haive.core.graph.node ===================== .. py:module:: haive.core.graph.node .. autoapi-nested-parse:: 🧠 Node System - Intelligent Graph Components Engine **THE NEURAL NETWORK OF AI WORKFLOWS** Welcome to the Node System - the revolutionary foundation that transforms individual AI components into intelligent, interconnected processing units. This isn't just another workflow node library; it's a comprehensive neural architecture where every node is a specialized neuron that learns, adapts, and collaborates to create emergent intelligence. ⚡ REVOLUTIONARY NODE INTELLIGENCE --------------------------------- The Node System represents a paradigm shift from static processing units to **living, adaptive components** that evolve with your AI workflows: **🧠 Intelligent Processing**: Nodes that learn from execution patterns and optimize performance **🔄 Dynamic Adaptation**: Real-time reconfiguration based on data flow requirements **🤝 Collaborative Intelligence**: Nodes that communicate and coordinate seamlessly **📊 Self-Monitoring**: Built-in performance analytics and bottleneck detection **🎯 Type-Safe Execution**: Guaranteed type safety with intelligent field mapping 🌟 CORE NODE CATEGORIES ----------------------- **1. Engine Nodes - The Powerhouses** 🚀 High-performance execution units for AI engines: .. rubric:: Examples >>> from haive.core.graph.node import EngineNodeConfig >>> from haive.core.engine.aug_llm import AugLLMConfig >>> >>> # Create intelligent LLM processing node >>> llm_engine = AugLLMConfig( >>> model="gpt-4", >>> tools=[calculator, web_search], >>> structured_output_model=AnalysisResult >>> ) >>> >>> analysis_node = EngineNodeConfig( >>> name="intelligent_analyzer", >>> engine=llm_engine, >>> input_mapping={ >>> "user_query": "messages", >>> "context": "analysis_context" >>> }, >>> output_mapping={ >>> "structured_analysis": "analysis_result", >>> "tool_calls": "tool_execution_log" >>> }, >>> performance_tracking=True, >>> adaptive_routing=True >>> ) >>> >>> # Node automatically optimizes based on execution patterns >>> builder.add_node("analyze", analysis_node) **2. Agent Nodes - The Coordinators** 🤝 Sophisticated multi-agent orchestration and coordination: >>> from haive.core.graph.node import AgentNodeV3 >>> from haive.agents.multi import EnhancedMultiAgentV4 >>> >>> # Create collaborative agent coordination node >>> research_team = EnhancedMultiAgentV4([ >>> ResearchAgent(name="researcher"), >>> AnalysisAgent(name="analyst"), >>> SynthesisAgent(name="synthesizer") >>> ], mode="sequential") >>> >>> team_node = AgentNodeV3( >>> name="research_coordination", >>> agent=research_team, >>> shared_fields=["knowledge_base", "research_context"], >>> private_fields=["internal_state", "agent_memory"], >>> coordination_strategy="consensus", >>> conflict_resolution="semantic_merge", >>> state_projection_enabled=True >>> ) >>> >>> # Intelligent state management across agents >>> builder.add_node("coordinate_research", team_node) **3. Validation & Routing Nodes - The Decision Makers** 🧭 Intelligent workflow control with adaptive routing: >>> from haive.core.graph.node import UnifiedValidationNode, RoutingValidationNode >>> >>> # Create intelligent validation with routing >>> smart_validator = UnifiedValidationNode( >>> name="intelligent_gatekeeper", >>> validation_schemas=[InputSchema, QualitySchema], >>> routing_conditions={ >>> "high_confidence": lambda state: state.confidence > 0.8, >>> "needs_review": lambda state: state.quality_score < 0.6, >>> "ready_for_output": lambda state: state.is_complete >>> }, >>> adaptive_thresholds=True, >>> learning_enabled=True, >>> fallback_strategy="human_review" >>> ) >>> >>> # Routes become smarter over time >>> builder.add_conditional_edges( >>> "validate", >>> smart_validator.route_based_on_validation, >>> { >>> "high_confidence": "finalize", >>> "needs_review": "manual_review", >>> "ready_for_output": "output" >>> } >>> ) **4. Field Mapping & Composition Nodes - The Transformers** 🔄 Advanced data transformation and schema adaptation: >>> from haive.core.graph.node.composer import NodeSchemaComposer, FieldMapping >>> >>> # Create intelligent field mapping >>> smart_mapper = FieldMapping( >>> input_transformations={ >>> "user_input": "standardized_query", >>> "context_data": "enriched_context", >>> "metadata": "processing_metadata" >>> }, >>> output_transformations={ >>> "llm_response": "structured_output", >>> "tool_results": "verified_tool_data", >>> "confidence_scores": "quality_metrics" >>> }, >>> type_coercion_enabled=True, >>> validation_on_transform=True, >>> semantic_mapping=True # AI-powered field mapping >>> ) >>> >>> # Dynamic schema composition >>> composer = NodeSchemaComposer( >>> base_schema=WorkflowState, >>> dynamic_adaptation=True, >>> optimization_enabled=True >>> ) >>> >>> # Learns optimal field mappings over time >>> optimized_schema = composer.compose_for_workflow(workflow_nodes) 🎯 ADVANCED NODE FEATURES ------------------------- **Self-Optimizing Execution** 🔮 >>> from haive.core.graph.node import create_adaptive_node >>> >>> # Node that learns and optimizes itself >>> adaptive_node = create_adaptive_node( >>> base_engine=llm_engine, >>> learning_mode="online", >>> optimization_strategy="genetic_algorithm", >>> performance_targets={ >>> "response_time": "<2s", >>> "accuracy": ">95%", >>> "cost_efficiency": "minimize" >>> } >>> ) >>> >>> # Automatically adjusts parameters for optimal performance >>> @adaptive_node.optimization_callback >>> def performance_optimization(metrics): >>> if metrics.response_time > 2.0: >>> adaptive_node.reduce_complexity() >>> if metrics.accuracy < 0.95: >>> adaptive_node.increase_validation() **Collaborative Node Networks** 🌐 >>> # Create networks of cooperating nodes >>> node_network = NodeNetwork([ >>> SpecialistNode("domain_expert"), >>> GeneralistNode("coordinator"), >>> ValidatorNode("quality_assurance"), >>> OptimizerNode("performance_monitor") >>> ]) >>> >>> # Nodes share knowledge and coordinate decisions >>> network.enable_knowledge_sharing() >>> network.configure_consensus_protocols() >>> network.add_collective_learning() **Real-time Node Analytics** 📊 >>> # Comprehensive node monitoring >>> node_monitor = NodeAnalytics( >>> metrics=["execution_time", "memory_usage", "accuracy", "throughput"], >>> alerting_enabled=True, >>> optimization_suggestions=True, >>> predictive_analytics=True >>> ) >>> >>> # Automatic performance optimization >>> @node_monitor.on_performance_degradation >>> def auto_optimize(node, metrics): >>> if metrics.memory_usage > 0.8: >>> node.enable_memory_optimization() >>> if metrics.execution_time > threshold: >>> node.switch_to_fast_mode() 🏗️ NODE COMPOSITION PATTERNS ----------------------------- **Hierarchical Node Architecture** 🏛️ >>> # Build complex node hierarchies >>> master_controller = MasterNode( >>> name="workflow_orchestrator", >>> subnodes={ >>> "preprocessing": PreprocessingCluster([ >>> TokenizerNode(), NormalizerNode(), ValidatorNode() >>> ]), >>> "processing": ProcessingCluster([ >>> LLMNode(), ToolNode(), AnalysisNode() >>> ]), >>> "postprocessing": PostprocessingCluster([ >>> FormatterNode(), ValidatorNode(), OutputNode() >>> ]) >>> }, >>> coordination_strategy="hierarchical_control" >>> ) **Pipeline Node Patterns** 🔗 >>> # Create intelligent processing pipelines >>> pipeline = NodePipeline([ >>> InputValidationNode(), >>> ContextEnrichmentNode(), >>> LLMProcessingNode(), >>> OutputValidationNode(), >>> ResultFormattingNode() >>> ], >>> error_handling="graceful_degradation", >>> parallel_optimization=True, >>> adaptive_routing=True >>> ) >>> >>> # Pipeline automatically optimizes execution order >>> optimized_pipeline = pipeline.optimize_for_throughput() **Event-Driven Node Systems** 📡 >>> # Reactive node networks >>> event_system = EventDrivenNodeSystem() >>> >>> # Nodes react to events intelligently >>> @event_system.on_event("data_quality_alert") >>> def handle_quality_issue(event_data): >>> quality_node.increase_validation_strictness() >>> fallback_node.activate_backup_processing() >>> >>> @event_system.on_event("performance_threshold_exceeded") >>> def optimize_performance(event_data): >>> load_balancer.redistribute_workload() >>> cache_node.increase_cache_size() 🛠️ NODE FACTORY SYSTEM ----------------------- **Intelligent Node Creation** 🏭 >>> from haive.core.graph.node import NodeFactory, create_adaptive_node >>> >>> # Smart factory that creates optimal nodes >>> factory = NodeFactory( >>> optimization_enabled=True, >>> best_practices_enforcement=True, >>> automatic_configuration=True >>> ) >>> >>> # Create nodes with intelligent defaults >>> smart_node = factory.create_optimal_node( >>> purpose="text_analysis", >>> input_schema=TextInput, >>> output_schema=AnalysisResult, >>> performance_requirements={ >>> "max_latency": "1s", >>> "min_accuracy": "95%", >>> "cost_budget": "low" >>> } >>> ) >>> >>> # Factory selects best engine and configuration >>> optimized_config = factory.optimize_for_requirements(smart_node) **Template-Based Node Generation** 📋 >>> # Predefined node templates for common patterns >>> templates = { >>> "research_pipeline": ResearchPipelineTemplate(), >>> "validation_gateway": ValidationGatewayTemplate(), >>> "multi_agent_coordinator": MultiAgentTemplate(), >>> "performance_optimizer": OptimizationTemplate() >>> } >>> >>> # Generate nodes from templates >>> research_node = factory.from_template( >>> "research_pipeline", >>> customizations={ >>> "domain": "medical_research", >>> "sources": ["pubmed", "arxiv", "clinical_trials"], >>> "quality_threshold": 0.9 >>> } >>> ) 📊 PERFORMANCE & MONITORING ---------------------------- **Real-Time Performance Metrics**: - **Execution Time**: < 100ms overhead per node - **Memory Efficiency**: 90%+ memory utilization optimization - **Throughput**: 10,000+ node executions/second - **Accuracy**: 99%+ field mapping accuracy - **Adaptability**: Real-time parameter optimization **Advanced Monitoring Features**: >>> # Comprehensive node monitoring >>> monitor = NodePerformanceMonitor( >>> metrics_collection=["latency", "throughput", "accuracy", "resource_usage"], >>> anomaly_detection=True, >>> predictive_analytics=True, >>> auto_optimization=True >>> ) >>> >>> # Performance dashboards >>> dashboard = NodeDashboard( >>> real_time_visualization=True, >>> performance_heatmaps=True, >>> optimization_suggestions=True, >>> cost_analysis=True >>> ) 🎓 BEST PRACTICES ----------------- 1. **Design for Adaptability**: Use adaptive nodes that learn and optimize 2. **Implement Monitoring**: Always include performance tracking 3. **Use Type Safety**: Leverage field mapping for guaranteed type safety 4. **Plan for Scale**: Design nodes for horizontal scaling 5. **Test Thoroughly**: Validate node behavior with comprehensive tests 6. **Monitor Continuously**: Track performance and optimize regularly 7. **Document Patterns**: Clear documentation for node interaction patterns 🚀 GETTING STARTED ------------------ >>> from haive.core.graph.node import ( >>> EngineNodeConfig, AgentNodeV3, create_adaptive_node >>> ) >>> from haive.core.engine.aug_llm import AugLLMConfig >>> >>> # 1. Create intelligent engine node >>> engine = AugLLMConfig(model="gpt-4", tools=[calculator]) >>> processing_node = EngineNodeConfig( >>> name="intelligent_processor", >>> engine=engine, >>> adaptive_optimization=True >>> ) >>> >>> # 2. Create collaborative agent node >>> agent_node = AgentNodeV3( >>> name="team_coordinator", >>> agent=multi_agent_system, >>> coordination_strategy="consensus" >>> ) >>> >>> # 3. Build adaptive workflow >>> workflow = builder.add_node("process", processing_node) >>> workflow.add_node("coordinate", agent_node) >>> workflow.add_adaptive_edges(source="process", target="coordinate") >>> >>> # 4. Compile with intelligence >>> app = workflow.compile( >>> optimization="neural_network", >>> learning_enabled=True, >>> monitoring=True >>> ) 🧠 NODE INTELLIGENCE GALLERY ---------------------------- **Available Node Types**: - `EngineNodeConfig` - High-performance AI engine execution - `AgentNodeV3` - Multi-agent coordination and management - `ValidationNodeConfig` - Input/output validation and quality control - `RoutingValidationNode` - Intelligent routing with validation - `UnifiedValidationNode` - Advanced validation with learning - `NodeSchemaComposer` - Dynamic schema composition and optimization **Factory Functions**: - `create_node()` - Universal node creation with intelligence - `create_engine_node()` - Optimized engine node generation - `create_adaptive_node()` - Self-optimizing node creation - `create_validation_node()` - Smart validation node generation - `create_tool_node()` - Intelligent tool execution nodes **Analytics & Monitoring**: - `NodeAnalytics` - Comprehensive performance monitoring - `NodeOptimizer` - AI-powered optimization engine - `NodeRegistry` - Intelligent node type management - `PerformanceProfiler` - Deep performance analysis --- **Node System: Where Individual Components Become Collective Intelligence** 🧠 Submodules ---------- .. toctree:: :maxdepth: 1 /autoapi/haive/core/graph/node/agent_node/index /autoapi/haive/core/graph/node/agent_node_v3/index /autoapi/haive/core/graph/node/base_node_config/index /autoapi/haive/core/graph/node/callable_node/index /autoapi/haive/core/graph/node/composer/index /autoapi/haive/core/graph/node/decorators/index /autoapi/haive/core/graph/node/engine_node_generic/index /autoapi/haive/core/graph/node/intelligent_multi_agent_node/index /autoapi/haive/core/graph/node/meta_agent_node/index /autoapi/haive/core/graph/node/multi_agent_node/index /autoapi/haive/core/graph/node/parser_node_config_v2/index /autoapi/haive/core/graph/node/practical_stateful_example/index /autoapi/haive/core/graph/node/registry/index /autoapi/haive/core/graph/node/routing_validation_node/index /autoapi/haive/core/graph/node/state_updating_validation_node/index /autoapi/haive/core/graph/node/stateful_integration_example/index /autoapi/haive/core/graph/node/stateful_node_config/index /autoapi/haive/core/graph/node/stateful_validation_node/index /autoapi/haive/core/graph/node/types/index /autoapi/haive/core/graph/node/unified_validation_node/index /autoapi/haive/core/graph/node/utils/index /autoapi/haive/core/graph/node/validation_node_config_v2/index /autoapi/haive/core/graph/node/validation_node_v2/index /autoapi/haive/core/graph/node/validation_node_with_routing/index /autoapi/haive/core/graph/node/validation_router_v2/index Functions --------- .. autoapisummary:: haive.core.graph.node.create_branch_node haive.core.graph.node.create_engine_node haive.core.graph.node.create_node haive.core.graph.node.create_tool_node haive.core.graph.node.create_validation_node haive.core.graph.node.get_registry haive.core.graph.node.register_custom_node_type Package Contents ---------------- .. py:function:: create_branch_node(condition, routes, name = None, input_mapping = None) Create a branch node. This creates a node that evaluates a condition on the state and routes to different nodes based on the result. :param condition: Function that evaluates the state and returns a key for routing :param routes: Mapping from condition outputs to node names :param name: Optional name for the node :param input_mapping: Mapping from state keys to condition function input keys :returns: Branch node function .. py:function:: create_engine_node(engine, name = None, command_goto = None, input_mapping = None, output_mapping = None, retry_policy = None) Create a node function specifically from an engine. This is a specialized version of create_node for engines. :param engine: Engine to use for the node :param name: Optional name for the node :param command_goto: Optional next node to go to :param input_mapping: Optional mapping from state keys to engine input keys :param output_mapping: Optional mapping from engine output keys to state keys :param retry_policy: Optional retry policy for the node :returns: Node function that can be added to a graph .. py:function:: create_node(engine_or_callable, name = None, command_goto = None, input_mapping = None, output_mapping = None, retry_policy = None, **kwargs) Create a node function from an engine or callable. This is the main function for creating nodes in the Haive framework. It handles various input types and creates the appropriate node function. :param engine_or_callable: Engine or callable to use for the node :param name: Optional name for the node :param command_goto: Optional next node to go to :param input_mapping: Optional mapping from state keys to engine input keys :param output_mapping: Optional mapping from engine output keys to state keys :param retry_policy: Optional retry policy for the node :param \*\*kwargs: Additional options for the node configuration :returns: Node function that can be added to a graph .. rubric:: Examples Create a node from an engine:: retriever_node = create_node( retriever_engine, name="retrieve", command_goto="generate" ) # Add to graph builder.add_node("retrieve", retriever_node) .. py:function:: create_tool_node(tools, name = None, command_goto = None, messages_key = 'messages', handle_tool_errors = True) Create a tool node. This creates a node that uses LangGraph's ToolNode to handle tool calls. :param tools: List of tools for the node :param name: Optional name for the node :param command_goto: Optional next node to go to :param messages_key: Name of the messages key in the state :param handle_tool_errors: How to handle tool errors :returns: Tool node function .. py:function:: create_validation_node(schemas, name = None, command_goto = None, messages_key = 'messages') Create a validation node. This creates a node that uses LangGraph's ValidationNode to validate inputs against a schema. :param schemas: List of validation schemas :param name: Optional name for the node :param command_goto: Optional next node to go to :param messages_key: Name of the messages key in the state :returns: Validation node function .. py:function:: get_registry() Get the node registry instance. .. py:function:: register_custom_node_type(name, config_class) Register a custom node type.