agents.rag.db_rag.graph_db.agent¶

Graph Database RAG Agent implementation.

This module implements the main Graph Database RAG Agent that provides natural language querying capabilities for Neo4j databases. The agent uses a multi-step workflow to convert questions to Cypher queries, validate them, execute them, and generate natural language responses.

The agent workflow consists of the following steps:
  1. Domain Relevance Check: Validates if the query is within the configured domain

  2. Query Generation: Converts natural language to Cypher using few-shot learning

  3. Query Validation: Checks the Cypher query against the database schema

  4. Query Correction: Fixes any errors found during validation

  5. Query Execution: Runs the validated query against Neo4j

  6. Answer Generation: Converts database results to natural language

Examples

Basic usage of the Graph DB RAG Agent:

>>> from haive.agents.rag.db_rag.graph_db import GraphDBRAGAgent, GraphDBRAGConfig
>>>
>>> # Configure the agent for a movie domain
>>> config = GraphDBRAGConfig(
...     domain_name="movies",
...     domain_categories=["movie", "actor", "director"],
...     graph_db_config=GraphDBConfig(
...         graph_db_uri="bolt://localhost:7687",
...         graph_db_user="neo4j",
...         graph_db_password="password"
...     )
... )
>>>
>>> # Create and use the agent
>>> agent = GraphDBRAGAgent(config)
>>> result = agent.invoke({"question": "Who directed The Matrix?"})
>>> print(result["answer"])
The Wachowskis directed The Matrix.

Using the agent with streaming:

>>> # Stream the workflow execution
>>> for chunk in agent.stream({"question": "What are the top 5 rated movies?"}):
...     if "answer" in chunk:
...         print(chunk["answer"])

Note

The agent requires a connection to a Neo4j database and uses environment variables for configuration if not explicitly provided.

See also

  • GraphDBRAGConfig: Configuration options for the agent

  • OverallState: State management during workflow execution

  • haive.agents.rag.db_rag.graph_db.engines: LLM engines used by the agent

Classes¶

GraphDBRAGAgent

Graph Database RAG Agent for natural language querying of Neo4j databases.

Functions¶

check_domain_relevance(query[, domain_categories])

Check if a query is relevant to the specified domain.

correct_query(query[, errors])

Correct a Cypher query based on provided errors.

domain_router(query[, domain_categories])

Route queries based on domain relevance.

execute_query(query[, db_connection])

Execute a Cypher query against the database.

generate_answer(query_results[, original_query])

Generate natural language answer from query results.

generate_query(natural_language_query)

Generate Cypher query from natural language.

setup_workflow()

Set up the graph DB RAG workflow.

validate_query(query[, schema])

Validate a Cypher query against database schema.

validation_router(validation_result)

Route based on validation results.

Module Contents¶

class agents.rag.db_rag.graph_db.agent.GraphDBRAGAgent(config=GraphDBRAGConfig())¶

Bases: haive.core.engine.agent.agent.Agent[haive.agents.rag.db_rag.graph_db.config.GraphDBRAGConfig]

Graph Database RAG Agent for natural language querying of Neo4j databases.

This agent implements a sophisticated workflow for converting natural language questions into Cypher queries, executing them against a Neo4j database, and generating human-readable responses. It includes domain validation, query validation, error correction, and result formatting.

The agent uses few-shot learning with domain-specific examples to improve query generation accuracy and includes robust error handling for common Cypher mistakes.

config¶

Configuration object containing all settings.

Type:

GraphDBRAGConfig

graph_db¶

Connected Neo4j database instance.

Type:

Neo4jGraph

graph_db_enhanced_schema¶

Enhanced schema information from the database.

graph_db_structured_schema¶

Structured schema for relationship validation.

corrector_schema¶

Schema used for correcting relationship directions.

cypher_query_corrector¶

Utility for fixing common Cypher errors.

example_selector¶

Semantic similarity selector for few-shot examples.

no_results¶

Default message when no results are found.

Type:

str

Examples

Creating and using the agent:

>>> # Create agent with minimal config
>>> agent = GraphDBRAGAgent()
>>>
>>> # Query the database
>>> result = agent.invoke({
...     "question": "What movies has Tom Hanks acted in?"
... })
>>> print(f"Answer: {result['answer']}")
>>> print(f"Cypher used: {result['cypher_statement']}")

>>> # Use with custom domain
>>> config = GraphDBRAGConfig(
...     domain_name="healthcare",
...     domain_categories=["patient", "doctor", "medication"]
... )
>>> healthcare_agent = GraphDBRAGAgent(config)

Note

The agent automatically sets up the workflow graph upon initialization. All node functions return Command objects for state updates and routing.

Initialize the Graph DB RAG Agent.

Sets up the Neo4j connection, schema information, example selector, and workflow graph. Handles initialization errors gracefully with appropriate logging.

Parameters:

config (haive.agents.rag.db_rag.graph_db.config.GraphDBRAGConfig) – Configuration object. Defaults to GraphDBRAGConfig() which uses environment variables for Neo4j connection.

Raises:
  • ValueError – If Neo4j connection cannot be established.

  • Exception – For other initialization errors.

Examples

>>> # Using default config (from environment)
>>> agent = GraphDBRAGAgent()
>>> # Using custom config
>>> custom_config = GraphDBRAGConfig(
...     domain_name="movies",
...     graph_db_config=GraphDBConfig(
...         graph_db_uri="bolt://localhost:7687"
...     )
... )
>>> agent = GraphDBRAGAgent(custom_config)
check_domain_relevance(state)¶

Check if the user’s question is relevant to the configured domain.

This is the first step in the workflow. It uses the guardrails engine to determine if the question should be processed or rejected as out-of-domain.

Parameters:

state (haive.agents.rag.db_rag.graph_db.state.OverallState) – Current workflow state containing the user’s question.

Returns:

  • next_action: “end” if out-of-domain, otherwise continue

  • database_records: Error message if out-of-domain

  • steps: Updated with “check_domain_relevance”

Return type:

Command object with updates

Examples

>>> state = OverallState(question="What's the weather like?")
>>> command = agent.check_domain_relevance(state)
>>> # For a movie domain agent, this would return:
>>> # Command(update={"next_action": "end", ...})

Note

This node acts as a guardrail to prevent processing of irrelevant queries, saving computational resources and improving accuracy.

correct_query(state)¶

Correct errors in the Cypher query based on validation feedback.

Uses the correct_cypher engine to fix identified errors and produce a valid query that matches the database schema.

Parameters:

state (haive.agents.rag.db_rag.graph_db.state.OverallState) – Current state containing the invalid query and errors.

Returns:

  • next_action: “validate_query” (to re-validate)

  • cypher_statement: The corrected Cypher query

  • steps: Updated with “correct_query”

Return type:

Command object with updates

Examples

>>> state = OverallState(
...     cypher_statement="MATCH (p:Actor)-[:DIRECTED]->(m:Film) RETURN p.name",
...     cypher_errors=["Label 'Film' does not exist, use 'Movie'"]
... )
>>> command = agent.correct_query(state)
>>> print(command.update["cypher_statement"])
MATCH (p:Person)-[:DIRECTED]->(m:Movie) RETURN p.name

Note

The corrected query is sent back to validation to ensure all errors are resolved.

domain_router(state)¶

Route based on domain relevance check result.

Parameters:

state (haive.agents.rag.db_rag.graph_db.state.OverallState) – Current state with next_action field.

Returns:

Next node name - END if out-of-domain, “generate_query” otherwise.

Return type:

str

Note

This is used as a conditional edge function in the workflow graph.

execute_query(state)¶

Execute the validated Cypher query against the Neo4j database.

Runs the query and captures the results for answer generation. Handles empty results gracefully.

Parameters:

state (haive.agents.rag.db_rag.graph_db.state.OverallState) – Current state containing the validated Cypher statement.

Returns:

  • database_records: Query results or “No results found”

  • next_action: “generate_answer”

  • steps: Updated with “execute_query”

Return type:

Command object with updates

Examples

>>> state = OverallState(
...     cypher_statement="MATCH (m:Movie) RETURN m.title LIMIT 3"
... )
>>> command = agent.execute_query(state)
>>> print(command.update["database_records"])
[{"m.title": "The Matrix"}, {"m.title": "Inception"}, ...]

Note

The query is executed with proper sanitization and timeout settings configured in the Neo4j connection.

generate_answer(state)¶

Generate a natural language answer from the query results.

Uses the generate_final_answer engine to convert database records into a human-friendly response that directly answers the question.

Parameters:

state (haive.agents.rag.db_rag.graph_db.state.OverallState) – Current state containing question and database results.

Returns:

  • answer: The natural language response

  • next_action: “end”

  • steps: Updated with “generate_answer”

Return type:

Command object with updates

Examples

>>> state = OverallState(
...     question="Who directed The Matrix?",
...     database_records=[{"p.name": "Lana Wachowski"}, {"p.name": "Lilly Wachowski"}]
... )
>>> command = agent.generate_answer(state)
>>> print(command.update["answer"])
The Matrix was directed by Lana Wachowski and Lilly Wachowski.

Note

The engine is prompted to provide direct, conversational answers without mentioning the database or technical details.

generate_query(state)¶

Generate a Cypher query from the natural language question.

Uses the text2cypher engine with few-shot examples to convert the user’s question into a valid Cypher query for the database schema.

Parameters:

state (haive.agents.rag.db_rag.graph_db.state.OverallState) – Current state containing the user’s question.

Returns:

  • cypher_statement: The generated Cypher query

  • steps: Updated with “generate_query”

Return type:

Command object with updates

Examples

>>> state = OverallState(question="Who directed Inception?")
>>> command = agent.generate_query(state)
>>> print(command.update["cypher_statement"])
MATCH (p:Person)-[:DIRECTED]->(m:Movie {title: 'Inception'}) RETURN p.name

Note

The quality of generation depends heavily on the provided examples and their similarity to the user’s question.

setup_workflow()¶

Set up the complete Graph DB RAG workflow.

Configures the workflow graph with all nodes and edges, including conditional routing based on validation results. This method is called automatically during agent initialization.

The workflow structure:

START
  ↓
check_domain_relevance
  ↓ (conditional)
generate_query ← ─ ─ ─ ┐
  ↓                    │
validate_query         │
  ↓ (conditional)     │
correct_query ─ ─ ─ ─ ─┘
  ↓
execute_query
  ↓
generate_answer
  ↓
END

Note

The workflow includes loops for query correction and multiple exit points for error handling.

Return type:

None

validate_query(state)¶

Validate the generated Cypher query against the database schema.

Checks for syntax errors, schema mismatches, and logical issues in the generated query. Routes to correction if errors are found.

Parameters:

state (haive.agents.rag.db_rag.graph_db.state.OverallState) – Current state containing the Cypher statement to validate.

Returns:

  • next_action: “correct_cypher” if errors, “execute_query” if valid

  • cypher_errors: List of validation errors (if any)

  • steps: Updated with “validate_query”

Return type:

Command object with updates

Examples

>>> state = OverallState(
...     cypher_statement="MATCH (p:Actor)-[:DIRECTED]->(m:Film) RETURN p.name"
... )
>>> command = agent.validate_query(state)
>>> # Would return errors about "Film" label and "Actor" directing

Note

Validation checks include label existence, property names, relationship types, and query completeness.

validation_router(state)¶

Route based on query validation result.

Parameters:

state (haive.agents.rag.db_rag.graph_db.state.OverallState) – Current state with next_action field.

Returns:

Next node name - “correct_query”, “execute_query”, or END.

Return type:

str

Note

This is used as a conditional edge function in the workflow graph.

agents.rag.db_rag.graph_db.agent.check_domain_relevance(query, domain_categories=None)¶

Check if a query is relevant to the specified domain.

Parameters:
  • query (str) – The query to check

  • domain_categories (list) – List of domain categories to check against

Returns:

True if the query is domain-relevant, False otherwise

Return type:

bool

agents.rag.db_rag.graph_db.agent.correct_query(query, errors=None)¶

Correct a Cypher query based on provided errors.

Parameters:
  • query (str) – The original query

  • errors (list) – List of error messages

Returns:

Corrected query string

Return type:

str

agents.rag.db_rag.graph_db.agent.domain_router(query, domain_categories=None)¶

Route queries based on domain relevance.

Parameters:
  • query (str)

  • domain_categories (list)

Return type:

str

agents.rag.db_rag.graph_db.agent.execute_query(query, db_connection=None)¶

Execute a Cypher query against the database.

Parameters:

query (str)

Return type:

dict

agents.rag.db_rag.graph_db.agent.generate_answer(query_results, original_query='')¶

Generate natural language answer from query results.

Parameters:
  • query_results (dict)

  • original_query (str)

Return type:

str

agents.rag.db_rag.graph_db.agent.generate_query(natural_language_query)¶

Generate Cypher query from natural language.

Parameters:

natural_language_query (str)

Return type:

str

agents.rag.db_rag.graph_db.agent.setup_workflow()¶

Set up the graph DB RAG workflow.

agents.rag.db_rag.graph_db.agent.validate_query(query, schema=None)¶

Validate a Cypher query against database schema.

Parameters:
Return type:

dict

agents.rag.db_rag.graph_db.agent.validation_router(validation_result)¶

Route based on validation results.

Parameters:

validation_result (dict)

Return type:

str