Source code for haive.hap.models.graph

from typing import Dict, List, Optional, Any
from pydantic import BaseModel, Field
from haive.agents.base.agent import Agent

[docs] class HAPNode(BaseModel): """HAP node that can contain an agent.""" id: str = Field(..., description="Node identifier") agent_entrypoint: str = Field( ..., description="Module:Class entrypoint for agent" ) agent_instance: Optional[Agent] = Field( default=None, description="Loaded agent instance", exclude=True # Don't serialize instance ) next_nodes: List[str] = Field( default_factory=list, description="Next node IDs in execution order" )
[docs] def load_agent(self) -> Agent: """Load agent from entrypoint if not already loaded.""" if not self.agent_instance: if ":" not in self.agent_entrypoint: raise ValueError(f"Invalid entrypoint: {self.agent_entrypoint}") module_name, class_name = self.agent_entrypoint.split(":", 1) try: import importlib module = importlib.import_module(module_name) agent_class = getattr(module, class_name) if not issubclass(agent_class, Agent): raise TypeError(f"{agent_class} is not an Agent subclass") # Try to instantiate with minimal config from haive.core.engine.aug_llm import AugLLMConfig try: # First try with no args (for agents with defaults) self.agent_instance = agent_class() except Exception: try: # Try with name only self.agent_instance = agent_class(name=self.id) except Exception: # Try with name and engine self.agent_instance = agent_class( name=self.id, engine=AugLLMConfig() ) except (ImportError, AttributeError) as e: raise ValueError(f"Could not load agent from {self.agent_entrypoint}: {e}") return self.agent_instance
[docs] async def execute(self, context: 'HAPContext') -> 'HAPContext': """Execute this node's agent.""" agent = self.load_agent() # Add to execution path context.execution_path.append(self.id) # Prepare input for agent - use inputs dict agent_input = context.inputs.copy() # Execute agent if hasattr(agent, 'arun'): result = await agent.arun(agent_input) else: result = agent.run(agent_input) # Update context with results if isinstance(result, dict): context.outputs.update(result) else: context.outputs[self.id] = str(result) # Add agent metadata context.agent_metadata[self.id] = { "agent_type": agent.__class__.__name__, "entrypoint": self.agent_entrypoint, "execution_time": None # Would add timing } return context
[docs] class HAPGraph(BaseModel): """HAP graph with agent orchestration capabilities.""" nodes: Dict[str, HAPNode] = Field( default_factory=dict, description="HAP-specific nodes" ) entry_node: str = Field( default="", description="Entry point node ID" )
[docs] def add_agent_node( self, node_id: str, agent: Agent, next_nodes: List[str] = None ) -> HAPNode: """Add an agent as a node to the graph.""" entrypoint = f"{agent.__module__}:{agent.__class__.__name__}" node = HAPNode( id=node_id, agent_entrypoint=entrypoint, agent_instance=agent, next_nodes=next_nodes or [] ) self.nodes[node_id] = node return node
[docs] def add_entrypoint_node( self, node_id: str, entrypoint: str, next_nodes: List[str] = None ) -> HAPNode: """Add a node by entrypoint string.""" node = HAPNode( id=node_id, agent_entrypoint=entrypoint, next_nodes=next_nodes or [] ) self.nodes[node_id] = node return node
[docs] def topological_order(self) -> List[str]: """Get topological ordering of nodes.""" indeg = {k: 0 for k in self.nodes} for node in self.nodes.values(): for next_id in node.next_nodes: if next_id in indeg: indeg[next_id] += 1 queue = [k for k, d in indeg.items() if d == 0] order: List[str] = [] while queue: node_id = queue.pop(0) order.append(node_id) if node_id in self.nodes: for next_id in self.nodes[node_id].next_nodes: indeg[next_id] -= 1 if indeg[next_id] == 0: queue.append(next_id) return order
[docs] async def execute(self, initial_context: Dict[str, Any]) -> 'HAPContext': """Execute the entire graph.""" from .context import HAPContext # Create HAP context context = HAPContext() # Store initial context in legacy_inputs for backward compatibility context.inputs.update(initial_context) context.graph_context["graph_name"] = getattr(self, 'name', 'unnamed') # Execute nodes in topological order execution_order = self.topological_order() for node_id in execution_order: node = self.nodes[node_id] context = await node.execute(context) return context
# Backward compatibility aliases AgentNode = HAPNode AgentGraph = HAPGraph