agents.memory.agentic_rag_coordinator¶
Agentic RAG Coordinator for Memory System.
This module provides an intelligent coordinator that selects and combines multiple retrieval strategies based on query analysis and context.
Classes¶
Intelligent coordinator that selects and combines retrieval strategies for optimal memory retrieval. |
|
Configuration for Agentic RAG Coordinator with intelligent strategy selection. |
|
Comprehensive result from agentic RAG coordinator with performance metrics and analysis. |
|
Represents a retrieval strategy with its configuration and performance characteristics. |
Module Contents¶
- class agents.memory.agentic_rag_coordinator.AgenticRAGCoordinator(config)¶
Bases:
haive.agents.simple.agent.SimpleAgent
Intelligent coordinator that selects and combines retrieval strategies for optimal memory retrieval.
The AgenticRAGCoordinator is an advanced memory retrieval agent that analyzes incoming queries and dynamically selects the most appropriate retrieval strategies from a comprehensive set of available options. It combines results from multiple strategies to provide comprehensive, diverse, and relevant memory retrieval.
- Key Features:
Intelligent strategy selection based on query analysis
Multi-strategy parallel execution for comprehensive coverage
Advanced result fusion with diversity and coverage optimization
Performance monitoring and optimization
Adaptive strategy weighting based on query characteristics
Support for graph-based, semantic, temporal, and procedural retrieval
- memory_store¶
Memory store manager for basic memory operations
- classifier¶
Memory classifier for query analysis and intent detection
- kg_generator¶
Optional knowledge graph generator for graph-based retrieval
- enhanced_retriever_config¶
Configuration for enhanced vector retrieval
- graph_rag_config¶
Configuration for graph-enhanced RAG retrieval
- max_strategies¶
Maximum number of strategies to use per query
- min_confidence_threshold¶
Minimum confidence score required to use a strategy
- enable_strategy_combination¶
Whether to enable multi-strategy result fusion
- fusion_method¶
Method used for combining results from multiple strategies
- diversity_weight¶
Weight given to diversity in result ranking (0.0-1.0)
- coverage_weight¶
Weight given to coverage in result ranking (0.0-1.0)
- relevance_weight¶
Weight given to relevance in result ranking (0.0-1.0)
- coordinator_llm¶
LLM runnable for strategy selection and coordination
- retrievers¶
Dictionary of available retrieval components
- strategies¶
Dictionary of available retrieval strategies
- strategy_selection_prompt¶
Prompt template for strategy selection
- result_fusion_prompt¶
Prompt template for result fusion
Examples
Basic coordinator usage:
# Create coordinator coordinator = AgenticRAGCoordinator(config) # Retrieve memories with intelligent strategy selection result = await coordinator.retrieve_memories( "How do I deploy a web application with Docker?" ) print(f"Retrieved {len(result.final_memories)} memories") print(f"Used strategies: {result.selected_strategies}") print(f"Processing time: {result.total_time_ms:.1f}ms") # Access retrieved memories for i, memory in enumerate(result.final_memories): score = result.final_scores[i] print(f"Memory {i+1}: {memory['content'][:100]}... (score: {score:.2f})")
Advanced retrieval with filtering:
# Retrieve with specific memory types and limits result = await coordinator.retrieve_memories( query="What are the best practices for API security?", limit=5, memory_types=[MemoryType.SEMANTIC, MemoryType.PROCEDURAL], namespace=("user", "security") ) # Analyze strategy performance print(f"Strategy reasoning: {result.strategy_reasoning}") print(f"Quality scores: diversity={result.diversity_score:.2f}, " f"coverage={result.coverage_score:.2f}, confidence={result.confidence_score:.2f}")
Performance analysis:
result = await coordinator.retrieve_memories("complex technical query") # Analyze individual strategy performance for strategy_name, strategy_data in result.strategy_results.items(): memory_count = len(strategy_data.get('memories', [])) exec_time = strategy_data.get('execution_time_ms', 0) print(f"{strategy_name}: {memory_count} memories in {exec_time:.1f}ms")
Forced strategy selection (for testing):
# Force specific strategies for testing or optimization result = await coordinator.retrieve_memories( query="test query", force_strategies=["enhanced_similarity", "graph_traversal"] ) print(f"Forced strategies: {result.selected_strategies}")
Note
The coordinator automatically selects the best strategies based on query analysis, but can be configured with custom weights and thresholds for different use cases and performance requirements.
Initialize the Agentic RAG Coordinator with comprehensive strategy setup.
Sets up the coordinator with all necessary components including retrievers, strategies, and prompts for intelligent memory retrieval coordination.
- Parameters:
config (AgenticRAGCoordinatorConfig) – AgenticRAGCoordinatorConfig containing all coordinator settings
Examples
Basic initialization:
config = AgenticRAGCoordinatorConfig( name="my_coordinator", memory_store_manager=store_manager, memory_classifier=classifier, kg_generator=kg_generator ) coordinator = AgenticRAGCoordinator(config)
Advanced initialization with custom settings:
config = AgenticRAGCoordinatorConfig( name="advanced_coordinator", memory_store_manager=store_manager, memory_classifier=classifier, kg_generator=kg_generator, enhanced_retriever_config=enhanced_config, graph_rag_config=graph_config, max_strategies=3, min_confidence_threshold=0.7, enable_strategy_combination=True, coordinator_llm=AugLLMConfig(model="gpt-4", temperature=0.2), fusion_method="weighted_rank", diversity_weight=0.25, coverage_weight=0.35, relevance_weight=0.4 ) coordinator = AgenticRAGCoordinator(config)
Note
The coordinator automatically sets up all available retrieval strategies based on the provided configuration. Optional components (like graph RAG) will only be available if their configurations are provided.
- async retrieve_memories(query, limit=None, memory_types=None, namespace=None, force_strategies=None)¶
Retrieve memories using intelligent strategy coordination and result fusion.
This method is the core of the agentic RAG coordinator. It analyzes the query, selects appropriate retrieval strategies, executes them in parallel, and fuses the results to provide comprehensive, diverse, and relevant memory retrieval.
- Parameters:
query (str) – Natural language query for memory retrieval
limit (int | None) – Maximum number of memories to return (default: 10)
memory_types (list[haive.agents.memory.core.types.MemoryType] | None) – Specific memory types to focus on (optional filtering)
namespace (tuple[str, Ellipsis] | None) – Memory namespace to search within (optional scoping)
force_strategies (list[str] | None) – Force specific strategies for testing or optimization
- Returns:
Comprehensive result with memories, strategy info, and metrics
- Return type:
Examples
Basic memory retrieval:
result = await coordinator.retrieve_memories( "How do I deploy a web application?" ) print(f"Retrieved {len(result.final_memories)} memories") print(f"Used strategies: {result.selected_strategies}") print(f"Processing time: {result.total_time_ms:.1f}ms") # Access retrieved memories for i, memory in enumerate(result.final_memories): score = result.final_scores[i] print(f"Memory {i+1}: {memory['content'][:100]}... (score: {score:.2f})")
Advanced retrieval with filtering:
result = await coordinator.retrieve_memories( query="What are machine learning best practices?", limit=5, memory_types=[MemoryType.SEMANTIC, MemoryType.PROCEDURAL], namespace=("user", "ml", "practices") ) # Analyze strategy performance print(f"Strategy reasoning: {result.strategy_reasoning}") print(f"Quality scores:") print(f" Diversity: {result.diversity_score:.2f}") print(f" Coverage: {result.coverage_score:.2f}") print(f" Confidence: {result.confidence_score:.2f}")
Performance analysis:
result = await coordinator.retrieve_memories("complex technical query") # Analyze individual strategy performance for strategy_name, strategy_data in result.strategy_results.items(): memory_count = len(strategy_data.get('memories', [])) exec_time = strategy_data.get('execution_time_ms', 0) print(f"{strategy_name}: {memory_count} memories in {exec_time:.1f}ms") # Overall performance metrics print(f"Total execution time: {result.total_time_ms:.1f}ms") print(f"Average memory score: {sum(result.final_scores) / len(result.final_scores):.2f}")
Forced strategy selection (for testing):
result = await coordinator.retrieve_memories( query="test query", force_strategies=["enhanced_similarity", "graph_traversal"] ) print(f"Forced strategies: {result.selected_strategies}") assert result.selected_strategies == ["enhanced_similarity", "graph_traversal"]
Note
The coordinator automatically analyzes the query to select the most appropriate strategies. It considers query complexity, memory types needed, temporal scope, and other factors to optimize retrieval performance and quality.
- async run(user_input)¶
Main execution method for the Agentic RAG Coordinator with comprehensive reporting.
This method serves as the primary interface for the coordinator, accepting natural language queries and returning formatted results with detailed performance metrics and strategy information.
- Parameters:
user_input (str) – Natural language query for memory retrieval
- Returns:
Formatted response with retrieval results, strategy information, and performance metrics
- Return type:
Examples
Basic query execution:
coordinator = AgenticRAGCoordinator(config) response = await coordinator.run( "How do I deploy a Docker container to AWS?" ) print(response) # Output: # Retrieved 8 memories using 2 strategies: # - Strategies: enhanced_similarity, procedural_search # - Total time: 1247.3ms # - Quality scores: Diversity=0.78, Coverage=0.85, Confidence=0.82 # - Strategy reasoning: Selected enhanced similarity for factual content and procedural search for deployment steps # # Top memories: # 1. [0.91] Docker deployment to AWS ECS requires proper image tagging and service configuration... # 2. [0.87] AWS deployment best practices include using IAM roles, security groups, and monitoring... # 3. [0.84] Container orchestration with ECS involves task definitions, services, and load balancers...
Complex query with multiple strategies:
response = await coordinator.run( "What are the relationships between machine learning algorithms and their applications in healthcare?" ) print(response) # Output shows graph traversal strategy was used for relationship discovery # along with enhanced similarity for semantic content
Error handling:
response = await coordinator.run("complex query that might fail") # Response will include error information if retrieval fails # but still provide a graceful user experience
Note
The response format includes: - Number of memories retrieved and strategies used - Processing time and quality scores - Strategy selection reasoning - Top 3 memories with relevance scores - Graceful error handling for failed retrievals
- class agents.memory.agentic_rag_coordinator.AgenticRAGCoordinatorConfig(/, **data)¶
Bases:
pydantic.BaseModel
Configuration for Agentic RAG Coordinator with intelligent strategy selection.
This configuration class defines all parameters needed to create and configure an AgenticRAGCoordinator, including core components, retrieval strategies, coordination settings, and result fusion parameters.
- Parameters:
data (Any)
- name¶
Unique identifier for the coordinator instance
- memory_store_manager¶
Manager for memory storage and retrieval operations
- memory_classifier¶
Classifier for analyzing query intent and memory types
- kg_generator¶
Optional knowledge graph generator for graph-based retrieval
- enhanced_retriever_config¶
Configuration for enhanced vector retrieval
- graph_rag_config¶
Configuration for graph-enhanced RAG retrieval
- max_strategies¶
Maximum number of strategies to use per query
- min_confidence_threshold¶
Minimum confidence score required to use a strategy
- enable_strategy_combination¶
Whether to enable multi-strategy result fusion
- coordinator_llm¶
LLM configuration for strategy selection and coordination
- fusion_method¶
Method used for combining results from multiple strategies
- diversity_weight¶
Weight given to diversity in result ranking (0.0-1.0)
- coverage_weight¶
Weight given to coverage in result ranking (0.0-1.0)
- relevance_weight¶
Weight given to relevance in result ranking (0.0-1.0)
Examples
Basic configuration:
config = AgenticRAGCoordinatorConfig( name="my_rag_coordinator", memory_store_manager=store_manager, memory_classifier=classifier, kg_generator=kg_generator, max_strategies=2, min_confidence_threshold=0.6 )
Advanced configuration with custom retrieval components:
config = AgenticRAGCoordinatorConfig( name="advanced_rag_coordinator", memory_store_manager=store_manager, memory_classifier=classifier, kg_generator=kg_generator, # Enhanced retrieval configuration enhanced_retriever_config=EnhancedRetrieverConfig( enable_query_expansion=True, importance_boost=0.2 ), # Graph RAG configuration graph_rag_config=GraphRAGRetrieverConfig( enable_graph_traversal=True, max_graph_depth=3, min_confidence_threshold=0.6 ), # Coordination settings max_strategies=3, min_confidence_threshold=0.5, enable_strategy_combination=True, # LLM configuration coordinator_llm=AugLLMConfig( model="gpt-4", temperature=0.2, max_tokens=1000 ), # Result fusion weights fusion_method="weighted_rank", diversity_weight=0.25, coverage_weight=0.35, relevance_weight=0.4 )
Performance-optimized configuration:
config = AgenticRAGCoordinatorConfig( name="fast_rag_coordinator", memory_store_manager=store_manager, memory_classifier=classifier, # Single strategy for speed max_strategies=1, min_confidence_threshold=0.7, enable_strategy_combination=False, # Fast LLM configuration coordinator_llm=AugLLMConfig( model="gpt-3.5-turbo", temperature=0.1, max_tokens=500 ), # Relevance-focused fusion fusion_method="relevance_only", diversity_weight=0.0, coverage_weight=0.0, relevance_weight=1.0 )
Note
The weights for diversity, coverage, and relevance should sum to 1.0 for optimal result fusion. The coordinator will normalize them if needed.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class agents.memory.agentic_rag_coordinator.AgenticRAGResult(/, **data)¶
Bases:
pydantic.BaseModel
Comprehensive result from agentic RAG coordinator with performance metrics and analysis.
This class encapsulates all information from an agentic RAG retrieval operation, including the retrieved memories, strategy details, performance metrics, and quality scores for analysis and optimization.
- Parameters:
data (Any)
- query¶
Original user query that was processed
- selected_strategies¶
List of strategy names that were selected and executed
- strategy_results¶
Detailed results from each individual strategy
- final_memories¶
Final ranked and deduplicated memories after strategy fusion
- final_scores¶
Relevance scores for each memory in final_memories
- total_time_ms¶
Total processing time for the entire operation
- strategy_times¶
Execution time for each individual strategy
- query_analysis¶
Detailed analysis of the query (intent, complexity, etc.)
- strategy_reasoning¶
Explanation of why specific strategies were selected
- diversity_score¶
Measure of diversity in the retrieved memories (0.0-1.0)
- coverage_score¶
Measure of how well results cover query aspects (0.0-1.0)
- confidence_score¶
Overall confidence in the result quality (0.0-1.0)
Examples
Accessing retrieval results:
result = await coordinator.retrieve_memories("How do I deploy web apps?") print(f"Query: {result.query}") print(f"Strategies used: {result.selected_strategies}") print(f"Total memories: {len(result.final_memories)}") print(f"Processing time: {result.total_time_ms:.1f}ms") # Access individual memories for i, memory in enumerate(result.final_memories): score = result.final_scores[i] if i < len(result.final_scores) else 0.0 print(f"Memory {i+1}: {memory['content'][:100]}... (score: {score:.2f})")
Analyzing strategy performance:
result = await coordinator.retrieve_memories("complex query") print(f"Strategy reasoning: {result.strategy_reasoning}") print(f"Quality scores:") print(f" Diversity: {result.diversity_score:.2f}") print(f" Coverage: {result.coverage_score:.2f}") print(f" Confidence: {result.confidence_score:.2f}") # Strategy timing analysis for strategy, time_ms in result.strategy_times.items(): print(f" {strategy}: {time_ms:.1f}ms")
Query analysis details:
result = await coordinator.retrieve_memories("What did I learn about ML?") if result.query_analysis: analysis = result.query_analysis print(f"Memory types needed: {analysis.get('memory_types', [])}") print(f"Query complexity: {analysis.get('complexity', 'unknown')}") print(f"Requires reasoning: {analysis.get('requires_reasoning', False)}") print(f"Entities: {analysis.get('entities', [])}") print(f"Topics: {analysis.get('topics', [])}")
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- class agents.memory.agentic_rag_coordinator.RetrievalStrategy(/, **data)¶
Bases:
pydantic.BaseModel
Represents a retrieval strategy with its configuration and performance characteristics.
A retrieval strategy defines how to retrieve memories for specific types of queries, including the strategy’s strengths, supported memory types, and performance parameters.
- Parameters:
data (Any)
- name¶
Unique identifier for the strategy (e.g., “enhanced_similarity”)
- description¶
Human-readable description of what the strategy does
- best_for¶
List of query types this strategy excels at handling
- memory_types¶
List of memory types this strategy is optimized for
- confidence_threshold¶
Minimum confidence score required to use this strategy
- typical_latency_ms¶
Expected response time in milliseconds for this strategy
- max_results¶
Maximum number of results this strategy can return
- parameters¶
Strategy-specific configuration parameters
Examples
Creating an enhanced similarity strategy:
strategy = RetrievalStrategy( name="enhanced_similarity", description="Multi-factor similarity search with memory type awareness", best_for=["factual_queries", "recent_events", "personal_information"], memory_types=[MemoryType.SEMANTIC, MemoryType.EPISODIC], confidence_threshold=0.6, typical_latency_ms=300, max_results=15, parameters={"enable_query_expansion": True, "importance_boost": 0.2} )
Creating a graph traversal strategy:
strategy = RetrievalStrategy( name="graph_traversal", description="Knowledge graph traversal for relationship discovery", best_for=["relationship_queries", "entity_connections"], memory_types=[MemoryType.CONTEXTUAL, MemoryType.SEMANTIC], confidence_threshold=0.7, typical_latency_ms=800, max_results=12, parameters={"max_depth": 3, "min_confidence": 0.6} )
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.