ArchitectureΒΆ
This document describes the HAP (Haive Agent Protocol) architecture and design principles.
Note
π Beta Documentation: HAP architecture is currently in beta and may evolve based on feedback and usage patterns.
OverviewΒΆ
HAP is designed as a layered architecture that separates concerns and enables flexible agent orchestration:
βββββββββββββββββββββββββββββββββββββββββββββββ
β Application Layer β
β User Workflows & Applications β
βββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββ
β Protocol Layer (HAP) β
β JSON-RPC 2.0 β’ Auth β’ Resources β’ UI β
βββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββ
β Server Layer β
β Runtime Engine β’ Agent Loading β’ Exec β
βββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββ
β Models Layer β
β Graphs β’ Nodes β’ Context β’ State β
βββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββ
β Haive Foundation β
β Agents β’ Tools β’ Engines β’ Schema β
βββββββββββββββββββββββββββββββββββββββββββββββ
Design PrinciplesΒΆ
1. Separation of ConcernsΒΆ
Each layer has a distinct responsibility:
Models: Data structures and validation
Server: Execution logic and management
Protocol: Communication and integration
Application: User-facing functionality
2. Haive IntegrationΒΆ
HAP is built on Haive foundations:
# Proper Haive integration
from haive.core.schema.state_schema import StateSchema
from haive.agents.simple.agent import SimpleAgent
from haive.core.engine.aug_llm import AugLLMConfig
# HAP extends, doesn't replace
class HAPContext(StateSchema):
execution_path: List[str] = Field(default_factory=list)
agent_metadata: Dict[str, Any] = Field(default_factory=dict)
3. Real Component TestingΒΆ
Following Haiveβs no-mocks philosophy:
# β
CORRECT: Use real agents in tests
def test_workflow_execution():
agent = SimpleAgent(name="test", engine=AugLLMConfig())
graph = HAPGraph()
graph.add_agent_node("test", agent)
runtime = HAPRuntime(graph)
result = await runtime.run({"input": "test"})
assert result.execution_path == ["test"]
4. Simplicity FirstΒΆ
Use BaseModel and simple patterns:
# Simple, clear model definitions
class HAPNode(BaseModel):
id: str = Field(..., description="Node identifier")
agent_entrypoint: Optional[str] = Field(default=None)
agent_instance: Optional[Agent] = Field(default=None)
next_nodes: List[str] = Field(default_factory=list)
Layer DetailsΒΆ
Models Layer (haive.hap.models
)ΒΆ
Core Models:
HAPGraph
: Workflow structure with topological orderingHAPNode
: Individual workflow steps with agent loadingHAPContext
: Execution state extending StateSchema
Key Features:
class HAPGraph(BaseModel):
nodes: Dict[str, HAPNode] = Field(default_factory=dict)
entry_node: Optional[str] = Field(default=None)
def add_agent_node(self, node_id: str, agent: Agent,
next_nodes: List[str] = None):
"""Add node with agent instance."""
def add_entrypoint_node(self, node_id: str, entrypoint: str,
next_nodes: List[str] = None):
"""Add node with agent entrypoint string."""
def topological_order(self) -> List[str]:
"""Get execution order for the graph."""
State Management:
class HAPContext(StateSchema):
"""Execution context extending Haive StateSchema."""
# Core execution tracking
execution_path: List[str] = Field(default_factory=list)
agent_metadata: Dict[str, Any] = Field(default_factory=dict)
graph_context: Dict[str, Any] = Field(default_factory=dict)
# Backward compatibility
legacy_inputs: Dict[str, Any] = Field(default_factory=dict)
legacy_outputs: Dict[str, Any] = Field(default_factory=dict)
Server Layer (haive.hap.server
)ΒΆ
HAPRuntime: Core execution engine
class HAPRuntime:
"""Runtime engine for executing HAP workflows."""
def __init__(self, graph: HAPGraph):
self.graph = graph
self._validate_graph()
async def run(self, initial_data: Dict[str, Any],
context: Optional[HAPContext] = None) -> HAPContext:
"""Execute the workflow graph."""
def run_sync(self, initial_data: Dict[str, Any]) -> HAPContext:
"""Synchronous execution wrapper."""
Agent Loading Strategies:
async def _load_agent(self, node: HAPNode) -> Agent:
"""Load agent from node definition."""
if node.agent_instance:
# Direct agent instance
return node.agent_instance
elif node.agent_entrypoint:
# Dynamic loading from entrypoint
module_name, class_name = node.agent_entrypoint.split(':')
module = importlib.import_module(module_name)
agent_class = getattr(module, class_name)
return agent_class()
else:
raise ValueError(f"Node {node.id} has no agent definition")
Error Handling:
try:
agent = await self._load_agent(node)
result = await agent.arun(current_context)
except ImportError as e:
self._handle_import_error(node, e)
except Exception as e:
self._handle_execution_error(node, e)
finally:
self._update_metadata(node, execution_time, result)
Protocol Layer (haive.hap.hap
)ΒΆ
HAP Protocol Context:
class HAPContext(BaseModel):
"""Protocol-level context for HAP communication."""
request_id: str = Field(...)
session_id: Optional[str] = Field(default=None)
# Authentication
auth_provider: Optional[AuthProvider] = Field(default=None)
# Resource access
resource_provider: Optional[ResourceProvider] = Field(default=None)
# Progress tracking
progress_handler: Optional[ProgressHandler] = Field(default=None)
Communication:
# JSON-RPC 2.0 message format
{
"jsonrpc": "2.0",
"method": "agent/execute",
"params": {
"agent": "data_analyzer",
"input": {
"data": "sales figures...",
"analysis_type": "trend"
},
"context": {
"session_id": "session-123",
"user": "alice"
}
},
"id": "req-456"
}
Provider Interfaces:
class ResourceProvider(Protocol):
"""Interface for resource access."""
async def read_resource(self, name: str) -> str: ...
class AuthProvider(Protocol):
"""Interface for authentication."""
def get_current_user(self) -> str: ...
def get_user_scopes(self, user: str) -> List[str]: ...
Data FlowΒΆ
Typical Execution Flow:
Graph Definition: User creates HAPGraph with agents
Runtime Creation: HAPRuntime validates and prepares graph
Execution Request: Client calls
runtime.run(data)
Context Initialization: Create HAPContext with initial state
Topological Execution: Execute nodes in dependency order
Agent Loading: Load agents dynamically or use instances
Agent Execution: Run agent with current context
State Updates: Update context with results and metadata
Flow Control: Route to next nodes based on graph structure
Result Return: Return final context with execution history
Detailed Flow Diagram:
Input Data
β
βΌ
βββββββββββββββ
β HAPRuntime β
βββββββββββββββ
β
βΌ
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β HAPGraph βββββΆβ HAPNode βββββΆβ Agent β
β (structure) β β (step) β β (process) β
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β β β
βΌ βΌ βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β HAPContext β
β β’ execution_path: ["node1", "node2"] β
β β’ agent_metadata: {node1: {...}, ...} β
β β’ graph_context: {...} β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
Final Result
Performance ConsiderationsΒΆ
Async Execution:
# Async-first design
async def run(self, data: Dict[str, Any]) -> HAPContext:
"""All execution is async by default."""
for node_id in self.graph.topological_order():
# Concurrent agent loading
agent = await self._load_agent(node)
# Async agent execution
result = await agent.arun(context)
Memory Management:
# Efficient context updates
def update_context(self, node_id: str, result: Any, metadata: Dict):
"""Update context without copying entire state."""
self.context.execution_path.append(node_id)
self.context.agent_metadata[node_id] = metadata
# Only update changed fields
Parallel Execution (Future):
# Planned parallel execution for independent nodes
async def execute_parallel_nodes(self, nodes: List[str]):
"""Execute independent nodes concurrently."""
tasks = [self._execute_node(node_id) for node_id in nodes]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
Error Handling ArchitectureΒΆ
Layered Error Handling:
# Model layer: Pydantic validation
class HAPGraph(BaseModel):
@model_validator(mode="after")
def validate_graph(self) -> "HAPGraph":
if self.entry_node not in self.nodes:
raise ValueError("Entry node must exist in graph")
# Server layer: Execution errors
class HAPRuntime:
async def _execute_node(self, node_id: str):
try:
return await self._safe_execute(node_id)
except Exception as e:
self._handle_node_error(node_id, e)
# Protocol layer: Communication errors
class HAPServer:
async def handle_request(self, request):
try:
return await self._process_request(request)
except Exception as e:
return self._create_error_response(request.id, e)
Error Recovery Strategies:
Graceful Degradation: Continue execution with default values
Retry Logic: Automatic retry with exponential backoff
Circuit Breaker: Prevent cascading failures
Error Propagation: Clear error messages with context
Security ArchitectureΒΆ
Authentication Flow:
class HAPAuthProvider:
"""Secure authentication for HAP protocol."""
def validate_request(self, request: HAPRequest) -> bool:
"""Validate request authentication."""
def get_user_permissions(self, user: str) -> List[str]:
"""Get user's allowed operations."""
Authorization:
@require_scope("agent:execute")
async def execute_agent(self, request: ExecuteRequest):
"""Execute agent with proper authorization."""
@require_scope("graph:create")
async def create_graph(self, request: CreateGraphRequest):
"""Create graph with proper permissions."""
Input Validation:
class HAPRequest(BaseModel):
"""All requests use Pydantic validation."""
method: str = Field(..., pattern=r"^[a-z_]+/[a-z_]+$")
params: Dict[str, Any] = Field(...)
@field_validator("params")
@classmethod
def validate_params(cls, v):
"""Sanitize and validate parameters."""
# Remove potentially dangerous content
# Validate against schema
return sanitized_params
Extension PointsΒΆ
Custom Agents:
# HAP works with any Haive agent
class CustomAgent(Agent):
async def arun(self, context: StateSchema) -> str:
# Custom logic here
return result
# Use in HAP
graph.add_agent_node("custom", CustomAgent())
Custom Providers:
class DatabaseResourceProvider(ResourceProvider):
"""Custom resource provider for database access."""
async def read_resource(self, name: str) -> str:
return await self.db.query(name)
Protocol Extensions:
class CustomHAPServer(HAPServer):
"""Extended HAP server with custom methods."""
async def handle_custom_method(self, request):
"""Custom protocol method."""
return custom_result
Future Architecture PlansΒΆ
Planned Enhancements:
Distributed Execution: Multi-server agent coordination
Plugin System: Dynamic capability loading
Streaming Responses: Real-time progress updates
Graph Optimization: Automatic workflow optimization
Resource Management: Advanced resource allocation
Monitoring Integration: Built-in metrics and tracing
Backward Compatibility:
All HAP 1.0 APIs will remain supported
Graceful migration paths for breaking changes
Semantic versioning for API changes
Best PracticesΒΆ
Architecture Guidelines:
Keep Models Simple: Use BaseModel, avoid complex inheritance
Async by Default: All I/O operations should be async
Fail Fast: Validate early, provide clear error messages
Test with Real Components: No mocks, use actual agents
Document Extensively: Clear docstrings and examples
Follow Haive Patterns: Extend, donβt replace, Haive foundations
Performance Guidelines:
Minimize Context Copying: Update in-place when possible
Cache Agent Instances: Reuse loaded agents when safe
Batch Operations: Group similar operations together
Monitor Resource Usage: Track memory and CPU usage
Profile Regularly: Identify and fix bottlenecks
This architecture enables HAP to be both powerful and maintainable, providing a solid foundation for complex agent workflows while remaining true to Haiveβs principles of simplicity and reliability.