HAP ServerΒΆ

The server module contains the runtime engine for executing HAP workflows.

HAPRuntimeΒΆ

The HAPRuntime class is the main execution engine for agent graphs.

class haive.hap.server.runtime.HAPRuntime(graph)[source]ΒΆ

Bases: object

Execute an HAP graph with proper error handling and Haive integration.

Parameters:

graph (HAPGraph)

__init__(graph)[source]ΒΆ
Parameters:

graph (HAPGraph)

async run(initial_context)[source]ΒΆ

Execute the graph asynchronously.

Parameters:

initial_context (Dict[str, Any])

Return type:

HAPContext

run_sync(initial_context)[source]ΒΆ

Execute the graph synchronously (for backward compatibility).

Parameters:

initial_context (Dict[str, Any])

Return type:

HAPContext

Core FunctionalityΒΆ

  • Graph Execution: Runs agents in topological order

  • Context Management: Maintains state across nodes

  • Agent Loading: Dynamically loads agents from various sources

  • Error Handling: Comprehensive error propagation

Basic UsageΒΆ

from haive.hap.server.runtime import HAPRuntime
from haive.hap.models.graph import HAPGraph

# Create graph with agents
graph = HAPGraph()
# ... build graph ...

# Create runtime
runtime = HAPRuntime(graph)

# Execute asynchronously
result = await runtime.run({"input": "data"})

# Or synchronously
result = runtime.run_sync({"input": "data"})

Agent LoadingΒΆ

The runtime supports multiple agent loading patterns:

From EntrypointΒΆ

# Standard format: module:ClassName
agent = runtime._load_agent("haive.agents.simple:SimpleAgent")

# Custom module
agent = runtime._load_agent("myproject.agents:CustomAgent")

Smart InitializationΒΆ

The runtime tries multiple initialization patterns:

# Pattern 1: No arguments
agent = AgentClass()

# Pattern 2: Name only
agent = AgentClass(name="agent_name")

# Pattern 3: Name + Engine
agent = AgentClass(
    name="agent_name",
    engine=AugLLMConfig()
)

Execution FlowΒΆ

Graph TraversalΒΆ

  1. Start from entry_node

  2. Execute node’s agent

  3. Update context with results

  4. Move to next_nodes

  5. Repeat until complete

# Execution follows topological order
graph = HAPGraph()
graph.add_agent_node("A", agent_a, ["B", "C"])
graph.add_agent_node("B", agent_b, ["D"])
graph.add_agent_node("C", agent_c, ["D"])
graph.add_agent_node("D", agent_d)
graph.entry_node = "A"

# Order: A -> (B, C) -> D

Context FlowΒΆ

Context flows through each node:

async def execute_node(node, context):
    # Get agent
    agent = node.load_agent()

    # Execute with context
    result = await agent.arun(context)

    # Update context
    context.execution_path.append(node.id)
    context.agent_metadata[node.id] = {
        "result": result,
        "timestamp": time.time()
    }

    return context

Error HandlingΒΆ

The runtime provides comprehensive error handling:

Import ErrorsΒΆ

try:
    agent = runtime._load_agent("invalid.module:Agent")
except ImportError as e:
    # Module not found
    print(f"Failed to import: {e}")

Validation ErrorsΒΆ

try:
    # Invalid entrypoint format
    agent = runtime._load_agent("invalid_format")
except ValueError as e:
    # Format validation failed
    print(f"Invalid format: {e}")

Runtime ErrorsΒΆ

try:
    result = await runtime.run(input_data)
except RuntimeError as e:
    # Execution failed
    print(f"Execution error: {e}")

Advanced UsageΒΆ

Custom ContextΒΆ

# Start with pre-populated context
initial_context = {
    "user": "alice",
    "session_id": "12345",
    "config": {
        "timeout": 30,
        "retry": 3
    }
}

result = await runtime.run(initial_context)

Progress TrackingΒΆ

# Track execution progress
class ProgressRuntime(HAPRuntime):
    async def execute_node(self, node, context):
        print(f"Executing: {node.id}")
        result = await super().execute_node(node, context)
        print(f"Completed: {node.id}")
        return result

Error RecoveryΒΆ

# Implement retry logic
async def run_with_retry(runtime, input_data, max_retries=3):
    for attempt in range(max_retries):
        try:
            return await runtime.run(input_data)
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            print(f"Retry {attempt + 1}/{max_retries}")
            await asyncio.sleep(1)

Performance ConsiderationsΒΆ

Agent CachingΒΆ

Agents are loaded once per node:

class CachedNode(HAPNode):
    def load_agent(self):
        if self.agent_instance is None:
            # Load only once
            self.agent_instance = super().load_agent()
        return self.agent_instance

Parallel ExecutionΒΆ

For independent nodes:

# Future enhancement
async def execute_parallel(nodes, context):
    tasks = [
        execute_node(node, context.copy())
        for node in nodes
    ]
    results = await asyncio.gather(*tasks)
    return merge_contexts(results)

Memory ManagementΒΆ

# Clear large data after processing
result = await runtime.run(large_input)
result.inputs.clear()  # Free memory
result.outputs.clear()

Future DevelopmentΒΆ

HAP IntegrationΒΆ

The runtime will integrate with HAP protocol:

# Future implementation
hap_server = HAPServer()
hap_server.register_runtime(runtime)
await hap_server.start()

Plugin SystemΒΆ

Dynamic agent discovery:

# Future plugin system
runtime.discover_agents("./agents")
runtime.register_plugin("custom_agents")

Distributed ExecutionΒΆ

Cross-server workflow execution:

# Future distributed runtime
runtime = DistributedRuntime([
    "server1:8080",
    "server2:8080"
])