agents.memory.agentic_rag_coordinator

Agentic RAG Coordinator for Memory System.

This module provides an intelligent coordinator that selects and combines multiple retrieval strategies based on query analysis and context.

Classes

AgenticRAGCoordinator

Intelligent coordinator that selects and combines retrieval strategies for optimal memory retrieval.

AgenticRAGCoordinatorConfig

Configuration for Agentic RAG Coordinator with intelligent strategy selection.

AgenticRAGResult

Comprehensive result from agentic RAG coordinator with performance metrics and analysis.

RetrievalStrategy

Represents a retrieval strategy with its configuration and performance characteristics.

Module Contents

class agents.memory.agentic_rag_coordinator.AgenticRAGCoordinator(config)

Bases: haive.agents.simple.agent.SimpleAgent

Intelligent coordinator that selects and combines retrieval strategies for optimal memory retrieval.

The AgenticRAGCoordinator is an advanced memory retrieval agent that analyzes incoming queries and dynamically selects the most appropriate retrieval strategies from a comprehensive set of available options. It combines results from multiple strategies to provide comprehensive, diverse, and relevant memory retrieval.

Key Features:
  • Intelligent strategy selection based on query analysis

  • Multi-strategy parallel execution for comprehensive coverage

  • Advanced result fusion with diversity and coverage optimization

  • Performance monitoring and optimization

  • Adaptive strategy weighting based on query characteristics

  • Support for graph-based, semantic, temporal, and procedural retrieval

memory_store

Memory store manager for basic memory operations

classifier

Memory classifier for query analysis and intent detection

kg_generator

Optional knowledge graph generator for graph-based retrieval

enhanced_retriever_config

Configuration for enhanced vector retrieval

graph_rag_config

Configuration for graph-enhanced RAG retrieval

max_strategies

Maximum number of strategies to use per query

min_confidence_threshold

Minimum confidence score required to use a strategy

enable_strategy_combination

Whether to enable multi-strategy result fusion

fusion_method

Method used for combining results from multiple strategies

diversity_weight

Weight given to diversity in result ranking (0.0-1.0)

coverage_weight

Weight given to coverage in result ranking (0.0-1.0)

relevance_weight

Weight given to relevance in result ranking (0.0-1.0)

coordinator_llm

LLM runnable for strategy selection and coordination

retrievers

Dictionary of available retrieval components

strategies

Dictionary of available retrieval strategies

strategy_selection_prompt

Prompt template for strategy selection

result_fusion_prompt

Prompt template for result fusion

Examples

Basic coordinator usage:

# Create coordinator
coordinator = AgenticRAGCoordinator(config)

# Retrieve memories with intelligent strategy selection
result = await coordinator.retrieve_memories(
    "How do I deploy a web application with Docker?"
)

print(f"Retrieved {len(result.final_memories)} memories")
print(f"Used strategies: {result.selected_strategies}")
print(f"Processing time: {result.total_time_ms:.1f}ms")

# Access retrieved memories
for i, memory in enumerate(result.final_memories):
    score = result.final_scores[i]
    print(f"Memory {i+1}: {memory['content'][:100]}... (score: {score:.2f})")

Advanced retrieval with filtering:

# Retrieve with specific memory types and limits
result = await coordinator.retrieve_memories(
    query="What are the best practices for API security?",
    limit=5,
    memory_types=[MemoryType.SEMANTIC, MemoryType.PROCEDURAL],
    namespace=("user", "security")
)

# Analyze strategy performance
print(f"Strategy reasoning: {result.strategy_reasoning}")
print(f"Quality scores: diversity={result.diversity_score:.2f}, "
      f"coverage={result.coverage_score:.2f}, confidence={result.confidence_score:.2f}")

Performance analysis:

result = await coordinator.retrieve_memories("complex technical query")

# Analyze individual strategy performance
for strategy_name, strategy_data in result.strategy_results.items():
    memory_count = len(strategy_data.get('memories', []))
    exec_time = strategy_data.get('execution_time_ms', 0)
    print(f"{strategy_name}: {memory_count} memories in {exec_time:.1f}ms")

Forced strategy selection (for testing):

# Force specific strategies for testing or optimization
result = await coordinator.retrieve_memories(
    query="test query",
    force_strategies=["enhanced_similarity", "graph_traversal"]
)

print(f"Forced strategies: {result.selected_strategies}")

Note

The coordinator automatically selects the best strategies based on query analysis, but can be configured with custom weights and thresholds for different use cases and performance requirements.

Initialize the Agentic RAG Coordinator with comprehensive strategy setup.

Sets up the coordinator with all necessary components including retrievers, strategies, and prompts for intelligent memory retrieval coordination.

Parameters:

config (AgenticRAGCoordinatorConfig) – AgenticRAGCoordinatorConfig containing all coordinator settings

Examples

Basic initialization:

config = AgenticRAGCoordinatorConfig(
    name="my_coordinator",
    memory_store_manager=store_manager,
    memory_classifier=classifier,
    kg_generator=kg_generator
)

coordinator = AgenticRAGCoordinator(config)

Advanced initialization with custom settings:

config = AgenticRAGCoordinatorConfig(
    name="advanced_coordinator",
    memory_store_manager=store_manager,
    memory_classifier=classifier,
    kg_generator=kg_generator,
    enhanced_retriever_config=enhanced_config,
    graph_rag_config=graph_config,
    max_strategies=3,
    min_confidence_threshold=0.7,
    enable_strategy_combination=True,
    coordinator_llm=AugLLMConfig(model="gpt-4", temperature=0.2),
    fusion_method="weighted_rank",
    diversity_weight=0.25,
    coverage_weight=0.35,
    relevance_weight=0.4
)

coordinator = AgenticRAGCoordinator(config)

Note

The coordinator automatically sets up all available retrieval strategies based on the provided configuration. Optional components (like graph RAG) will only be available if their configurations are provided.

async retrieve_memories(query, limit=None, memory_types=None, namespace=None, force_strategies=None)

Retrieve memories using intelligent strategy coordination and result fusion.

This method is the core of the agentic RAG coordinator. It analyzes the query, selects appropriate retrieval strategies, executes them in parallel, and fuses the results to provide comprehensive, diverse, and relevant memory retrieval.

Parameters:
  • query (str) – Natural language query for memory retrieval

  • limit (int | None) – Maximum number of memories to return (default: 10)

  • memory_types (list[haive.agents.memory.core.types.MemoryType] | None) – Specific memory types to focus on (optional filtering)

  • namespace (tuple[str, Ellipsis] | None) – Memory namespace to search within (optional scoping)

  • force_strategies (list[str] | None) – Force specific strategies for testing or optimization

Returns:

Comprehensive result with memories, strategy info, and metrics

Return type:

AgenticRAGResult

Examples

Basic memory retrieval:

result = await coordinator.retrieve_memories(
    "How do I deploy a web application?"
)

print(f"Retrieved {len(result.final_memories)} memories")
print(f"Used strategies: {result.selected_strategies}")
print(f"Processing time: {result.total_time_ms:.1f}ms")

# Access retrieved memories
for i, memory in enumerate(result.final_memories):
    score = result.final_scores[i]
    print(f"Memory {i+1}: {memory['content'][:100]}... (score: {score:.2f})")

Advanced retrieval with filtering:

result = await coordinator.retrieve_memories(
    query="What are machine learning best practices?",
    limit=5,
    memory_types=[MemoryType.SEMANTIC, MemoryType.PROCEDURAL],
    namespace=("user", "ml", "practices")
)

# Analyze strategy performance
print(f"Strategy reasoning: {result.strategy_reasoning}")
print(f"Quality scores:")
print(f"  Diversity: {result.diversity_score:.2f}")
print(f"  Coverage: {result.coverage_score:.2f}")
print(f"  Confidence: {result.confidence_score:.2f}")

Performance analysis:

result = await coordinator.retrieve_memories("complex technical query")

# Analyze individual strategy performance
for strategy_name, strategy_data in result.strategy_results.items():
    memory_count = len(strategy_data.get('memories', []))
    exec_time = strategy_data.get('execution_time_ms', 0)
    print(f"{strategy_name}: {memory_count} memories in {exec_time:.1f}ms")

# Overall performance metrics
print(f"Total execution time: {result.total_time_ms:.1f}ms")
print(f"Average memory score: {sum(result.final_scores) / len(result.final_scores):.2f}")

Forced strategy selection (for testing):

result = await coordinator.retrieve_memories(
    query="test query",
    force_strategies=["enhanced_similarity", "graph_traversal"]
)

print(f"Forced strategies: {result.selected_strategies}")
assert result.selected_strategies == ["enhanced_similarity", "graph_traversal"]

Note

The coordinator automatically analyzes the query to select the most appropriate strategies. It considers query complexity, memory types needed, temporal scope, and other factors to optimize retrieval performance and quality.

async run(user_input)

Main execution method for the Agentic RAG Coordinator with comprehensive reporting.

This method serves as the primary interface for the coordinator, accepting natural language queries and returning formatted results with detailed performance metrics and strategy information.

Parameters:

user_input (str) – Natural language query for memory retrieval

Returns:

Formatted response with retrieval results, strategy information, and performance metrics

Return type:

str

Examples

Basic query execution:

coordinator = AgenticRAGCoordinator(config)

response = await coordinator.run(
    "How do I deploy a Docker container to AWS?"
)

print(response)
# Output:
# Retrieved 8 memories using 2 strategies:
# - Strategies: enhanced_similarity, procedural_search
# - Total time: 1247.3ms
# - Quality scores: Diversity=0.78, Coverage=0.85, Confidence=0.82
# - Strategy reasoning: Selected enhanced similarity for factual content and procedural search for deployment steps
#
# Top memories:
# 1. [0.91] Docker deployment to AWS ECS requires proper image tagging and service configuration...
# 2. [0.87] AWS deployment best practices include using IAM roles, security groups, and monitoring...
# 3. [0.84] Container orchestration with ECS involves task definitions, services, and load balancers...

Complex query with multiple strategies:

response = await coordinator.run(
    "What are the relationships between machine learning algorithms and their applications in healthcare?"
)

print(response)
# Output shows graph traversal strategy was used for relationship discovery
# along with enhanced similarity for semantic content

Error handling:

response = await coordinator.run("complex query that might fail")

# Response will include error information if retrieval fails
# but still provide a graceful user experience

Note

The response format includes: - Number of memories retrieved and strategies used - Processing time and quality scores - Strategy selection reasoning - Top 3 memories with relevance scores - Graceful error handling for failed retrievals

class agents.memory.agentic_rag_coordinator.AgenticRAGCoordinatorConfig(/, **data)

Bases: pydantic.BaseModel

Configuration for Agentic RAG Coordinator with intelligent strategy selection.

This configuration class defines all parameters needed to create and configure an AgenticRAGCoordinator, including core components, retrieval strategies, coordination settings, and result fusion parameters.

Parameters:

data (Any)

name

Unique identifier for the coordinator instance

memory_store_manager

Manager for memory storage and retrieval operations

memory_classifier

Classifier for analyzing query intent and memory types

kg_generator

Optional knowledge graph generator for graph-based retrieval

enhanced_retriever_config

Configuration for enhanced vector retrieval

graph_rag_config

Configuration for graph-enhanced RAG retrieval

max_strategies

Maximum number of strategies to use per query

min_confidence_threshold

Minimum confidence score required to use a strategy

enable_strategy_combination

Whether to enable multi-strategy result fusion

coordinator_llm

LLM configuration for strategy selection and coordination

fusion_method

Method used for combining results from multiple strategies

diversity_weight

Weight given to diversity in result ranking (0.0-1.0)

coverage_weight

Weight given to coverage in result ranking (0.0-1.0)

relevance_weight

Weight given to relevance in result ranking (0.0-1.0)

Examples

Basic configuration:

config = AgenticRAGCoordinatorConfig(
    name="my_rag_coordinator",
    memory_store_manager=store_manager,
    memory_classifier=classifier,
    kg_generator=kg_generator,
    max_strategies=2,
    min_confidence_threshold=0.6
)

Advanced configuration with custom retrieval components:

config = AgenticRAGCoordinatorConfig(
    name="advanced_rag_coordinator",
    memory_store_manager=store_manager,
    memory_classifier=classifier,
    kg_generator=kg_generator,

    # Enhanced retrieval configuration
    enhanced_retriever_config=EnhancedRetrieverConfig(
        enable_query_expansion=True,
        importance_boost=0.2
    ),

    # Graph RAG configuration
    graph_rag_config=GraphRAGRetrieverConfig(
        enable_graph_traversal=True,
        max_graph_depth=3,
        min_confidence_threshold=0.6
    ),

    # Coordination settings
    max_strategies=3,
    min_confidence_threshold=0.5,
    enable_strategy_combination=True,

    # LLM configuration
    coordinator_llm=AugLLMConfig(
        model="gpt-4",
        temperature=0.2,
        max_tokens=1000
    ),

    # Result fusion weights
    fusion_method="weighted_rank",
    diversity_weight=0.25,
    coverage_weight=0.35,
    relevance_weight=0.4
)

Performance-optimized configuration:

config = AgenticRAGCoordinatorConfig(
    name="fast_rag_coordinator",
    memory_store_manager=store_manager,
    memory_classifier=classifier,

    # Single strategy for speed
    max_strategies=1,
    min_confidence_threshold=0.7,
    enable_strategy_combination=False,

    # Fast LLM configuration
    coordinator_llm=AugLLMConfig(
        model="gpt-3.5-turbo",
        temperature=0.1,
        max_tokens=500
    ),

    # Relevance-focused fusion
    fusion_method="relevance_only",
    diversity_weight=0.0,
    coverage_weight=0.0,
    relevance_weight=1.0
)

Note

The weights for diversity, coverage, and relevance should sum to 1.0 for optimal result fusion. The coordinator will normalize them if needed.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class agents.memory.agentic_rag_coordinator.AgenticRAGResult(/, **data)

Bases: pydantic.BaseModel

Comprehensive result from agentic RAG coordinator with performance metrics and analysis.

This class encapsulates all information from an agentic RAG retrieval operation, including the retrieved memories, strategy details, performance metrics, and quality scores for analysis and optimization.

Parameters:

data (Any)

query

Original user query that was processed

selected_strategies

List of strategy names that were selected and executed

strategy_results

Detailed results from each individual strategy

final_memories

Final ranked and deduplicated memories after strategy fusion

final_scores

Relevance scores for each memory in final_memories

total_time_ms

Total processing time for the entire operation

strategy_times

Execution time for each individual strategy

query_analysis

Detailed analysis of the query (intent, complexity, etc.)

strategy_reasoning

Explanation of why specific strategies were selected

diversity_score

Measure of diversity in the retrieved memories (0.0-1.0)

coverage_score

Measure of how well results cover query aspects (0.0-1.0)

confidence_score

Overall confidence in the result quality (0.0-1.0)

Examples

Accessing retrieval results:

result = await coordinator.retrieve_memories("How do I deploy web apps?")

print(f"Query: {result.query}")
print(f"Strategies used: {result.selected_strategies}")
print(f"Total memories: {len(result.final_memories)}")
print(f"Processing time: {result.total_time_ms:.1f}ms")

# Access individual memories
for i, memory in enumerate(result.final_memories):
    score = result.final_scores[i] if i < len(result.final_scores) else 0.0
    print(f"Memory {i+1}: {memory['content'][:100]}... (score: {score:.2f})")

Analyzing strategy performance:

result = await coordinator.retrieve_memories("complex query")

print(f"Strategy reasoning: {result.strategy_reasoning}")
print(f"Quality scores:")
print(f"  Diversity: {result.diversity_score:.2f}")
print(f"  Coverage: {result.coverage_score:.2f}")
print(f"  Confidence: {result.confidence_score:.2f}")

# Strategy timing analysis
for strategy, time_ms in result.strategy_times.items():
    print(f"  {strategy}: {time_ms:.1f}ms")

Query analysis details:

result = await coordinator.retrieve_memories("What did I learn about ML?")

if result.query_analysis:
    analysis = result.query_analysis
    print(f"Memory types needed: {analysis.get('memory_types', [])}")
    print(f"Query complexity: {analysis.get('complexity', 'unknown')}")
    print(f"Requires reasoning: {analysis.get('requires_reasoning', False)}")
    print(f"Entities: {analysis.get('entities', [])}")
    print(f"Topics: {analysis.get('topics', [])}")

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

class agents.memory.agentic_rag_coordinator.RetrievalStrategy(/, **data)

Bases: pydantic.BaseModel

Represents a retrieval strategy with its configuration and performance characteristics.

A retrieval strategy defines how to retrieve memories for specific types of queries, including the strategy’s strengths, supported memory types, and performance parameters.

Parameters:

data (Any)

name

Unique identifier for the strategy (e.g., “enhanced_similarity”)

description

Human-readable description of what the strategy does

best_for

List of query types this strategy excels at handling

memory_types

List of memory types this strategy is optimized for

confidence_threshold

Minimum confidence score required to use this strategy

typical_latency_ms

Expected response time in milliseconds for this strategy

max_results

Maximum number of results this strategy can return

parameters

Strategy-specific configuration parameters

Examples

Creating an enhanced similarity strategy:

strategy = RetrievalStrategy(
    name="enhanced_similarity",
    description="Multi-factor similarity search with memory type awareness",
    best_for=["factual_queries", "recent_events", "personal_information"],
    memory_types=[MemoryType.SEMANTIC, MemoryType.EPISODIC],
    confidence_threshold=0.6,
    typical_latency_ms=300,
    max_results=15,
    parameters={"enable_query_expansion": True, "importance_boost": 0.2}
)

Creating a graph traversal strategy:

strategy = RetrievalStrategy(
    name="graph_traversal",
    description="Knowledge graph traversal for relationship discovery",
    best_for=["relationship_queries", "entity_connections"],
    memory_types=[MemoryType.CONTEXTUAL, MemoryType.SEMANTIC],
    confidence_threshold=0.7,
    typical_latency_ms=800,
    max_results=12,
    parameters={"max_depth": 3, "min_confidence": 0.6}
)

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.