agents.chain.examples

Examples of using DeclarativeChainAgent to build complex RAG flows.

Shows how to recreate our complex agents using declarative specifications.

Classes

StrategyDecision

Strategy decision for RAG routing.

Functions

create_agentic_router_declarative(documents)

Create an agentic RAG router using declarative chain building.

create_complex_flow_from_spec()

Create a complex flow using raw ChainSpec.

create_plan(state)

Create query execution plan.

create_query_planning_declarative(documents)

Create a query planning RAG using declarative chain building.

create_rag_with_fallback()

Create a RAG with fallback strategies.

create_self_reflective_declarative(documents)

Create a self-reflective RAG using declarative chain building.

execute_sub_query(state)

Execute one sub-query.

finalize_answer(state)

Finalize the answer.

improve_answer(state)

Improve the answer based on critique.

reflect_and_critique(state)

Reflect on answer quality.

synthesize_results(state)

Synthesize all sub-query results.

Module Contents

class agents.chain.examples.StrategyDecision(/, **data)

Bases: pydantic.BaseModel

Strategy decision for RAG routing.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:

data (Any)

agents.chain.examples.create_agentic_router_declarative(documents)

Create an agentic RAG router using declarative chain building.

Parameters:

documents (list[langchain_core.documents.Document])

agents.chain.examples.create_complex_flow_from_spec()

Create a complex flow using raw ChainSpec.

Return type:

Any

agents.chain.examples.create_plan(state)

Create query execution plan.

Parameters:

state (dict[str, Any])

Return type:

dict[str, Any]

agents.chain.examples.create_query_planning_declarative(documents)

Create a query planning RAG using declarative chain building.

Parameters:

documents (list[langchain_core.documents.Document])

agents.chain.examples.create_rag_with_fallback()

Create a RAG with fallback strategies.

Return type:

Any

agents.chain.examples.create_self_reflective_declarative(documents)

Create a self-reflective RAG using declarative chain building.

Parameters:

documents (list[langchain_core.documents.Document])

agents.chain.examples.execute_sub_query(state)

Execute one sub-query.

Parameters:

state (dict[str, Any])

Return type:

dict[str, Any]

agents.chain.examples.finalize_answer(state)

Finalize the answer.

Parameters:

state (dict[str, Any])

Return type:

dict[str, Any]

agents.chain.examples.improve_answer(state)

Improve the answer based on critique.

Parameters:

state (dict[str, Any])

Return type:

dict[str, Any]

agents.chain.examples.reflect_and_critique(state)

Reflect on answer quality.

Parameters:

state (dict[str, Any])

Return type:

dict[str, Any]

agents.chain.examples.synthesize_results(state)

Synthesize all sub-query results.

Parameters:

state (dict[str, Any])

Return type:

dict[str, Any]