agents.rag.db_rag.graph_db.agent¶
Graph Database RAG Agent implementation.
This module implements the main Graph Database RAG Agent that provides natural language querying capabilities for Neo4j databases. The agent uses a multi-step workflow to convert questions to Cypher queries, validate them, execute them, and generate natural language responses.
- The agent workflow consists of the following steps:
Domain Relevance Check: Validates if the query is within the configured domain
Query Generation: Converts natural language to Cypher using few-shot learning
Query Validation: Checks the Cypher query against the database schema
Query Correction: Fixes any errors found during validation
Query Execution: Runs the validated query against Neo4j
Answer Generation: Converts database results to natural language
Examples
Basic usage of the Graph DB RAG Agent:
>>> from haive.agents.rag.db_rag.graph_db import GraphDBRAGAgent, GraphDBRAGConfig
>>>
>>> # Configure the agent for a movie domain
>>> config = GraphDBRAGConfig(
... domain_name="movies",
... domain_categories=["movie", "actor", "director"],
... graph_db_config=GraphDBConfig(
... graph_db_uri="bolt://localhost:7687",
... graph_db_user="neo4j",
... graph_db_password="password"
... )
... )
>>>
>>> # Create and use the agent
>>> agent = GraphDBRAGAgent(config)
>>> result = agent.invoke({"question": "Who directed The Matrix?"})
>>> print(result["answer"])
The Wachowskis directed The Matrix.
Using the agent with streaming:
>>> # Stream the workflow execution
>>> for chunk in agent.stream({"question": "What are the top 5 rated movies?"}):
... if "answer" in chunk:
... print(chunk["answer"])
Note
The agent requires a connection to a Neo4j database and uses environment variables for configuration if not explicitly provided.
See also
GraphDBRAGConfig
: Configuration options for the agentOverallState
: State management during workflow executionhaive.agents.rag.db_rag.graph_db.engines
: LLM engines used by the agent
Classes¶
Graph Database RAG Agent for natural language querying of Neo4j databases. |
Functions¶
|
Check if a query is relevant to the specified domain. |
|
Correct a Cypher query based on provided errors. |
|
Route queries based on domain relevance. |
|
Execute a Cypher query against the database. |
|
Generate natural language answer from query results. |
|
Generate Cypher query from natural language. |
Set up the graph DB RAG workflow. |
|
|
Validate a Cypher query against database schema. |
|
Route based on validation results. |
Module Contents¶
- class agents.rag.db_rag.graph_db.agent.GraphDBRAGAgent(config=GraphDBRAGConfig())¶
Bases:
haive.core.engine.agent.agent.Agent
[haive.agents.rag.db_rag.graph_db.config.GraphDBRAGConfig
]Graph Database RAG Agent for natural language querying of Neo4j databases.
This agent implements a sophisticated workflow for converting natural language questions into Cypher queries, executing them against a Neo4j database, and generating human-readable responses. It includes domain validation, query validation, error correction, and result formatting.
The agent uses few-shot learning with domain-specific examples to improve query generation accuracy and includes robust error handling for common Cypher mistakes.
- config¶
Configuration object containing all settings.
- Type:
- graph_db¶
Connected Neo4j database instance.
- Type:
Neo4jGraph
- graph_db_enhanced_schema¶
Enhanced schema information from the database.
- graph_db_structured_schema¶
Structured schema for relationship validation.
- corrector_schema¶
Schema used for correcting relationship directions.
- cypher_query_corrector¶
Utility for fixing common Cypher errors.
- example_selector¶
Semantic similarity selector for few-shot examples.
Examples
Creating and using the agent:
>>> # Create agent with minimal config >>> agent = GraphDBRAGAgent() >>> >>> # Query the database >>> result = agent.invoke({ ... "question": "What movies has Tom Hanks acted in?" ... }) >>> print(f"Answer: {result['answer']}") >>> print(f"Cypher used: {result['cypher_statement']}") >>> # Use with custom domain >>> config = GraphDBRAGConfig( ... domain_name="healthcare", ... domain_categories=["patient", "doctor", "medication"] ... ) >>> healthcare_agent = GraphDBRAGAgent(config)
Note
The agent automatically sets up the workflow graph upon initialization. All node functions return Command objects for state updates and routing.
Initialize the Graph DB RAG Agent.
Sets up the Neo4j connection, schema information, example selector, and workflow graph. Handles initialization errors gracefully with appropriate logging.
- Parameters:
config (haive.agents.rag.db_rag.graph_db.config.GraphDBRAGConfig) â Configuration object. Defaults to GraphDBRAGConfig() which uses environment variables for Neo4j connection.
- Raises:
ValueError â If Neo4j connection cannot be established.
Exception â For other initialization errors.
Examples
>>> # Using default config (from environment) >>> agent = GraphDBRAGAgent()
>>> # Using custom config >>> custom_config = GraphDBRAGConfig( ... domain_name="movies", ... graph_db_config=GraphDBConfig( ... graph_db_uri="bolt://localhost:7687" ... ) ... ) >>> agent = GraphDBRAGAgent(custom_config)
- check_domain_relevance(state)¶
Check if the userâs question is relevant to the configured domain.
This is the first step in the workflow. It uses the guardrails engine to determine if the question should be processed or rejected as out-of-domain.
- Parameters:
state (haive.agents.rag.db_rag.graph_db.state.OverallState) â Current workflow state containing the userâs question.
- Returns:
next_action: âendâ if out-of-domain, otherwise continue
database_records: Error message if out-of-domain
steps: Updated with âcheck_domain_relevanceâ
- Return type:
Command object with updates
Examples
>>> state = OverallState(question="What's the weather like?") >>> command = agent.check_domain_relevance(state) >>> # For a movie domain agent, this would return: >>> # Command(update={"next_action": "end", ...})
Note
This node acts as a guardrail to prevent processing of irrelevant queries, saving computational resources and improving accuracy.
- correct_query(state)¶
Correct errors in the Cypher query based on validation feedback.
Uses the correct_cypher engine to fix identified errors and produce a valid query that matches the database schema.
- Parameters:
state (haive.agents.rag.db_rag.graph_db.state.OverallState) â Current state containing the invalid query and errors.
- Returns:
next_action: âvalidate_queryâ (to re-validate)
cypher_statement: The corrected Cypher query
steps: Updated with âcorrect_queryâ
- Return type:
Command object with updates
Examples
>>> state = OverallState( ... cypher_statement="MATCH (p:Actor)-[:DIRECTED]->(m:Film) RETURN p.name", ... cypher_errors=["Label 'Film' does not exist, use 'Movie'"] ... ) >>> command = agent.correct_query(state) >>> print(command.update["cypher_statement"]) MATCH (p:Person)-[:DIRECTED]->(m:Movie) RETURN p.name
Note
The corrected query is sent back to validation to ensure all errors are resolved.
- domain_router(state)¶
Route based on domain relevance check result.
- Parameters:
state (haive.agents.rag.db_rag.graph_db.state.OverallState) â Current state with next_action field.
- Returns:
Next node name - END if out-of-domain, âgenerate_queryâ otherwise.
- Return type:
Note
This is used as a conditional edge function in the workflow graph.
- execute_query(state)¶
Execute the validated Cypher query against the Neo4j database.
Runs the query and captures the results for answer generation. Handles empty results gracefully.
- Parameters:
state (haive.agents.rag.db_rag.graph_db.state.OverallState) â Current state containing the validated Cypher statement.
- Returns:
database_records: Query results or âNo results foundâ
next_action: âgenerate_answerâ
steps: Updated with âexecute_queryâ
- Return type:
Command object with updates
Examples
>>> state = OverallState( ... cypher_statement="MATCH (m:Movie) RETURN m.title LIMIT 3" ... ) >>> command = agent.execute_query(state) >>> print(command.update["database_records"]) [{"m.title": "The Matrix"}, {"m.title": "Inception"}, ...]
Note
The query is executed with proper sanitization and timeout settings configured in the Neo4j connection.
- generate_answer(state)¶
Generate a natural language answer from the query results.
Uses the generate_final_answer engine to convert database records into a human-friendly response that directly answers the question.
- Parameters:
state (haive.agents.rag.db_rag.graph_db.state.OverallState) â Current state containing question and database results.
- Returns:
answer: The natural language response
next_action: âendâ
steps: Updated with âgenerate_answerâ
- Return type:
Command object with updates
Examples
>>> state = OverallState( ... question="Who directed The Matrix?", ... database_records=[{"p.name": "Lana Wachowski"}, {"p.name": "Lilly Wachowski"}] ... ) >>> command = agent.generate_answer(state) >>> print(command.update["answer"]) The Matrix was directed by Lana Wachowski and Lilly Wachowski.
Note
The engine is prompted to provide direct, conversational answers without mentioning the database or technical details.
- generate_query(state)¶
Generate a Cypher query from the natural language question.
Uses the text2cypher engine with few-shot examples to convert the userâs question into a valid Cypher query for the database schema.
- Parameters:
state (haive.agents.rag.db_rag.graph_db.state.OverallState) â Current state containing the userâs question.
- Returns:
cypher_statement: The generated Cypher query
steps: Updated with âgenerate_queryâ
- Return type:
Command object with updates
Examples
>>> state = OverallState(question="Who directed Inception?") >>> command = agent.generate_query(state) >>> print(command.update["cypher_statement"]) MATCH (p:Person)-[:DIRECTED]->(m:Movie {title: 'Inception'}) RETURN p.name
Note
The quality of generation depends heavily on the provided examples and their similarity to the userâs question.
- setup_workflow()¶
Set up the complete Graph DB RAG workflow.
Configures the workflow graph with all nodes and edges, including conditional routing based on validation results. This method is called automatically during agent initialization.
The workflow structure:
START â check_domain_relevance â (conditional) generate_query â â â â â â â validate_query â â (conditional) â correct_query â â â â ââ â execute_query â generate_answer â END
Note
The workflow includes loops for query correction and multiple exit points for error handling.
- Return type:
None
- validate_query(state)¶
Validate the generated Cypher query against the database schema.
Checks for syntax errors, schema mismatches, and logical issues in the generated query. Routes to correction if errors are found.
- Parameters:
state (haive.agents.rag.db_rag.graph_db.state.OverallState) â Current state containing the Cypher statement to validate.
- Returns:
next_action: âcorrect_cypherâ if errors, âexecute_queryâ if valid
cypher_errors: List of validation errors (if any)
steps: Updated with âvalidate_queryâ
- Return type:
Command object with updates
Examples
>>> state = OverallState( ... cypher_statement="MATCH (p:Actor)-[:DIRECTED]->(m:Film) RETURN p.name" ... ) >>> command = agent.validate_query(state) >>> # Would return errors about "Film" label and "Actor" directing
Note
Validation checks include label existence, property names, relationship types, and query completeness.
- validation_router(state)¶
Route based on query validation result.
- Parameters:
state (haive.agents.rag.db_rag.graph_db.state.OverallState) â Current state with next_action field.
- Returns:
Next node name - âcorrect_queryâ, âexecute_queryâ, or END.
- Return type:
Note
This is used as a conditional edge function in the workflow graph.
- agents.rag.db_rag.graph_db.agent.check_domain_relevance(query, domain_categories=None)¶
Check if a query is relevant to the specified domain.
- agents.rag.db_rag.graph_db.agent.correct_query(query, errors=None)¶
Correct a Cypher query based on provided errors.
- agents.rag.db_rag.graph_db.agent.domain_router(query, domain_categories=None)¶
Route queries based on domain relevance.
- agents.rag.db_rag.graph_db.agent.execute_query(query, db_connection=None)¶
Execute a Cypher query against the database.
- agents.rag.db_rag.graph_db.agent.generate_answer(query_results, original_query='')¶
Generate natural language answer from query results.
- agents.rag.db_rag.graph_db.agent.generate_query(natural_language_query)¶
Generate Cypher query from natural language.
- agents.rag.db_rag.graph_db.agent.setup_workflow()¶
Set up the graph DB RAG workflow.
- agents.rag.db_rag.graph_db.agent.validate_query(query, schema=None)¶
Validate a Cypher query against database schema.