agents.memory.multi_agent_coordinatorยถ

Multi-Agent Memory Coordinator using MetaStateSchema patterns.

This module provides a comprehensive coordinator that orchestrates multiple memory agents using the MetaStateSchema pattern for proper state management and agent composition.

Classesยถ

MemoryAgentCapabilities

Describes the capabilities and characteristics of a memory agent.

MemoryTask

Represents a memory-related task for multi-agent coordination.

MultiAgentCoordinatorConfig

Configuration for Multi-Agent Memory Coordinator.

MultiAgentMemoryCoordinator

Orchestrates multiple memory agents using MetaStateSchema patterns.

Module Contentsยถ

class agents.memory.multi_agent_coordinator.MemoryAgentCapabilities(/, **data)ยถ

Bases: pydantic.BaseModel

Describes the capabilities and characteristics of a memory agent.

This model defines what a memory agent can do, its performance characteristics, and specializations. Itโ€™s used by the multi-agent coordinator for intelligent task routing and load balancing.

Parameters:

data (Any)

agent_nameยถ

Unique identifier for the agent

agent_typeยถ

Class or type name of the agent (e.g., โ€œKGGeneratorAgentโ€)

can_store_memoriesยถ

Whether the agent can store new memories

can_retrieve_memoriesยถ

Whether the agent can retrieve existing memories

can_analyze_memoriesยถ

Whether the agent can analyze memory content

can_generate_knowledge_graphยถ

Whether the agent can build knowledge graphs

can_coordinate_retrievalยถ

Whether the agent can coordinate retrieval strategies

supported_memory_typesยถ

List of memory types the agent can handle

typical_latency_msยถ

Expected response time in milliseconds

max_concurrent_tasksยถ

Maximum number of concurrent tasks the agent can handle

specializationยถ

List of agent specializations and strengths

Examples

KG Generator Agent capabilities:

kg_capabilities = MemoryAgentCapabilities(
    agent_name="kg_generator",
    agent_type="KGGeneratorAgent",
    can_analyze_memories=True,
    can_generate_knowledge_graph=True,
    supported_memory_types=[
        MemoryType.SEMANTIC,
        MemoryType.EPISODIC,
        MemoryType.CONTEXTUAL
    ],
    typical_latency_ms=2000,
    max_concurrent_tasks=2,
    specialization=[
        "entity_extraction",
        "relationship_discovery",
        "graph_construction"
    ]
)

Agentic RAG Coordinator capabilities:

rag_capabilities = MemoryAgentCapabilities(
    agent_name="agentic_rag",
    agent_type="AgenticRAGCoordinator",
    can_retrieve_memories=True,
    can_coordinate_retrieval=True,
    supported_memory_types=list(MemoryType),  # Supports all types
    typical_latency_ms=1500,
    max_concurrent_tasks=3,
    specialization=[
        "strategy_selection",
        "result_fusion",
        "intelligent_retrieval"
    ]
)

Memory Store Agent capabilities:

store_capabilities = MemoryAgentCapabilities(
    agent_name="memory_store",
    agent_type="MemoryStoreAgent",
    can_store_memories=True,
    can_retrieve_memories=True,
    supported_memory_types=list(MemoryType),
    typical_latency_ms=500,
    max_concurrent_tasks=5,
    specialization=[
        "memory_storage",
        "basic_retrieval",
        "memory_management"
    ]
)

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

class agents.memory.multi_agent_coordinator.MemoryTask(/, **data)ยถ

Bases: pydantic.BaseModel

Represents a memory-related task for multi-agent coordination.

A MemoryTask encapsulates a specific memory operation (store, retrieve, analyze, etc.) that can be executed by the multi-agent coordinator system. It contains all necessary information for task routing, execution, and result tracking.

Parameters:

data (Any)

idยถ

Unique identifier for the task, used for tracking and coordination

typeยถ

Type of memory operation (store, retrieve, analyze, generate_kg, etc.)

queryยถ

Natural language description of the task or query content

parametersยถ

Dictionary of task-specific parameters and configuration

priorityยถ

Task priority level (1=highest, 10=lowest) for execution ordering

namespaceยถ

Memory namespace to operate within (e.g., (โ€œuserโ€, โ€œpersonalโ€))

memory_typesยถ

Specific memory types to target (semantic, episodic, etc.)

statusยถ

Current task status (pending, routing, executing, completed, failed)

assigned_agentยถ

Name of the agent assigned to execute this task

resultยถ

Task execution result (populated after completion)

errorยถ

Error message if task execution failed

created_atยถ

UTC timestamp when the task was created

started_atยถ

UTC timestamp when task execution started

completed_atยถ

UTC timestamp when task execution completed

Examples

Creating a memory storage task:

task = MemoryTask(
    id="store_001",
    type="store_memory",
    query="Store information about Python programming",
    parameters={"content": "Python is a programming language"},
    priority=3,
    namespace=("user", "learning")
)

Creating a retrieval task:

task = MemoryTask(
    id="retrieve_001",
    type="retrieve_memories",
    query="Find information about machine learning",
    parameters={"limit": 10, "use_graph_rag": True},
    priority=1,
    memory_types=[MemoryType.SEMANTIC, MemoryType.EPISODIC]
)

Creating an analysis task:

task = MemoryTask(
    id="analyze_001",
    type="analyze_memory",
    query="Analyze patterns in my learning history",
    parameters={"analysis_type": "pattern_detection"},
    priority=2
)

Creating a knowledge graph generation task:

task = MemoryTask(
    id="kg_001",
    type="generate_knowledge_graph",
    query="Build knowledge graph from recent memories",
    parameters={"max_memories": 100, "confidence_threshold": 0.7},
    priority=4,
    namespace=("user", "work")
)

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

class agents.memory.multi_agent_coordinator.MultiAgentCoordinatorConfig(/, **data)ยถ

Bases: pydantic.BaseModel

Configuration for Multi-Agent Memory Coordinator.

This configuration class defines all parameters needed to create and configure a MultiAgentMemoryCoordinator, including agent configurations, coordination settings, and performance parameters.

Parameters:

data (Any)

nameยถ

Unique identifier for the coordinator instance

memory_store_managerยถ

Manager for memory storage and retrieval operations

memory_classifierยถ

Classifier for analyzing memory content and types

kg_generator_configยถ

Configuration for the knowledge graph generator agent

agentic_rag_configยถ

Configuration for the agentic RAG coordinator agent

max_concurrent_tasksยถ

Maximum number of tasks that can execute simultaneously

task_timeout_secondsยถ

Maximum time (in seconds) a task can run before timing out

enable_agent_communicationยถ

Whether to enable communication between agents

coordinator_llmยถ

LLM configuration for the coordinatorโ€™s decision-making

routing_strategyยถ

Strategy for routing tasks to agents (capability_based, load_balanced, etc.)

enable_task_decompositionยถ

Whether to enable breaking complex tasks into subtasks

enable_cachingยถ

Whether to enable result caching for performance

cache_ttl_secondsยถ

Time-to-live for cached results in seconds

persistenceยถ

Persistence configuration passed to sub-agents

Examples

Basic configuration:

config = MultiAgentCoordinatorConfig(
    name="my_coordinator",
    memory_store_manager=store_manager,
    memory_classifier=classifier,
    kg_generator_config=kg_config,
    agentic_rag_config=rag_config,
    max_concurrent_tasks=3,
    task_timeout_seconds=180
)

Advanced configuration with custom settings:

config = MultiAgentCoordinatorConfig(
    name="advanced_coordinator",
    memory_store_manager=store_manager,
    memory_classifier=classifier,
    kg_generator_config=kg_config,
    agentic_rag_config=rag_config,

    # Coordination settings
    max_concurrent_tasks=10,
    task_timeout_seconds=600,
    enable_agent_communication=True,

    # Coordinator LLM
    coordinator_llm=AugLLMConfig(
        model="gpt-4",
        temperature=0.2,
        max_tokens=1000
    ),

    # Task routing
    routing_strategy="capability_based",
    enable_task_decomposition=True,

    # Performance
    enable_caching=True,
    cache_ttl_seconds=7200,  # 2 hours

    # Persistence
    persistence=False  # Disable for testing
)

Production configuration:

config = MultiAgentCoordinatorConfig(
    name="production_coordinator",
    memory_store_manager=store_manager,
    memory_classifier=classifier,
    kg_generator_config=kg_config,
    agentic_rag_config=rag_config,

    # High-performance settings
    max_concurrent_tasks=20,
    task_timeout_seconds=900,
    enable_agent_communication=True,

    # Optimized coordinator
    coordinator_llm=AugLLMConfig(
        model="gpt-4-turbo",
        temperature=0.1,
        max_tokens=2000
    ),

    # Advanced routing
    routing_strategy="load_balanced",
    enable_task_decomposition=True,

    # Production caching
    enable_caching=True,
    cache_ttl_seconds=3600
)

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_configยถ

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class agents.memory.multi_agent_coordinator.MultiAgentMemoryCoordinator(config)ยถ

Orchestrates multiple memory agents using MetaStateSchema patterns.

The MultiAgentMemoryCoordinator is the central orchestrator for the memory system, managing a collection of specialized memory agents and intelligently routing tasks based on agent capabilities, performance characteristics, and current load.

This coordinator provides: - Intelligent task routing based on agent capabilities - Load balancing across multiple agents - Task decomposition for complex operations - Performance monitoring and optimization - Fault tolerance and error handling - Agent communication and coordination

configยถ

Configuration object containing all coordinator settings

memory_storeยถ

Memory store manager for direct storage operations

classifierยถ

Memory classifier for content analysis

coordinator_llmยถ

LLM runnable for coordinator decision-making

meta_agentsยถ

Dictionary of agents wrapped in MetaStateSchema

agent_capabilitiesยถ

Dictionary mapping agent names to their capabilities

task_queueยถ

List of pending tasks waiting for execution

active_tasksยถ

Dictionary of currently executing tasks

completed_tasksยถ

Dictionary of completed tasks with results

performance_metricsยถ

Dictionary tracking system performance metrics

Examples

Basic coordinator usage:

# Create coordinator
coordinator = MultiAgentMemoryCoordinator(config)

# Store memory
result = await coordinator.store_memory(
    "I learned about machine learning algorithms today"
)

# Retrieve memories
memories = await coordinator.retrieve_memories(
    query="machine learning",
    limit=5
)

# Analyze memory content
analysis = await coordinator.analyze_memory(
    "Complex analysis of learning patterns"
)

Advanced task execution:

# Create custom task
task = MemoryTask(
    id="complex_analysis",
    type="analyze_and_graph",
    query="Analyze learning patterns and build knowledge graph",
    parameters={
        "analysis_depth": "comprehensive",
        "graph_confidence": 0.8
    },
    priority=1
)

# Execute task
result = await coordinator.execute_task(task)

# Check task status
if result.status == "completed":
    print(f"Task completed: {result.result}")
else:
    print(f"Task failed: {result.error}")

System monitoring:

# Get system status
status = coordinator.get_system_status()
print(f"Total agents: {status['total_agents']}")
print(f"Active tasks: {status['active_tasks']}")

# Run diagnostic
diagnostic = await coordinator.run_diagnostic()
if diagnostic["system_status"] == "healthy":
    print("System is healthy")
else:
    print("System issues detected")

Performance monitoring:

# Get performance metrics
metrics = coordinator.performance_metrics
print(f"Total tasks: {metrics['total_tasks']}")
print(f"Success rate: {metrics['successful_tasks'] / metrics['total_tasks'] * 100:.1f}%")
print(f"Average latency: {metrics['avg_latency_ms']:.1f}ms")

Initialize the multi-agent coordinator.

Sets up the coordinator with the provided configuration, initializes all managed agents, and prepares the task management system.

Parameters:

config (MultiAgentCoordinatorConfig) โ€“ MultiAgentCoordinatorConfig containing all coordinator settings

Examples

Basic initialization:

config = MultiAgentCoordinatorConfig(
    memory_store_manager=store_manager,
    memory_classifier=classifier,
    kg_generator_config=kg_config,
    agentic_rag_config=rag_config
)

coordinator = MultiAgentMemoryCoordinator(config)

Advanced initialization with custom settings:

config = MultiAgentCoordinatorConfig(
    name="production_coordinator",
    memory_store_manager=store_manager,
    memory_classifier=classifier,
    kg_generator_config=kg_config,
    agentic_rag_config=rag_config,
    max_concurrent_tasks=10,
    task_timeout_seconds=600,
    enable_caching=True
)

coordinator = MultiAgentMemoryCoordinator(config)
async analyze_memory(content)ยถ

Analyze memory content using the multi-agent system with specialized routing.

This method creates a memory analysis task and routes it to the most appropriate agent (typically the memory classifier). The system provides comprehensive analysis including classification, entity extraction, and importance scoring.

Parameters:

content (str) โ€“ The memory content to analyze (text, structured data, etc.)

Returns:

Analysis results containing:
  • analysis: Detailed analysis results from the assigned agent

  • success: Boolean indicating if analysis completed successfully

  • error: Error message if analysis failed

Return type:

Dict[str, Any]

Examples

Basic memory analysis:

analysis = await coordinator.analyze_memory(
    "I attended a machine learning conference where I learned about neural networks"
)

if analysis["success"]:
    result = analysis["analysis"]
    print(f"Memory type: {result.get('memory_type')}")
    print(f"Entities: {result.get('entities')}")
    print(f"Importance: {result.get('importance_score')}")
else:
    print(f"Analysis failed: {analysis['error']}")

Complex content analysis:

analysis = await coordinator.analyze_memory(
    '''
    Meeting Notes: Q1 Planning
    Attendees: Alice (PM), Bob (Engineer), Carol (Designer)
    Decisions:
    - Use React for the frontend
    - Deploy on AWS with auto-scaling
    - Launch beta by March 15th
    '''
)

if analysis["success"]:
    result = analysis["analysis"]
    print(f"Extracted entities: {result.get('entities')}")
    print(f"Key decisions: {result.get('decisions')}")
    print(f"Action items: {result.get('action_items')}")
    print(f"Participants: {result.get('participants')}")

Note

The analysis typically includes: - Memory type classification (semantic, episodic, procedural, etc.) - Entity extraction (people, organizations, concepts, etc.) - Importance and relevance scoring - Metadata extraction (dates, locations, etc.) - Relationship identification - Content summarization

async execute_task(task)ยถ

Execute a memory task using appropriate agents with intelligent routing.

This method is the core of the multi-agent coordinator, responsible for: 1. Routing tasks to the most appropriate agent(s) 2. Executing tasks based on routing decisions 3. Handling different execution strategies (single, multi, sequential, decomposed) 4. Updating performance metrics and task status

Parameters:

task (MemoryTask) โ€“ MemoryTask to execute containing query, parameters, and metadata

Returns:

The same task object updated with results, status, and timing

Return type:

MemoryTask

Raises:
  • ValueError โ€“ If routing decision is unknown or invalid

  • RuntimeError โ€“ If task execution fails due to agent errors

Examples

Basic task execution:

task = MemoryTask(
    id="simple_task",
    type="retrieve_memories",
    query="Find information about Python programming",
    priority=1
)

result_task = await coordinator.execute_task(task)

if result_task.status == "completed":
    print(f"Task completed: {result_task.result}")
else:
    print(f"Task failed: {result_task.error}")

Complex task with custom parameters:

task = MemoryTask(
    id="complex_analysis",
    type="analyze_and_graph",
    query="Analyze learning patterns and build knowledge graph",
    parameters={
        "analysis_depth": "comprehensive",
        "graph_confidence": 0.8,
        "include_relationships": True
    },
    priority=1,
    namespace=("user", "work")
)

result_task = await coordinator.execute_task(task)

# Check execution details
print(f"Assigned agent: {result_task.assigned_agent}")
print(f"Duration: {result_task.completed_at - result_task.started_at}")
print(f"Result: {result_task.result}")

Error handling:

try:
    result_task = await coordinator.execute_task(task)

    if result_task.status == "failed":
        logger.error(f"Task {task.id} failed: {result_task.error}")
        # Handle failure - maybe retry or use fallback

except Exception as e:
    logger.error(f"Unexpected error executing task: {e}")
async generate_knowledge_graph(namespace=None)ยถ

Generate knowledge graph using the multi-agent system with KG specialization.

This method creates a knowledge graph generation task and routes it to the specialized KG generator agent. The system extracts entities, relationships, and builds a comprehensive knowledge graph from stored memories.

Parameters:

namespace (tuple[str, Ellipsis] | None) โ€“ Optional namespace tuple to limit graph generation scope

Returns:

Knowledge graph results containing:
  • knowledge_graph: Generated graph with nodes and relationships

  • success: Boolean indicating if generation completed successfully

  • error: Error message if generation failed

Return type:

Dict[str, Any]

Examples

Basic knowledge graph generation:

kg_result = await coordinator.generate_knowledge_graph()

if kg_result["success"]:
    graph = kg_result["knowledge_graph"]
    print(f"Nodes: {len(graph.get('nodes', []))}")
    print(f"Relationships: {len(graph.get('relationships', []))}")

    # Explore entities
    for node in graph.get('nodes', []):
        print(f"Entity: {node['name']} ({node['type']})")

    # Explore relationships
    for rel in graph.get('relationships', []):
        print(f"{rel['source']} -> {rel['target']} ({rel['type']})")
else:
    print(f"KG generation failed: {kg_result['error']}")

Scoped knowledge graph generation:

kg_result = await coordinator.generate_knowledge_graph(
    namespace=("user", "work", "projects")
)

if kg_result["success"]:
    graph = kg_result["knowledge_graph"]

    # Analyze work-related entities
    work_entities = [
        node for node in graph.get('nodes', [])
        if node.get('type') in ['person', 'organization', 'project']
    ]

    print(f"Work entities: {len(work_entities)}")

    # Find project relationships
    project_rels = [
        rel for rel in graph.get('relationships', [])
        if 'project' in rel.get('type', '').lower()
    ]

    print(f"Project relationships: {len(project_rels)}")

Note

The knowledge graph typically includes: - Entities: People, organizations, concepts, technologies, etc. - Relationships: Works_at, uses, knows, creates, etc. - Confidence scores for entities and relationships - Metadata: Creation timestamps, memory references, etc. - Graph statistics: Node counts, relationship types, etc.

get_system_status()ยถ

Get comprehensive system status and health information.

This method provides a complete overview of the multi-agent systemโ€™s current state, including agent health, performance metrics, and operational status.

Returns:

System status containing:
  • coordinator_status: Overall coordinator status (active, degraded, error)

  • total_agents: Number of managed agents

  • active_tasks: Number of currently executing tasks

  • completed_tasks: Number of completed tasks

  • performance_metrics: System performance statistics

  • agent_status: Individual agent status and health

  • agent_capabilities: Summary of each agentโ€™s capabilities

Return type:

Dict[str, Any]

Examples

Basic system status check:

status = coordinator.get_system_status()

print(f"Coordinator: {status['coordinator_status']}")
print(f"Total agents: {status['total_agents']}")
print(f"Active tasks: {status['active_tasks']}")
print(f"Success rate: {status['performance_metrics']['successful_tasks'] / status['performance_metrics']['total_tasks'] * 100:.1f}%")

Detailed agent status:

status = coordinator.get_system_status()

for agent_name, agent_info in status['agent_status'].items():
    print(f"Agent: {agent_name}")
    print(f"  Type: {agent_info['agent_type']}")
    print(f"  Status: {agent_info['execution_status']}")
    print(f"  Executions: {agent_info['execution_count']}")
    print(f"  Needs recompile: {agent_info['needs_recompile']}")

Performance monitoring:

status = coordinator.get_system_status()
metrics = status['performance_metrics']

print(f"Total tasks: {metrics['total_tasks']}")
print(f"Successful: {metrics['successful_tasks']}")
print(f"Failed: {metrics['failed_tasks']}")
print(f"Average latency: {metrics['avg_latency_ms']:.1f}ms")

# Agent utilization
for agent, count in metrics['agent_utilization'].items():
    utilization = count / metrics['total_tasks'] * 100
    print(f"Agent {agent}: {utilization:.1f}% utilization")

Note

This method is synchronous and provides a snapshot of the current system state. For continuous monitoring, call this method periodically or use the run_diagnostic() method for health checks.

async retrieve_memories(query, limit=10, memory_types=None, namespace=None)ยถ

Retrieve memories using the multi-agent system with intelligent routing.

This method creates a memory retrieval task and routes it to the most appropriate agent (typically the agentic RAG coordinator). The system automatically selects the best retrieval strategy based on the query characteristics.

Parameters:
  • query (str) โ€“ Natural language query describing what memories to retrieve

  • limit (int) โ€“ Maximum number of memories to return (default: 10)

  • memory_types (list[haive.agents.memory.core.types.MemoryType] | None) โ€“ Optional list of specific memory types to search within

  • namespace (tuple[str, Ellipsis] | None) โ€“ Optional namespace tuple to limit search scope

Returns:

List of memory objects with content, metadata, and relevance scores

Return type:

List[Dict[str, Any]]

Examples

Basic memory retrieval:

memories = await coordinator.retrieve_memories(
    "What did I learn about Python programming?"
)

for memory in memories:
    print(f"Content: {memory['content']}")
    print(f"Relevance: {memory['relevance_score']}")
    print(f"Timestamp: {memory['timestamp']}")

Targeted retrieval with filters:

memories = await coordinator.retrieve_memories(
    query="machine learning algorithms",
    limit=5,
    memory_types=[MemoryType.SEMANTIC, MemoryType.EPISODIC],
    namespace=("user", "learning")
)

Complex query with context:

memories = await coordinator.retrieve_memories(
    "Find all meetings where we discussed the API project and show related decisions",
    limit=20,
    namespace=("user", "work")
)

# System automatically uses graph traversal for complex queries
for memory in memories:
    if memory.get('graph_connections'):
        print(f"Connected entities: {memory['graph_connections']}")

Note

The system automatically: - Analyzes query complexity and selects appropriate retrieval strategy - Uses vector similarity, graph traversal, or hybrid approaches - Applies relevance scoring and ranking - Returns structured results with metadata and provenance

async run_diagnostic()ยถ

Run comprehensive system diagnostic with agent health checks.

This method performs a complete system diagnostic by testing each agent with a simple diagnostic query. It identifies unhealthy agents and provides detailed error information for troubleshooting.

Returns:

Diagnostic results containing:
  • system_status: Overall system health (healthy, degraded, critical)

  • agent_diagnostics: Individual agent diagnostic results

  • performance_metrics: Current system performance metrics

Return type:

Dict[str, Any]

Examples

Basic diagnostic check:

diagnostic = await coordinator.run_diagnostic()

print(f"System status: {diagnostic['system_status']}")

if diagnostic['system_status'] != 'healthy':
    print("Issues detected:")
    for agent, result in diagnostic['agent_diagnostics'].items():
        if result['status'] != 'healthy':
            print(f"  {agent}: {result.get('error', 'Unknown error')}")
else:
    print("All agents are healthy")

Detailed diagnostic analysis:

diagnostic = await coordinator.run_diagnostic()

for agent_name, result in diagnostic['agent_diagnostics'].items():
    print(f"Agent: {agent_name}")
    print(f"  Status: {result['status']}")

    if result['status'] == 'healthy':
        print(f"  Test result: {result.get('test_result', 'N/A')}")
    else:
        print(f"  Error: {result.get('error', 'Unknown error')}")

Performance analysis:

diagnostic = await coordinator.run_diagnostic()
metrics = diagnostic['performance_metrics']

if metrics['total_tasks'] > 0:
    success_rate = metrics['successful_tasks'] / metrics['total_tasks']
    print(f"Success rate: {success_rate * 100:.1f}%")

    if success_rate < 0.9:
        print("Warning: Low success rate detected")

    if metrics['avg_latency_ms'] > 5000:
        print("Warning: High latency detected")

Note

This diagnostic runs a simple test query on each agent to verify basic functionality. For production systems, consider running this periodically to monitor system health and detect degradation early.

async store_memory(content, namespace=None)ยถ

Store a memory using the multi-agent system with intelligent routing.

This method creates a memory storage task and routes it to the appropriate agent (typically the memory store agent). The system automatically handles classification, metadata extraction, and storage optimization.

Parameters:
  • content (str) โ€“ The memory content to store (text, structured data, etc.)

  • namespace (tuple[str, Ellipsis] | None) โ€“ Optional namespace tuple for organizing memories (e.g., (โ€œuserโ€, โ€œworkโ€))

Returns:

Success message with storage details or error message

Return type:

str

Examples

Basic memory storage:

result = await coordinator.store_memory(
    "I learned about machine learning algorithms today"
)
print(result)  # "Memory stored successfully: {...}"

Memory with namespace:

result = await coordinator.store_memory(
    "Completed project milestone: API integration",
    namespace=("user", "work", "projects")
)

Structured memory storage:

result = await coordinator.store_memory(
    json.dumps({
        "event": "meeting",
        "participants": ["Alice", "Bob"],
        "decisions": ["Use React for frontend", "Deploy on AWS"]
    }),
    namespace=("user", "work", "meetings")
)

Note

The system automatically classifies the memory type, extracts metadata, and updates relevant knowledge graphs based on the content.