User Guide¶
Complete guide for using HAP (Haive Agent Protocol) in your applications.
Note
📋 Beta Software: HAP is currently in beta. APIs may change and features are still being developed.
Table of Contents¶
Introduction¶
HAP (Haive Agent Protocol) allows you to orchestrate multiple AI agents in complex workflows. This guide covers everything from basic usage to advanced patterns.
What You’ll Learn:
How to create and configure agents for HAP
Building workflow graphs with multiple agents
Managing state and context across executions
Handling errors and edge cases
Performance optimization techniques
Production deployment considerations
Prerequisites¶
Required Knowledge: - Python programming (intermediate level) - Basic understanding of async/await - Familiarity with AI/LLM concepts - Understanding of workflow/pipeline patterns
System Setup: - Python 3.12+ installed - HAP package installed (see Installation) - Access to an LLM provider (OpenAI, Anthropic, etc.) - Code editor with Python support
Basic Usage¶
Creating Your First Workflow¶
Step 1: Import Required Components
"""Basic HAP workflow setup."""
import asyncio
from haive.hap.models import HAPGraph
from haive.hap.server.runtime import HAPRuntime
from haive.agents.simple.agent import SimpleAgent
from haive.core.engine.aug_llm import AugLLMConfig
Step 2: Create and Configure Agents
# Create a research agent
researcher = SimpleAgent(
name="researcher",
engine=AugLLMConfig(
temperature=0.7,
system_message="You are a thorough researcher who gathers comprehensive information on given topics."
)
)
# Create a writer agent
writer = SimpleAgent(
name="writer",
engine=AugLLMConfig(
temperature=0.8,
system_message="You create well-structured, engaging content based on research findings."
)
)
Step 3: Build the Workflow Graph
# Create workflow: Research → Write
graph = HAPGraph()
# Add nodes with connections
graph.add_agent_node("research_phase", researcher, next_nodes=["writing_phase"])
graph.add_agent_node("writing_phase", writer)
# Set entry point
graph.entry_node = "research_phase"
Step 4: Execute the Workflow
async def main():
"""Execute the workflow."""
# Create runtime engine
runtime = HAPRuntime(graph)
# Execute with input data
result = await runtime.run({
"topic": "Impact of AI on Education",
"requirements": "Focus on benefits and challenges, 500 words"
})
# Display results
print(f"Execution path: {' → '.join(result.execution_path)}")
print(f"Final output: {result.outputs}")
return result
# Run the workflow
if __name__ == "__main__":
asyncio.run(main())
Agent Configuration¶
Agent Specialization:
# Specialized agents for different tasks
# Data analyst with low temperature for consistency
analyst = SimpleAgent(
name="data_analyst",
engine=AugLLMConfig(
temperature=0.2,
system_message="You are a data analyst specializing in trend analysis and insights.",
max_tokens=1000
)
)
# Creative writer with higher temperature
creative_writer = SimpleAgent(
name="creative_writer",
engine=AugLLMConfig(
temperature=0.9,
system_message="You are a creative writer who crafts engaging, original content.",
max_tokens=2000
)
)
# Technical reviewer with structured output
reviewer = SimpleAgent(
name="reviewer",
engine=AugLLMConfig(
temperature=0.3,
system_message="You review content for accuracy, clarity, and completeness."
)
)
Agent with Tools (Future feature):
# Agent with external tools (planned feature)
from langchain_core.tools import tool
@tool
def web_search(query: str) -> str:
"""Search the web for information."""
# Implementation here
return search_results
research_agent = SimpleAgent(
name="web_researcher",
engine=AugLLMConfig(tools=[web_search])
)
Workflow Patterns¶
Sequential Processing¶
Linear workflow where each step depends on the previous:
def create_sequential_workflow():
"""Create a sequential processing workflow."""
# Agents for each step
collector = SimpleAgent(name="data_collector", engine=AugLLMConfig())
processor = SimpleAgent(name="processor", engine=AugLLMConfig())
analyzer = SimpleAgent(name="analyzer", engine=AugLLMConfig())
reporter = SimpleAgent(name="reporter", engine=AugLLMConfig())
# Linear chain: Collect → Process → Analyze → Report
graph = HAPGraph()
graph.add_agent_node("collect", collector, ["process"])
graph.add_agent_node("process", processor, ["analyze"])
graph.add_agent_node("analyze", analyzer, ["report"])
graph.add_agent_node("report", reporter)
graph.entry_node = "collect"
return graph
Parallel Processing¶
Multiple agents working on different aspects simultaneously:
def create_parallel_workflow():
"""Create a workflow with parallel processing."""
# Coordinator and specialist agents
coordinator = SimpleAgent(name="coordinator", engine=AugLLMConfig())
text_specialist = SimpleAgent(name="text_specialist", engine=AugLLMConfig())
data_specialist = SimpleAgent(name="data_specialist", engine=AugLLMConfig())
image_specialist = SimpleAgent(name="image_specialist", engine=AugLLMConfig())
synthesizer = SimpleAgent(name="synthesizer", engine=AugLLMConfig())
# Fork-join pattern: Coordinate → [Text, Data, Image] → Synthesize
graph = HAPGraph()
graph.add_agent_node("coordinate", coordinator, ["text", "data", "image"])
graph.add_agent_node("text", text_specialist, ["synthesize"])
graph.add_agent_node("data", data_specialist, ["synthesize"])
graph.add_agent_node("image", image_specialist, ["synthesize"])
graph.add_agent_node("synthesize", synthesizer)
graph.entry_node = "coordinate"
return graph
Conditional Routing¶
Routing based on content or conditions:
def create_conditional_workflow():
"""Create workflow with conditional routing."""
# Router and specialist agents
classifier = SimpleAgent(
name="classifier",
engine=AugLLMConfig(
system_message="Classify input and determine the best processing path."
)
)
technical_handler = SimpleAgent(name="technical", engine=AugLLMConfig())
creative_handler = SimpleAgent(name="creative", engine=AugLLMConfig())
general_handler = SimpleAgent(name="general", engine=AugLLMConfig())
# Classification-based routing
graph = HAPGraph()
graph.add_agent_node("classify", classifier, ["technical", "creative", "general"])
graph.add_agent_node("technical", technical_handler)
graph.add_agent_node("creative", creative_handler)
graph.add_agent_node("general", general_handler)
graph.entry_node = "classify"
return graph
Complex Multi-Stage Workflows¶
Combining multiple patterns:
def create_complex_workflow():
"""Create a complex multi-stage workflow."""
# Stage 1: Input processing
intake = SimpleAgent(name="intake", engine=AugLLMConfig())
validator = SimpleAgent(name="validator", engine=AugLLMConfig())
# Stage 2: Parallel analysis
content_analyzer = SimpleAgent(name="content_analyzer", engine=AugLLMConfig())
sentiment_analyzer = SimpleAgent(name="sentiment_analyzer", engine=AugLLMConfig())
quality_analyzer = SimpleAgent(name="quality_analyzer", engine=AugLLMConfig())
# Stage 3: Synthesis and output
synthesizer = SimpleAgent(name="synthesizer", engine=AugLLMConfig())
formatter = SimpleAgent(name="formatter", engine=AugLLMConfig())
# Complex workflow
graph = HAPGraph()
# Stage 1: Sequential validation
graph.add_agent_node("intake", intake, ["validate"])
graph.add_agent_node("validate", validator, ["content_analysis", "sentiment_analysis", "quality_analysis"])
# Stage 2: Parallel analysis
graph.add_agent_node("content_analysis", content_analyzer, ["synthesize"])
graph.add_agent_node("sentiment_analysis", sentiment_analyzer, ["synthesize"])
graph.add_agent_node("quality_analysis", quality_analyzer, ["synthesize"])
# Stage 3: Sequential synthesis
graph.add_agent_node("synthesize", synthesizer, ["format"])
graph.add_agent_node("format", formatter)
graph.entry_node = "intake"
return graph
Context and State Management¶
Understanding HAP Context¶
Context flows through the entire workflow:
from haive.hap.models import HAPContext
# Context contains execution state
context = HAPContext()
# Execution tracking
print(f"Execution path: {context.execution_path}")
print(f"Agent metadata: {context.agent_metadata}")
print(f"Graph context: {context.graph_context}")
Working with Context Data:
async def context_aware_workflow():
"""Demonstrate context usage."""
# Create initial context with data
initial_context = HAPContext()
initial_context.graph_context = {
"user_id": "user123",
"session_id": "session456",
"preferences": {"style": "formal", "length": "concise"}
}
# Execute with context
runtime = HAPRuntime(graph)
result = await runtime.run(
{"task": "Analyze customer feedback"},
context=initial_context
)
# Access execution metadata
for node_id in result.execution_path:
metadata = result.agent_metadata.get(node_id, {})
print(f"Node {node_id}: Duration={metadata.get('duration', 'N/A')}s")
Persistent State¶
Maintaining state across multiple executions:
class StatefulWorkflow:
"""Workflow that maintains state across executions."""
def __init__(self):
self.graph = self._build_graph()
self.runtime = HAPRuntime(self.graph)
self.persistent_context = HAPContext()
async def execute_step(self, step_data: dict):
"""Execute a single step while maintaining state."""
# Merge new data with persistent context
execution_data = {
**step_data,
"previous_results": self.persistent_context.graph_context.get("results", [])
}
# Execute
result = await self.runtime.run(execution_data, self.persistent_context)
# Update persistent state
self.persistent_context.graph_context["results"] = result.outputs
self.persistent_context.graph_context["last_execution"] = {
"timestamp": datetime.now(),
"path": result.execution_path
}
return result
Error Handling¶
Graceful Error Handling¶
Handling different types of errors:
async def robust_workflow_execution():
"""Execute workflow with comprehensive error handling."""
try:
runtime = HAPRuntime(graph)
result = await runtime.run(input_data)
print("✅ Workflow completed successfully")
return result
except ImportError as e:
print(f"❌ Agent import failed: {e}")
print("📝 Check agent entrypoints and installed packages")
except ValidationError as e:
print(f"❌ Input validation failed: {e}")
print("📝 Check input data format and required fields")
except TimeoutError as e:
print(f"❌ Execution timeout: {e}")
print("📝 Consider increasing timeout or optimizing agents")
except Exception as e:
print(f"❌ Unexpected error: {e}")
print("📝 Enable debug logging for more details")
return None
Error Recovery Strategies:
class ResilientWorkflow:
"""Workflow with built-in error recovery."""
def __init__(self, graph: HAPGraph, max_retries: int = 3):
self.graph = graph
self.runtime = HAPRuntime(graph)
self.max_retries = max_retries
async def execute_with_retry(self, input_data: dict):
"""Execute with automatic retry on failure."""
for attempt in range(self.max_retries):
try:
return await self.runtime.run(input_data)
except Exception as e:
if attempt < self.max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"⚠️ Attempt {attempt + 1} failed: {e}")
print(f"⏳ Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
else:
print(f"❌ All {self.max_retries} attempts failed")
raise
Partial Execution Recovery:
async def resume_from_checkpoint(self, checkpoint_context: HAPContext):
"""Resume execution from a previous checkpoint."""
# Find where execution stopped
completed_nodes = set(checkpoint_context.execution_path)
all_nodes = set(self.graph.topological_order())
remaining_nodes = all_nodes - completed_nodes
if not remaining_nodes:
print("✅ Workflow already completed")
return checkpoint_context
# Create subgraph with remaining nodes
remaining_graph = self._create_subgraph(remaining_nodes)
remaining_runtime = HAPRuntime(remaining_graph)
# Continue execution
print(f"🔄 Resuming from checkpoint, {len(remaining_nodes)} nodes remaining")
final_result = await remaining_runtime.run({}, checkpoint_context)
return final_result
Performance Optimization¶
Async Best Practices¶
Maximizing async efficiency:
async def optimized_execution():
"""Optimized workflow execution patterns."""
# Use asyncio.gather for true parallelism where possible
if graph.has_parallel_sections():
parallel_tasks = []
for section in graph.get_parallel_sections():
task = asyncio.create_task(execute_section(section))
parallel_tasks.append(task)
results = await asyncio.gather(*parallel_tasks)
return merge_results(results)
# Regular sequential execution
return await runtime.run(input_data)
Resource Management:
class ResourceManagedWorkflow:
"""Workflow with resource limits and management."""
def __init__(self, graph: HAPGraph, max_concurrent: int = 5):
self.graph = graph
self.runtime = HAPRuntime(graph)
self.semaphore = asyncio.Semaphore(max_concurrent)
async def execute_with_limits(self, input_data: dict):
"""Execute with concurrency limits."""
async with self.semaphore:
return await self.runtime.run(input_data)
Memory Optimization¶
Efficient context handling:
def optimize_context_memory(context: HAPContext):
"""Optimize context memory usage."""
# Clear large intermediate results
for node_id, metadata in context.agent_metadata.items():
if 'large_intermediate_data' in metadata:
metadata['large_intermediate_data'] = f"<{len(metadata['large_intermediate_data'])} bytes>"
# Keep only essential graph context
essential_keys = ['user_id', 'session_id', 'final_results']
context.graph_context = {
k: v for k, v in context.graph_context.items()
if k in essential_keys
}
return context
Caching Strategies¶
Agent and result caching:
from functools import lru_cache
import hashlib
class CachingWorkflow:
"""Workflow with intelligent caching."""
def __init__(self):
self.agent_cache = {}
self.result_cache = {}
@lru_cache(maxsize=100)
def get_cached_agent(self, agent_config_hash: str):
"""Cache agent instances for reuse."""
return self.agent_cache.get(agent_config_hash)
def cache_result(self, input_hash: str, result: HAPContext):
"""Cache results for identical inputs."""
self.result_cache[input_hash] = result
async def execute_with_caching(self, input_data: dict):
"""Execute with result caching."""
# Check result cache first
input_hash = hashlib.md5(str(input_data).encode()).hexdigest()
if input_hash in self.result_cache:
print("📦 Using cached result")
return self.result_cache[input_hash]
# Execute and cache result
result = await self.runtime.run(input_data)
self.cache_result(input_hash, result)
return result
Production Deployment¶
Environment Configuration¶
Production environment setup:
import os
from typing import Optional
class ProductionConfig:
"""Production configuration management."""
def __init__(self):
self.api_key = os.getenv("OPENAI_API_KEY")
self.environment = os.getenv("ENVIRONMENT", "development")
self.log_level = os.getenv("LOG_LEVEL", "INFO")
self.max_concurrent_workflows = int(os.getenv("MAX_CONCURRENT_WORKFLOWS", "10"))
self.execution_timeout = int(os.getenv("EXECUTION_TIMEOUT", "300"))
def validate(self):
"""Validate production configuration."""
if not self.api_key:
raise ValueError("OPENAI_API_KEY environment variable required")
if self.environment == "production" and self.log_level == "DEBUG":
print("⚠️ Warning: DEBUG logging in production")
Health Checks:
async def health_check():
"""Perform health check for HAP service."""
try:
# Test basic workflow execution
test_agent = SimpleAgent(name="health_check", engine=AugLLMConfig())
test_graph = HAPGraph()
test_graph.add_agent_node("test", test_agent)
test_graph.entry_node = "test"
runtime = HAPRuntime(test_graph)
result = await runtime.run({"test": "health check"})
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"test_execution_time": result.agent_metadata.get("test", {}).get("duration", 0)
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
Monitoring and Logging¶
Comprehensive logging:
import logging
import json
from datetime import datetime
class HAPLogger:
"""Structured logging for HAP workflows."""
def __init__(self, name: str = "hap"):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# JSON formatter for structured logs
formatter = logging.Formatter(
'{"timestamp": "%(asctime)s", "level": "%(levelname)s", "message": %(message)s}'
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_workflow_start(self, workflow_id: str, input_data: dict):
"""Log workflow start."""
self.logger.info(json.dumps({
"event": "workflow_start",
"workflow_id": workflow_id,
"input_size": len(str(input_data))
}))
def log_workflow_complete(self, workflow_id: str, result: HAPContext):
"""Log workflow completion."""
self.logger.info(json.dumps({
"event": "workflow_complete",
"workflow_id": workflow_id,
"execution_path": result.execution_path,
"total_steps": len(result.execution_path)
}))
Metrics Collection:
class HAPMetrics:
"""Metrics collection for HAP workflows."""
def __init__(self):
self.metrics = {
"workflows_executed": 0,
"total_execution_time": 0,
"error_count": 0,
"agent_usage": {}
}
def record_workflow_execution(self, result: HAPContext):
"""Record workflow execution metrics."""
self.metrics["workflows_executed"] += 1
# Track agent usage
for node_id in result.execution_path:
agent_name = result.agent_metadata.get(node_id, {}).get("agent_name", "unknown")
self.metrics["agent_usage"][agent_name] = self.metrics["agent_usage"].get(agent_name, 0) + 1
def get_metrics_summary(self) -> dict:
"""Get metrics summary."""
return {
"total_workflows": self.metrics["workflows_executed"],
"average_execution_time": self.metrics["total_execution_time"] / max(1, self.metrics["workflows_executed"]),
"error_rate": self.metrics["error_count"] / max(1, self.metrics["workflows_executed"]),
"most_used_agents": sorted(
self.metrics["agent_usage"].items(),
key=lambda x: x[1],
reverse=True
)[:5]
}
Best Practices¶
Development Best Practices:
Start Simple: Begin with single-agent workflows, then add complexity
Test Incrementally: Test each agent individually before integration
Use Real Components: Follow Haive’s no-mock philosophy
Document Workflows: Clear descriptions of what each workflow does
Version Control Graphs: Track changes to workflow definitions
Production Best Practices:
Validate Inputs: Always validate input data before processing
Implement Timeouts: Set reasonable execution timeouts
Monitor Performance: Track execution times and error rates
Handle Failures Gracefully: Implement retry logic and fallbacks
Secure API Keys: Use environment variables and secret management
Performance Best Practices:
Use Async Everywhere: Leverage async/await for all I/O operations
Cache When Appropriate: Cache expensive computations and API calls
Limit Concurrency: Use semaphores to prevent resource exhaustion
Profile Regularly: Identify and fix performance bottlenecks
Optimize Context: Minimize context size and copying
Troubleshooting¶
Common Issues and Solutions:
1. Import Errors:
`python
# Problem: Agent import fails
# Solution: Check entrypoint format and module paths
graph.add_entrypoint_node("worker", "haive.agents.simple:SimpleAgent") # Correct format
`
2. Context Issues:
`python
# Problem: Context not flowing between agents
# Solution: Ensure proper context handling
result = await runtime.run(input_data, context=initial_context)
`
3. Performance Issues:
`python
# Problem: Slow execution
# Solution: Enable concurrent execution where possible
# Check for bottlenecks in agent configurations
`
4. Memory Issues:
`python
# Problem: High memory usage
# Solution: Clear intermediate results and optimize context
context = optimize_context_memory(context)
`
Debug Tools:
# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)
# Use debug execution
result = await runtime.run(input_data, debug=True)
# Inspect context at each step
for node_id in result.execution_path:
print(f"Node {node_id}: {result.agent_metadata[node_id]}")
This user guide provides comprehensive coverage of HAP usage patterns and best practices. For more specific topics, refer to the detailed API documentation and tutorial series.