User Guide

Complete guide for using HAP (Haive Agent Protocol) in your applications.

Note

📋 Beta Software: HAP is currently in beta. APIs may change and features are still being developed.

Table of Contents

Introduction

HAP (Haive Agent Protocol) allows you to orchestrate multiple AI agents in complex workflows. This guide covers everything from basic usage to advanced patterns.

What You’ll Learn:

  • How to create and configure agents for HAP

  • Building workflow graphs with multiple agents

  • Managing state and context across executions

  • Handling errors and edge cases

  • Performance optimization techniques

  • Production deployment considerations

Prerequisites

Required Knowledge: - Python programming (intermediate level) - Basic understanding of async/await - Familiarity with AI/LLM concepts - Understanding of workflow/pipeline patterns

System Setup: - Python 3.12+ installed - HAP package installed (see Installation) - Access to an LLM provider (OpenAI, Anthropic, etc.) - Code editor with Python support

Basic Usage

Creating Your First Workflow

Step 1: Import Required Components

"""Basic HAP workflow setup."""
import asyncio
from haive.hap.models import HAPGraph
from haive.hap.server.runtime import HAPRuntime
from haive.agents.simple.agent import SimpleAgent
from haive.core.engine.aug_llm import AugLLMConfig

Step 2: Create and Configure Agents

# Create a research agent
researcher = SimpleAgent(
    name="researcher",
    engine=AugLLMConfig(
        temperature=0.7,
        system_message="You are a thorough researcher who gathers comprehensive information on given topics."
    )
)

# Create a writer agent
writer = SimpleAgent(
    name="writer",
    engine=AugLLMConfig(
        temperature=0.8,
        system_message="You create well-structured, engaging content based on research findings."
    )
)

Step 3: Build the Workflow Graph

# Create workflow: Research → Write
graph = HAPGraph()

# Add nodes with connections
graph.add_agent_node("research_phase", researcher, next_nodes=["writing_phase"])
graph.add_agent_node("writing_phase", writer)

# Set entry point
graph.entry_node = "research_phase"

Step 4: Execute the Workflow

async def main():
    """Execute the workflow."""

    # Create runtime engine
    runtime = HAPRuntime(graph)

    # Execute with input data
    result = await runtime.run({
        "topic": "Impact of AI on Education",
        "requirements": "Focus on benefits and challenges, 500 words"
    })

    # Display results
    print(f"Execution path: {' → '.join(result.execution_path)}")
    print(f"Final output: {result.outputs}")

    return result

# Run the workflow
if __name__ == "__main__":
    asyncio.run(main())

Agent Configuration

Agent Specialization:

# Specialized agents for different tasks

# Data analyst with low temperature for consistency
analyst = SimpleAgent(
    name="data_analyst",
    engine=AugLLMConfig(
        temperature=0.2,
        system_message="You are a data analyst specializing in trend analysis and insights.",
        max_tokens=1000
    )
)

# Creative writer with higher temperature
creative_writer = SimpleAgent(
    name="creative_writer",
    engine=AugLLMConfig(
        temperature=0.9,
        system_message="You are a creative writer who crafts engaging, original content.",
        max_tokens=2000
    )
)

# Technical reviewer with structured output
reviewer = SimpleAgent(
    name="reviewer",
    engine=AugLLMConfig(
        temperature=0.3,
        system_message="You review content for accuracy, clarity, and completeness."
    )
)

Agent with Tools (Future feature):

# Agent with external tools (planned feature)
from langchain_core.tools import tool

@tool
def web_search(query: str) -> str:
    """Search the web for information."""
    # Implementation here
    return search_results

research_agent = SimpleAgent(
    name="web_researcher",
    engine=AugLLMConfig(tools=[web_search])
)

Workflow Patterns

Sequential Processing

Linear workflow where each step depends on the previous:

def create_sequential_workflow():
    """Create a sequential processing workflow."""

    # Agents for each step
    collector = SimpleAgent(name="data_collector", engine=AugLLMConfig())
    processor = SimpleAgent(name="processor", engine=AugLLMConfig())
    analyzer = SimpleAgent(name="analyzer", engine=AugLLMConfig())
    reporter = SimpleAgent(name="reporter", engine=AugLLMConfig())

    # Linear chain: Collect → Process → Analyze → Report
    graph = HAPGraph()
    graph.add_agent_node("collect", collector, ["process"])
    graph.add_agent_node("process", processor, ["analyze"])
    graph.add_agent_node("analyze", analyzer, ["report"])
    graph.add_agent_node("report", reporter)
    graph.entry_node = "collect"

    return graph

Parallel Processing

Multiple agents working on different aspects simultaneously:

def create_parallel_workflow():
    """Create a workflow with parallel processing."""

    # Coordinator and specialist agents
    coordinator = SimpleAgent(name="coordinator", engine=AugLLMConfig())
    text_specialist = SimpleAgent(name="text_specialist", engine=AugLLMConfig())
    data_specialist = SimpleAgent(name="data_specialist", engine=AugLLMConfig())
    image_specialist = SimpleAgent(name="image_specialist", engine=AugLLMConfig())
    synthesizer = SimpleAgent(name="synthesizer", engine=AugLLMConfig())

    # Fork-join pattern: Coordinate → [Text, Data, Image] → Synthesize
    graph = HAPGraph()
    graph.add_agent_node("coordinate", coordinator, ["text", "data", "image"])
    graph.add_agent_node("text", text_specialist, ["synthesize"])
    graph.add_agent_node("data", data_specialist, ["synthesize"])
    graph.add_agent_node("image", image_specialist, ["synthesize"])
    graph.add_agent_node("synthesize", synthesizer)
    graph.entry_node = "coordinate"

    return graph

Conditional Routing

Routing based on content or conditions:

def create_conditional_workflow():
    """Create workflow with conditional routing."""

    # Router and specialist agents
    classifier = SimpleAgent(
        name="classifier",
        engine=AugLLMConfig(
            system_message="Classify input and determine the best processing path."
        )
    )

    technical_handler = SimpleAgent(name="technical", engine=AugLLMConfig())
    creative_handler = SimpleAgent(name="creative", engine=AugLLMConfig())
    general_handler = SimpleAgent(name="general", engine=AugLLMConfig())

    # Classification-based routing
    graph = HAPGraph()
    graph.add_agent_node("classify", classifier, ["technical", "creative", "general"])
    graph.add_agent_node("technical", technical_handler)
    graph.add_agent_node("creative", creative_handler)
    graph.add_agent_node("general", general_handler)
    graph.entry_node = "classify"

    return graph

Complex Multi-Stage Workflows

Combining multiple patterns:

def create_complex_workflow():
    """Create a complex multi-stage workflow."""

    # Stage 1: Input processing
    intake = SimpleAgent(name="intake", engine=AugLLMConfig())
    validator = SimpleAgent(name="validator", engine=AugLLMConfig())

    # Stage 2: Parallel analysis
    content_analyzer = SimpleAgent(name="content_analyzer", engine=AugLLMConfig())
    sentiment_analyzer = SimpleAgent(name="sentiment_analyzer", engine=AugLLMConfig())
    quality_analyzer = SimpleAgent(name="quality_analyzer", engine=AugLLMConfig())

    # Stage 3: Synthesis and output
    synthesizer = SimpleAgent(name="synthesizer", engine=AugLLMConfig())
    formatter = SimpleAgent(name="formatter", engine=AugLLMConfig())

    # Complex workflow
    graph = HAPGraph()

    # Stage 1: Sequential validation
    graph.add_agent_node("intake", intake, ["validate"])
    graph.add_agent_node("validate", validator, ["content_analysis", "sentiment_analysis", "quality_analysis"])

    # Stage 2: Parallel analysis
    graph.add_agent_node("content_analysis", content_analyzer, ["synthesize"])
    graph.add_agent_node("sentiment_analysis", sentiment_analyzer, ["synthesize"])
    graph.add_agent_node("quality_analysis", quality_analyzer, ["synthesize"])

    # Stage 3: Sequential synthesis
    graph.add_agent_node("synthesize", synthesizer, ["format"])
    graph.add_agent_node("format", formatter)

    graph.entry_node = "intake"
    return graph

Context and State Management

Understanding HAP Context

Context flows through the entire workflow:

from haive.hap.models import HAPContext

# Context contains execution state
context = HAPContext()

# Execution tracking
print(f"Execution path: {context.execution_path}")
print(f"Agent metadata: {context.agent_metadata}")
print(f"Graph context: {context.graph_context}")

Working with Context Data:

async def context_aware_workflow():
    """Demonstrate context usage."""

    # Create initial context with data
    initial_context = HAPContext()
    initial_context.graph_context = {
        "user_id": "user123",
        "session_id": "session456",
        "preferences": {"style": "formal", "length": "concise"}
    }

    # Execute with context
    runtime = HAPRuntime(graph)
    result = await runtime.run(
        {"task": "Analyze customer feedback"},
        context=initial_context
    )

    # Access execution metadata
    for node_id in result.execution_path:
        metadata = result.agent_metadata.get(node_id, {})
        print(f"Node {node_id}: Duration={metadata.get('duration', 'N/A')}s")

Persistent State

Maintaining state across multiple executions:

class StatefulWorkflow:
    """Workflow that maintains state across executions."""

    def __init__(self):
        self.graph = self._build_graph()
        self.runtime = HAPRuntime(self.graph)
        self.persistent_context = HAPContext()

    async def execute_step(self, step_data: dict):
        """Execute a single step while maintaining state."""

        # Merge new data with persistent context
        execution_data = {
            **step_data,
            "previous_results": self.persistent_context.graph_context.get("results", [])
        }

        # Execute
        result = await self.runtime.run(execution_data, self.persistent_context)

        # Update persistent state
        self.persistent_context.graph_context["results"] = result.outputs
        self.persistent_context.graph_context["last_execution"] = {
            "timestamp": datetime.now(),
            "path": result.execution_path
        }

        return result

Error Handling

Graceful Error Handling

Handling different types of errors:

async def robust_workflow_execution():
    """Execute workflow with comprehensive error handling."""

    try:
        runtime = HAPRuntime(graph)
        result = await runtime.run(input_data)

        print("✅ Workflow completed successfully")
        return result

    except ImportError as e:
        print(f"❌ Agent import failed: {e}")
        print("📝 Check agent entrypoints and installed packages")

    except ValidationError as e:
        print(f"❌ Input validation failed: {e}")
        print("📝 Check input data format and required fields")

    except TimeoutError as e:
        print(f"❌ Execution timeout: {e}")
        print("📝 Consider increasing timeout or optimizing agents")

    except Exception as e:
        print(f"❌ Unexpected error: {e}")
        print("📝 Enable debug logging for more details")

    return None

Error Recovery Strategies:

class ResilientWorkflow:
    """Workflow with built-in error recovery."""

    def __init__(self, graph: HAPGraph, max_retries: int = 3):
        self.graph = graph
        self.runtime = HAPRuntime(graph)
        self.max_retries = max_retries

    async def execute_with_retry(self, input_data: dict):
        """Execute with automatic retry on failure."""

        for attempt in range(self.max_retries):
            try:
                return await self.runtime.run(input_data)

            except Exception as e:
                if attempt < self.max_retries - 1:
                    wait_time = 2 ** attempt  # Exponential backoff
                    print(f"⚠️ Attempt {attempt + 1} failed: {e}")
                    print(f"⏳ Retrying in {wait_time} seconds...")
                    await asyncio.sleep(wait_time)
                else:
                    print(f"❌ All {self.max_retries} attempts failed")
                    raise

Partial Execution Recovery:

async def resume_from_checkpoint(self, checkpoint_context: HAPContext):
    """Resume execution from a previous checkpoint."""

    # Find where execution stopped
    completed_nodes = set(checkpoint_context.execution_path)
    all_nodes = set(self.graph.topological_order())
    remaining_nodes = all_nodes - completed_nodes

    if not remaining_nodes:
        print("✅ Workflow already completed")
        return checkpoint_context

    # Create subgraph with remaining nodes
    remaining_graph = self._create_subgraph(remaining_nodes)
    remaining_runtime = HAPRuntime(remaining_graph)

    # Continue execution
    print(f"🔄 Resuming from checkpoint, {len(remaining_nodes)} nodes remaining")
    final_result = await remaining_runtime.run({}, checkpoint_context)

    return final_result

Performance Optimization

Async Best Practices

Maximizing async efficiency:

async def optimized_execution():
    """Optimized workflow execution patterns."""

    # Use asyncio.gather for true parallelism where possible
    if graph.has_parallel_sections():
        parallel_tasks = []
        for section in graph.get_parallel_sections():
            task = asyncio.create_task(execute_section(section))
            parallel_tasks.append(task)

        results = await asyncio.gather(*parallel_tasks)
        return merge_results(results)

    # Regular sequential execution
    return await runtime.run(input_data)

Resource Management:

class ResourceManagedWorkflow:
    """Workflow with resource limits and management."""

    def __init__(self, graph: HAPGraph, max_concurrent: int = 5):
        self.graph = graph
        self.runtime = HAPRuntime(graph)
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def execute_with_limits(self, input_data: dict):
        """Execute with concurrency limits."""
        async with self.semaphore:
            return await self.runtime.run(input_data)

Memory Optimization

Efficient context handling:

def optimize_context_memory(context: HAPContext):
    """Optimize context memory usage."""

    # Clear large intermediate results
    for node_id, metadata in context.agent_metadata.items():
        if 'large_intermediate_data' in metadata:
            metadata['large_intermediate_data'] = f"<{len(metadata['large_intermediate_data'])} bytes>"

    # Keep only essential graph context
    essential_keys = ['user_id', 'session_id', 'final_results']
    context.graph_context = {
        k: v for k, v in context.graph_context.items()
        if k in essential_keys
    }

    return context

Caching Strategies

Agent and result caching:

from functools import lru_cache
import hashlib

class CachingWorkflow:
    """Workflow with intelligent caching."""

    def __init__(self):
        self.agent_cache = {}
        self.result_cache = {}

    @lru_cache(maxsize=100)
    def get_cached_agent(self, agent_config_hash: str):
        """Cache agent instances for reuse."""
        return self.agent_cache.get(agent_config_hash)

    def cache_result(self, input_hash: str, result: HAPContext):
        """Cache results for identical inputs."""
        self.result_cache[input_hash] = result

    async def execute_with_caching(self, input_data: dict):
        """Execute with result caching."""

        # Check result cache first
        input_hash = hashlib.md5(str(input_data).encode()).hexdigest()
        if input_hash in self.result_cache:
            print("📦 Using cached result")
            return self.result_cache[input_hash]

        # Execute and cache result
        result = await self.runtime.run(input_data)
        self.cache_result(input_hash, result)

        return result

Production Deployment

Environment Configuration

Production environment setup:

import os
from typing import Optional

class ProductionConfig:
    """Production configuration management."""

    def __init__(self):
        self.api_key = os.getenv("OPENAI_API_KEY")
        self.environment = os.getenv("ENVIRONMENT", "development")
        self.log_level = os.getenv("LOG_LEVEL", "INFO")
        self.max_concurrent_workflows = int(os.getenv("MAX_CONCURRENT_WORKFLOWS", "10"))
        self.execution_timeout = int(os.getenv("EXECUTION_TIMEOUT", "300"))

    def validate(self):
        """Validate production configuration."""
        if not self.api_key:
            raise ValueError("OPENAI_API_KEY environment variable required")

        if self.environment == "production" and self.log_level == "DEBUG":
            print("⚠️ Warning: DEBUG logging in production")

Health Checks:

async def health_check():
    """Perform health check for HAP service."""

    try:
        # Test basic workflow execution
        test_agent = SimpleAgent(name="health_check", engine=AugLLMConfig())
        test_graph = HAPGraph()
        test_graph.add_agent_node("test", test_agent)
        test_graph.entry_node = "test"

        runtime = HAPRuntime(test_graph)
        result = await runtime.run({"test": "health check"})

        return {
            "status": "healthy",
            "timestamp": datetime.now().isoformat(),
            "test_execution_time": result.agent_metadata.get("test", {}).get("duration", 0)
        }

    except Exception as e:
        return {
            "status": "unhealthy",
            "error": str(e),
            "timestamp": datetime.now().isoformat()
        }

Monitoring and Logging

Comprehensive logging:

import logging
import json
from datetime import datetime

class HAPLogger:
    """Structured logging for HAP workflows."""

    def __init__(self, name: str = "hap"):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)

        # JSON formatter for structured logs
        formatter = logging.Formatter(
            '{"timestamp": "%(asctime)s", "level": "%(levelname)s", "message": %(message)s}'
        )

        handler = logging.StreamHandler()
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)

    def log_workflow_start(self, workflow_id: str, input_data: dict):
        """Log workflow start."""
        self.logger.info(json.dumps({
            "event": "workflow_start",
            "workflow_id": workflow_id,
            "input_size": len(str(input_data))
        }))

    def log_workflow_complete(self, workflow_id: str, result: HAPContext):
        """Log workflow completion."""
        self.logger.info(json.dumps({
            "event": "workflow_complete",
            "workflow_id": workflow_id,
            "execution_path": result.execution_path,
            "total_steps": len(result.execution_path)
        }))

Metrics Collection:

class HAPMetrics:
    """Metrics collection for HAP workflows."""

    def __init__(self):
        self.metrics = {
            "workflows_executed": 0,
            "total_execution_time": 0,
            "error_count": 0,
            "agent_usage": {}
        }

    def record_workflow_execution(self, result: HAPContext):
        """Record workflow execution metrics."""
        self.metrics["workflows_executed"] += 1

        # Track agent usage
        for node_id in result.execution_path:
            agent_name = result.agent_metadata.get(node_id, {}).get("agent_name", "unknown")
            self.metrics["agent_usage"][agent_name] = self.metrics["agent_usage"].get(agent_name, 0) + 1

    def get_metrics_summary(self) -> dict:
        """Get metrics summary."""
        return {
            "total_workflows": self.metrics["workflows_executed"],
            "average_execution_time": self.metrics["total_execution_time"] / max(1, self.metrics["workflows_executed"]),
            "error_rate": self.metrics["error_count"] / max(1, self.metrics["workflows_executed"]),
            "most_used_agents": sorted(
                self.metrics["agent_usage"].items(),
                key=lambda x: x[1],
                reverse=True
            )[:5]
        }

Best Practices

Development Best Practices:

  1. Start Simple: Begin with single-agent workflows, then add complexity

  2. Test Incrementally: Test each agent individually before integration

  3. Use Real Components: Follow Haive’s no-mock philosophy

  4. Document Workflows: Clear descriptions of what each workflow does

  5. Version Control Graphs: Track changes to workflow definitions

Production Best Practices:

  1. Validate Inputs: Always validate input data before processing

  2. Implement Timeouts: Set reasonable execution timeouts

  3. Monitor Performance: Track execution times and error rates

  4. Handle Failures Gracefully: Implement retry logic and fallbacks

  5. Secure API Keys: Use environment variables and secret management

Performance Best Practices:

  1. Use Async Everywhere: Leverage async/await for all I/O operations

  2. Cache When Appropriate: Cache expensive computations and API calls

  3. Limit Concurrency: Use semaphores to prevent resource exhaustion

  4. Profile Regularly: Identify and fix performance bottlenecks

  5. Optimize Context: Minimize context size and copying

Troubleshooting

Common Issues and Solutions:

1. Import Errors: `python # Problem: Agent import fails # Solution: Check entrypoint format and module paths graph.add_entrypoint_node("worker", "haive.agents.simple:SimpleAgent")  # Correct format `

2. Context Issues: `python # Problem: Context not flowing between agents # Solution: Ensure proper context handling result = await runtime.run(input_data, context=initial_context) `

3. Performance Issues: `python # Problem: Slow execution # Solution: Enable concurrent execution where possible # Check for bottlenecks in agent configurations `

4. Memory Issues: `python # Problem: High memory usage # Solution: Clear intermediate results and optimize context context = optimize_context_memory(context) `

Debug Tools:

# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)

# Use debug execution
result = await runtime.run(input_data, debug=True)

# Inspect context at each step
for node_id in result.execution_path:
    print(f"Node {node_id}: {result.agent_metadata[node_id]}")

This user guide provides comprehensive coverage of HAP usage patterns and best practices. For more specific topics, refer to the detailed API documentation and tutorial series.