HAP ServerΒΆ
The server module contains the runtime engine for executing HAP workflows.
HAPRuntimeΒΆ
The HAPRuntime
class is the main execution engine for agent graphs.
- class haive.hap.server.runtime.HAPRuntime(graph)[source]ΒΆ
Bases:
object
Execute an HAP graph with proper error handling and Haive integration.
- Parameters:
graph (HAPGraph)
Core FunctionalityΒΆ
Graph Execution: Runs agents in topological order
Context Management: Maintains state across nodes
Agent Loading: Dynamically loads agents from various sources
Error Handling: Comprehensive error propagation
Basic UsageΒΆ
from haive.hap.server.runtime import HAPRuntime
from haive.hap.models.graph import HAPGraph
# Create graph with agents
graph = HAPGraph()
# ... build graph ...
# Create runtime
runtime = HAPRuntime(graph)
# Execute asynchronously
result = await runtime.run({"input": "data"})
# Or synchronously
result = runtime.run_sync({"input": "data"})
Agent LoadingΒΆ
The runtime supports multiple agent loading patterns:
From EntrypointΒΆ
# Standard format: module:ClassName
agent = runtime._load_agent("haive.agents.simple:SimpleAgent")
# Custom module
agent = runtime._load_agent("myproject.agents:CustomAgent")
Smart InitializationΒΆ
The runtime tries multiple initialization patterns:
# Pattern 1: No arguments
agent = AgentClass()
# Pattern 2: Name only
agent = AgentClass(name="agent_name")
# Pattern 3: Name + Engine
agent = AgentClass(
name="agent_name",
engine=AugLLMConfig()
)
Execution FlowΒΆ
Graph TraversalΒΆ
Start from
entry_node
Execute nodeβs agent
Update context with results
Move to
next_nodes
Repeat until complete
# Execution follows topological order
graph = HAPGraph()
graph.add_agent_node("A", agent_a, ["B", "C"])
graph.add_agent_node("B", agent_b, ["D"])
graph.add_agent_node("C", agent_c, ["D"])
graph.add_agent_node("D", agent_d)
graph.entry_node = "A"
# Order: A -> (B, C) -> D
Context FlowΒΆ
Context flows through each node:
async def execute_node(node, context):
# Get agent
agent = node.load_agent()
# Execute with context
result = await agent.arun(context)
# Update context
context.execution_path.append(node.id)
context.agent_metadata[node.id] = {
"result": result,
"timestamp": time.time()
}
return context
Error HandlingΒΆ
The runtime provides comprehensive error handling:
Import ErrorsΒΆ
try:
agent = runtime._load_agent("invalid.module:Agent")
except ImportError as e:
# Module not found
print(f"Failed to import: {e}")
Validation ErrorsΒΆ
try:
# Invalid entrypoint format
agent = runtime._load_agent("invalid_format")
except ValueError as e:
# Format validation failed
print(f"Invalid format: {e}")
Runtime ErrorsΒΆ
try:
result = await runtime.run(input_data)
except RuntimeError as e:
# Execution failed
print(f"Execution error: {e}")
Advanced UsageΒΆ
Custom ContextΒΆ
# Start with pre-populated context
initial_context = {
"user": "alice",
"session_id": "12345",
"config": {
"timeout": 30,
"retry": 3
}
}
result = await runtime.run(initial_context)
Progress TrackingΒΆ
# Track execution progress
class ProgressRuntime(HAPRuntime):
async def execute_node(self, node, context):
print(f"Executing: {node.id}")
result = await super().execute_node(node, context)
print(f"Completed: {node.id}")
return result
Error RecoveryΒΆ
# Implement retry logic
async def run_with_retry(runtime, input_data, max_retries=3):
for attempt in range(max_retries):
try:
return await runtime.run(input_data)
except Exception as e:
if attempt == max_retries - 1:
raise
print(f"Retry {attempt + 1}/{max_retries}")
await asyncio.sleep(1)
Performance ConsiderationsΒΆ
Agent CachingΒΆ
Agents are loaded once per node:
class CachedNode(HAPNode):
def load_agent(self):
if self.agent_instance is None:
# Load only once
self.agent_instance = super().load_agent()
return self.agent_instance
Parallel ExecutionΒΆ
For independent nodes:
# Future enhancement
async def execute_parallel(nodes, context):
tasks = [
execute_node(node, context.copy())
for node in nodes
]
results = await asyncio.gather(*tasks)
return merge_contexts(results)
Memory ManagementΒΆ
# Clear large data after processing
result = await runtime.run(large_input)
result.inputs.clear() # Free memory
result.outputs.clear()
Future DevelopmentΒΆ
HAP IntegrationΒΆ
The runtime will integrate with HAP protocol:
# Future implementation
hap_server = HAPServer()
hap_server.register_runtime(runtime)
await hap_server.start()
Plugin SystemΒΆ
Dynamic agent discovery:
# Future plugin system
runtime.discover_agents("./agents")
runtime.register_plugin("custom_agents")
Distributed ExecutionΒΆ
Cross-server workflow execution:
# Future distributed runtime
runtime = DistributedRuntime([
"server1:8080",
"server2:8080"
])