🔧 Troubleshooting Guide

This comprehensive troubleshooting guide helps you diagnose and resolve common issues with Haive Agents.

Quick Diagnostics

System Health Check

Run this diagnostic script to quickly identify common issues:

import asyncio
import sys
import os
import pkg_resources
import platform
from typing import Dict, List, Any

async def system_diagnostics():
    """Comprehensive system diagnostics."""

    print("🔍 Haive Agents System Diagnostics")
    print("=" * 50)

    # System information
    print(f"Python version: {sys.version}")
    print(f"Platform: {platform.platform()}")
    print(f"Architecture: {platform.architecture()}")

    # Package versions
    print(f"\n📦 Package Versions:")
    critical_packages = [
        "haive-agents", "haive-core", "langchain", "langchain-core",
        "openai", "anthropic", "pydantic", "fastapi", "uvicorn"
    ]

    for package in critical_packages:
        try:
            version = pkg_resources.get_distribution(package).version
            print(f"  ✅ {package}: {version}")
        except pkg_resources.DistributionNotFound:
            print(f"  ❌ {package}: NOT INSTALLED")

    # Environment variables
    print(f"\n🔑 Environment Variables:")
    env_vars = [
        "OPENAI_API_KEY", "AZURE_OPENAI_API_KEY", "ANTHROPIC_API_KEY",
        "DATABASE_URL", "REDIS_URL", "NEO4J_URI"
    ]

    for var in env_vars:
        value = os.getenv(var)
        if value:
            print(f"  ✅ {var}: {'*' * min(len(value), 20)} (set)")
        else:
            print(f"  ⚠️  {var}: not set")

    # Basic import test
    print(f"\n🧪 Import Tests:")
    import_tests = [
        ("haive.agents", "SimpleAgent"),
        ("haive.agents", "ReactAgent"),
        ("haive.agents", "MultiAgent"),
        ("haive.core.engine.aug_llm", "AugLLMConfig"),
        ("langchain_core.tools", "tool")
    ]

    for module, cls in import_tests:
        try:
            mod = __import__(module, fromlist=[cls])
            getattr(mod, cls)
            print(f"  ✅ {module}.{cls}")
        except Exception as e:
            print(f"  ❌ {module}.{cls}: {e}")

    # Basic agent creation test
    print(f"\n🤖 Agent Creation Test:")
    try:
        from haive.agents import SimpleAgent
        from haive.core.engine.aug_llm import AugLLMConfig

        config = AugLLMConfig(model="gpt-3.5-turbo")
        agent = SimpleAgent(name="diagnostic", engine=config)
        print(f"  ✅ Agent creation successful")

        # Test basic execution (if API key available)
        if os.getenv("OPENAI_API_KEY"):
            result = await agent.arun("Say 'test successful'", timeout=10)
            if "test successful" in result.lower():
                print(f"  ✅ Agent execution successful")
            else:
                print(f"  ⚠️  Agent execution unexpected response")
        else:
            print(f"  ⚠️  Skipping execution test (no API key)")

    except Exception as e:
        print(f"  ❌ Agent creation failed: {e}")

    print(f"\n✅ Diagnostics complete")

if __name__ == "__main__":
    asyncio.run(system_diagnostics())

Common Issues

Installation Problems

Issue: “No module named ‘haive’”

# Check Python environment
which python
which pip

# Verify installation
pip show haive-agents

# Reinstall if needed
pip uninstall haive-agents
pip install haive-agents

# For development
cd haive-agents/
pip install -e .

Issue: “Package conflicts during installation”

# Create clean environment
python -m venv venv-haive
source venv-haive/bin/activate  # Linux/Mac
# OR
venv-haive\Scripts\activate     # Windows

# Install with constraints
pip install --no-deps haive-agents
pip install -r requirements.txt

Issue: “Poetry installation fails”

# Clear poetry cache
poetry cache clear --all pypi

# Update poetry
pip install --upgrade poetry

# Fresh install
rm poetry.lock
poetry install

API and Authentication Issues

Issue: “OpenAI API key not found”

import os

# Check if API key is set
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
    print("❌ OPENAI_API_KEY not set")
    print("Set it with: export OPENAI_API_KEY='your-key-here'")
else:
    print(f"✅ API key set: {api_key[:10]}...")

# Test API connectivity
import openai

try:
    client = openai.OpenAI(api_key=api_key)
    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": "test"}],
        max_tokens=5
    )
    print("✅ OpenAI API connection successful")
except Exception as e:
    print(f"❌ OpenAI API error: {e}")

Issue: “Rate limit exceeded”

# Implement exponential backoff
import time
import random

async def retry_with_backoff(func, max_retries=3):
    for attempt in range(max_retries):
        try:
            return await func()
        except Exception as e:
            if "rate limit" in str(e).lower() and attempt < max_retries - 1:
                delay = (2 ** attempt) + random.uniform(0, 1)
                print(f"Rate limited, waiting {delay:.1f}s...")
                await asyncio.sleep(delay)
            else:
                raise

# Use lower-tier models for high-volume requests
fallback_config = AugLLMConfig(
    model="gpt-3.5-turbo",  # Instead of gpt-4
    temperature=0.7
)

Issue: “Azure OpenAI configuration errors”

# Correct Azure configuration
from haive.core.engine.aug_llm import AugLLMConfig

azure_config = AugLLMConfig(
    provider="azure",
    model="gpt-4",  # Model name, not deployment name
    api_key=os.getenv("AZURE_OPENAI_API_KEY"),
    api_base=os.getenv("AZURE_OPENAI_ENDPOINT"),
    api_version="2024-02-15-preview",  # Correct API version
    deployment_name="your-deployment-name"  # This is the deployment name
)

# Test Azure connection
try:
    agent = SimpleAgent(name="azure_test", engine=azure_config)
    result = await agent.arun("test connection")
    print("✅ Azure OpenAI connection successful")
except Exception as e:
    print(f"❌ Azure error: {e}")
    # Common fixes:
    print("Check: API key, endpoint URL, deployment name, API version")

Agent Execution Issues

Issue: “Agent takes too long to respond”

# Add timeout to agent calls
import asyncio

try:
    result = await asyncio.wait_for(
        agent.arun("your query"),
        timeout=30.0  # 30 second timeout
    )
except asyncio.TimeoutError:
    print("Agent request timed out")

# Use streaming for long responses
config_with_streaming = AugLLMConfig(
    model="gpt-4",
    stream=True,
    max_tokens=1000  # Limit response length
)

# Optimize model selection
fast_config = AugLLMConfig(model="gpt-3.5-turbo")  # Faster model
detailed_config = AugLLMConfig(model="gpt-4")      # More capable model

Issue: “Agent gives inconsistent responses”

# Lower temperature for consistency
consistent_config = AugLLMConfig(
    model="gpt-4",
    temperature=0.1,  # More deterministic
    seed=12345        # Reproducible outputs (if supported)
)

# Use structured output for consistency
from pydantic import BaseModel, Field

class ConsistentResponse(BaseModel):
    answer: str = Field(description="The main answer")
    confidence: float = Field(ge=0.0, le=1.0, description="Confidence level")

structured_config = AugLLMConfig(
    structured_output_model=ConsistentResponse,
    temperature=0.2
)

Issue: “ReactAgent not using tools properly”

from langchain_core.tools import tool
from haive.agents import ReactAgent

@tool
def debug_calculator(expression: str) -> str:
    """Calculate mathematical expressions with debugging."""
    print(f"🔧 Tool called with: {expression}")
    try:
        result = eval(expression)
        print(f"🔧 Tool result: {result}")
        return f"Result: {result}"
    except Exception as e:
        print(f"🔧 Tool error: {e}")
        return f"Error: {e}"

# Create ReactAgent with debug tools
react_agent = ReactAgent(
    name="debug_react",
    engine=AugLLMConfig(temperature=0.1),
    tools=[debug_calculator],
    max_iterations=5,
    return_intermediate_steps=True  # Show reasoning steps
)

# Test with clear instructions
result = await react_agent.arun(
    "Calculate 15 * 23 using the calculator tool, then tell me the result"
)

# Check intermediate steps
if hasattr(result, 'intermediate_steps'):
    print("🔧 Reasoning steps:")
    for step in result.intermediate_steps:
        print(f"  {step}")

Memory and Performance Issues

Issue: “Out of memory errors”

# Monitor memory usage
import psutil
import gc

def check_memory():
    process = psutil.Process()
    memory_mb = process.memory_info().rss / 1024 / 1024
    print(f"Memory usage: {memory_mb:.1f} MB")
    return memory_mb

# Memory-efficient agent configuration
memory_efficient_config = AugLLMConfig(
    model="gpt-3.5-turbo",  # Use smaller model
    max_tokens=1000,        # Limit response length
    stream=True             # Stream responses
)

# Cleanup after heavy operations
async def memory_safe_agent_call(agent, query):
    try:
        result = await agent.arun(query)
        return result
    finally:
        gc.collect()  # Force garbage collection

# Limit conversation history
agent.max_history_length = 5  # Keep only recent messages

Issue: “Slow performance with large datasets”

# Use async/await properly
import asyncio

async def process_multiple_queries(agent, queries):
    # Process in batches to avoid overwhelming the API
    batch_size = 5
    results = []

    for i in range(0, len(queries), batch_size):
        batch = queries[i:i + batch_size]

        # Process batch concurrently
        batch_tasks = [agent.arun(query) for query in batch]
        batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)

        results.extend(batch_results)

        # Brief pause between batches
        await asyncio.sleep(0.1)

    return results

# Enable caching for repeated queries
cached_config = AugLLMConfig(
    model="gpt-4",
    enable_caching=True,
    cache_ttl=3600  # 1 hour cache
)

Database and Storage Issues

Issue: “Database connection errors”

# Test database connectivity
import asyncpg
import asyncio

async def test_postgres_connection():
    try:
        conn = await asyncpg.connect("postgresql://user:password@localhost/db")
        result = await conn.fetchval("SELECT version()")
        print(f"✅ PostgreSQL connected: {result[:50]}")
        await conn.close()
    except Exception as e:
        print(f"❌ PostgreSQL error: {e}")
        print("Check: connection string, user permissions, server status")

# Test Redis connectivity
import aioredis

async def test_redis_connection():
    try:
        redis = await aioredis.from_url("redis://localhost:6379")
        await redis.ping()
        print("✅ Redis connected")
        await redis.close()
    except Exception as e:
        print(f"❌ Redis error: {e}")
        print("Check: Redis server status, connection URL")

Issue: “Neo4j graph database issues”

# Test Neo4j connectivity
from neo4j import AsyncGraphDatabase

async def test_neo4j_connection():
    driver = AsyncGraphDatabase.driver(
        "neo4j://localhost:7687",
        auth=("neo4j", "password")
    )

    try:
        async with driver.session() as session:
            result = await session.run("RETURN 'Connection successful' as message")
            record = await result.single()
            print(f"✅ Neo4j connected: {record['message']}")
    except Exception as e:
        print(f"❌ Neo4j error: {e}")
        print("Check: server status, credentials, plugins (APOC, GDS)")
    finally:
        await driver.close()

Multi-Agent Issues

Issue: “MultiAgent coordination failures”

# Debug multi-agent execution
from haive.agents import MultiAgent, SimpleAgent

# Create agents with clear roles
planner = SimpleAgent(
    name="planner",
    engine=AugLLMConfig(
        system_message="You are a planning expert. Create detailed plans.",
        temperature=0.3
    )
)

executor = SimpleAgent(
    name="executor",
    engine=AugLLMConfig(
        system_message="You execute plans step by step.",
        temperature=0.5
    )
)

# Multi-agent with debugging
multi_agent = MultiAgent(
    name="debug_workflow",
    agents=[planner, executor],
    execution_mode="sequential",
    enable_logging=True  # Enable detailed logging
)

# Test with clear task
result = await multi_agent.arun(
    "Create a plan to organize a team meeting, then execute the first step"
)

# Check individual agent results
if hasattr(result, 'agent_results'):
    for agent_name, agent_result in result.agent_results.items():
        print(f"🔧 {agent_name}: {agent_result[:100]}...")

Issue: “State sharing between agents”

# Proper state management
from haive.core.schema.prebuilt.messages_state import MessagesState

# Use shared state schema
shared_state = MessagesState(
    messages=[],
    shared_context={"workflow_id": "debug_001"}
)

# Configure agents to use shared state
multi_agent = MultiAgent(
    name="shared_state_workflow",
    agents=[agent1, agent2],
    execution_mode="sequential",
    state_schema=MessagesState,
    share_state=True
)

Debugging Tools

Debug Mode

import logging

# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("haive.agents")

# Create agent with debugging
debug_agent = SimpleAgent(
    name="debug_agent",
    engine=AugLLMConfig(
        model="gpt-4",
        temperature=0.1,
        enable_logging=True
    )
)

# Run with debug information
result = await debug_agent.arun("test query", debug=True)

Request Tracing

import time
import uuid
from typing import Dict, Any

class RequestTracer:
    """Trace agent requests for debugging."""

    def __init__(self):
        self.traces: Dict[str, Dict[str, Any]] = {}

    def start_trace(self, query: str) -> str:
        """Start a new trace."""
        trace_id = str(uuid.uuid4())[:8]

        self.traces[trace_id] = {
            "query": query,
            "start_time": time.time(),
            "steps": []
        }

        return trace_id

    def add_step(self, trace_id: str, step: str, data: Any = None):
        """Add a step to the trace."""
        if trace_id in self.traces:
            self.traces[trace_id]["steps"].append({
                "step": step,
                "timestamp": time.time(),
                "data": data
            })

    def end_trace(self, trace_id: str, result: Any):
        """End a trace."""
        if trace_id in self.traces:
            trace = self.traces[trace_id]
            trace["result"] = result
            trace["end_time"] = time.time()
            trace["duration"] = trace["end_time"] - trace["start_time"]

    def get_trace(self, trace_id: str) -> Dict[str, Any]:
        """Get trace information."""
        return self.traces.get(trace_id, {})

    def print_trace(self, trace_id: str):
        """Print detailed trace information."""
        trace = self.get_trace(trace_id)

        if not trace:
            print(f"❌ Trace {trace_id} not found")
            return

        print(f"🔍 Trace {trace_id}")
        print(f"Query: {trace['query']}")
        print(f"Duration: {trace.get('duration', 0):.2f}s")
        print(f"Steps:")

        for step in trace.get("steps", []):
            elapsed = step["timestamp"] - trace["start_time"]
            print(f"  [{elapsed:.2f}s] {step['step']}")
            if step.get("data"):
                print(f"    Data: {step['data']}")

# Usage
tracer = RequestTracer()

async def traced_agent_call(agent, query):
    trace_id = tracer.start_trace(query)

    try:
        tracer.add_step(trace_id, "Starting agent call")
        result = await agent.arun(query)
        tracer.add_step(trace_id, "Agent call completed", len(str(result)))
        tracer.end_trace(trace_id, result)

        tracer.print_trace(trace_id)
        return result

    except Exception as e:
        tracer.add_step(trace_id, f"Error occurred: {e}")
        tracer.end_trace(trace_id, None)
        tracer.print_trace(trace_id)
        raise

Performance Profiling

import cProfile
import pstats
import io
from functools import wraps

def profile_agent_call(func):
    """Decorator to profile agent function calls."""

    @wraps(func)
    async def wrapper(*args, **kwargs):
        pr = cProfile.Profile()
        pr.enable()

        try:
            result = await func(*args, **kwargs)
            return result
        finally:
            pr.disable()

            # Print profiling results
            s = io.StringIO()
            ps = pstats.Stats(pr, stream=s)
            ps.sort_stats('cumulative').print_stats(20)

            print("🔧 Performance Profile:")
            print(s.getvalue())

    return wrapper

# Usage
@profile_agent_call
async def profiled_agent_run(agent, query):
    return await agent.arun(query)

Error Analysis

Common Error Patterns

Error: “ValidationError: X validation errors”

# Debug Pydantic validation errors
from pydantic import ValidationError

try:
    result = await agent.arun("query")
except ValidationError as e:
    print("🔧 Validation errors:")
    for error in e.errors():
        print(f"  Field: {error['loc']}")
        print(f"  Message: {error['msg']}")
        print(f"  Value: {error.get('input')}")

    # Common fixes:
    # 1. Check field types in structured output models
    # 2. Verify required fields are present
    # 3. Check field constraints (min/max values)

Error: “JSONDecodeError: Expecting value”

# Debug JSON parsing issues
import json

def safe_json_parse(text: str):
    try:
        return json.loads(text)
    except json.JSONDecodeError as e:
        print(f"🔧 JSON Parse Error at position {e.pos}")
        print(f"🔧 Text around error: ...{text[max(0, e.pos-20):e.pos+20]}...")

        # Try to extract JSON from response
        import re
        json_match = re.search(r'\{.*\}', text, re.DOTALL)
        if json_match:
            try:
                return json.loads(json_match.group())
            except:
                pass

        return None

Error: “AttributeError: ‘NoneType’ object has no attribute”

# Debug None value issues
def safe_get_attribute(obj, attr, default=None):
    """Safely get attribute with debugging."""
    if obj is None:
        print(f"🔧 Warning: Trying to get '{attr}' from None object")
        return default

    if not hasattr(obj, attr):
        print(f"🔧 Warning: Object {type(obj)} has no attribute '{attr}'")
        return default

    return getattr(obj, attr, default)

# Use defensive programming
result = await agent.arun("query")
response_text = safe_get_attribute(result, 'content', 'No response')

Production Debugging

Log Analysis

# Structured logging for production debugging
import logging
import json
from datetime import datetime

class ProductionLogger:
    """Enhanced logging for production debugging."""

    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
        self.setup_logging()

    def setup_logging(self):
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    def log_agent_call(self, agent_name: str, query: str,
                      processing_time: float, success: bool, error: str = None):
        """Log agent call with structured data."""

        log_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "agent_name": agent_name,
            "query_length": len(query),
            "processing_time": processing_time,
            "success": success,
            "error": error
        }

        if success:
            self.logger.info(f"Agent call successful: {json.dumps(log_data)}")
        else:
            self.logger.error(f"Agent call failed: {json.dumps(log_data)}")

Monitoring Integration

# Integration with monitoring systems
import time
from typing import Dict, Any

class MonitoringIntegration:
    """Integration with external monitoring systems."""

    def __init__(self):
        self.metrics = {}

    def record_agent_metrics(self, agent_name: str,
                            processing_time: float,
                            success: bool,
                            error_type: str = None):
        """Record metrics for external monitoring."""

        # Update internal metrics
        if agent_name not in self.metrics:
            self.metrics[agent_name] = {
                "total_calls": 0,
                "successful_calls": 0,
                "total_time": 0.0,
                "error_counts": {}
            }

        metrics = self.metrics[agent_name]
        metrics["total_calls"] += 1
        metrics["total_time"] += processing_time

        if success:
            metrics["successful_calls"] += 1
        else:
            error_type = error_type or "unknown"
            metrics["error_counts"][error_type] = \
                metrics["error_counts"].get(error_type, 0) + 1

        # Send to external monitoring (Prometheus, DataDog, etc.)
        self.send_to_monitoring(agent_name, processing_time, success, error_type)

    def send_to_monitoring(self, agent_name: str, processing_time: float,
                          success: bool, error_type: str = None):
        """Send metrics to external monitoring system."""
        # Implementation depends on your monitoring system
        pass

    def get_agent_health_summary(self) -> Dict[str, Any]:
        """Get health summary for all agents."""
        summary = {}

        for agent_name, metrics in self.metrics.items():
            total_calls = metrics["total_calls"]
            if total_calls == 0:
                continue

            success_rate = metrics["successful_calls"] / total_calls
            avg_time = metrics["total_time"] / total_calls

            summary[agent_name] = {
                "success_rate": success_rate,
                "average_time": avg_time,
                "total_calls": total_calls,
                "health_status": "healthy" if success_rate > 0.95 else "degraded"
            }

        return summary

Support Resources

Getting Help

When encountering issues:

  1. Check Documentation: Review relevant sections of this guide

  2. Search Issues: Check GitHub Issues

  3. Community Support: Join our Discord Server

  4. Create Issue: File detailed bug report with reproduction steps

Information to Include in Bug Reports:

  • System information (OS, Python version)

  • Haive Agents version and installation method

  • Complete error messages and stack traces

  • Minimal reproduction code

  • Expected vs. actual behavior

  • Environment variables (without sensitive data)

System Information Script:

def generate_support_info():
    """Generate system information for support requests."""

    info = {
        "system": {
            "platform": platform.platform(),
            "python_version": sys.version,
            "architecture": platform.architecture()
        },
        "packages": {},
        "environment": {},
        "config": {}
    }

    # Package versions
    for pkg in ["haive-agents", "haive-core", "langchain", "openai"]:
        try:
            info["packages"][pkg] = pkg_resources.get_distribution(pkg).version
        except:
            info["packages"][pkg] = "not installed"

    # Environment (without sensitive data)
    env_vars = ["OPENAI_API_KEY", "ANTHROPIC_API_KEY", "DATABASE_URL"]
    for var in env_vars:
        info["environment"][var] = "set" if os.getenv(var) else "not set"

    return json.dumps(info, indent=2)

Quick Fixes Summary

Most Common Solutions:

  1. Import Errors: Reinstall package in clean environment

  2. API Errors: Check API keys and rate limits

  3. Timeout Issues: Reduce max_tokens or use faster models

  4. Memory Issues: Use streaming and limit history length

  5. Performance Issues: Enable caching and use async properly

  6. Validation Errors: Check structured output model fields

  7. Database Issues: Verify connection strings and permissions

Emergency Fixes:

# Quick agent with error handling
async def emergency_agent(query: str) -> str:
    """Emergency agent with comprehensive error handling."""

    try:
        # Basic configuration
        config = AugLLMConfig(
            model="gpt-3.5-turbo",  # Fast, reliable
            temperature=0.5,
            max_tokens=500,
            timeout=30
        )

        agent = SimpleAgent(name="emergency", engine=config)

        # Multiple attempts with fallbacks
        for attempt in range(3):
            try:
                result = await asyncio.wait_for(
                    agent.arun(query),
                    timeout=30.0
                )
                return result
            except Exception as e:
                if attempt == 2:  # Last attempt
                    return f"Emergency response: Unable to process query due to {e}"
                await asyncio.sleep(1)  # Brief pause

    except Exception as e:
        return f"Critical error: {e}"

This troubleshooting guide should help you resolve most issues quickly. Remember to check the logs, verify your configuration, and don’t hesitate to ask for help when needed!