[docs]classHAPNode(BaseModel):"""HAP node that can contain an agent."""id:str=Field(...,description="Node identifier")agent_entrypoint:str=Field(...,description="Module:Class entrypoint for agent")agent_instance:Optional[Agent]=Field(default=None,description="Loaded agent instance",exclude=True# Don't serialize instance)next_nodes:List[str]=Field(default_factory=list,description="Next node IDs in execution order")
[docs]defload_agent(self)->Agent:"""Load agent from entrypoint if not already loaded."""ifnotself.agent_instance:if":"notinself.agent_entrypoint:raiseValueError(f"Invalid entrypoint: {self.agent_entrypoint}")module_name,class_name=self.agent_entrypoint.split(":",1)try:importimportlibmodule=importlib.import_module(module_name)agent_class=getattr(module,class_name)ifnotissubclass(agent_class,Agent):raiseTypeError(f"{agent_class} is not an Agent subclass")# Try to instantiate with minimal configfromhaive.core.engine.aug_llmimportAugLLMConfigtry:# First try with no args (for agents with defaults)self.agent_instance=agent_class()exceptException:try:# Try with name onlyself.agent_instance=agent_class(name=self.id)exceptException:# Try with name and engineself.agent_instance=agent_class(name=self.id,engine=AugLLMConfig())except(ImportError,AttributeError)ase:raiseValueError(f"Could not load agent from {self.agent_entrypoint}: {e}")returnself.agent_instance
[docs]asyncdefexecute(self,context:'HAPContext')->'HAPContext':"""Execute this node's agent."""agent=self.load_agent()# Add to execution pathcontext.execution_path.append(self.id)# Prepare input for agent - use inputs dictagent_input=context.inputs.copy()# Execute agentifhasattr(agent,'arun'):result=awaitagent.arun(agent_input)else:result=agent.run(agent_input)# Update context with resultsifisinstance(result,dict):context.outputs.update(result)else:context.outputs[self.id]=str(result)# Add agent metadatacontext.agent_metadata[self.id]={"agent_type":agent.__class__.__name__,"entrypoint":self.agent_entrypoint,"execution_time":None# Would add timing}returncontext
[docs]classHAPGraph(BaseModel):"""HAP graph with agent orchestration capabilities."""nodes:Dict[str,HAPNode]=Field(default_factory=dict,description="HAP-specific nodes")entry_node:str=Field(default="",description="Entry point node ID")
[docs]defadd_agent_node(self,node_id:str,agent:Agent,next_nodes:List[str]=None)->HAPNode:"""Add an agent as a node to the graph."""entrypoint=f"{agent.__module__}:{agent.__class__.__name__}"node=HAPNode(id=node_id,agent_entrypoint=entrypoint,agent_instance=agent,next_nodes=next_nodesor[])self.nodes[node_id]=nodereturnnode
[docs]defadd_entrypoint_node(self,node_id:str,entrypoint:str,next_nodes:List[str]=None)->HAPNode:"""Add a node by entrypoint string."""node=HAPNode(id=node_id,agent_entrypoint=entrypoint,next_nodes=next_nodesor[])self.nodes[node_id]=nodereturnnode
[docs]deftopological_order(self)->List[str]:"""Get topological ordering of nodes."""indeg={k:0forkinself.nodes}fornodeinself.nodes.values():fornext_idinnode.next_nodes:ifnext_idinindeg:indeg[next_id]+=1queue=[kfork,dinindeg.items()ifd==0]order:List[str]=[]whilequeue:node_id=queue.pop(0)order.append(node_id)ifnode_idinself.nodes:fornext_idinself.nodes[node_id].next_nodes:indeg[next_id]-=1ifindeg[next_id]==0:queue.append(next_id)returnorder
[docs]asyncdefexecute(self,initial_context:Dict[str,Any])->'HAPContext':"""Execute the entire graph."""from.contextimportHAPContext# Create HAP contextcontext=HAPContext()# Store initial context in legacy_inputs for backward compatibilitycontext.inputs.update(initial_context)context.graph_context["graph_name"]=getattr(self,'name','unnamed')# Execute nodes in topological orderexecution_order=self.topological_order()fornode_idinexecution_order:node=self.nodes[node_id]context=awaitnode.execute(context)returncontext