agents.memory.multi_agent_coordinatorยถ
Multi-Agent Memory Coordinator using MetaStateSchema patterns.
This module provides a comprehensive coordinator that orchestrates multiple memory agents using the MetaStateSchema pattern for proper state management and agent composition.
Classesยถ
Describes the capabilities and characteristics of a memory agent. |
|
Represents a memory-related task for multi-agent coordination. |
|
Configuration for Multi-Agent Memory Coordinator. |
|
Orchestrates multiple memory agents using MetaStateSchema patterns. |
Module Contentsยถ
- class agents.memory.multi_agent_coordinator.MemoryAgentCapabilities(/, **data)ยถ
Bases:
pydantic.BaseModel
Describes the capabilities and characteristics of a memory agent.
This model defines what a memory agent can do, its performance characteristics, and specializations. Itโs used by the multi-agent coordinator for intelligent task routing and load balancing.
- Parameters:
data (Any)
- agent_nameยถ
Unique identifier for the agent
- agent_typeยถ
Class or type name of the agent (e.g., โKGGeneratorAgentโ)
- can_store_memoriesยถ
Whether the agent can store new memories
- can_retrieve_memoriesยถ
Whether the agent can retrieve existing memories
- can_analyze_memoriesยถ
Whether the agent can analyze memory content
- can_generate_knowledge_graphยถ
Whether the agent can build knowledge graphs
- can_coordinate_retrievalยถ
Whether the agent can coordinate retrieval strategies
- supported_memory_typesยถ
List of memory types the agent can handle
- typical_latency_msยถ
Expected response time in milliseconds
- max_concurrent_tasksยถ
Maximum number of concurrent tasks the agent can handle
- specializationยถ
List of agent specializations and strengths
Examples
KG Generator Agent capabilities:
kg_capabilities = MemoryAgentCapabilities( agent_name="kg_generator", agent_type="KGGeneratorAgent", can_analyze_memories=True, can_generate_knowledge_graph=True, supported_memory_types=[ MemoryType.SEMANTIC, MemoryType.EPISODIC, MemoryType.CONTEXTUAL ], typical_latency_ms=2000, max_concurrent_tasks=2, specialization=[ "entity_extraction", "relationship_discovery", "graph_construction" ] )
Agentic RAG Coordinator capabilities:
rag_capabilities = MemoryAgentCapabilities( agent_name="agentic_rag", agent_type="AgenticRAGCoordinator", can_retrieve_memories=True, can_coordinate_retrieval=True, supported_memory_types=list(MemoryType), # Supports all types typical_latency_ms=1500, max_concurrent_tasks=3, specialization=[ "strategy_selection", "result_fusion", "intelligent_retrieval" ] )
Memory Store Agent capabilities:
store_capabilities = MemoryAgentCapabilities( agent_name="memory_store", agent_type="MemoryStoreAgent", can_store_memories=True, can_retrieve_memories=True, supported_memory_types=list(MemoryType), typical_latency_ms=500, max_concurrent_tasks=5, specialization=[ "memory_storage", "basic_retrieval", "memory_management" ] )
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- class agents.memory.multi_agent_coordinator.MemoryTask(/, **data)ยถ
Bases:
pydantic.BaseModel
Represents a memory-related task for multi-agent coordination.
A MemoryTask encapsulates a specific memory operation (store, retrieve, analyze, etc.) that can be executed by the multi-agent coordinator system. It contains all necessary information for task routing, execution, and result tracking.
- Parameters:
data (Any)
- idยถ
Unique identifier for the task, used for tracking and coordination
- typeยถ
Type of memory operation (store, retrieve, analyze, generate_kg, etc.)
- queryยถ
Natural language description of the task or query content
- parametersยถ
Dictionary of task-specific parameters and configuration
- priorityยถ
Task priority level (1=highest, 10=lowest) for execution ordering
- namespaceยถ
Memory namespace to operate within (e.g., (โuserโ, โpersonalโ))
- memory_typesยถ
Specific memory types to target (semantic, episodic, etc.)
- statusยถ
Current task status (pending, routing, executing, completed, failed)
- assigned_agentยถ
Name of the agent assigned to execute this task
- resultยถ
Task execution result (populated after completion)
- errorยถ
Error message if task execution failed
- created_atยถ
UTC timestamp when the task was created
- started_atยถ
UTC timestamp when task execution started
- completed_atยถ
UTC timestamp when task execution completed
Examples
Creating a memory storage task:
task = MemoryTask( id="store_001", type="store_memory", query="Store information about Python programming", parameters={"content": "Python is a programming language"}, priority=3, namespace=("user", "learning") )
Creating a retrieval task:
task = MemoryTask( id="retrieve_001", type="retrieve_memories", query="Find information about machine learning", parameters={"limit": 10, "use_graph_rag": True}, priority=1, memory_types=[MemoryType.SEMANTIC, MemoryType.EPISODIC] )
Creating an analysis task:
task = MemoryTask( id="analyze_001", type="analyze_memory", query="Analyze patterns in my learning history", parameters={"analysis_type": "pattern_detection"}, priority=2 )
Creating a knowledge graph generation task:
task = MemoryTask( id="kg_001", type="generate_knowledge_graph", query="Build knowledge graph from recent memories", parameters={"max_memories": 100, "confidence_threshold": 0.7}, priority=4, namespace=("user", "work") )
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- class agents.memory.multi_agent_coordinator.MultiAgentCoordinatorConfig(/, **data)ยถ
Bases:
pydantic.BaseModel
Configuration for Multi-Agent Memory Coordinator.
This configuration class defines all parameters needed to create and configure a MultiAgentMemoryCoordinator, including agent configurations, coordination settings, and performance parameters.
- Parameters:
data (Any)
- nameยถ
Unique identifier for the coordinator instance
- memory_store_managerยถ
Manager for memory storage and retrieval operations
- memory_classifierยถ
Classifier for analyzing memory content and types
- kg_generator_configยถ
Configuration for the knowledge graph generator agent
- agentic_rag_configยถ
Configuration for the agentic RAG coordinator agent
- max_concurrent_tasksยถ
Maximum number of tasks that can execute simultaneously
- task_timeout_secondsยถ
Maximum time (in seconds) a task can run before timing out
- enable_agent_communicationยถ
Whether to enable communication between agents
- coordinator_llmยถ
LLM configuration for the coordinatorโs decision-making
- routing_strategyยถ
Strategy for routing tasks to agents (capability_based, load_balanced, etc.)
- enable_task_decompositionยถ
Whether to enable breaking complex tasks into subtasks
- enable_cachingยถ
Whether to enable result caching for performance
- cache_ttl_secondsยถ
Time-to-live for cached results in seconds
- persistenceยถ
Persistence configuration passed to sub-agents
Examples
Basic configuration:
config = MultiAgentCoordinatorConfig( name="my_coordinator", memory_store_manager=store_manager, memory_classifier=classifier, kg_generator_config=kg_config, agentic_rag_config=rag_config, max_concurrent_tasks=3, task_timeout_seconds=180 )
Advanced configuration with custom settings:
config = MultiAgentCoordinatorConfig( name="advanced_coordinator", memory_store_manager=store_manager, memory_classifier=classifier, kg_generator_config=kg_config, agentic_rag_config=rag_config, # Coordination settings max_concurrent_tasks=10, task_timeout_seconds=600, enable_agent_communication=True, # Coordinator LLM coordinator_llm=AugLLMConfig( model="gpt-4", temperature=0.2, max_tokens=1000 ), # Task routing routing_strategy="capability_based", enable_task_decomposition=True, # Performance enable_caching=True, cache_ttl_seconds=7200, # 2 hours # Persistence persistence=False # Disable for testing )
Production configuration:
config = MultiAgentCoordinatorConfig( name="production_coordinator", memory_store_manager=store_manager, memory_classifier=classifier, kg_generator_config=kg_config, agentic_rag_config=rag_config, # High-performance settings max_concurrent_tasks=20, task_timeout_seconds=900, enable_agent_communication=True, # Optimized coordinator coordinator_llm=AugLLMConfig( model="gpt-4-turbo", temperature=0.1, max_tokens=2000 ), # Advanced routing routing_strategy="load_balanced", enable_task_decomposition=True, # Production caching enable_caching=True, cache_ttl_seconds=3600 )
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_configยถ
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class agents.memory.multi_agent_coordinator.MultiAgentMemoryCoordinator(config)ยถ
Orchestrates multiple memory agents using MetaStateSchema patterns.
The MultiAgentMemoryCoordinator is the central orchestrator for the memory system, managing a collection of specialized memory agents and intelligently routing tasks based on agent capabilities, performance characteristics, and current load.
This coordinator provides: - Intelligent task routing based on agent capabilities - Load balancing across multiple agents - Task decomposition for complex operations - Performance monitoring and optimization - Fault tolerance and error handling - Agent communication and coordination
- configยถ
Configuration object containing all coordinator settings
- memory_storeยถ
Memory store manager for direct storage operations
- classifierยถ
Memory classifier for content analysis
- coordinator_llmยถ
LLM runnable for coordinator decision-making
- meta_agentsยถ
Dictionary of agents wrapped in MetaStateSchema
- agent_capabilitiesยถ
Dictionary mapping agent names to their capabilities
- task_queueยถ
List of pending tasks waiting for execution
- active_tasksยถ
Dictionary of currently executing tasks
- completed_tasksยถ
Dictionary of completed tasks with results
- performance_metricsยถ
Dictionary tracking system performance metrics
Examples
Basic coordinator usage:
# Create coordinator coordinator = MultiAgentMemoryCoordinator(config) # Store memory result = await coordinator.store_memory( "I learned about machine learning algorithms today" ) # Retrieve memories memories = await coordinator.retrieve_memories( query="machine learning", limit=5 ) # Analyze memory content analysis = await coordinator.analyze_memory( "Complex analysis of learning patterns" )
Advanced task execution:
# Create custom task task = MemoryTask( id="complex_analysis", type="analyze_and_graph", query="Analyze learning patterns and build knowledge graph", parameters={ "analysis_depth": "comprehensive", "graph_confidence": 0.8 }, priority=1 ) # Execute task result = await coordinator.execute_task(task) # Check task status if result.status == "completed": print(f"Task completed: {result.result}") else: print(f"Task failed: {result.error}")
System monitoring:
# Get system status status = coordinator.get_system_status() print(f"Total agents: {status['total_agents']}") print(f"Active tasks: {status['active_tasks']}") # Run diagnostic diagnostic = await coordinator.run_diagnostic() if diagnostic["system_status"] == "healthy": print("System is healthy") else: print("System issues detected")
Performance monitoring:
# Get performance metrics metrics = coordinator.performance_metrics print(f"Total tasks: {metrics['total_tasks']}") print(f"Success rate: {metrics['successful_tasks'] / metrics['total_tasks'] * 100:.1f}%") print(f"Average latency: {metrics['avg_latency_ms']:.1f}ms")
Initialize the multi-agent coordinator.
Sets up the coordinator with the provided configuration, initializes all managed agents, and prepares the task management system.
- Parameters:
config (MultiAgentCoordinatorConfig) โ MultiAgentCoordinatorConfig containing all coordinator settings
Examples
Basic initialization:
config = MultiAgentCoordinatorConfig( memory_store_manager=store_manager, memory_classifier=classifier, kg_generator_config=kg_config, agentic_rag_config=rag_config ) coordinator = MultiAgentMemoryCoordinator(config)
Advanced initialization with custom settings:
config = MultiAgentCoordinatorConfig( name="production_coordinator", memory_store_manager=store_manager, memory_classifier=classifier, kg_generator_config=kg_config, agentic_rag_config=rag_config, max_concurrent_tasks=10, task_timeout_seconds=600, enable_caching=True ) coordinator = MultiAgentMemoryCoordinator(config)
- async analyze_memory(content)ยถ
Analyze memory content using the multi-agent system with specialized routing.
This method creates a memory analysis task and routes it to the most appropriate agent (typically the memory classifier). The system provides comprehensive analysis including classification, entity extraction, and importance scoring.
- Parameters:
content (str) โ The memory content to analyze (text, structured data, etc.)
- Returns:
- Analysis results containing:
analysis: Detailed analysis results from the assigned agent
success: Boolean indicating if analysis completed successfully
error: Error message if analysis failed
- Return type:
Dict[str, Any]
Examples
Basic memory analysis:
analysis = await coordinator.analyze_memory( "I attended a machine learning conference where I learned about neural networks" ) if analysis["success"]: result = analysis["analysis"] print(f"Memory type: {result.get('memory_type')}") print(f"Entities: {result.get('entities')}") print(f"Importance: {result.get('importance_score')}") else: print(f"Analysis failed: {analysis['error']}")
Complex content analysis:
analysis = await coordinator.analyze_memory( ''' Meeting Notes: Q1 Planning Attendees: Alice (PM), Bob (Engineer), Carol (Designer) Decisions: - Use React for the frontend - Deploy on AWS with auto-scaling - Launch beta by March 15th ''' ) if analysis["success"]: result = analysis["analysis"] print(f"Extracted entities: {result.get('entities')}") print(f"Key decisions: {result.get('decisions')}") print(f"Action items: {result.get('action_items')}") print(f"Participants: {result.get('participants')}")
Note
The analysis typically includes: - Memory type classification (semantic, episodic, procedural, etc.) - Entity extraction (people, organizations, concepts, etc.) - Importance and relevance scoring - Metadata extraction (dates, locations, etc.) - Relationship identification - Content summarization
- async execute_task(task)ยถ
Execute a memory task using appropriate agents with intelligent routing.
This method is the core of the multi-agent coordinator, responsible for: 1. Routing tasks to the most appropriate agent(s) 2. Executing tasks based on routing decisions 3. Handling different execution strategies (single, multi, sequential, decomposed) 4. Updating performance metrics and task status
- Parameters:
task (MemoryTask) โ MemoryTask to execute containing query, parameters, and metadata
- Returns:
The same task object updated with results, status, and timing
- Return type:
- Raises:
ValueError โ If routing decision is unknown or invalid
RuntimeError โ If task execution fails due to agent errors
Examples
Basic task execution:
task = MemoryTask( id="simple_task", type="retrieve_memories", query="Find information about Python programming", priority=1 ) result_task = await coordinator.execute_task(task) if result_task.status == "completed": print(f"Task completed: {result_task.result}") else: print(f"Task failed: {result_task.error}")
Complex task with custom parameters:
task = MemoryTask( id="complex_analysis", type="analyze_and_graph", query="Analyze learning patterns and build knowledge graph", parameters={ "analysis_depth": "comprehensive", "graph_confidence": 0.8, "include_relationships": True }, priority=1, namespace=("user", "work") ) result_task = await coordinator.execute_task(task) # Check execution details print(f"Assigned agent: {result_task.assigned_agent}") print(f"Duration: {result_task.completed_at - result_task.started_at}") print(f"Result: {result_task.result}")
Error handling:
try: result_task = await coordinator.execute_task(task) if result_task.status == "failed": logger.error(f"Task {task.id} failed: {result_task.error}") # Handle failure - maybe retry or use fallback except Exception as e: logger.error(f"Unexpected error executing task: {e}")
- async generate_knowledge_graph(namespace=None)ยถ
Generate knowledge graph using the multi-agent system with KG specialization.
This method creates a knowledge graph generation task and routes it to the specialized KG generator agent. The system extracts entities, relationships, and builds a comprehensive knowledge graph from stored memories.
- Parameters:
namespace (tuple[str, Ellipsis] | None) โ Optional namespace tuple to limit graph generation scope
- Returns:
- Knowledge graph results containing:
knowledge_graph: Generated graph with nodes and relationships
success: Boolean indicating if generation completed successfully
error: Error message if generation failed
- Return type:
Dict[str, Any]
Examples
Basic knowledge graph generation:
kg_result = await coordinator.generate_knowledge_graph() if kg_result["success"]: graph = kg_result["knowledge_graph"] print(f"Nodes: {len(graph.get('nodes', []))}") print(f"Relationships: {len(graph.get('relationships', []))}") # Explore entities for node in graph.get('nodes', []): print(f"Entity: {node['name']} ({node['type']})") # Explore relationships for rel in graph.get('relationships', []): print(f"{rel['source']} -> {rel['target']} ({rel['type']})") else: print(f"KG generation failed: {kg_result['error']}")
Scoped knowledge graph generation:
kg_result = await coordinator.generate_knowledge_graph( namespace=("user", "work", "projects") ) if kg_result["success"]: graph = kg_result["knowledge_graph"] # Analyze work-related entities work_entities = [ node for node in graph.get('nodes', []) if node.get('type') in ['person', 'organization', 'project'] ] print(f"Work entities: {len(work_entities)}") # Find project relationships project_rels = [ rel for rel in graph.get('relationships', []) if 'project' in rel.get('type', '').lower() ] print(f"Project relationships: {len(project_rels)}")
Note
The knowledge graph typically includes: - Entities: People, organizations, concepts, technologies, etc. - Relationships: Works_at, uses, knows, creates, etc. - Confidence scores for entities and relationships - Metadata: Creation timestamps, memory references, etc. - Graph statistics: Node counts, relationship types, etc.
- get_system_status()ยถ
Get comprehensive system status and health information.
This method provides a complete overview of the multi-agent systemโs current state, including agent health, performance metrics, and operational status.
- Returns:
- System status containing:
coordinator_status: Overall coordinator status (active, degraded, error)
total_agents: Number of managed agents
active_tasks: Number of currently executing tasks
completed_tasks: Number of completed tasks
performance_metrics: System performance statistics
agent_status: Individual agent status and health
agent_capabilities: Summary of each agentโs capabilities
- Return type:
Dict[str, Any]
Examples
Basic system status check:
status = coordinator.get_system_status() print(f"Coordinator: {status['coordinator_status']}") print(f"Total agents: {status['total_agents']}") print(f"Active tasks: {status['active_tasks']}") print(f"Success rate: {status['performance_metrics']['successful_tasks'] / status['performance_metrics']['total_tasks'] * 100:.1f}%")
Detailed agent status:
status = coordinator.get_system_status() for agent_name, agent_info in status['agent_status'].items(): print(f"Agent: {agent_name}") print(f" Type: {agent_info['agent_type']}") print(f" Status: {agent_info['execution_status']}") print(f" Executions: {agent_info['execution_count']}") print(f" Needs recompile: {agent_info['needs_recompile']}")
Performance monitoring:
status = coordinator.get_system_status() metrics = status['performance_metrics'] print(f"Total tasks: {metrics['total_tasks']}") print(f"Successful: {metrics['successful_tasks']}") print(f"Failed: {metrics['failed_tasks']}") print(f"Average latency: {metrics['avg_latency_ms']:.1f}ms") # Agent utilization for agent, count in metrics['agent_utilization'].items(): utilization = count / metrics['total_tasks'] * 100 print(f"Agent {agent}: {utilization:.1f}% utilization")
Note
This method is synchronous and provides a snapshot of the current system state. For continuous monitoring, call this method periodically or use the run_diagnostic() method for health checks.
- async retrieve_memories(query, limit=10, memory_types=None, namespace=None)ยถ
Retrieve memories using the multi-agent system with intelligent routing.
This method creates a memory retrieval task and routes it to the most appropriate agent (typically the agentic RAG coordinator). The system automatically selects the best retrieval strategy based on the query characteristics.
- Parameters:
query (str) โ Natural language query describing what memories to retrieve
limit (int) โ Maximum number of memories to return (default: 10)
memory_types (list[haive.agents.memory.core.types.MemoryType] | None) โ Optional list of specific memory types to search within
namespace (tuple[str, Ellipsis] | None) โ Optional namespace tuple to limit search scope
- Returns:
List of memory objects with content, metadata, and relevance scores
- Return type:
List[Dict[str, Any]]
Examples
Basic memory retrieval:
memories = await coordinator.retrieve_memories( "What did I learn about Python programming?" ) for memory in memories: print(f"Content: {memory['content']}") print(f"Relevance: {memory['relevance_score']}") print(f"Timestamp: {memory['timestamp']}")
Targeted retrieval with filters:
memories = await coordinator.retrieve_memories( query="machine learning algorithms", limit=5, memory_types=[MemoryType.SEMANTIC, MemoryType.EPISODIC], namespace=("user", "learning") )
Complex query with context:
memories = await coordinator.retrieve_memories( "Find all meetings where we discussed the API project and show related decisions", limit=20, namespace=("user", "work") ) # System automatically uses graph traversal for complex queries for memory in memories: if memory.get('graph_connections'): print(f"Connected entities: {memory['graph_connections']}")
Note
The system automatically: - Analyzes query complexity and selects appropriate retrieval strategy - Uses vector similarity, graph traversal, or hybrid approaches - Applies relevance scoring and ranking - Returns structured results with metadata and provenance
- async run_diagnostic()ยถ
Run comprehensive system diagnostic with agent health checks.
This method performs a complete system diagnostic by testing each agent with a simple diagnostic query. It identifies unhealthy agents and provides detailed error information for troubleshooting.
- Returns:
- Diagnostic results containing:
system_status: Overall system health (healthy, degraded, critical)
agent_diagnostics: Individual agent diagnostic results
performance_metrics: Current system performance metrics
- Return type:
Dict[str, Any]
Examples
Basic diagnostic check:
diagnostic = await coordinator.run_diagnostic() print(f"System status: {diagnostic['system_status']}") if diagnostic['system_status'] != 'healthy': print("Issues detected:") for agent, result in diagnostic['agent_diagnostics'].items(): if result['status'] != 'healthy': print(f" {agent}: {result.get('error', 'Unknown error')}") else: print("All agents are healthy")
Detailed diagnostic analysis:
diagnostic = await coordinator.run_diagnostic() for agent_name, result in diagnostic['agent_diagnostics'].items(): print(f"Agent: {agent_name}") print(f" Status: {result['status']}") if result['status'] == 'healthy': print(f" Test result: {result.get('test_result', 'N/A')}") else: print(f" Error: {result.get('error', 'Unknown error')}")
Performance analysis:
diagnostic = await coordinator.run_diagnostic() metrics = diagnostic['performance_metrics'] if metrics['total_tasks'] > 0: success_rate = metrics['successful_tasks'] / metrics['total_tasks'] print(f"Success rate: {success_rate * 100:.1f}%") if success_rate < 0.9: print("Warning: Low success rate detected") if metrics['avg_latency_ms'] > 5000: print("Warning: High latency detected")
Note
This diagnostic runs a simple test query on each agent to verify basic functionality. For production systems, consider running this periodically to monitor system health and detect degradation early.
- async store_memory(content, namespace=None)ยถ
Store a memory using the multi-agent system with intelligent routing.
This method creates a memory storage task and routes it to the appropriate agent (typically the memory store agent). The system automatically handles classification, metadata extraction, and storage optimization.
- Parameters:
- Returns:
Success message with storage details or error message
- Return type:
Examples
Basic memory storage:
result = await coordinator.store_memory( "I learned about machine learning algorithms today" ) print(result) # "Memory stored successfully: {...}"
Memory with namespace:
result = await coordinator.store_memory( "Completed project milestone: API integration", namespace=("user", "work", "projects") )
Structured memory storage:
result = await coordinator.store_memory( json.dumps({ "event": "meeting", "participants": ["Alice", "Bob"], "decisions": ["Use React for frontend", "Deploy on AWS"] }), namespace=("user", "work", "meetings") )
Note
The system automatically classifies the memory type, extracts metadata, and updates relevant knowledge graphs based on the content.