🔀 Graph System - Visual AI Workflow Orchestration¶
WHERE AI BECOMES A SYMPHONY OF INTELLIGENT COMPONENTS
Welcome to the Graph System - a revolutionary visual programming paradigm that transforms how AI workflows are designed, executed, and understood. This isn’t just another workflow engine; it’s a fundamental reimagining of how intelligent systems should be composed.
The Visual Revolution 🎨¶
digraph graph_revolution { rankdir=LR; node [shape=box, style="rounded,filled"]; subgraph cluster_traditional { label="Traditional Approach"; style=filled; fillcolor=lightgray; code [label="Code-based\nWorkflows", fillcolor=white]; implicit [label="Implicit\nFlow", fillcolor=white]; debug [label="Hard to\nDebug", fillcolor=white]; code -> implicit -> debug; } subgraph cluster_haive { label="Haive Graph Revolution"; style=filled; fillcolor=lightgreen; visual [label="Visual\nWorkflows", fillcolor=lightblue]; explicit [label="Explicit\nFlow", fillcolor=lightblue]; observable [label="Observable\nExecution", fillcolor=lightblue]; visual -> explicit -> observable; } debug -> visual [label="Paradigm Shift", style=dashed, color=red]; }🔀 Beyond Linear Execution¶
Transform Your AI from Sequential to Symphonic:
- Visual Workflow Construction
Build complex AI behaviors by connecting nodes and edges, with real-time visualization and debugging
- Intelligent Routing Logic
Dynamic path selection based on state, with conditional branches, parallel execution, and loop detection
- Built-in State Persistence
Automatic checkpointing, workflow replay, and time-travel debugging for resilient AI systems
- Agent Orchestration
Coordinate multiple agents with shared state, message passing, and synchronization primitives
- LangGraph Foundation
Built on the industry-standard LangGraph with Haive-specific enhancements for production use
Core Architecture 🏗️¶
Revolutionary Features 🚀¶
Visual Workflow Design¶
graph TD subgraph "Visual Design" A[Start] --> B[Agent Node] B --> C{Decision} C -->|Route 1| D[Tool Node] C -->|Route 2| E[Validation] D --> F[Merge] E --> F F --> G[End] end style A fill:#f9f,stroke:#333,stroke-width:4px style G fill:#9f9,stroke:#333,stroke-width:4px
BaseGraph Architecture¶
- class haive.core.graph.BaseGraph(*, id=<factory>, name, description=None, metadata=<factory>, nodes=<factory>, edges=<factory>, branches=<factory>, entry_points=<factory>, finish_points=<factory>, conditional_entries=<factory>, conditional_exits=<factory>, entry_point=None, finish_point=None, state_schema=None, default_config=None, subgraphs=<factory>, node_types=<factory>, created_at=<factory>, updated_at=<factory>, needs_recompile_flag=False, last_compiled_at=None, compilation_state_hash=None, last_input_schema=None, last_output_schema=None, last_config_schema=None, last_interrupt_before=None, last_interrupt_after=None, last_compile_kwargs=None, allow_cycles=False, require_end_path=True)[source]¶
Bases:
BaseModel
,ValidationMixin
Base class for graph management in the Haive framework.
Provides comprehensive graph management capabilities including: - Node management (add, remove, update) - Edge management (direct and branch-based) - Branch management - Graph validation - Serialization
- Parameters:
data (Any)
id (str)
name (str)
description (str | None)
nodes (dict[str, Node | NodeConfig | Any | None])
entry_point (str | None)
finish_point (str | None)
state_schema (Any | None)
default_config (RunnableConfig | None)
created_at (datetime)
updated_at (datetime)
needs_recompile_flag (bool)
last_compiled_at (datetime | None)
compilation_state_hash (str | None)
last_input_schema (Any | None)
last_output_schema (Any | None)
last_config_schema (Any | None)
last_interrupt_before (list | None)
last_interrupt_after (list | None)
allow_cycles (bool)
require_end_path (bool)
- classmethod from_langgraph(state_graph, name=None)[source]¶
Create a BaseGraph from a LangGraph StateGraph.
- add_boolean_conditional_edges(source_node, condition, true_destination, false_destination='__end__', also_accept_strings=True)[source]¶
Add conditional edges that explicitly handle boolean results.
This is a convenience method for the common case where you have a condition that returns True/False and you want clear routing.
- Parameters:
source_node (str) – Source node name
condition (Callable[[Any], bool]) – Function that returns True or False
true_destination (str) – Where to go when condition returns True
false_destination (str) – Where to go when condition returns False
also_accept_strings (bool) – Whether to also accept string equivalents like ‘has_tool_calls’/’no_tool_calls’
- Returns:
Self for method chaining
- Return type:
Example:
graph.add_boolean_conditional_edges( 'agent_node', has_tool_calls, # Function that returns True/False 'validation', # Go here when True END # Go here when False )
- add_branch(branch_or_name, source_node=None, condition=None, routes=None, branch_type=None, **kwargs)[source]¶
Add a branch to the graph with flexible input options.
- Parameters:
branch_or_name (Branch | str) – Branch object or branch name
source_node (str | None) – Source node for the branch (required if branch_or_name is a string)
condition (Any | None) – Condition function or key/value for evaluation
routes (dict[bool | str, str] | None) – Mapping of condition results to target nodes
branch_type (BranchType | None) – Type of branch (determined automatically if not provided)
**kwargs – Additional parameters for branch creation
- Returns:
Self for method chaining
- Return type:
- add_conditional_edges(source_node, condition, destinations=None, default='__end__', create_missing_nodes=False)[source]¶
Add conditional edges from a source node based on a condition.
This method supports multiple ways to handle True/False routing:
Boolean destinations: Use True/False as keys:
graph.add_conditional_edges( 'agent_node', has_tool_calls, {True: 'validation', False: END} )
String destinations with optional boolean fallbacks: String keys like ‘has_tool_calls’/’no_tool_calls’ can optionally get True/False fallbacks added by setting add_boolean_fallbacks=True:
graph.add_conditional_edges( 'agent_node', has_tool_calls, {'has_tool_calls': 'validation', 'no_tool_calls': END}, add_boolean_fallbacks=True ) # With add_boolean_fallbacks=True, adds: {True: 'validation', False: END}
List format: First item = True destination, Second item = False destination:
graph.add_conditional_edges( 'agent_node', has_tool_calls, ['validation', END] # validation when True, END when False )
Alternative: For simple boolean routing, consider using add_boolean_conditional_edges() which provides cleaner syntax for True/False conditions.
- Parameters:
source_node (str) – Source node name
condition (Branch | Callable[[StateLike, ConfigLike | None], str | bool | list[str] | list[Send] | Send | Command | None] | Any) – A function, Branch, NodeConfig or any object that can determine branching. For callables, takes (state, optional config) and returns a node name, boolean, list of nodes, list of Send objects, Send object, Command object, or a Branch object itself
destinations (str | list[str] | dict[bool | str | int, str] | None) – Target node(s) - can be: - A single node name string (which will be mapped to True) - A list of node names (which will be mapped by index) - A dictionary mapping condition results to target nodes
default (str | Literal['END'] | None) – Default destination if no condition matches (defaults to END). IMPORTANT: When destinations is a dictionary, no default is ever added.
create_missing_nodes (bool) – Whether to create missing destination nodes automatically (defaults to False)
add_boolean_fallbacks – Whether to automatically add True/False keys for string-keyed destinations (defaults to False)
- Returns:
Self for method chaining
- Return type:
- add_function_branch(source_node, condition, routes, default_route='__end__', name=None)[source]¶
Add a function-based branch.
- Parameters:
- Returns:
Self for method chaining
- Return type:
- add_intelligent_agent_routing(agents, execution_mode='infer', branches=None, prefix='agent_')[source]¶
Add intelligent agent routing with sequence inference and branching.
- Parameters:
- Returns:
Self for method chaining
- Return type:
- add_key_value_branch(source_node, key, value, comparison=ComparisonType.EQUALS, true_dest='continue', false_dest='__end__', name=None)[source]¶
Add a key-value comparison branch.
- Parameters:
- Returns:
Self for method chaining
- Return type:
- add_node(node_or_name, node_like=None, **kwargs)[source]¶
Add a node to the graph with flexible input options.
- Parameters:
- Returns:
Self for method chaining
- Return type:
- add_parallel_branches(source_node, branches, branch_names=None, join_node=None, join_node_obj=None, **kwargs)[source]¶
Add parallel branches from a source node, optionally joining at a common node.
- Parameters:
source_node (str) – Name of the source node
branches (list[list[str] | list[Node] | list[dict] | list[Any]]) – List of node sequences (each sequence is a branch)
branch_names (list[str] | None) – Optional names for the branches (used in branch creation)
join_node (str | Node | dict | Any | None) – Optional node to join all branches
join_node_obj (Any | None) – If join_node is a string, this is the node object/callable
**kwargs – Additional properties for nodes
- Returns:
Self for method chaining
- Return type:
- add_postlude_node(postlude_node, node_obj=None, **kwargs)[source]¶
Add a node at the end of the graph (before END).
- add_prelude_node(prelude_node, node_obj=None, **kwargs)[source]¶
Add a node at the beginning of the graph (after START).
- add_sequence(nodes, node_objects=None, connect_start=False, connect_end=False, **kwargs)[source]¶
Add a sequence of nodes and connect them in order.
- Parameters:
nodes (list[str | Node | dict | Any]) – List of nodes to add (names, objects, or dictionaries)
node_objects (list[Any] | None) – If nodes contains strings, these are the corresponding objects/callables
connect_start (bool) – Whether to connect the first node to START
connect_end (bool) – Whether to connect the last node to END
**kwargs – Additional properties to apply to all nodes
- Returns:
Self for method chaining
- Return type:
- check_full_recompilation_needed(input_schema=None, output_schema=None, config_schema=None, interrupt_before=None, interrupt_after=None, **compile_kwargs)[source]¶
Check if recompilation is needed for any reason and provide details.
- Parameters:
input_schema (Any | None) – Input schema to check
output_schema (Any | None) – Output schema to check
config_schema (Any | None) – Config schema to check
interrupt_before (list | None) – Interrupt before configuration to check
interrupt_after (list | None) – Interrupt after configuration to check
**compile_kwargs – Additional compilation parameters to check
- Returns:
Dictionary with recompilation status and reasons
- Return type:
- check_graph_validity()[source]¶
Validate the graph structure.
- Returns:
List of validation issues (empty if graph is valid)
- Return type:
- compile(raise_on_validation_error=False)[source]¶
Validate and compile the graph to a runnable LangGraph StateGraph.
- Parameters:
raise_on_validation_error (bool) – Whether to raise an exception on validation errors
- Returns:
Compiled LangGraph StateGraph
- Raises:
ValueError – If validation fails and raise_on_validation_error is True
- Return type:
- debug_conditional_routing(source_node)[source]¶
Debug conditional routing for a specific node.
This shows how boolean and string results will be routed for debugging purposes.
- Parameters:
source_node (str) – The node to debug routing for
- Return type:
None
- extend_from(other_graph, prefix='')[source]¶
Extend this graph with nodes and edges from another graph.
- Parameters:
other_graph – Graph to extend from
prefix – Optional prefix for imported node names
- Returns:
Self for method chaining
- Return type:
- find_all_paths(start_node='__start__', end_node='__end__', max_depth=100, include_loops=False, debug=False)[source]¶
Find all possible paths between two nodes.
- Parameters:
start_node – Starting node (defaults to START)
end_node – Ending node (defaults to END)
max_depth – Maximum path depth to prevent infinite loops
include_loops – Whether to include paths with loops/cycles
debug – Whether to show detailed debug logging
- Returns:
List of GraphPath objects
- find_nodes_without_end_path()[source]¶
Find nodes that can’t reach END.
- Returns:
List of node names that can’t reach END
- Return type:
Any | None
- find_nodes_without_finish_path()[source]¶
Find nodes that can’t reach a finish point. Alias for find_nodes_without_end_path for API consistency.
- Returns:
List of node names that can’t reach a finish point
- Return type:
Any | None
- find_unreachable_nodes()[source]¶
Find nodes that can’t be reached from START.
- Returns:
List of unreachable node names
- Return type:
Any | None
- get_sink_nodes()[source]¶
Get nodes that have no outgoing edges (other than to END).
- Returns:
List of sink node names
- Return type:
Any | None
- get_source_nodes()[source]¶
Get nodes that have no incoming edges (other than START).
- Returns:
List of source node names
- Return type:
Any | None
- insert_node_after(target_node, new_node, new_node_obj=None, **kwargs)[source]¶
Insert a new node after an existing node, redirecting all outgoing connections.
- Parameters:
- Returns:
Self for method chaining
- Return type:
- insert_node_before(target_node, new_node, new_node_obj=None, **kwargs)[source]¶
Insert a new node before an existing node, redirecting all incoming connections.
- Parameters:
- Returns:
Self for method chaining
- Return type:
- mark_compiled(input_schema=None, output_schema=None, config_schema=None, interrupt_before=None, interrupt_after=None, **compile_kwargs)[source]¶
Mark the graph as compiled and reset the recompilation flag.
- Parameters:
input_schema (Any | None) – Input schema used for compilation
output_schema (Any | None) – Output schema used for compilation
config_schema (Any | None) – Config schema used for compilation
interrupt_before (list | None) – Interrupt before nodes used for compilation
interrupt_after (list | None) – Interrupt after nodes used for compilation
**compile_kwargs – Additional compilation parameters
- Return type:
None
- needs_recompile()[source]¶
Check if the graph needs recompilation.
- Returns:
True if the graph has been modified since last compilation
- Return type:
- needs_recompile_for_interrupts(interrupt_before=None, interrupt_after=None)[source]¶
Check if recompilation is needed due to interrupt changes.
- needs_recompile_for_schemas(input_schema=None, output_schema=None, config_schema=None)[source]¶
Check if recompilation is needed due to schema changes.
- replace_node(node_name, new_node, preserve_connections=True)[source]¶
Replace a node while optionally preserving its connections.
- set_conditional_entry(condition, entry_node, default_entry=None)[source]¶
Set a conditional entry point for the graph.
- Parameters:
- Returns:
Self for method chaining
- Return type:
- set_conditional_exit(node_name, condition, exit_if_true=True)[source]¶
Set a conditional exit point for the graph.
- Parameters:
- Returns:
Self for method chaining
- Return type:
- set_end_point(node_name)[source]¶
Deprecated: Use set_finish_point instead.
Set a finish point of the graph.
- to_json(**kwargs)[source]¶
Convert graph to JSON string.
- Parameters:
**kwargs – Additional parameters for JSON serialization
- Returns:
JSON string representation of the graph
- Return type:
- to_langgraph(state_schema=None, input_schema=None, output_schema=None, config_schema=None, **kwargs)[source]¶
Convert to LangGraph StateGraph with proper schema handling.
Schema Resolution Logic: 1. If state_schema provided: use it, default input/output to state_schema 2. If input_schema and output_schema provided: use them, create PassThroughState for state_schema 3. If only input_schema provided: use it for both input and state, output defaults to state 4. If only output_schema provided: use it for both output and state, input defaults to state 5. If none provided: use self.state_schema or dict
Note: This method marks the graph as compiled after successful conversion.
- to_mermaid(include_subgraphs=True, theme='default', subgraph_mode='cluster', show_default_branches=False)[source]¶
Generate a Mermaid graph diagram string.
- Parameters:
- Returns:
Mermaid diagram as string
- Return type:
- visualize(output_path=None, include_subgraphs=True, highlight_nodes=None, highlight_paths=None, save_png=True, width='100%', theme='default', subgraph_mode='cluster', show_default_branches=False, debug=False)[source]¶
Generate and display a visualization of the graph.
This method attempts multiple rendering approaches based on the environment, with fallbacks to ensure something is always displayed.
- Parameters:
output_path (str | None) – Optional path to save the diagram
include_subgraphs (bool) – Whether to visualize subgraphs as clusters
highlight_nodes (list[str] | None) – List of node names to highlight
highlight_paths (list[list[str]] | None) – List of paths to highlight (each path is a list of node names)
save_png (bool) – Whether to save the diagram as PNG
width (str) – Width of the displayed diagram
theme (str) – Mermaid theme to use (e.g., “default”, “forest”, “dark”, “neutral”)
subgraph_mode (str) – How to render subgraphs (“cluster”, “inline”, or “separate”) - DEPRECATED
show_default_branches (bool) – Whether to show default branches
debug (bool) – Whether to enable debug output
- Returns:
The generated Mermaid code
- Return type:
- property all_entry_points: dict[str, Any]¶
Property that returns all entry points (regular and conditional).
- property all_finish_points: dict[str, Any]¶
Property that returns all finish points (regular and conditional).
- property conditional_edges: Any¶
Property for accessing branches as conditional edges (compatibility).
- Returns:
Dictionary of branches indexed by ID
- Return type:
Any
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
The Foundation of Visual AI Programming
BaseGraph provides a powerful abstraction for building graph-based workflows that can handle everything from simple linear flows to complex multi-agent orchestrations.
Basic Graph Construction
Dynamic Graph Composition¶
digraph dynamic_composition { rankdir=TB; node [shape=record, style="rounded,filled"]; runtime [label="{Runtime Engine|+graphs: Dict\l+active: Graph\l}", fillcolor=lightblue]; subgraph cluster_graphs { label="Dynamic Graph Library"; style=filled; fillcolor=lightyellow; simple [label="{Simple Flow|Linear execution}", fillcolor=white]; react [label="{ReAct Pattern|Tool + Reasoning}", fillcolor=white]; multi [label="{Multi-Agent|Coordination}", fillcolor=white]; custom [label="{Custom Graph|User defined}", fillcolor=white]; } runtime -> simple [label="select"]; runtime -> react [label="select"]; runtime -> multi [label="select"]; runtime -> custom [label="select"]; compose [label="Graph Composer", shape=ellipse, fillcolor=lightgreen]; simple -> compose [label="combine"]; react -> compose [label="combine"]; compose -> custom [label="creates", style=dashed]; }Observable Execution¶
sequenceDiagram participant User participant Graph participant Node1 as Agent Node participant Node2 as Tool Node participant Observer User->>Graph: invoke(input) Graph->>Observer: on_graph_start Graph->>Node1: process(state) Node1->>Observer: on_node_start("agent") Node1-->>Observer: on_node_end("agent", output) Graph->>Node2: process(state) Node2->>Observer: on_node_start("tools") Node2-->>Observer: on_node_end("tools", output) Graph->>Observer: on_graph_end(final_state) Graph-->>User: result
Advanced Patterns 🎯¶
Conditional Routing¶
digraph conditional_routing { rankdir=TB; node [shape=box, style="rounded,filled"]; start [label="Input", shape=ellipse, fillcolor=lightgreen]; classifier [label="Classifier\nNode", fillcolor=lightblue]; subgraph cluster_routes { label="Dynamic Routes"; style=filled; fillcolor=lightyellow; simple [label="Simple\nQuery", fillcolor=white]; complex [label="Complex\nAnalysis", fillcolor=white]; multi [label="Multi-step\nReasoning", fillcolor=white]; error [label="Error\nHandling", fillcolor=pink]; } end [label="Output", shape=ellipse, fillcolor=lightgreen]; start -> classifier; classifier -> simple [label="confidence > 0.8"]; classifier -> complex [label="0.5 < confidence < 0.8"]; classifier -> multi [label="confidence < 0.5"]; classifier -> error [label="error", style=dashed]; simple -> end; complex -> end; multi -> end; error -> end; }Parallel Execution¶
graph TD Start[Input] --> Fork{Fork} Fork --> A1[Analyzer 1] Fork --> A2[Analyzer 2] Fork --> A3[Analyzer 3] A1 --> Join{Join} A2 --> Join A3 --> Join Join --> Synthesize[Synthesize Results] Synthesize --> End[Output] style Fork fill:#ff9,stroke:#333,stroke-width:2px style Join fill:#ff9,stroke:#333,stroke-width:2px
Loop Patterns¶
digraph loop_patterns { rankdir=TB; node [shape=box, style="rounded,filled"]; subgraph cluster_refinement { label="Refinement Loop"; style=filled; fillcolor=lightblue; generate [label="Generate", fillcolor=white]; evaluate [label="Evaluate", fillcolor=white]; improve [label="Improve", fillcolor=white]; generate -> evaluate; evaluate -> improve [label="needs work"]; improve -> generate [label="retry"]; evaluate -> "cluster_refinement_end" [label="satisfied", style=invis]; } subgraph cluster_exploration { label="Exploration Loop"; style=filled; fillcolor=lightgreen; explore [label="Explore", fillcolor=white]; discover [label="Discover", fillcolor=white]; expand [label="Expand", fillcolor=white]; explore -> discover; discover -> expand; expand -> explore [label="continue"]; discover -> "cluster_exploration_end" [label="complete", style=invis]; } }Node System Architecture¶
🧠 Node System - Intelligent Graph Components Engine
THE NEURAL NETWORK OF AI WORKFLOWS
Welcome to the Node System - the revolutionary foundation that transforms individual AI components into intelligent, interconnected processing units. This isn’t just another workflow node library; it’s a comprehensive neural architecture where every node is a specialized neuron that learns, adapts, and collaborates to create emergent intelligence.
⚡ REVOLUTIONARY NODE INTELLIGENCE¶
The Node System represents a paradigm shift from static processing units to living, adaptive components that evolve with your AI workflows:
🧠 Intelligent Processing: Nodes that learn from execution patterns and optimize performance 🔄 Dynamic Adaptation: Real-time reconfiguration based on data flow requirements 🤝 Collaborative Intelligence: Nodes that communicate and coordinate seamlessly 📊 Self-Monitoring: Built-in performance analytics and bottleneck detection 🎯 Type-Safe Execution: Guaranteed type safety with intelligent field mapping
🌟 CORE NODE CATEGORIES¶
- 1. Engine Nodes - The Powerhouses 🚀
High-performance execution units for AI engines:
Examples
>>> from haive.core.graph.node import EngineNodeConfig
>>> from haive.core.engine.aug_llm import AugLLMConfig
>>>
>>> # Create intelligent LLM processing node
>>> llm_engine = AugLLMConfig(
>>> model="gpt-4",
>>> tools=[calculator, web_search],
>>> structured_output_model=AnalysisResult
>>> )
>>>
>>> analysis_node = EngineNodeConfig(
>>> name="intelligent_analyzer",
>>> engine=llm_engine,
>>> input_mapping={
>>> "user_query": "messages",
>>> "context": "analysis_context"
>>> },
>>> output_mapping={
>>> "structured_analysis": "analysis_result",
>>> "tool_calls": "tool_execution_log"
>>> },
>>> performance_tracking=True,
>>> adaptive_routing=True
>>> )
>>>
>>> # Node automatically optimizes based on execution patterns
>>> builder.add_node("analyze", analysis_node)
- 2. Agent Nodes - The Coordinators 🤝
Sophisticated multi-agent orchestration and coordination:
>>> from haive.core.graph.node import AgentNodeV3 >>> from haive.agents.multi import EnhancedMultiAgentV4 >>> >>> # Create collaborative agent coordination node >>> research_team = EnhancedMultiAgentV4([ >>> ResearchAgent(name="researcher"), >>> AnalysisAgent(name="analyst"), >>> SynthesisAgent(name="synthesizer") >>> ], mode="sequential") >>> >>> team_node = AgentNodeV3( >>> name="research_coordination", >>> agent=research_team, >>> shared_fields=["knowledge_base", "research_context"], >>> private_fields=["internal_state", "agent_memory"], >>> coordination_strategy="consensus", >>> conflict_resolution="semantic_merge", >>> state_projection_enabled=True >>> ) >>> >>> # Intelligent state management across agents >>> builder.add_node("coordinate_research", team_node)
- 3. Validation & Routing Nodes - The Decision Makers 🧭
Intelligent workflow control with adaptive routing:
>>> from haive.core.graph.node import UnifiedValidationNode, RoutingValidationNode >>> >>> # Create intelligent validation with routing >>> smart_validator = UnifiedValidationNode( >>> name="intelligent_gatekeeper", >>> validation_schemas=[InputSchema, QualitySchema], >>> routing_conditions={ >>> "high_confidence": lambda state: state.confidence > 0.8, >>> "needs_review": lambda state: state.quality_score < 0.6, >>> "ready_for_output": lambda state: state.is_complete >>> }, >>> adaptive_thresholds=True, >>> learning_enabled=True, >>> fallback_strategy="human_review" >>> ) >>> >>> # Routes become smarter over time >>> builder.add_conditional_edges( >>> "validate", >>> smart_validator.route_based_on_validation, >>> { >>> "high_confidence": "finalize", >>> "needs_review": "manual_review", >>> "ready_for_output": "output" >>> } >>> )
- 4. Field Mapping & Composition Nodes - The Transformers 🔄
Advanced data transformation and schema adaptation:
>>> from haive.core.graph.node.composer import NodeSchemaComposer, FieldMapping >>> >>> # Create intelligent field mapping >>> smart_mapper = FieldMapping( >>> input_transformations={ >>> "user_input": "standardized_query", >>> "context_data": "enriched_context", >>> "metadata": "processing_metadata" >>> }, >>> output_transformations={ >>> "llm_response": "structured_output", >>> "tool_results": "verified_tool_data", >>> "confidence_scores": "quality_metrics" >>> }, >>> type_coercion_enabled=True, >>> validation_on_transform=True, >>> semantic_mapping=True # AI-powered field mapping >>> ) >>> >>> # Dynamic schema composition >>> composer = NodeSchemaComposer( >>> base_schema=WorkflowState, >>> dynamic_adaptation=True, >>> optimization_enabled=True >>> ) >>> >>> # Learns optimal field mappings over time >>> optimized_schema = composer.compose_for_workflow(workflow_nodes)
🎯 ADVANCED NODE FEATURES¶
Self-Optimizing Execution 🔮
>>> from haive.core.graph.node import create_adaptive_node
>>>
>>> # Node that learns and optimizes itself
>>> adaptive_node = create_adaptive_node(
>>> base_engine=llm_engine,
>>> learning_mode="online",
>>> optimization_strategy="genetic_algorithm",
>>> performance_targets={
>>> "response_time": "<2s",
>>> "accuracy": ">95%",
>>> "cost_efficiency": "minimize"
>>> }
>>> )
>>>
>>> # Automatically adjusts parameters for optimal performance
>>> @adaptive_node.optimization_callback
>>> def performance_optimization(metrics):
>>> if metrics.response_time > 2.0:
>>> adaptive_node.reduce_complexity()
>>> if metrics.accuracy < 0.95:
>>> adaptive_node.increase_validation()
Collaborative Node Networks 🌐
>>> # Create networks of cooperating nodes
>>> node_network = NodeNetwork([
>>> SpecialistNode("domain_expert"),
>>> GeneralistNode("coordinator"),
>>> ValidatorNode("quality_assurance"),
>>> OptimizerNode("performance_monitor")
>>> ])
>>>
>>> # Nodes share knowledge and coordinate decisions
>>> network.enable_knowledge_sharing()
>>> network.configure_consensus_protocols()
>>> network.add_collective_learning()
Real-time Node Analytics 📊
>>> # Comprehensive node monitoring
>>> node_monitor = NodeAnalytics(
>>> metrics=["execution_time", "memory_usage", "accuracy", "throughput"],
>>> alerting_enabled=True,
>>> optimization_suggestions=True,
>>> predictive_analytics=True
>>> )
>>>
>>> # Automatic performance optimization
>>> @node_monitor.on_performance_degradation
>>> def auto_optimize(node, metrics):
>>> if metrics.memory_usage > 0.8:
>>> node.enable_memory_optimization()
>>> if metrics.execution_time > threshold:
>>> node.switch_to_fast_mode()
🏗️ NODE COMPOSITION PATTERNS¶
Hierarchical Node Architecture 🏛️
>>> # Build complex node hierarchies
>>> master_controller = MasterNode(
>>> name="workflow_orchestrator",
>>> subnodes={
>>> "preprocessing": PreprocessingCluster([
>>> TokenizerNode(), NormalizerNode(), ValidatorNode()
>>> ]),
>>> "processing": ProcessingCluster([
>>> LLMNode(), ToolNode(), AnalysisNode()
>>> ]),
>>> "postprocessing": PostprocessingCluster([
>>> FormatterNode(), ValidatorNode(), OutputNode()
>>> ])
>>> },
>>> coordination_strategy="hierarchical_control"
>>> )
Pipeline Node Patterns 🔗
>>> # Create intelligent processing pipelines
>>> pipeline = NodePipeline([
>>> InputValidationNode(),
>>> ContextEnrichmentNode(),
>>> LLMProcessingNode(),
>>> OutputValidationNode(),
>>> ResultFormattingNode()
>>> ],
>>> error_handling="graceful_degradation",
>>> parallel_optimization=True,
>>> adaptive_routing=True
>>> )
>>>
>>> # Pipeline automatically optimizes execution order
>>> optimized_pipeline = pipeline.optimize_for_throughput()
Event-Driven Node Systems 📡
>>> # Reactive node networks
>>> event_system = EventDrivenNodeSystem()
>>>
>>> # Nodes react to events intelligently
>>> @event_system.on_event("data_quality_alert")
>>> def handle_quality_issue(event_data):
>>> quality_node.increase_validation_strictness()
>>> fallback_node.activate_backup_processing()
>>>
>>> @event_system.on_event("performance_threshold_exceeded")
>>> def optimize_performance(event_data):
>>> load_balancer.redistribute_workload()
>>> cache_node.increase_cache_size()
🛠️ NODE FACTORY SYSTEM¶
Intelligent Node Creation 🏭
>>> from haive.core.graph.node import NodeFactory, create_adaptive_node
>>>
>>> # Smart factory that creates optimal nodes
>>> factory = NodeFactory(
>>> optimization_enabled=True,
>>> best_practices_enforcement=True,
>>> automatic_configuration=True
>>> )
>>>
>>> # Create nodes with intelligent defaults
>>> smart_node = factory.create_optimal_node(
>>> purpose="text_analysis",
>>> input_schema=TextInput,
>>> output_schema=AnalysisResult,
>>> performance_requirements={
>>> "max_latency": "1s",
>>> "min_accuracy": "95%",
>>> "cost_budget": "low"
>>> }
>>> )
>>>
>>> # Factory selects best engine and configuration
>>> optimized_config = factory.optimize_for_requirements(smart_node)
Template-Based Node Generation 📋
>>> # Predefined node templates for common patterns
>>> templates = {
>>> "research_pipeline": ResearchPipelineTemplate(),
>>> "validation_gateway": ValidationGatewayTemplate(),
>>> "multi_agent_coordinator": MultiAgentTemplate(),
>>> "performance_optimizer": OptimizationTemplate()
>>> }
>>>
>>> # Generate nodes from templates
>>> research_node = factory.from_template(
>>> "research_pipeline",
>>> customizations={
>>> "domain": "medical_research",
>>> "sources": ["pubmed", "arxiv", "clinical_trials"],
>>> "quality_threshold": 0.9
>>> }
>>> )
📊 PERFORMANCE & MONITORING¶
Real-Time Performance Metrics: - Execution Time: < 100ms overhead per node - Memory Efficiency: 90%+ memory utilization optimization - Throughput: 10,000+ node executions/second - Accuracy: 99%+ field mapping accuracy - Adaptability: Real-time parameter optimization
Advanced Monitoring Features:
>>> # Comprehensive node monitoring
>>> monitor = NodePerformanceMonitor(
>>> metrics_collection=["latency", "throughput", "accuracy", "resource_usage"],
>>> anomaly_detection=True,
>>> predictive_analytics=True,
>>> auto_optimization=True
>>> )
>>>
>>> # Performance dashboards
>>> dashboard = NodeDashboard(
>>> real_time_visualization=True,
>>> performance_heatmaps=True,
>>> optimization_suggestions=True,
>>> cost_analysis=True
>>> )
🎓 BEST PRACTICES¶
Design for Adaptability: Use adaptive nodes that learn and optimize
Implement Monitoring: Always include performance tracking
Use Type Safety: Leverage field mapping for guaranteed type safety
Plan for Scale: Design nodes for horizontal scaling
Test Thoroughly: Validate node behavior with comprehensive tests
Monitor Continuously: Track performance and optimize regularly
Document Patterns: Clear documentation for node interaction patterns
🚀 GETTING STARTED¶
>>> from haive.core.graph.node import (
>>> EngineNodeConfig, AgentNodeV3, create_adaptive_node
>>> )
>>> from haive.core.engine.aug_llm import AugLLMConfig
>>>
>>> # 1. Create intelligent engine node
>>> engine = AugLLMConfig(model="gpt-4", tools=[calculator])
>>> processing_node = EngineNodeConfig(
>>> name="intelligent_processor",
>>> engine=engine,
>>> adaptive_optimization=True
>>> )
>>>
>>> # 2. Create collaborative agent node
>>> agent_node = AgentNodeV3(
>>> name="team_coordinator",
>>> agent=multi_agent_system,
>>> coordination_strategy="consensus"
>>> )
>>>
>>> # 3. Build adaptive workflow
>>> workflow = builder.add_node("process", processing_node)
>>> workflow.add_node("coordinate", agent_node)
>>> workflow.add_adaptive_edges(source="process", target="coordinate")
>>>
>>> # 4. Compile with intelligence
>>> app = workflow.compile(
>>> optimization="neural_network",
>>> learning_enabled=True,
>>> monitoring=True
>>> )
🧠 NODE INTELLIGENCE GALLERY¶
Available Node Types: - EngineNodeConfig - High-performance AI engine execution - AgentNodeV3 - Multi-agent coordination and management - ValidationNodeConfig - Input/output validation and quality control - RoutingValidationNode - Intelligent routing with validation - UnifiedValidationNode - Advanced validation with learning - NodeSchemaComposer - Dynamic schema composition and optimization
Factory Functions: - create_node() - Universal node creation with intelligence - create_engine_node() - Optimized engine node generation - create_adaptive_node() - Self-optimizing node creation - create_validation_node() - Smart validation node generation - create_tool_node() - Intelligent tool execution nodes
Analytics & Monitoring: - NodeAnalytics - Comprehensive performance monitoring - NodeOptimizer - AI-powered optimization engine - NodeRegistry - Intelligent node type management - PerformanceProfiler - Deep performance analysis
—
Node System: Where Individual Components Become Collective Intelligence 🧠
- haive.core.graph.node.AgentNodeV3¶
alias of
AgentNodeV3Config
- class haive.core.graph.node.AsyncNodeFunction(*args, **kwargs)[source]¶
Protocol for async node functions.
An async node function is like a regular node function but executes asynchronously.
- __init__(*args, **kwargs)¶
- class haive.core.graph.node.Command(*, graph=None, update=None, resume=None, goto=())[source]¶
One or more commands to update the graph’s state and send messages to nodes.
!!! version-added “Added in version 0.2.24.”
- Parameters:
graph (str | None) –
graph to send the command to. Supported values are:
None: the current graph (default)
Command.PARENT: closest parent graph
update (Any | None) – update to apply to the graph’s state.
resume (Any | dict[str, Any] | None) – value to resume execution with. To be used together with [interrupt()][langgraph.types.interrupt].
goto (Send | Sequence[Send | str] | str) –
can be one of the following:
name of the node to navigate to next (any node that belongs to the specified graph)
sequence of node names to navigate to next
Send object (to execute a node with the input provided)
sequence of Send objects
- __init__(*, graph=None, update=None, resume=None, goto=())¶
- class haive.core.graph.node.EngineNodeConfig(*, id=<factory>, name, node_type=NodeType.ENGINE, input_schema=None, output_schema=None, input_field_defs=<factory>, output_field_defs=<factory>, auto_add_engine_attribution=True, engine_name=None, command_goto='__end__', config_overrides=<factory>, config_schema=None, metadata=<factory>, engine=None, input_fields=None, output_fields=None, retry_policy=None, use_send=False, debug=True)[source]¶
Engine-based node with intelligent I/O handling and schema support.
This node config extends the base NodeConfig with engine-specific functionality while maintaining the new input/output schema pattern for better state utilization.
- Parameters:
data (Any)
id (str)
name (str)
node_type (NodeType)
input_schema (type[BaseModel] | None)
output_schema (type[BaseModel] | None)
input_field_defs (list[FieldDefinition])
output_field_defs (list[FieldDefinition])
auto_add_engine_attribution (bool)
engine_name (str | None)
command_goto (str | Literal['END'] | ~langgraph.types.Send | list[~langgraph.types.Send | str] | None)
config_schema (type[BaseModel] | None)
engine (Engine | None)
retry_policy (RetryPolicy | None)
use_send (bool)
debug (bool)
- extract_input_from_state(state)[source]¶
Override to properly extract input fields from state using engine-aware logic.
- model_post_init(_EngineNodeConfig__context)[source]¶
Post-initialization to setup engine-specific configurations.
- Return type:
None
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- retry_policy: RetryPolicy | None¶
- class haive.core.graph.node.IntelligentMultiAgentNode(*, id=<factory>, name, node_type=NodeType.AGENT, input_schema=None, output_schema=None, input_field_defs=<factory>, output_field_defs=<factory>, auto_add_engine_attribution=True, engine_name=None, command_goto='__end__', config_overrides=<factory>, config_schema=None, metadata=<factory>, execution_mode='infer', infer_sequence=True, branches=<factory>, current_sequence=None, execution_index=0)[source]¶
Intelligent multi-agent node with sequence inference and branching.
This node provides advanced multi-agent coordination with: - Automatic sequence inference from agent patterns - Conditional branching and routing - Dynamic agent execution planning - Smart fallback strategies
- Parameters:
id (str)
name (str)
node_type (NodeType)
input_schema (type[BaseModel] | None)
output_schema (type[BaseModel] | None)
input_field_defs (list[FieldDefinition])
output_field_defs (list[FieldDefinition])
auto_add_engine_attribution (bool)
engine_name (str | None)
command_goto (str | Literal['END'] | ~langgraph.types.Send | list[~langgraph.types.Send | str] | None)
config_schema (type[BaseModel] | None)
execution_mode (str)
infer_sequence (bool)
execution_index (int)
- __call__(state, config=None)[source]¶
Execute intelligent multi-agent coordination.
- Parameters:
state (MultiAgentState)
config (ConfigLike | None)
- Return type:
- add_branch_condition(source_agent, condition, target_agents)[source]¶
Add a branch condition for an agent.
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- node_type: NodeType¶
- class haive.core.graph.node.MultiAgentNode(*, id=<factory>, name, node_type, input_schema=None, output_schema=None, input_field_defs=<factory>, output_field_defs=<factory>, auto_add_engine_attribution=True, engine_name=None, command_goto='__end__', config_overrides=<factory>, config_schema=None, metadata=<factory>)[source]¶
Node for executing agents within a multi-agent state container.
This node handles: - State projection from MultiAgentState to agent-specific schema - Agent execution with its expected state type - State updates back to the container - Recompilation tracking
The key innovation is that each agent receives its exact expected state schema, not a flattened global state.
- Parameters:
id (str)
name (str)
node_type (NodeType)
input_schema (type[BaseModel] | None)
output_schema (type[BaseModel] | None)
input_field_defs (list[FieldDefinition])
output_field_defs (list[FieldDefinition])
auto_add_engine_attribution (bool)
engine_name (str | None)
command_goto (str | Literal['END'] | ~langgraph.types.Send | list[~langgraph.types.Send | str] | None)
config_schema (type[BaseModel] | None)
- __call__(state, config=None)[source]¶
Execute agent with state projection.
- Parameters:
state (MultiAgentState)
config (ConfigLike | None)
- Return type:
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- node_type: NodeType¶
- class haive.core.graph.node.NodeConfig(*, id=<factory>, name, node_type=None, schemas=<factory>, engine=None, engine_name=None, callable_func=None, callable_ref=None, state_schema=None, input_schema=None, output_schema=None, input_fields=None, output_fields=None, command_goto=None, retry_policy=None, tools=None, messages_field='messages', handle_tool_errors=True, validation_schemas=None, condition=None, condition_ref=None, routes=None, send_targets=None, send_field=None, config_overrides=<factory>, metadata=<factory>)[source]¶
Configuration for a node in a graph.
A NodeConfig defines all aspects of a node’s behavior, including: - Core identification (id, name) - Engine/callable to execute - State schema integration - Input/output field mappings - Control flow behavior - Node type-specific options
- Parameters:
data (Any)
id (str)
name (str)
node_type (NodeType | None)
engine (Engine | None)
engine_name (str | None)
callable_func (Callable | None)
callable_ref (str | None)
state_schema (type[BaseModel] | None)
input_schema (type[BaseModel] | None)
output_schema (type[BaseModel] | None)
command_goto (str | Literal['END'] | ~langgraph.types.Send | list[~langgraph.types.Send | str] | None)
retry_policy (RetryPolicy | None)
messages_field (str | None)
validation_schemas (list[type[BaseModel] | Callable] | None)
condition (Callable | None)
condition_ref (str | None)
send_field (str | None)
- validate_and_determine_node_type()[source]¶
Validate the configuration and determine the node type automatically if not specified.
- Return type:
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'validate_assignment': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- retry_policy: RetryPolicy | None¶
- class haive.core.graph.node.NodeFactory[source]¶
Factory for creating node functions from configurations.
This class provides methods for creating different types of node functions based on their configuration, engine type, or specialized functionality.
- classmethod create_node_function(config)[source]¶
Create a node function from a node configuration.
- Parameters:
config (NodeConfig) – NodeConfig with all node configuration
- Returns:
A callable node function for use in LangGraph
- Return type:
- class haive.core.graph.node.NodeFunction(*args, **kwargs)[source]¶
Protocol for node functions.
A node function takes a state and optional config and returns an output. This output can be a dictionary (state update), Command, Send, or list of Send objects.
- __init__(*args, **kwargs)¶
- class haive.core.graph.node.NodeRegistry[source]¶
Registry for node configurations and types.
This registry keeps track of all registered node configurations and implements the AbstractRegistry interface from the Haive framework.
It provides methods for: - Registering node configurations - Looking up nodes by ID, name, or type - Listing all nodes or nodes of a specific type - Registering custom node types
- find_by_id(id)[source]¶
Find a node configuration by ID.
- Parameters:
id (str) – Node ID
- Returns:
Node configuration if found, None otherwise
- Return type:
NodeConfig | None
- find_by_name(name)[source]¶
Find a node configuration by name (searches all types).
- Parameters:
name (str) – Node name
- Returns:
Node configuration if found, None otherwise
- Return type:
NodeConfig | None
- get(item_type, name)[source]¶
Get a node configuration by type and name.
- Parameters:
- Returns:
Node configuration if found, None otherwise
- Return type:
NodeConfig | None
- get_all(item_type)[source]¶
Get all nodes of a specific type.
- Parameters:
item_type (NodeType) – Node type
- Returns:
Dictionary mapping node names to configurations
- Return type:
- register(item)[source]¶
Register a node configuration.
- Parameters:
item (NodeConfig) – Node configuration to register
- Returns:
The registered node configuration
- Return type:
- register_custom_node_type(name, config_class)[source]¶
Register a custom node configuration class.
- Parameters:
name (str) – Name of the custom node type
config_class (type[NodeConfig]) – Custom NodeConfig class
- Return type:
None
- class haive.core.graph.node.NodeSchemaComposer[source]¶
Main composer for flexible node I/O configuration.
This class enables “result → potato” style field mappings by: 1. Registering custom extract/update functions 2. Composing field mappings for nodes 3. Wrapping existing nodes with new I/O schemas 4. Creating adapters for type compatibility
Examples
# Change retriever output from “documents” to “retrieved_documents” composer = NodeSchemaComposer()
- retriever_node = composer.compose_node(
base_node=existing_retriever_node, output_mappings=[
FieldMapping(“documents”, “retrieved_documents”)
]
)
# Create callable node with custom field mapping check_node = composer.from_callable(
func=lambda msgs: len(msgs) > 5, input_mappings=[
FieldMapping(“messages”, “msgs”)
], output_mappings=[
FieldMapping(“result”, “should_continue”, transform=[“bool_to_str”])
]
)
- compose_node(base_node, input_mappings=None, output_mappings=None, name=None)[source]¶
Compose a node with custom I/O mappings.
- Parameters:
base_node (Any) – Existing node to wrap
input_mappings (list[FieldMapping] | None) – How to map state fields to node inputs
output_mappings (list[FieldMapping] | None) – How to map node outputs to state fields
name (str | None) – Optional name for the composed node
- Returns:
ComposedNode with custom I/O mappings
- Return type:
Examples
# Change retriever output key retriever = composer.compose_node(
base_node=existing_retriever, output_mappings=[
FieldMapping(“documents”, “retrieved_documents”)
]
)
# Add input/output transforms agent = composer.compose_node(
base_node=existing_agent, input_mappings=[
FieldMapping(“messages”, “conversation”, transform=[“filter_human”])
], output_mappings=[
FieldMapping(“response”, “ai_response”, transform=[“strip”])
]
)
- create_adapter(source_schema, target_schema, field_mappings, name=None)[source]¶
Create an adapter between two schemas.
- Parameters:
source_schema (type[BaseModel]) – Source Pydantic model
target_schema (type[BaseModel]) – Target Pydantic model
field_mappings (list[FieldMapping]) – How to map fields between schemas
name (str | None) – Optional name for the adapter
- Returns:
SchemaAdapter that converts between schemas
- Return type:
Examples
# Adapt between different state schemas adapter = composer.create_adapter(
source_schema=OldState, target_schema=NewState, field_mappings=[
FieldMapping(“old_field”, “new_field”), FieldMapping(“data”, “processed_data”, transform=[“validate”])
]
)
- create_extract_function(mappings, fallback_extract=None)[source]¶
Create extract function from field mappings.
- Parameters:
mappings (list[FieldMapping]) – List of field mappings for extraction
fallback_extract (str | None) – Name of fallback extract function if mappings fail
- Returns:
Extract function that handles all mappings
- Return type:
- create_update_function(mappings, merge_mode='replace')[source]¶
Create update function from field mappings.
- Parameters:
mappings (list[FieldMapping]) – List of field mappings for updates
merge_mode (str) – How to merge updates (“replace”, “merge”, “append”)
- Returns:
Update function that handles all mappings
- Return type:
- from_callable(func, input_mappings=None, output_mappings=None, name=None, **callable_kwargs)[source]¶
Create a composed node from a callable function.
- Parameters:
func (Callable) – Function to wrap as a node
input_mappings (list[FieldMapping] | None) – How to extract function parameters from state
output_mappings (list[FieldMapping] | None) – How to map function result to state updates
name (str | None) – Optional name for the node
**callable_kwargs – Additional arguments for CallableNodeConfig
- Returns:
ComposedCallableNode with custom I/O mappings
- Return type:
Examples
# Simple boolean check with field mapping check_node = composer.from_callable(
func=lambda msgs: len(msgs) > 5, input_mappings=[
FieldMapping(“messages”, “msgs”)
], output_mappings=[
FieldMapping(“result”, “should_continue”)
]
)
# Complex processing with transforms process_node = composer.from_callable(
func=process_documents, input_mappings=[
FieldMapping(“documents”, “docs”), FieldMapping(“config.batch_size”, “batch_size”, default=10)
], output_mappings=[
FieldMapping(“processed”, “result”, transform=[“validate”])
]
)
- register_extract_function(name, func)[source]¶
Register a custom extract function.
- Parameters:
name (str) – Name to register under
func (ExtractFunction) – Extract function that takes (state, config) -> value
- register_transform_function(name, func)[source]¶
Register a custom transform function.
- Parameters:
name (str) – Name to register under
func (TransformFunction) – Transform function that takes value -> transformed_value
- register_update_function(name, func)[source]¶
Register a custom update function.
- Parameters:
name (str) – Name to register under
func (UpdateFunction) – Update function that takes (result, state, config) -> dict
- class haive.core.graph.node.NodeType(*values)[source]¶
Types of nodes in the graph.
- AGENT = 'agent'¶
- CALLABLE = 'callable'¶
- COORDINATOR = 'coordinator'¶
- CUSTOM = 'custom'¶
- ENGINE = 'engine'¶
- MESSAGE_TRANSFORMER = 'message_transformer'¶
- OUTPUT_PARSER = 'output_parser'¶
- PARSER = 'parser'¶
- TOOL = 'tool'¶
- TRANSFORM = 'transform'¶
- VALIDATION = 'validation'¶
- class haive.core.graph.node.RetryPolicy(initial_interval=0.5, backoff_factor=2.0, max_interval=128.0, max_attempts=3, jitter=True, retry_on=<function default_retry_on>)[source]¶
Configuration for retrying nodes.
!!! version-added “Added in version 0.2.24.”
- Parameters:
- class haive.core.graph.node.Send(node, arg)[source]¶
A message or packet to send to a specific node in the graph.
The Send class is used within a StateGraph’s conditional edges to dynamically invoke a node with a custom state at the next step.
Importantly, the sent state can differ from the core graph’s state, allowing for flexible and dynamic workflow management.
One such example is a “map-reduce” workflow where your graph invokes the same node multiple times in parallel with different states, before aggregating the results back into the main graph’s state.
- arg¶
The state or message to send to the target node.
- Type:
Any
Examples
>>> from typing import Annotated >>> import operator >>> class OverallState(TypedDict): ... subjects: list[str] ... jokes: Annotated[list[str], operator.add] ... >>> from langgraph.types import Send >>> from langgraph.graph import END, START >>> def continue_to_jokes(state: OverallState): ... return [Send("generate_joke", {"subject": s}) for s in state['subjects']] ... >>> from langgraph.graph import StateGraph >>> builder = StateGraph(OverallState) >>> builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]}) >>> builder.add_conditional_edges(START, continue_to_jokes) >>> builder.add_edge("generate_joke", END) >>> graph = builder.compile() >>> >>> # Invoking with two subjects results in a generated joke for each >>> graph.invoke({"subjects": ["cats", "dogs"]}) {'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}
- class haive.core.graph.node.ToolNode(tools, *, name='tools', tags=None, handle_tool_errors=True, messages_key='messages')[source]¶
A node that runs the tools called in the last AIMessage.
It can be used either in StateGraph with a “messages” state key (or a custom key passed via ToolNode’s ‘messages_key’). If multiple tool calls are requested, they will be run in parallel. The output will be a list of ToolMessages, one for each tool call.
Tool calls can also be passed directly as a list of ToolCall dicts.
- Parameters:
tools (Sequence[BaseTool | Callable]) – A sequence of tools that can be invoked by the ToolNode.
name (str) – The name of the ToolNode in the graph. Defaults to “tools”.
tags (list[str] | None) – Optional tags to associate with the node. Defaults to None.
handle_tool_errors (bool | str | Callable[[...], str] | tuple[type[Exception], ...]) –
How to handle tool errors raised by tools inside the node. Defaults to True. Must be one of the following:
- True: all errors will be caught and
a ToolMessage with a default error message (TOOL_CALL_ERROR_TEMPLATE) will be returned.
- str: all errors will be caught and
a ToolMessage with the string value of ‘handle_tool_errors’ will be returned.
- tuple[type[Exception], …]: exceptions in the tuple will be caught and
a ToolMessage with a default error message (TOOL_CALL_ERROR_TEMPLATE) will be returned.
- Callable[…, str]: exceptions from the signature of the callable will be caught and
a ToolMessage with the string value of the result of the ‘handle_tool_errors’ callable will be returned.
False: none of the errors raised by the tools will be caught
messages_key (str) – The state key in the input that contains the list of messages. The same key will be used for the output from the ToolNode. Defaults to “messages”.
The ToolNode is roughly analogous to:
```python tools_by_name = {tool.name: tool for tool in tools} def tool_node(state: dict):
result = [] for tool_call in state[“messages”][-1].tool_calls:
tool = tools_by_name[tool_call[“name”]] observation = tool.invoke(tool_call[“args”]) result.append(ToolMessage(content=observation, tool_call_id=tool_call[“id”]))
return {“messages”: result}
Tool calls can also be passed directly to a ToolNode. This can be useful when using the Send API, e.g., in a conditional edge:
```python def example_conditional_edge(state: dict) -> List[Send]:
tool_calls = state[“messages”][-1].tool_calls # If tools rely on state or store variables (whose values are not generated # directly by a model), you can inject them into the tool calls. tool_calls = [
tool_node.inject_tool_args(call, state, store) for call in last_message.tool_calls
] return [Send(“tools”, [tool_call]) for tool_call in tool_calls]
Important
- The input state can be one of the following:
A dict with a messages key containing a list of messages.
A list of messages.
A list of tool calls.
- If operating on a message list, the last message must be an AIMessage with
tool_calls populated.
- __init__(tools, *, name='tools', tags=None, handle_tool_errors=True, messages_key='messages')[source]¶
- class haive.core.graph.node.ValidationNode(schemas, *, format_error=None, name='validation', tags=None)[source]¶
A node that validates all tools requests from the last AIMessage.
It can be used either in StateGraph with a “messages” key or in MessageGraph.
!!! note
This node does not actually run the tools, it only validates the tool calls, which is useful for extraction and other use cases where you need to generate structured output that conforms to a complex schema without losing the original messages and tool IDs (for use in multi-turn conversations).
- Parameters:
schemas (Sequence[BaseTool | Type[BaseModel] | Callable]) – A list of schemas to validate the tool calls with. These can be any of the following: - A pydantic BaseModel class - A BaseTool instance (the args_schema will be used) - A function (a schema will be created from the function signature)
format_error (Callable[[BaseException, ToolCall, Type[BaseModel]], str] | None) – A function that takes an exception, a ToolCall, and a schema and returns a formatted error string. By default, it returns the exception repr and a message to respond after fixing validation errors.
name (str | None) – The name of the node.
tags (list[str] | None) – A list of tags to add to the node.
- Returns:
A list of ToolMessages with the validated content or error messages.
- Return type:
(Union[Dict[str, List[ToolMessage]], Sequence[ToolMessage]])
Examples
Example usage for re-prompting the model to generate a valid response: >>> from typing import Literal, Annotated >>> from typing_extensions import TypedDict … >>> from langchain_anthropic import ChatAnthropic >>> from pydantic import BaseModel, field_validator … >>> from langgraph.graph import END, START, StateGraph >>> from langgraph.prebuilt import ValidationNode >>> from langgraph.graph.message import add_messages … … >>> class SelectNumber(BaseModel): … a: int … … @field_validator(“a”) … def a_must_be_meaningful(cls, v): … if v != 37: … raise ValueError(“Only 37 is allowed”) … return v … … >>> builder = StateGraph(Annotated[list, add_messages]) >>> llm = ChatAnthropic(model=”claude-3-5-haiku-latest”).bind_tools([SelectNumber]) >>> builder.add_node(“model”, llm) >>> builder.add_node(“validation”, ValidationNode([SelectNumber])) >>> builder.add_edge(START, “model”) … … >>> def should_validate(state: list) -> Literal[“validation”, “__end__”]: … if state[-1].tool_calls: … return “validation” … return END … … >>> builder.add_conditional_edges(“model”, should_validate) … … >>> def should_reprompt(state: list) -> Literal[“model”, “__end__”]: … for msg in state[::-1]: … # None of the tool calls were errors … if msg.type == “ai”: … return END … if msg.additional_kwargs.get(“is_error”): … return “model” … return END … … >>> builder.add_conditional_edges(“validation”, should_reprompt) … … >>> graph = builder.compile() >>> res = graph.invoke((“user”, “Select a number, any number”)) >>> # Show the retry logic >>> for msg in res: … msg.pretty_print() ================================ Human Message ================================= Select a number, any number ================================== Ai Message ================================== [{‘id’: ‘toolu_01JSjT9Pq8hGmTgmMPc6KnvM’, ‘input’: {‘a’: 42}, ‘name’: ‘SelectNumber’, ‘type’: ‘tool_use’}] Tool Calls: SelectNumber (toolu_01JSjT9Pq8hGmTgmMPc6KnvM) Call ID: toolu_01JSjT9Pq8hGmTgmMPc6KnvM Args:
a: 42
================================= Tool Message ================================= Name: SelectNumber ValidationError(model=’SelectNumber’, errors=[{‘loc’: (‘a’,), ‘msg’: ‘Only 37 is allowed’, ‘type’: ‘value_error’}]) Respond after fixing all validation errors. ================================== Ai Message ================================== [{‘id’: ‘toolu_01PkxSVxNxc5wqwCPW1FiSmV’, ‘input’: {‘a’: 37}, ‘name’: ‘SelectNumber’, ‘type’: ‘tool_use’}] Tool Calls: SelectNumber (toolu_01PkxSVxNxc5wqwCPW1FiSmV) Call ID: toolu_01PkxSVxNxc5wqwCPW1FiSmV Args:
a: 37
================================= Tool Message ================================= Name: SelectNumber {“a”: 37}
- class haive.core.graph.node.ValidationNodeConfig(*, tool_routes=<factory>, tool_metadata=<factory>, tools_dict=<factory>, routed_tools=<factory>, before_tool_validator=None, tools=<factory>, tool_instances=<factory>, id=<factory>, name='validation', node_type=NodeType.VALIDATION, input_schema=None, output_schema=None, input_field_defs=<factory>, output_field_defs=<factory>, auto_add_engine_attribution=True, engine_name=None, command_goto='__end__', config_overrides=<factory>, config_schema=None, metadata=<factory>, messages_key='messages', schemas=<factory>, format_error=None, agent_node='agent_node', tool_node='tool_node', parser_node='parse_output', retriever_node='retriever', available_nodes=<factory>, custom_route_mappings=<factory>, direct_node_routes=<factory>)[source]¶
Configuration for a validation node that routes tool calls to appropriate nodes.
- Parameters:
id (str)
name (str)
node_type (NodeType)
input_schema (type[BaseModel] | None)
output_schema (type[BaseModel] | None)
input_field_defs (list[FieldDefinition])
output_field_defs (list[FieldDefinition])
auto_add_engine_attribution (bool)
engine_name (str | None)
command_goto (str | Literal['END'] | ~langgraph.types.Send | list[~langgraph.types.Send | str] | None)
config_schema (type[BaseModel] | None)
messages_key (str)
format_error (Callable | None)
agent_node (str)
tool_node (str)
parser_node (str)
retriever_node (str)
- __call__(state, config=None)[source]¶
Validate and route tool calls to appropriate nodes.
Returns ONLY routing decisions - no state updates!
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- haive.core.graph.node.branch_node(condition, routes, name=None, input_mapping=None)[source]¶
Create a branch node.
This decorator creates a node that evaluates a condition on the state and routes to different nodes based on the result.
- Parameters:
condition (Callable) – Function that evaluates the state and returns a key for routing
routes (dict[Any, str]) – Mapping from condition outputs to node names
name (str | None) – Optional name for the node
input_mapping (dict[str, str] | None) – Mapping from state keys to condition function input keys
- haive.core.graph.node.create_branch_node(condition, routes, name=None, input_mapping=None)[source]¶
Create a branch node.
This creates a node that evaluates a condition on the state and routes to different nodes based on the result.
- Parameters:
condition (Callable) – Function that evaluates the state and returns a key for routing
routes (dict[Any, str]) – Mapping from condition outputs to node names
name (str | None) – Optional name for the node
input_mapping (dict[str, str] | None) – Mapping from state keys to condition function input keys
- Returns:
Branch node function
- Return type:
- haive.core.graph.node.create_engine_node(engine, name=None, command_goto=None, input_mapping=None, output_mapping=None, retry_policy=None)[source]¶
Create a node function specifically from an engine.
This is a specialized version of create_node for engines.
- Parameters:
engine (Any) – Engine to use for the node
name (str | None) – Optional name for the node
command_goto (str | Literal['END'] | ~langgraph.types.Send | list[~langgraph.types.Send | str] | None) – Optional next node to go to
input_mapping (dict[str, str] | None) – Optional mapping from state keys to engine input keys
output_mapping (dict[str, str] | None) – Optional mapping from engine output keys to state keys
retry_policy (RetryPolicy | None) – Optional retry policy for the node
- Returns:
Node function that can be added to a graph
- Return type:
- haive.core.graph.node.create_node(engine_or_callable, name=None, command_goto=None, input_mapping=None, output_mapping=None, retry_policy=None, **kwargs)[source]¶
Create a node function from an engine or callable.
This is the main function for creating nodes in the Haive framework. It handles various input types and creates the appropriate node function.
- Parameters:
engine_or_callable (Any) – Engine or callable to use for the node
name (str | None) – Optional name for the node
command_goto (str | Literal['END'] | ~langgraph.types.Send | list[~langgraph.types.Send | str] | None) – Optional next node to go to
input_mapping (dict[str, str] | None) – Optional mapping from state keys to engine input keys
output_mapping (dict[str, str] | None) – Optional mapping from engine output keys to state keys
retry_policy (RetryPolicy | None) – Optional retry policy for the node
**kwargs – Additional options for the node configuration
- Returns:
Node function that can be added to a graph
- Return type:
Examples
Create a node from an engine:
retriever_node = create_node( retriever_engine, name="retrieve", command_goto="generate" ) # Add to graph builder.add_node("retrieve", retriever_node)
- haive.core.graph.node.create_send_node(send_targets, send_field=None, name=None, input_mapping=None, distribution='round_robin', payload_extractor=None, send_all_to_each=False, **kwargs)[source]¶
Create a send node for fan-out operations.
- Parameters:
send_targets (list[str]) – List of target node names to send items to
send_field (str | None) – Key in the state containing items to distribute (not required if using payload_extractor)
name (str | None) – Optional name for the node
input_mapping (dict[str, str] | None) – Mapping from state keys to node input keys
distribution (str) – Strategy for distributing items (“round_robin”, “all”, “balanced”)
payload_extractor (Callable | None) – Optional function to extract payloads from state (overrides send_field)
send_all_to_each (bool) – If True, sends all items to each target (instead of distributing)
**kwargs – Additional options for the node configuration
- Returns:
Send node function
- Return type:
- haive.core.graph.node.create_tool_node(tools, name=None, command_goto=None, messages_key='messages', handle_tool_errors=True)[source]¶
Create a tool node.
This creates a node that uses LangGraph’s ToolNode to handle tool calls.
- Parameters:
name (str | None) – Optional name for the node
command_goto (str | Literal['END'] | ~langgraph.types.Send | list[~langgraph.types.Send | str] | None) – Optional next node to go to
messages_key (str) – Name of the messages key in the state
handle_tool_errors (bool | str | Callable[[...], str]) – How to handle tool errors
- Returns:
Tool node function
- Return type:
- haive.core.graph.node.create_validation_node(schemas, name=None, command_goto=None, messages_key='messages')[source]¶
Create a validation node.
This creates a node that uses LangGraph’s ValidationNode to validate inputs against a schema.
- Parameters:
schemas (list[type[BaseModel] | Callable]) – List of validation schemas
name (str | None) – Optional name for the node
command_goto (str | Literal['END'] | ~langgraph.types.Send | list[~langgraph.types.Send | str] | None) – Optional next node to go to
messages_key (str) – Name of the messages key in the state
- Returns:
Validation node function
- Return type:
- haive.core.graph.node.extract_io_mapping_from_schema(schema, engine_id)[source]¶
Extract input and output mappings from a schema for a specific engine.
- haive.core.graph.node.register_custom_node_type(name, config_class)[source]¶
Register a custom node type.
- Parameters:
name (str)
config_class (type[NodeConfig])
- Return type:
None
- haive.core.graph.node.register_node(name=None, node_type=None, command_goto=None, input_mapping=None, output_mapping=None, retry_policy=None, **kwargs)[source]¶
Decorator to register a function as a node.
This decorator wraps a function as a node function, with proper configuration for node type, command routing, input/output mapping, and retry policy.
- Parameters:
name (str | None) – Optional name for the node (defaults to function name)
node_type (NodeType | None) – Type of node to create
command_goto (str | Literal['END'] | ~langgraph.types.Send | list[~langgraph.types.Send | str] | None) – Next node to go to after this node
input_mapping (dict[str, str] | None) – Mapping from state keys to function input keys
output_mapping (dict[str, str] | None) – Mapping from function output keys to state keys
retry_policy (RetryPolicy | None) – Retry policy for the node
**kwargs – Additional options for the node configuration
- Returns:
Decorated function as a node function
- haive.core.graph.node.send_node(send_targets, send_field, name=None, input_mapping=None)[source]¶
Create a send node.
This decorator creates a node that generates Send objects to route to different nodes with different states. It’s useful for fan-out operations.
- haive.core.graph.node.tool_node(tools, name=None, command_goto=None, messages_field='messages', handle_tool_errors=True)[source]¶
Create a tool node.
This decorator creates a node that handles tool calls using LangGraph’s ToolNode. It’s a specialized version of register_node.
- Parameters:
tools (list) – List of tools for the node
name (str | None) – Optional name for the node
command_goto (str | Literal['END'] | ~langgraph.types.Send | list[~langgraph.types.Send | str] | None) – Next node to go to after this node
messages_field (str) – Name of the messages key in the state
handle_tool_errors (bool | str | Callable[[...], str]) – How to handle tool errors
- haive.core.graph.node.validation_node(schemas, name=None, command_goto=None, messages_field='messages')[source]¶
Create a validation node.
This decorator creates a node that validates inputs against a schema using LangGraph’s ValidationNode. It’s a specialized version of register_node.
- Parameters:
Intelligent Node Processing
The node system provides sophisticated primitives for building reusable workflow components.
Advanced Node Patterns:
from haive.core.graph.node import (
Node, AgentNode, ToolNode,
ConditionalNode, ParallelNode
)
# Agent node with full capabilities
class ResearchAgentNode(AgentNode):
"""Sophisticated research agent node."""
agent_config = AugLLMConfig(
model="gpt-4",
tools=[web_search, arxiv_search, semantic_scholar],
structured_output_model=ResearchFindings
)
async def process(self, state: WorkflowState) -> WorkflowState:
"""Execute research with the agent."""
# Pre-processing
query = self.preprocess_query(state.query)
# Agent execution
findings = await self.agent.ainvoke({
"query": query,
"context": state.research_data
})
# Post-processing and state update
state.research_data.append(findings.dict())
state.confidence = findings.confidence_score
# Emit metrics
await self.emit_metrics({
"sources_found": len(findings.sources),
"confidence": findings.confidence_score,
"execution_time": self.execution_time
})
return state
# Tool node with validation
class DataValidationNode(ToolNode):
"""Validate and clean research data."""
tools = [
validate_sources,
check_citations,
verify_facts
]
async def execute_tools(self, state: WorkflowState) -> Dict[str, Any]:
"""Run validation tools in sequence."""
validation_results = {}
for tool in self.tools:
try:
result = await tool.ainvoke(state.research_data)
validation_results[tool.name] = result
except Exception as e:
self.logger.error(f"Tool {tool.name} failed: {e}")
validation_results[tool.name] = {"error": str(e)}
return validation_results
# Conditional node with complex logic
class RoutingNode(ConditionalNode):
"""Intelligent routing based on multiple factors."""
def evaluate_conditions(self, state: WorkflowState) -> str:
"""Determine next node based on state analysis."""
# Multi-factor decision making
factors = {
"data_quality": self.assess_data_quality(state),
"confidence": state.confidence,
"completeness": self.check_completeness(state),
"time_remaining": self.get_time_remaining()
}
# Use decision matrix
decision = self.decision_engine.evaluate(factors)
# Log decision reasoning
self.logger.info(f"Routing decision: {decision.route}")
self.logger.debug(f"Factors: {factors}")
self.logger.debug(f"Reasoning: {decision.explanation}")
return decision.route
State Management in Graphs¶
Sophisticated State Handling
State Persistence and Recovery:
from haive.core.graph.checkpointer import (
PostgresCheckpointer,
checkpoint_config
)
# Configure checkpointing
checkpointer = PostgresCheckpointer(
connection_string=os.getenv("DATABASE_URL"),
schema="workflow_states",
auto_checkpoint=True,
checkpoint_frequency=5 # Every 5 nodes
)
# Compile with checkpointing
app = graph.compile(
checkpointer=checkpointer,
interrupt_before=["human_review"], # Pause for human input
interrupt_after=["critical_decision"] # Pause after critical nodes
)
# Execute with thread management
thread_id = "research_session_123"
config = {"configurable": {"thread_id": thread_id}}
# Run workflow (will checkpoint automatically)
result = await app.ainvoke(initial_state, config=config)
# Later: Resume from checkpoint
resumed_result = await app.ainvoke(None, config=config)
# Time-travel debugging
history = await checkpointer.get_history(thread_id)
for checkpoint in history:
print(f"Node: {checkpoint.node}")
print(f"State: {checkpoint.state}")
print(f"Timestamp: {checkpoint.timestamp}")
# Restore to specific checkpoint
await checkpointer.restore(thread_id, checkpoint_id="xyz123")
Graph Patterns Library¶
📐 Graph Patterns - Intelligent Workflow Templates Engine
THE BLUEPRINT LIBRARY FOR AI WORKFLOW MASTERY
Welcome to Graph Patterns - the revolutionary collection of battle-tested, intelligent workflow templates that transform complex AI orchestration challenges into simple, reusable patterns. This isn’t just a template library; it’s a sophisticated pattern recognition and generation system that learns from successful workflows and creates optimized blueprints for every AI use case.
⚡ REVOLUTIONARY PATTERN INTELLIGENCE¶
Graph Patterns represents a paradigm shift from manual workflow construction to intelligent pattern-based composition that automatically selects and adapts proven architectures:
🧠 Intelligent Pattern Recognition: AI-powered analysis of workflow requirements 🔄 Self-Adapting Templates: Patterns that evolve based on usage and performance ⚡ Performance Optimization: Automatically optimized for speed, accuracy, and cost 📊 Success Metrics Integration: Patterns proven through real-world deployments 🎯 Domain-Specific Intelligence: Specialized patterns for every AI domain
🌟 CORE PATTERN CATEGORIES¶
Multi-Agent Orchestration Patterns: - Sequential agent coordination - Parallel consensus decision making - Hierarchical team structures - Specialist coordination workflows
RAG (Retrieval-Augmented Generation) Patterns: - Simple RAG with optimization - Hybrid RAG with multiple sources - Collective RAG with multiple agents - Adaptive RAG with learning capabilities
Planning & Reasoning Patterns: - Hierarchical task planning - Reactive reasoning for dynamic environments - Strategic long-term planning - Continuous reflection patterns
Validation & Quality Assurance Patterns: - Multi-layer validation systems - Consensus-based validation - Continuous quality assurance - Adaptive quality patterns
For complete examples and advanced patterns, see the documentation.
Pre-built Workflow Patterns
The patterns library provides battle-tested workflow templates for common AI tasks.
Using Workflow Patterns:
from haive.core.graph.patterns import (
MapReducePattern,
ScatterGatherPattern,
PipelinePattern,
CircuitBreakerPattern
)
# Map-Reduce for parallel processing
map_reduce = MapReducePattern(
map_function=process_chunk,
reduce_function=combine_results,
chunk_size=100,
max_workers=10
)
graph.add_pattern(
"parallel_analysis",
map_reduce,
input_field="research_data",
output_field="analysis"
)
# Scatter-Gather for multi-source aggregation
scatter_gather = ScatterGatherPattern(
scatter_targets=[
("google_search", search_google),
("bing_search", search_bing),
("duckduckgo", search_ddg)
],
gather_strategy="merge_unique",
timeout_per_target=10
)
graph.add_pattern(
"multi_search",
scatter_gather,
input_field="query"
)
# Circuit breaker for unreliable services
circuit_breaker = CircuitBreakerPattern(
protected_node="external_api",
failure_threshold=3,
recovery_timeout=60,
fallback_node="cached_data"
)
graph.wrap_node_with_pattern("external_api", circuit_breaker)
Visual Workflow Builder¶
Interactive Graph Construction
Visual Builder Integration:
from haive.core.graph.builder import GraphBuilder, visualize
# Create builder with visual interface
builder = GraphBuilder(
name="visual_workflow",
enable_ui=True,
port=8080
)
# Define node library
builder.register_node_types({
"agents": [ResearchAgent, AnalysisAgent, WriterAgent],
"tools": [WebSearch, Calculator, DatabaseQuery],
"logic": [Conditional, Loop, Parallel]
})
# Build interactively (opens browser UI)
graph = builder.build_interactive()
# Or build programmatically with visual feedback
with builder.visual_context():
builder.add_node("start", StartNode())
builder.add_node("research", ResearchAgent())
builder.add_edge("start", "research")
# See real-time graph visualization
builder.show()
# Export graph
builder.export_to_file("workflow.json")
builder.export_as_image("workflow.png")
# Generate code from visual graph
code = builder.generate_code(language="python")
Real-World Examples 🌟¶
Multi-Agent Orchestration¶
Tool-Augmented Reasoning¶
graph TD Input[User Query] --> Agent[ReAct Agent] Agent --> Think{Think} Think -->|Need Info| Tools[Tool Execution] Tools --> Observe[Observe Results] Observe --> Think Think -->|Have Answer| Final[Final Response] Tools -.-> T1[Calculator] Tools -.-> T2[Web Search] Tools -.-> T3[Database] style Agent fill:#f96,stroke:#333,stroke-width:2px style Tools fill:#69f,stroke:#333,stroke-width:2px
Dynamic Workflow Adaptation¶
digraph dynamic_adaptation { rankdir=LR; node [shape=box, style="rounded,filled"]; monitor [label="Workflow\nMonitor", fillcolor=lightblue]; analyzer [label="Performance\nAnalyzer", fillcolor=lightgreen]; optimizer [label="Graph\nOptimizer", fillcolor=lightyellow]; subgraph cluster_versions { label="Graph Versions"; style=filled; fillcolor=lavender; v1 [label="Version 1\n(Original)", fillcolor=white]; v2 [label="Version 2\n(Optimized)", fillcolor=white]; v3 [label="Version 3\n(Adapted)", fillcolor=white]; } monitor -> analyzer [label="metrics"]; analyzer -> optimizer [label="insights"]; optimizer -> v2 [label="optimize"]; v1 -> v2 [label="evolve", style=dashed]; v2 -> v3 [label="adapt", style=dashed]; }Advanced Orchestration¶
Multi-Agent Coordination¶
Sophisticated Agent Orchestration
Dynamic Graph Modification¶
Runtime Graph Evolution:
from haive.core.graph.dynamic import DynamicGraph, GraphMutator
# Create graph that can modify itself
dynamic_graph = DynamicGraph(
base_graph=initial_graph,
allow_runtime_modification=True
)
# Define mutation rules
mutator = GraphMutator()
@mutator.rule("add_verification")
def add_verification_node(graph, state):
"""Add verification when confidence is low."""
if state.confidence < 0.6:
graph.add_node(
"extra_verification",
VerificationNode(),
after="analysis",
before="synthesis"
)
return True
return False
@mutator.rule("parallelize_research")
def parallelize_if_slow(graph, state, metrics):
"""Parallelize research if taking too long."""
if metrics.elapsed_time > 30 and not graph.has_parallel_nodes("research"):
graph.parallelize_node(
"research",
split_function=split_research_tasks,
parallelism=3
)
return True
return False
# Apply mutations during execution
dynamic_graph.set_mutator(mutator)
# Graph evolves based on runtime conditions
result = await dynamic_graph.execute(initial_state)
Performance Optimization¶
Graph Execution Metrics¶
High-Performance Workflow Execution
Node Latency: < 10ms overhead per node
Parallelism: Up to 1000 concurrent nodes
Checkpointing: < 50ms state persistence
Memory: O(1) state access with COW
Throughput: 10,000+ workflows/minute
Performance Optimization:
from haive.core.graph.optimization import (
GraphOptimizer,
ExecutionProfiler
)
# Optimize graph structure
optimizer = GraphOptimizer()
optimized_graph = optimizer.optimize(
graph,
strategies=[
"merge_sequential_nodes", # Combine simple sequences
"parallelize_independent", # Auto-detect parallelism
"cache_deterministic", # Cache pure functions
"eliminate_dead_paths", # Remove unreachable nodes
"minimize_state_transfer" # Reduce state copying
]
)
# Profile execution
profiler = ExecutionProfiler()
with profiler.profile():
result = await optimized_graph.execute(state)
# Analyze performance
report = profiler.generate_report()
print(f"Total time: {report.total_time}ms")
print(f"Slowest node: {report.slowest_node}")
print(f"Parallelism achieved: {report.parallelism_factor}x")
# Visualize bottlenecks
profiler.visualize_bottlenecks("performance.html")
Distributed Execution¶
Scale Across Multiple Machines:
from haive.core.graph.distributed import DistributedGraph, WorkerPool
# Create distributed graph
distributed_graph = DistributedGraph(
graph=workflow_graph,
coordinator_url="redis://coordinator:6379",
worker_pool=WorkerPool(
workers=[
"worker-1.compute.internal",
"worker-2.compute.internal",
"worker-3.compute.internal"
],
load_balancing="least_loaded"
)
)
# Configure node placement
distributed_graph.place_node("heavy_computation", "worker-1")
distributed_graph.place_nodes_by_type(AgentNode, "any")
distributed_graph.place_nodes_by_resources({
"gpu_required": "gpu-workers",
"memory_intensive": "high-memory-workers"
})
# Execute with distributed coordination
result = await distributed_graph.execute(
state,
partition_strategy="hash", # How to split state
replication_factor=2 # Fault tolerance
)
Integration Examples 🔌¶
LangChain Integration¶
Custom Node Development¶
Best Practices 📚¶
Graph Design Principles¶
Single Responsibility: Each node should do one thing well
Explicit State: All state changes should be explicit and traceable
Error Boundaries: Use validation nodes to catch and handle errors
Observability: Add logging and monitoring at key points
Reusability: Design nodes to be reusable across graphs
Testing Strategies¶
API Reference 📖¶
Core Classes¶
Node Types¶
- class haive.core.graph.node.ToolNode(tools, *, name='tools', tags=None, handle_tool_errors=True, messages_key='messages')[source]¶
Bases:
RunnableCallable
A node that runs the tools called in the last AIMessage.
It can be used either in StateGraph with a “messages” state key (or a custom key passed via ToolNode’s ‘messages_key’). If multiple tool calls are requested, they will be run in parallel. The output will be a list of ToolMessages, one for each tool call.
Tool calls can also be passed directly as a list of ToolCall dicts.
- Parameters:
tools (Sequence[BaseTool | Callable]) – A sequence of tools that can be invoked by the ToolNode.
name (str) – The name of the ToolNode in the graph. Defaults to “tools”.
tags (list[str] | None) – Optional tags to associate with the node. Defaults to None.
handle_tool_errors (bool | str | Callable[[...], str] | tuple[type[Exception], ...]) –
How to handle tool errors raised by tools inside the node. Defaults to True. Must be one of the following:
- True: all errors will be caught and
a ToolMessage with a default error message (TOOL_CALL_ERROR_TEMPLATE) will be returned.
- str: all errors will be caught and
a ToolMessage with the string value of ‘handle_tool_errors’ will be returned.
- tuple[type[Exception], …]: exceptions in the tuple will be caught and
a ToolMessage with a default error message (TOOL_CALL_ERROR_TEMPLATE) will be returned.
- Callable[…, str]: exceptions from the signature of the callable will be caught and
a ToolMessage with the string value of the result of the ‘handle_tool_errors’ callable will be returned.
False: none of the errors raised by the tools will be caught
messages_key (str) – The state key in the input that contains the list of messages. The same key will be used for the output from the ToolNode. Defaults to “messages”.
The ToolNode is roughly analogous to:
```python tools_by_name = {tool.name: tool for tool in tools} def tool_node(state: dict):
result = [] for tool_call in state[“messages”][-1].tool_calls:
tool = tools_by_name[tool_call[“name”]] observation = tool.invoke(tool_call[“args”]) result.append(ToolMessage(content=observation, tool_call_id=tool_call[“id”]))
return {“messages”: result}
Tool calls can also be passed directly to a ToolNode. This can be useful when using the Send API, e.g., in a conditional edge:
```python def example_conditional_edge(state: dict) -> List[Send]:
tool_calls = state[“messages”][-1].tool_calls # If tools rely on state or store variables (whose values are not generated # directly by a model), you can inject them into the tool calls. tool_calls = [
tool_node.inject_tool_args(call, state, store) for call in last_message.tool_calls
] return [Send(“tools”, [tool_call]) for tool_call in tool_calls]
Important
- The input state can be one of the following:
A dict with a messages key containing a list of messages.
A list of messages.
A list of tool calls.
- If operating on a message list, the last message must be an AIMessage with
tool_calls populated.
- __init__(tools, *, name='tools', tags=None, handle_tool_errors=True, messages_key='messages')[source]¶
Patterns & Utilities¶
📐 Graph Patterns - Intelligent Workflow Templates Engine
THE BLUEPRINT LIBRARY FOR AI WORKFLOW MASTERY
Welcome to Graph Patterns - the revolutionary collection of battle-tested, intelligent workflow templates that transform complex AI orchestration challenges into simple, reusable patterns. This isn’t just a template library; it’s a sophisticated pattern recognition and generation system that learns from successful workflows and creates optimized blueprints for every AI use case.
⚡ REVOLUTIONARY PATTERN INTELLIGENCE¶
Graph Patterns represents a paradigm shift from manual workflow construction to intelligent pattern-based composition that automatically selects and adapts proven architectures:
🧠 Intelligent Pattern Recognition: AI-powered analysis of workflow requirements 🔄 Self-Adapting Templates: Patterns that evolve based on usage and performance ⚡ Performance Optimization: Automatically optimized for speed, accuracy, and cost 📊 Success Metrics Integration: Patterns proven through real-world deployments 🎯 Domain-Specific Intelligence: Specialized patterns for every AI domain
🌟 CORE PATTERN CATEGORIES¶
Multi-Agent Orchestration Patterns: - Sequential agent coordination - Parallel consensus decision making - Hierarchical team structures - Specialist coordination workflows
RAG (Retrieval-Augmented Generation) Patterns: - Simple RAG with optimization - Hybrid RAG with multiple sources - Collective RAG with multiple agents - Adaptive RAG with learning capabilities
Planning & Reasoning Patterns: - Hierarchical task planning - Reactive reasoning for dynamic environments - Strategic long-term planning - Continuous reflection patterns
Validation & Quality Assurance Patterns: - Multi-layer validation systems - Consensus-based validation - Continuous quality assurance - Adaptive quality patterns
For complete examples and advanced patterns, see the documentation.
Enterprise Features¶
Production-Ready Workflow Management
Workflow Versioning: Track and deploy workflow versions
Access Control: Node-level permissions and audit trails
Monitoring: Real-time metrics and alerting
Fault Tolerance: Automatic failover and recovery
Compliance: Workflow governance and approval chains
See Also 👀¶
Engine Architecture - The engine system that powers nodes
Schema System - State management for graphs
../../haive-agents/agent_development - Building agents for graphs
../../tutorials/graph_workflows - Step-by-step graph tutorials