haive.core.graph.nodeยถ
๐ง Node System - Intelligent Graph Components Engine
THE NEURAL NETWORK OF AI WORKFLOWS
Welcome to the Node System - the revolutionary foundation that transforms individual AI components into intelligent, interconnected processing units. This isnโt just another workflow node library; itโs a comprehensive neural architecture where every node is a specialized neuron that learns, adapts, and collaborates to create emergent intelligence.
โก REVOLUTIONARY NODE INTELLIGENCEยถ
The Node System represents a paradigm shift from static processing units to living, adaptive components that evolve with your AI workflows:
๐ง Intelligent Processing: Nodes that learn from execution patterns and optimize performance ๐ Dynamic Adaptation: Real-time reconfiguration based on data flow requirements ๐ค Collaborative Intelligence: Nodes that communicate and coordinate seamlessly ๐ Self-Monitoring: Built-in performance analytics and bottleneck detection ๐ฏ Type-Safe Execution: Guaranteed type safety with intelligent field mapping
๐ CORE NODE CATEGORIESยถ
- 1. Engine Nodes - The Powerhouses ๐
High-performance execution units for AI engines:
Examples
>>> from haive.core.graph.node import EngineNodeConfig
>>> from haive.core.engine.aug_llm import AugLLMConfig
>>>
>>> # Create intelligent LLM processing node
>>> llm_engine = AugLLMConfig(
>>> model="gpt-4",
>>> tools=[calculator, web_search],
>>> structured_output_model=AnalysisResult
>>> )
>>>
>>> analysis_node = EngineNodeConfig(
>>> name="intelligent_analyzer",
>>> engine=llm_engine,
>>> input_mapping={
>>> "user_query": "messages",
>>> "context": "analysis_context"
>>> },
>>> output_mapping={
>>> "structured_analysis": "analysis_result",
>>> "tool_calls": "tool_execution_log"
>>> },
>>> performance_tracking=True,
>>> adaptive_routing=True
>>> )
>>>
>>> # Node automatically optimizes based on execution patterns
>>> builder.add_node("analyze", analysis_node)
- 2. Agent Nodes - The Coordinators ๐ค
Sophisticated multi-agent orchestration and coordination:
>>> from haive.core.graph.node import AgentNodeV3 >>> from haive.agents.multi import EnhancedMultiAgentV4 >>> >>> # Create collaborative agent coordination node >>> research_team = EnhancedMultiAgentV4([ >>> ResearchAgent(name="researcher"), >>> AnalysisAgent(name="analyst"), >>> SynthesisAgent(name="synthesizer") >>> ], mode="sequential") >>> >>> team_node = AgentNodeV3( >>> name="research_coordination", >>> agent=research_team, >>> shared_fields=["knowledge_base", "research_context"], >>> private_fields=["internal_state", "agent_memory"], >>> coordination_strategy="consensus", >>> conflict_resolution="semantic_merge", >>> state_projection_enabled=True >>> ) >>> >>> # Intelligent state management across agents >>> builder.add_node("coordinate_research", team_node)
- 3. Validation & Routing Nodes - The Decision Makers ๐งญ
Intelligent workflow control with adaptive routing:
>>> from haive.core.graph.node import UnifiedValidationNode, RoutingValidationNode >>> >>> # Create intelligent validation with routing >>> smart_validator = UnifiedValidationNode( >>> name="intelligent_gatekeeper", >>> validation_schemas=[InputSchema, QualitySchema], >>> routing_conditions={ >>> "high_confidence": lambda state: state.confidence > 0.8, >>> "needs_review": lambda state: state.quality_score < 0.6, >>> "ready_for_output": lambda state: state.is_complete >>> }, >>> adaptive_thresholds=True, >>> learning_enabled=True, >>> fallback_strategy="human_review" >>> ) >>> >>> # Routes become smarter over time >>> builder.add_conditional_edges( >>> "validate", >>> smart_validator.route_based_on_validation, >>> { >>> "high_confidence": "finalize", >>> "needs_review": "manual_review", >>> "ready_for_output": "output" >>> } >>> )
- 4. Field Mapping & Composition Nodes - The Transformers ๐
Advanced data transformation and schema adaptation:
>>> from haive.core.graph.node.composer import NodeSchemaComposer, FieldMapping >>> >>> # Create intelligent field mapping >>> smart_mapper = FieldMapping( >>> input_transformations={ >>> "user_input": "standardized_query", >>> "context_data": "enriched_context", >>> "metadata": "processing_metadata" >>> }, >>> output_transformations={ >>> "llm_response": "structured_output", >>> "tool_results": "verified_tool_data", >>> "confidence_scores": "quality_metrics" >>> }, >>> type_coercion_enabled=True, >>> validation_on_transform=True, >>> semantic_mapping=True # AI-powered field mapping >>> ) >>> >>> # Dynamic schema composition >>> composer = NodeSchemaComposer( >>> base_schema=WorkflowState, >>> dynamic_adaptation=True, >>> optimization_enabled=True >>> ) >>> >>> # Learns optimal field mappings over time >>> optimized_schema = composer.compose_for_workflow(workflow_nodes)
๐ฏ ADVANCED NODE FEATURESยถ
Self-Optimizing Execution ๐ฎ
>>> from haive.core.graph.node import create_adaptive_node
>>>
>>> # Node that learns and optimizes itself
>>> adaptive_node = create_adaptive_node(
>>> base_engine=llm_engine,
>>> learning_mode="online",
>>> optimization_strategy="genetic_algorithm",
>>> performance_targets={
>>> "response_time": "<2s",
>>> "accuracy": ">95%",
>>> "cost_efficiency": "minimize"
>>> }
>>> )
>>>
>>> # Automatically adjusts parameters for optimal performance
>>> @adaptive_node.optimization_callback
>>> def performance_optimization(metrics):
>>> if metrics.response_time > 2.0:
>>> adaptive_node.reduce_complexity()
>>> if metrics.accuracy < 0.95:
>>> adaptive_node.increase_validation()
Collaborative Node Networks ๐
>>> # Create networks of cooperating nodes
>>> node_network = NodeNetwork([
>>> SpecialistNode("domain_expert"),
>>> GeneralistNode("coordinator"),
>>> ValidatorNode("quality_assurance"),
>>> OptimizerNode("performance_monitor")
>>> ])
>>>
>>> # Nodes share knowledge and coordinate decisions
>>> network.enable_knowledge_sharing()
>>> network.configure_consensus_protocols()
>>> network.add_collective_learning()
Real-time Node Analytics ๐
>>> # Comprehensive node monitoring
>>> node_monitor = NodeAnalytics(
>>> metrics=["execution_time", "memory_usage", "accuracy", "throughput"],
>>> alerting_enabled=True,
>>> optimization_suggestions=True,
>>> predictive_analytics=True
>>> )
>>>
>>> # Automatic performance optimization
>>> @node_monitor.on_performance_degradation
>>> def auto_optimize(node, metrics):
>>> if metrics.memory_usage > 0.8:
>>> node.enable_memory_optimization()
>>> if metrics.execution_time > threshold:
>>> node.switch_to_fast_mode()
๐๏ธ NODE COMPOSITION PATTERNSยถ
Hierarchical Node Architecture ๐๏ธ
>>> # Build complex node hierarchies
>>> master_controller = MasterNode(
>>> name="workflow_orchestrator",
>>> subnodes={
>>> "preprocessing": PreprocessingCluster([
>>> TokenizerNode(), NormalizerNode(), ValidatorNode()
>>> ]),
>>> "processing": ProcessingCluster([
>>> LLMNode(), ToolNode(), AnalysisNode()
>>> ]),
>>> "postprocessing": PostprocessingCluster([
>>> FormatterNode(), ValidatorNode(), OutputNode()
>>> ])
>>> },
>>> coordination_strategy="hierarchical_control"
>>> )
Pipeline Node Patterns ๐
>>> # Create intelligent processing pipelines
>>> pipeline = NodePipeline([
>>> InputValidationNode(),
>>> ContextEnrichmentNode(),
>>> LLMProcessingNode(),
>>> OutputValidationNode(),
>>> ResultFormattingNode()
>>> ],
>>> error_handling="graceful_degradation",
>>> parallel_optimization=True,
>>> adaptive_routing=True
>>> )
>>>
>>> # Pipeline automatically optimizes execution order
>>> optimized_pipeline = pipeline.optimize_for_throughput()
Event-Driven Node Systems ๐ก
>>> # Reactive node networks
>>> event_system = EventDrivenNodeSystem()
>>>
>>> # Nodes react to events intelligently
>>> @event_system.on_event("data_quality_alert")
>>> def handle_quality_issue(event_data):
>>> quality_node.increase_validation_strictness()
>>> fallback_node.activate_backup_processing()
>>>
>>> @event_system.on_event("performance_threshold_exceeded")
>>> def optimize_performance(event_data):
>>> load_balancer.redistribute_workload()
>>> cache_node.increase_cache_size()
๐ ๏ธ NODE FACTORY SYSTEMยถ
Intelligent Node Creation ๐ญ
>>> from haive.core.graph.node import NodeFactory, create_adaptive_node
>>>
>>> # Smart factory that creates optimal nodes
>>> factory = NodeFactory(
>>> optimization_enabled=True,
>>> best_practices_enforcement=True,
>>> automatic_configuration=True
>>> )
>>>
>>> # Create nodes with intelligent defaults
>>> smart_node = factory.create_optimal_node(
>>> purpose="text_analysis",
>>> input_schema=TextInput,
>>> output_schema=AnalysisResult,
>>> performance_requirements={
>>> "max_latency": "1s",
>>> "min_accuracy": "95%",
>>> "cost_budget": "low"
>>> }
>>> )
>>>
>>> # Factory selects best engine and configuration
>>> optimized_config = factory.optimize_for_requirements(smart_node)
Template-Based Node Generation ๐
>>> # Predefined node templates for common patterns
>>> templates = {
>>> "research_pipeline": ResearchPipelineTemplate(),
>>> "validation_gateway": ValidationGatewayTemplate(),
>>> "multi_agent_coordinator": MultiAgentTemplate(),
>>> "performance_optimizer": OptimizationTemplate()
>>> }
>>>
>>> # Generate nodes from templates
>>> research_node = factory.from_template(
>>> "research_pipeline",
>>> customizations={
>>> "domain": "medical_research",
>>> "sources": ["pubmed", "arxiv", "clinical_trials"],
>>> "quality_threshold": 0.9
>>> }
>>> )
๐ PERFORMANCE & MONITORINGยถ
Real-Time Performance Metrics: - Execution Time: < 100ms overhead per node - Memory Efficiency: 90%+ memory utilization optimization - Throughput: 10,000+ node executions/second - Accuracy: 99%+ field mapping accuracy - Adaptability: Real-time parameter optimization
Advanced Monitoring Features:
>>> # Comprehensive node monitoring
>>> monitor = NodePerformanceMonitor(
>>> metrics_collection=["latency", "throughput", "accuracy", "resource_usage"],
>>> anomaly_detection=True,
>>> predictive_analytics=True,
>>> auto_optimization=True
>>> )
>>>
>>> # Performance dashboards
>>> dashboard = NodeDashboard(
>>> real_time_visualization=True,
>>> performance_heatmaps=True,
>>> optimization_suggestions=True,
>>> cost_analysis=True
>>> )
๐ BEST PRACTICESยถ
Design for Adaptability: Use adaptive nodes that learn and optimize
Implement Monitoring: Always include performance tracking
Use Type Safety: Leverage field mapping for guaranteed type safety
Plan for Scale: Design nodes for horizontal scaling
Test Thoroughly: Validate node behavior with comprehensive tests
Monitor Continuously: Track performance and optimize regularly
Document Patterns: Clear documentation for node interaction patterns
๐ GETTING STARTEDยถ
>>> from haive.core.graph.node import (
>>> EngineNodeConfig, AgentNodeV3, create_adaptive_node
>>> )
>>> from haive.core.engine.aug_llm import AugLLMConfig
>>>
>>> # 1. Create intelligent engine node
>>> engine = AugLLMConfig(model="gpt-4", tools=[calculator])
>>> processing_node = EngineNodeConfig(
>>> name="intelligent_processor",
>>> engine=engine,
>>> adaptive_optimization=True
>>> )
>>>
>>> # 2. Create collaborative agent node
>>> agent_node = AgentNodeV3(
>>> name="team_coordinator",
>>> agent=multi_agent_system,
>>> coordination_strategy="consensus"
>>> )
>>>
>>> # 3. Build adaptive workflow
>>> workflow = builder.add_node("process", processing_node)
>>> workflow.add_node("coordinate", agent_node)
>>> workflow.add_adaptive_edges(source="process", target="coordinate")
>>>
>>> # 4. Compile with intelligence
>>> app = workflow.compile(
>>> optimization="neural_network",
>>> learning_enabled=True,
>>> monitoring=True
>>> )
๐ง NODE INTELLIGENCE GALLERYยถ
Available Node Types: - EngineNodeConfig - High-performance AI engine execution - AgentNodeV3 - Multi-agent coordination and management - ValidationNodeConfig - Input/output validation and quality control - RoutingValidationNode - Intelligent routing with validation - UnifiedValidationNode - Advanced validation with learning - NodeSchemaComposer - Dynamic schema composition and optimization
Factory Functions: - create_node() - Universal node creation with intelligence - create_engine_node() - Optimized engine node generation - create_adaptive_node() - Self-optimizing node creation - create_validation_node() - Smart validation node generation - create_tool_node() - Intelligent tool execution nodes
Analytics & Monitoring: - NodeAnalytics - Comprehensive performance monitoring - NodeOptimizer - AI-powered optimization engine - NodeRegistry - Intelligent node type management - PerformanceProfiler - Deep performance analysis
โ
Node System: Where Individual Components Become Collective Intelligence ๐ง
Submodulesยถ
- haive.core.graph.node.agent_node
- haive.core.graph.node.agent_node_v3
- haive.core.graph.node.base_node_config
- haive.core.graph.node.callable_node
- haive.core.graph.node.composer
- haive.core.graph.node.decorators
- haive.core.graph.node.engine_node_generic
- haive.core.graph.node.intelligent_multi_agent_node
- haive.core.graph.node.meta_agent_node
- haive.core.graph.node.multi_agent_node
- haive.core.graph.node.parser_node_config_v2
- haive.core.graph.node.practical_stateful_example
- haive.core.graph.node.registry
- haive.core.graph.node.routing_validation_node
- haive.core.graph.node.state_updating_validation_node
- haive.core.graph.node.stateful_integration_example
- haive.core.graph.node.stateful_node_config
- haive.core.graph.node.stateful_validation_node
- haive.core.graph.node.types
- haive.core.graph.node.unified_validation_node
- haive.core.graph.node.utils
- haive.core.graph.node.validation_node_config_v2
- haive.core.graph.node.validation_node_v2
- haive.core.graph.node.validation_node_with_routing
- haive.core.graph.node.validation_router_v2
Functionsยถ
|
Create a branch node. |
|
Create a node function specifically from an engine. |
|
Create a node function from an engine or callable. |
|
Create a tool node. |
|
Create a validation node. |
Get the node registry instance. |
|
|
Register a custom node type. |
Package Contentsยถ
- haive.core.graph.node.create_branch_node(condition, routes, name=None, input_mapping=None)[source]ยถ
Create a branch node.
This creates a node that evaluates a condition on the state and routes to different nodes based on the result.
- Parameters:
condition (collections.abc.Callable) โ Function that evaluates the state and returns a key for routing
routes (dict[Any, str]) โ Mapping from condition outputs to node names
name (str | None) โ Optional name for the node
input_mapping (dict[str, str] | None) โ Mapping from state keys to condition function input keys
- Returns:
Branch node function
- Return type:
- haive.core.graph.node.create_engine_node(engine, name=None, command_goto=None, input_mapping=None, output_mapping=None, retry_policy=None)[source]ยถ
Create a node function specifically from an engine.
This is a specialized version of create_node for engines.
- Parameters:
engine (Any) โ Engine to use for the node
name (str | None) โ Optional name for the node
command_goto (types.CommandGoto | None) โ Optional next node to go to
input_mapping (dict[str, str] | None) โ Optional mapping from state keys to engine input keys
output_mapping (dict[str, str] | None) โ Optional mapping from engine output keys to state keys
retry_policy (langgraph.types.RetryPolicy | None) โ Optional retry policy for the node
- Returns:
Node function that can be added to a graph
- Return type:
- haive.core.graph.node.create_node(engine_or_callable, name=None, command_goto=None, input_mapping=None, output_mapping=None, retry_policy=None, **kwargs)[source]ยถ
Create a node function from an engine or callable.
This is the main function for creating nodes in the Haive framework. It handles various input types and creates the appropriate node function.
- Parameters:
engine_or_callable (Any) โ Engine or callable to use for the node
name (str | None) โ Optional name for the node
command_goto (types.CommandGoto | None) โ Optional next node to go to
input_mapping (dict[str, str] | None) โ Optional mapping from state keys to engine input keys
output_mapping (dict[str, str] | None) โ Optional mapping from engine output keys to state keys
retry_policy (langgraph.types.RetryPolicy | None) โ Optional retry policy for the node
**kwargs โ Additional options for the node configuration
- Returns:
Node function that can be added to a graph
- Return type:
Examples
Create a node from an engine:
retriever_node = create_node( retriever_engine, name="retrieve", command_goto="generate" ) # Add to graph builder.add_node("retrieve", retriever_node)
- haive.core.graph.node.create_tool_node(tools, name=None, command_goto=None, messages_key='messages', handle_tool_errors=True)[source]ยถ
Create a tool node.
This creates a node that uses LangGraphโs ToolNode to handle tool calls.
- Parameters:
tools (list[Any]) โ List of tools for the node
name (str | None) โ Optional name for the node
command_goto (types.CommandGoto | None) โ Optional next node to go to
messages_key (str) โ Name of the messages key in the state
handle_tool_errors (bool | str | collections.abc.Callable[Ellipsis, str]) โ How to handle tool errors
- Returns:
Tool node function
- Return type:
- haive.core.graph.node.create_validation_node(schemas, name=None, command_goto=None, messages_key='messages')[source]ยถ
Create a validation node.
This creates a node that uses LangGraphโs ValidationNode to validate inputs against a schema.
- Parameters:
schemas (list[type[pydantic.BaseModel] | collections.abc.Callable]) โ List of validation schemas
name (str | None) โ Optional name for the node
command_goto (types.CommandGoto | None) โ Optional next node to go to
messages_key (str) โ Name of the messages key in the state
- Returns:
Validation node function
- Return type:
- haive.core.graph.node.register_custom_node_type(name, config_class)[source]ยถ
Register a custom node type.
- Parameters:
name (str)
config_class (type[config.NodeConfig])
- Return type:
None