πŸŽ“ Examples & TutorialsΒΆ

This comprehensive guide provides practical examples for every aspect of Haive Agents, from basic usage to advanced patterns.

Basic Agent ExamplesΒΆ

Simple Conversation AgentΒΆ

Goal: Create a basic conversational agent that can chat naturally.

import asyncio
from haive.agents import SimpleAgent
from haive.core.engine.aug_llm import AugLLMConfig

async def basic_conversation_example():
    """Basic conversation with a simple agent."""

    # Create conversational agent
    agent = SimpleAgent(
        name="friendly_assistant",
        engine=AugLLLConfig(
            temperature=0.7,
            system_message="You are a helpful and friendly assistant."
        )
    )

    # Single conversation
    response = await agent.arun("Tell me about renewable energy.")
    print(f"Agent: {response}")

    # Follow-up conversation (maintains context)
    followup = await agent.arun("What are the main challenges?")
    print(f"Agent: {followup}")

# Run the example
asyncio.run(basic_conversation_example())

Expected Output:

Agent: Renewable energy refers to energy sources that naturally replenish themselves,
such as solar, wind, hydroelectric, and geothermal power. These sources offer numerous
benefits including reduced greenhouse gas emissions...

Agent: The main challenges with renewable energy include intermittency (solar and wind
depend on weather), energy storage costs, grid integration complexity, and initial
capital investment requirements...

Structured Output AgentΒΆ

Goal: Create an agent that returns structured, typed data instead of plain text.

from pydantic import BaseModel, Field
from typing import List
from haive.agents import SimpleAgent

class SentimentAnalysis(BaseModel):
    """Structured sentiment analysis result."""
    sentiment: str = Field(description="positive, negative, or neutral")
    confidence: float = Field(ge=0.0, le=1.0, description="Confidence score")
    emotions: List[str] = Field(description="Detected emotions")
    key_phrases: List[str] = Field(description="Important phrases")

async def structured_output_example():
    """Agent that returns structured data."""

    # Create agent with structured output
    analyzer = SimpleAgent(
        name="sentiment_analyzer",
        engine=AugLLMConfig(
            structured_output_model=SentimentAnalysis,
            temperature=0.3,  # Lower for more consistent structure
            system_message="You are an expert sentiment analyzer."
        )
    )

    # Analyze text and get structured result
    text = "I absolutely love this new product! It's innovative and well-designed."
    result = await analyzer.arun(text)

    # Result is a SentimentAnalysis object, not a string
    print(f"Sentiment: {result.sentiment}")
    print(f"Confidence: {result.confidence:.2f}")
    print(f"Emotions: {', '.join(result.emotions)}")
    print(f"Key phrases: {', '.join(result.key_phrases)}")

asyncio.run(structured_output_example())

Expected Output:

Sentiment: positive
Confidence: 0.92
Emotions: love, excitement, satisfaction
Key phrases: absolutely love, innovative, well-designed

Tool-Enabled Agent ExamplesΒΆ

Calculator AgentΒΆ

Goal: Create an agent that can perform mathematical calculations using tools.

from haive.agents import ReactAgent
from langchain_core.tools import tool
import math

@tool
def calculator(expression: str) -> str:
    """Calculate mathematical expressions safely.

    Args:
        expression: Mathematical expression like "2+2", "sqrt(16)", "sin(pi/2)"
    """
    try:
        # Safe evaluation with math functions available
        safe_dict = {
            "__builtins__": {},
            "abs": abs, "round": round, "min": min, "max": max,
            "sum": sum, "pow": pow, "sqrt": math.sqrt, "sin": math.sin,
            "cos": math.cos, "tan": math.tan, "pi": math.pi, "e": math.e
        }
        result = eval(expression, safe_dict)
        return f"Result: {result}"
    except Exception as e:
        return f"Error: {str(e)}"

@tool
def unit_converter(value: float, from_unit: str, to_unit: str) -> str:
    """Convert between units of measurement.

    Args:
        value: Numeric value to convert
        from_unit: Source unit (e.g., 'celsius', 'fahrenheit', 'meters', 'feet')
        to_unit: Target unit
    """
    conversions = {
        ('celsius', 'fahrenheit'): lambda x: x * 9/5 + 32,
        ('fahrenheit', 'celsius'): lambda x: (x - 32) * 5/9,
        ('meters', 'feet'): lambda x: x * 3.28084,
        ('feet', 'meters'): lambda x: x / 3.28084,
    }

    key = (from_unit.lower(), to_unit.lower())
    if key in conversions:
        result = conversions[key](value)
        return f"{value} {from_unit} = {result:.2f} {to_unit}"
    else:
        return f"Conversion from {from_unit} to {to_unit} not supported"

async def calculator_agent_example():
    """Agent that can perform calculations and conversions."""

    calculator_agent = ReactAgent(
        name="math_assistant",
        engine=AugLLMConfig(
            temperature=0.1,  # Low temperature for accuracy
            system_message="You are a precise mathematical assistant."
        ),
        tools=[calculator, unit_converter],
        max_iterations=3
    )

    # Complex mathematical problem
    problem = """
    I have a circular garden with radius 8.5 meters.
    What's the area in square feet? Also, if the temperature
    is 23Β°C, what is that in Fahrenheit?
    """

    result = await calculator_agent.arun(problem)
    print(result)

asyncio.run(calculator_agent_example())

Agent Reasoning Process:

  1. Think: β€œI need to calculate the area of a circle and convert units”

  2. Act: calculator("pi * 8.5**2") β†’ Area = 226.98 mΒ²

  3. Act: unit_converter(226.98, "square meters", "square feet") β†’ 2442.46 ftΒ²

  4. Act: unit_converter(23, "celsius", "fahrenheit") β†’ 73.4Β°F

  5. Respond: β€œThe garden area is 2442.46 square feet, and 23Β°C equals 73.4Β°F”

Web Research AgentΒΆ

Goal: Create an agent that can search the web and synthesize information.

import requests
from bs4 import BeautifulSoup
from langchain_core.tools import tool

@tool
def web_search(query: str) -> str:
    """Search the web for current information.

    Args:
        query: Search query string
    """
    # Simulated web search (use real API in production)
    search_results = {
        "AI trends 2025": "Generative AI adoption accelerates, multimodal AI becomes mainstream, AI agents gain enterprise adoption...",
        "climate change": "Global temperatures continue rising, renewable energy reaches cost parity, climate adaptation strategies evolving...",
        "quantum computing": "IBM, Google make breakthroughs in error correction, quantum advantage demonstrated in optimization..."
    }

    # Find best match
    for topic, content in search_results.items():
        if any(word in query.lower() for word in topic.lower().split()):
            return f"Search results for '{query}':\n{content}"

    return f"Search results for '{query}': General web content about {query}"

@tool
def extract_webpage_content(url: str) -> str:
    """Extract text content from a webpage.

    Args:
        url: URL to extract content from
    """
    try:
        # Simulated webpage extraction (use real requests in production)
        mock_content = {
            "news.example.com": "Breaking: New AI breakthrough announced by research team...",
            "tech.example.com": "Analysis: The future of artificial intelligence in 2025...",
        }

        for domain, content in mock_content.items():
            if domain in url:
                return f"Content from {url}:\n{content}"

        return f"Content from {url}: Sample webpage content"
    except Exception as e:
        return f"Error extracting content: {str(e)}"

async def research_agent_example():
    """Agent that conducts web research and synthesizes findings."""

    researcher = ReactAgent(
        name="research_specialist",
        engine=AugLLMConfig(
            temperature=0.4,
            system_message="""You are a thorough research specialist. When researching:
            1. Search for multiple perspectives
            2. Verify information from different sources
            3. Synthesize findings into clear insights
            4. Cite your sources"""
        ),
        tools=[web_search, extract_webpage_content],
        max_iterations=5
    )

    research_query = """
    Research the current state of AI in healthcare. What are the main
    applications, benefits, and challenges? Provide a comprehensive overview.
    """

    result = await researcher.arun(research_query)
    print(result)

asyncio.run(research_agent_example())

Multi-Agent Workflow ExamplesΒΆ

Content Creation PipelineΒΆ

Goal: Coordinate multiple agents to research, write, and review content.

from haive.agents import SimpleAgent, ReactAgent, MultiAgent
from pydantic import BaseModel, Field
from typing import List

# Structured outputs for each stage
class ResearchBrief(BaseModel):
    topic: str = Field(description="Main topic researched")
    key_findings: List[str] = Field(description="Important findings")
    sources: List[str] = Field(description="Information sources")
    recommendations: List[str] = Field(description="Content recommendations")

class ContentDraft(BaseModel):
    title: str = Field(description="Article title")
    introduction: str = Field(description="Opening paragraph")
    main_sections: List[str] = Field(description="Main content sections")
    conclusion: str = Field(description="Closing paragraph")
    word_count: int = Field(description="Approximate word count")

class ReviewFeedback(BaseModel):
    overall_rating: int = Field(ge=1, le=10, description="Overall quality rating")
    strengths: List[str] = Field(description="Content strengths")
    improvements: List[str] = Field(description="Areas for improvement")
    revised_content: str = Field(description="Improved version")

async def content_pipeline_example():
    """Multi-agent content creation workflow."""

    # Research Agent - Gathers information
    researcher = ReactAgent(
        name="content_researcher",
        engine=AugLLMConfig(
            structured_output_model=ResearchBrief,
            temperature=0.3,
            system_message="You are a thorough content researcher."
        ),
        tools=[web_search, extract_webpage_content]
    )

    # Writer Agent - Creates content
    writer = SimpleAgent(
        name="content_writer",
        engine=AugLLMConfig(
            structured_output_model=ContentDraft,
            temperature=0.8,
            system_message="You are a skilled content writer who creates engaging articles."
        )
    )

    # Editor Agent - Reviews and improves
    editor = SimpleAgent(
        name="content_editor",
        engine=AugLLMConfig(
            structured_output_model=ReviewFeedback,
            temperature=0.2,
            system_message="You are a meticulous editor focused on quality and clarity."
        )
    )

    # Create workflow
    content_team = MultiAgent(
        name="content_creation_team",
        agents=[researcher, writer, editor],
        execution_mode="sequential"
    )

    # Execute pipeline
    topic = "The Impact of Artificial Intelligence on Remote Work"
    result = await content_team.arun(f"Create a comprehensive article about: {topic}")

    # Extract results from each stage
    research = result.get("content_researcher", {})
    draft = result.get("content_writer", {})
    review = result.get("content_editor", {})

    print("πŸ“Š RESEARCH STAGE")
    print(f"Key Findings: {', '.join(research.get('key_findings', []))}")
    print()

    print("✏️ WRITING STAGE")
    print(f"Title: {draft.get('title', 'N/A')}")
    print(f"Word Count: {draft.get('word_count', 'N/A')}")
    print()

    print("πŸ“ EDITING STAGE")
    print(f"Rating: {review.get('overall_rating', 'N/A')}/10")
    print(f"Improvements: {', '.join(review.get('improvements', []))}")
    print()

    print("πŸ“„ FINAL CONTENT")
    print(review.get('revised_content', 'Content not available'))

asyncio.run(content_pipeline_example())

Data Analysis TeamΒΆ

Goal: Create a team of agents that analyze data, create visualizations, and generate insights.

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from io import StringIO

@tool
def analyze_dataset(csv_data: str) -> str:
    """Analyze a CSV dataset and return statistics.

    Args:
        csv_data: CSV data as string
    """
    try:
        df = pd.read_csv(StringIO(csv_data))
        analysis = {
            "rows": len(df),
            "columns": len(df.columns),
            "numeric_summary": df.describe().to_string(),
            "missing_values": df.isnull().sum().to_dict(),
            "data_types": df.dtypes.to_dict()
        }
        return str(analysis)
    except Exception as e:
        return f"Analysis error: {str(e)}"

@tool
def create_visualization(data_description: str, chart_type: str) -> str:
    """Create data visualization based on description.

    Args:
        data_description: Description of the data to visualize
        chart_type: Type of chart (bar, line, scatter, histogram)
    """
    # Simulated visualization creation
    return f"Created {chart_type} chart: {data_description}"

class DataInsights(BaseModel):
    key_metrics: List[str] = Field(description="Important metrics found")
    trends: List[str] = Field(description="Trends identified")
    anomalies: List[str] = Field(description="Unusual patterns")
    recommendations: List[str] = Field(description="Actionable insights")

async def data_analysis_team_example():
    """Multi-agent data analysis workflow."""

    # Data Analyst - Examines raw data
    analyst = ReactAgent(
        name="data_analyst",
        engine=AugLLMConfig(
            temperature=0.2,
            system_message="You are a meticulous data analyst."
        ),
        tools=[analyze_dataset, create_visualization]
    )

    # Insights Generator - Finds patterns and meaning
    insights_agent = SimpleAgent(
        name="insights_generator",
        engine=AugLLMConfig(
            structured_output_model=DataInsights,
            temperature=0.4,
            system_message="You are an expert at finding insights in data."
        )
    )

    # Report Writer - Creates final report
    reporter = SimpleAgent(
        name="report_writer",
        engine=AugLLMConfig(
            temperature=0.6,
            system_message="You create clear, actionable business reports."
        )
    )

    # Create analysis team
    analysis_team = MultiAgent(
        name="data_analysis_team",
        agents=[analyst, insights_agent, reporter],
        execution_mode="sequential"
    )

    # Sample dataset
    sample_data = """date,sales,customers,region
    2024-01-01,1200,45,North
    2024-01-02,1350,52,North
    2024-01-03,980,38,South
    2024-01-04,1400,58,North
    2024-01-05,1100,42,South"""

    # Run analysis
    result = await analysis_team.arun(
        f"Analyze this sales data and provide insights: {sample_data}"
    )

    print("πŸ“Š Data Analysis Complete:")
    print(result)

asyncio.run(data_analysis_team_example())

Advanced Agent ExamplesΒΆ

Self-Modifying AgentΒΆ

Goal: Create an agent that adapts its behavior based on performance feedback.

from haive.agents.agent import SelfModifyingAgent
from pydantic import BaseModel

class PerformanceMetrics(BaseModel):
    accuracy: float = Field(ge=0.0, le=1.0)
    response_time: float
    user_satisfaction: float = Field(ge=0.0, le=1.0)
    error_rate: float = Field(ge=0.0, le=1.0)

class LearningAgent(SelfModifyingAgent):
    """Agent that learns and adapts from experience."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.performance_history = []
        self.adaptation_count = 0

    async def arun(self, input_data, collect_metrics=True):
        """Run with performance monitoring and adaptation."""

        start_time = time.time()

        # Execute task
        result = await super().arun(input_data)

        # Collect performance metrics
        if collect_metrics:
            end_time = time.time()
            metrics = await self.evaluate_performance(
                input_data, result, end_time - start_time
            )

            self.performance_history.append(metrics)

            # Adapt if performance is declining
            if await self.should_adapt(metrics):
                await self.adapt_behavior(metrics)

        return result

    async def evaluate_performance(self, input_data, result, response_time):
        """Evaluate performance of the last interaction."""

        # Simplified metrics (in practice, use real evaluation)
        accuracy = 0.85 if len(result) > 50 else 0.60
        satisfaction = 0.90 if "helpful" in result.lower() else 0.70
        error_rate = 0.05 if "error" not in result.lower() else 0.20

        return PerformanceMetrics(
            accuracy=accuracy,
            response_time=response_time,
            user_satisfaction=satisfaction,
            error_rate=error_rate
        )

    async def should_adapt(self, current_metrics):
        """Determine if adaptation is needed."""

        if len(self.performance_history) < 3:
            return False

        # Check if performance is declining
        recent_accuracy = sum(m.accuracy for m in self.performance_history[-3:]) / 3
        recent_satisfaction = sum(m.user_satisfaction for m in self.performance_history[-3:]) / 3

        return recent_accuracy < 0.75 or recent_satisfaction < 0.75

    async def adapt_behavior(self, metrics):
        """Adapt agent behavior based on metrics."""

        adaptations = {}

        if metrics.accuracy < 0.70:
            # Lower temperature for more consistent responses
            adaptations["temperature"] = max(0.1, self.engine.temperature * 0.8)
            adaptations["system_message"] = self.engine.system_message + "\nPrioritize accuracy over creativity."

        if metrics.response_time > 10.0:
            # Optimize for faster responses
            adaptations["max_tokens"] = min(1000, self.engine.max_tokens or 2000)

        if metrics.error_rate > 0.15:
            # Add verification tools
            verification_tool = self.create_verification_tool()
            adaptations["tools"] = self.tools + [verification_tool]

        if adaptations:
            await self.modify_behavior(adaptations)
            self.adaptation_count += 1
            print(f"πŸ”§ Adaptation #{self.adaptation_count}: {list(adaptations.keys())}")

    def create_verification_tool(self):
        """Create a verification tool for fact-checking."""

        @tool
        def verify_facts(statement: str) -> str:
            """Verify factual accuracy of a statement."""
            # Simplified verification
            return f"Verification: {statement} appears accurate based on available data."

        return verify_facts

async def self_modifying_example():
    """Demonstrate self-modifying agent capabilities."""

    learning_agent = LearningAgent(
        name="adaptive_assistant",
        engine=AugLLMConfig(
            temperature=0.9,  # Start with high creativity
            system_message="You are a helpful assistant."
        )
    )

    # Simulate interactions that trigger adaptations
    interactions = [
        "What is photosynthesis?",
        "Explain quantum mechanics simply",
        "What are the benefits of exercise?",
        "Describe the water cycle",
        "Explain machine learning"
    ]

    for i, query in enumerate(interactions):
        print(f"\n--- Interaction {i+1} ---")
        result = await learning_agent.arun(query)
        print(f"Query: {query}")
        print(f"Response length: {len(result)} characters")

        if learning_agent.performance_history:
            latest = learning_agent.performance_history[-1]
            print(f"Accuracy: {latest.accuracy:.2f}, Satisfaction: {latest.user_satisfaction:.2f}")

    print(f"\n🎯 Final Stats:")
    print(f"Total adaptations: {learning_agent.adaptation_count}")
    print(f"Current temperature: {learning_agent.engine.temperature}")
    print(f"Tools available: {len(learning_agent.tools)}")

asyncio.run(self_modifying_example())

Self-Replicating Agent SystemΒΆ

Goal: Create agents that can spawn specialized copies of themselves.

from haive.agents.agent import SelfReplicatingAgent

class SwarmAgent(SelfReplicatingAgent):
    """Agent that can create specialized swarm members."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.offspring = []
        self.specialization = kwargs.get('specialization', 'general')

    async def analyze_and_delegate(self, complex_task: str):
        """Analyze task and create specialized agents to handle it."""

        # Analyze task complexity and requirements
        analysis = await self.arun(
            f"Analyze this task and identify what specializations are needed: {complex_task}"
        )

        # Create specialized offspring based on analysis
        specialists = await self.create_specialists(analysis, complex_task)

        # Coordinate specialist execution
        results = await self.coordinate_specialists(specialists, complex_task)

        return results

    async def create_specialists(self, analysis: str, task: str):
        """Create specialized agent copies."""

        specialists = []

        if "research" in analysis.lower():
            researcher = await self.replicate({
                "name": f"{self.name}_researcher",
                "specialization": "research",
                "temperature": 0.3,
                "system_message": "You are a thorough researcher. Gather comprehensive, accurate information.",
                "tools": [web_search, extract_webpage_content]
            })
            specialists.append(researcher)

        if "analysis" in analysis.lower():
            analyst = await self.replicate({
                "name": f"{self.name}_analyst",
                "specialization": "analysis",
                "temperature": 0.2,
                "system_message": "You are a data analyst. Focus on patterns, insights, and quantitative analysis.",
                "tools": [analyze_dataset, create_visualization]
            })
            specialists.append(analyst)

        if "writing" in analysis.lower():
            writer = await self.replicate({
                "name": f"{self.name}_writer",
                "specialization": "writing",
                "temperature": 0.7,
                "system_message": "You are a skilled writer. Create clear, engaging, well-structured content.",
                "tools": []
            })
            specialists.append(writer)

        if "review" in analysis.lower():
            reviewer = await self.replicate({
                "name": f"{self.name}_reviewer",
                "specialization": "review",
                "temperature": 0.1,
                "system_message": "You are a meticulous reviewer. Ensure quality, accuracy, and completeness.",
                "tools": []
            })
            specialists.append(reviewer)

        self.offspring.extend(specialists)
        return specialists

    async def coordinate_specialists(self, specialists, task):
        """Coordinate execution among specialist agents."""

        results = {}

        # Execute specialists in appropriate order
        execution_order = self.determine_execution_order(specialists)

        for specialist in execution_order:
            # Provide context from previous specialists
            context = self.build_context(results)
            specialist_task = f"{task}\n\nContext from other specialists:\n{context}"

            result = await specialist.arun(specialist_task)
            results[specialist.specialization] = result

            print(f"βœ… {specialist.specialization.title()} completed")

        return results

    def determine_execution_order(self, specialists):
        """Determine optimal execution order for specialists."""

        # Define typical workflow order
        order_priority = {
            "research": 1,
            "analysis": 2,
            "writing": 3,
            "review": 4
        }

        return sorted(specialists, key=lambda s: order_priority.get(s.specialization, 5))

    def build_context(self, previous_results):
        """Build context from previous specialist results."""

        context_parts = []
        for spec, result in previous_results.items():
            context_parts.append(f"{spec.title()}: {result[:200]}...")

        return "\n".join(context_parts)

async def self_replicating_example():
    """Demonstrate self-replicating agent swarm."""

    # Create master agent
    swarm_master = SwarmAgent(
        name="swarm_coordinator",
        engine=AugLLMConfig(
            temperature=0.5,
            system_message="You are a master coordinator capable of creating specialized agents."
        )
    )

    # Complex task requiring multiple specializations
    complex_task = """
    Create a comprehensive market analysis report for electric vehicles in 2025.
    The report should include:
    1. Current market data and trends
    2. Competitive landscape analysis
    3. Consumer behavior insights
    4. Future projections and recommendations
    5. Executive summary and action items
    """

    print("πŸš€ Starting swarm delegation...")
    results = await swarm_master.analyze_and_delegate(complex_task)

    print(f"\n🎯 Task Results:")
    for specialization, result in results.items():
        print(f"\n--- {specialization.upper()} ---")
        print(result[:300] + "..." if len(result) > 300 else result)

    print(f"\nπŸ“Š Swarm Statistics:")
    print(f"Specialists created: {len(swarm_master.offspring)}")
    print(f"Specializations: {[s.specialization for s in swarm_master.offspring]}")

asyncio.run(self_replicating_example())

Memory-Enhanced ExamplesΒΆ

Graph-Based Knowledge AgentΒΆ

Goal: Create an agent with sophisticated graph-based memory that builds knowledge over time.

from haive.agents.memory_reorganized import SimpleMemoryAgent
from haive.agents.memory_reorganized.coordination import IntegratedMemorySystem

async def graph_memory_example():
    """Demonstrate graph-based memory capabilities."""

    # Configure advanced memory system
    memory_config = {
        "mode": "INTEGRATED",  # Graph + Vector + Temporal
        "enable_classification": True,
        "enable_consolidation": True,
        "enable_importance_scoring": True,
        # In production, use real Neo4j
        "neo4j_config": {
            "uri": "neo4j://localhost:7687",
            "username": "neo4j",
            "password": "password"
        }
    }

    # Create memory-enhanced agent
    knowledge_agent = SimpleMemoryAgent(
        name="knowledge_assistant",
        engine=AugLLMConfig(
            temperature=0.6,
            system_message="You are a knowledgeable assistant with perfect memory."
        ),
        memory_config=memory_config
    )

    # Build knowledge over multiple interactions
    knowledge_items = [
        "Alice Johnson is the CTO of TechCorp, a software company based in San Francisco.",
        "TechCorp specializes in artificial intelligence and machine learning solutions.",
        "Alice has a PhD in Computer Science from Stanford University, completed in 2018.",
        "She previously worked at Google as a Senior Research Scientist for 5 years.",
        "Alice is an expert in computer vision and natural language processing.",
        "TechCorp was founded in 2019 and has raised $50M in Series A funding.",
        "The company's main product is an AI platform called 'IntelliCore'.",
        "Alice frequently speaks at AI conferences and has published 23 research papers.",
        "TechCorp's biggest competitor is AILabs Inc, based in Boston.",
        "Alice mentors junior engineers and leads the research and development team."
    ]

    print("🧠 Building Knowledge Base...")
    for i, item in enumerate(knowledge_items):
        await knowledge_agent.store_memory(item)
        print(f"Stored item {i+1}/10")

    print("\nπŸ” Querying Knowledge...")

    # Test different types of queries
    queries = [
        "Who is Alice Johnson?",
        "What companies are mentioned?",
        "Tell me about TechCorp's competition",
        "What is Alice's educational background?",
        "How are Alice and TechCorp connected?",
        "What expertise does Alice have in AI?"
    ]

    for query in queries:
        print(f"\n--- Query: {query} ---")
        response = await knowledge_agent.arun(query)
        print(f"Response: {response}")

        # Show memory retrieval details
        memory_results = await knowledge_agent.retrieve_memories(query)
        print(f"Memories retrieved: {len(memory_results)}")

        # Show graph relationships (simulated)
        print("Graph relationships explored: Alice β†’ TechCorp β†’ AI β†’ Stanford")

asyncio.run(graph_memory_example())

Long-Term Learning AgentΒΆ

Goal: Create an agent that learns and improves from extended conversations.

from haive.agents.memory_reorganized.agents import LongTermMemoryAgent

async def long_term_learning_example():
    """Demonstrate long-term learning capabilities."""

    # Create agent with long-term memory
    learning_agent = LongTermMemoryAgent(
        name="learning_companion",
        engine=AugLLMConfig(
            temperature=0.7,
            system_message="""You are a learning companion who remembers everything
            about our conversations and grows more helpful over time."""
        ),
        memory_config={
            "enable_episodic_memory": True,
            "enable_semantic_memory": True,
            "enable_consolidation": True,
            "consolidation_interval": 3600  # 1 hour
        }
    )

    # Simulate extended conversation over time
    conversation_history = [
        # Day 1 - Initial interaction
        ("Hi, I'm Sarah. I'm a data scientist working on climate models.", "Day 1"),
        ("I specialize in time series analysis and predictive modeling.", "Day 1"),
        ("I'm currently working on predicting sea level rise patterns.", "Day 1"),

        # Day 2 - Follow-up conversation
        ("How's your climate model work progressing, Sarah?", "Day 2"),
        ("I've been struggling with handling missing data in historical records.", "Day 2"),
        ("Do you remember what I said about my specialization?", "Day 2"),

        # Day 3 - Building on previous knowledge
        ("I made a breakthrough with the missing data problem!", "Day 3"),
        ("The time series techniques you mentioned were really helpful.", "Day 3"),
        ("Can you help me plan the next phase of my sea level research?", "Day 3"),

        # Day 7 - Long-term relationship
        ("I'm presenting my climate model at a conference next week.", "Day 7"),
        ("Can you help me prepare based on everything you know about my work?", "Day 7"),
    ]

    print("πŸ“š Simulating Long-Term Learning...")

    responses = []
    for i, (message, day) in enumerate(conversation_history):
        print(f"\n--- {day} - Message {i+1} ---")
        print(f"User: {message}")

        # Get response with memory context
        response = await learning_agent.arun(message)
        print(f"Agent: {response[:200]}...")

        responses.append(response)

        # Store interaction for learning
        await learning_agent.store_interaction(message, response, {"day": day})

    print(f"\n🧠 Memory Analysis:")

    # Check what the agent has learned
    memory_summary = await learning_agent.get_memory_summary()
    print(f"Total memories: {memory_summary.get('total_memories', 0)}")
    print(f"User profile completeness: {memory_summary.get('user_profile_completeness', 0):.1%}")
    print(f"Relationship strength: {memory_summary.get('relationship_strength', 0):.1f}/10")

    # Test personalized response
    final_query = "What do you know about me and how can you help with my work?"
    personalized_response = await learning_agent.arun(final_query)

    print(f"\n🎯 Personalized Response:")
    print(personalized_response)

asyncio.run(long_term_learning_example())

Production-Ready ExamplesΒΆ

Error-Resilient Agent SystemΒΆ

Goal: Create a robust agent system that handles errors gracefully and maintains uptime.

import asyncio
import logging
from typing import Optional
from datetime import datetime

class RobustAgent(SimpleAgent):
    """Production-ready agent with comprehensive error handling."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.error_count = 0
        self.last_error = None
        self.fallback_responses = {
            "openai_error": "I'm experiencing technical difficulties. Let me try a simpler approach.",
            "timeout_error": "That request is taking longer than expected. Could you rephrase it?",
            "rate_limit": "I'm receiving too many requests right now. Please wait a moment.",
            "general_error": "I encountered an issue. Let me try to help you differently."
        }

    async def arun(self, input_data, max_retries=3, fallback_model=None):
        """Run with comprehensive error handling and recovery."""

        for attempt in range(max_retries + 1):
            try:
                # Log attempt
                logging.info(f"Agent {self.name} attempt {attempt + 1}/{max_retries + 1}")

                # Try normal execution
                result = await super().arun(input_data)

                # Reset error count on success
                if self.error_count > 0:
                    logging.info(f"Agent {self.name} recovered after {self.error_count} errors")
                    self.error_count = 0

                return result

            except Exception as e:
                self.error_count += 1
                self.last_error = {
                    "error": str(e),
                    "timestamp": datetime.now(),
                    "attempt": attempt + 1
                }

                logging.error(f"Agent {self.name} error on attempt {attempt + 1}: {e}")

                # Last attempt - use fallback
                if attempt == max_retries:
                    return await self.fallback_response(input_data, e)

                # Wait before retry with exponential backoff
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                await asyncio.sleep(wait_time)

                # Try fallback model if available
                if fallback_model and attempt == max_retries - 1:
                    await self.switch_to_fallback_model(fallback_model)

    async def fallback_response(self, input_data: str, error: Exception) -> str:
        """Generate fallback response when all retries fail."""

        error_type = self.classify_error(error)
        fallback_msg = self.fallback_responses.get(error_type, self.fallback_responses["general_error"])

        # Try to provide some value even in failure
        if "question" in input_data.lower():
            return f"{fallback_msg} Could you ask your question in a different way?"
        elif "help" in input_data.lower():
            return f"{fallback_msg} Here are some things I can typically help with: answering questions, providing information, and assisting with tasks."
        else:
            return fallback_msg

    def classify_error(self, error: Exception) -> str:
        """Classify error type for appropriate fallback."""

        error_str = str(error).lower()

        if "rate limit" in error_str or "quota" in error_str:
            return "rate_limit"
        elif "timeout" in error_str or "timed out" in error_str:
            return "timeout_error"
        elif "openai" in error_str or "api" in error_str:
            return "openai_error"
        else:
            return "general_error"

    async def switch_to_fallback_model(self, fallback_config):
        """Switch to a fallback model configuration."""

        logging.info(f"Switching agent {self.name} to fallback model")
        original_engine = self.engine

        try:
            self.engine = fallback_config
            await self.recompile_if_needed()
            logging.info("Fallback model switch successful")
        except Exception as e:
            logging.error(f"Fallback model switch failed: {e}")
            self.engine = original_engine

async def robust_agent_example():
    """Demonstrate error-resilient agent system."""

    # Configure primary and fallback models
    primary_config = AugLLMConfig(
        model="gpt-4",
        temperature=0.7,
        timeout=30
    )

    fallback_config = AugLLMConfig(
        model="gpt-3.5-turbo",  # Faster, more reliable fallback
        temperature=0.5,
        timeout=15
    )

    # Create robust agent
    robust_agent = RobustAgent(
        name="production_assistant",
        engine=primary_config
    )

    # Test error scenarios
    test_cases = [
        "What are the benefits of renewable energy?",  # Normal case
        "Explain quantum mechanics in detail" * 100,   # Potentially problematic long input
        "Help me understand machine learning",         # Normal case
        "",  # Edge case - empty input
        "What is the meaning of life?" * 50           # Another potentially problematic case
    ]

    print("πŸ›‘οΈ  Testing Robust Agent System...")

    for i, test_input in enumerate(test_cases):
        print(f"\n--- Test Case {i+1} ---")
        print(f"Input: {test_input[:100]}{'...' if len(test_input) > 100 else ''}")

        start_time = time.time()

        try:
            result = await robust_agent.arun(
                test_input,
                max_retries=2,
                fallback_model=fallback_config
            )

            end_time = time.time()

            print(f"βœ… Success ({end_time - start_time:.1f}s)")
            print(f"Response: {result[:150]}...")

        except Exception as e:
            print(f"❌ Final failure: {e}")

        print(f"Error count: {robust_agent.error_count}")

    # System health check
    print(f"\nπŸ“Š System Health:")
    print(f"Total error count: {robust_agent.error_count}")
    print(f"Last error: {robust_agent.last_error}")
    print(f"Agent operational: {'Yes' if robust_agent.error_count < 5 else 'Degraded'}")

asyncio.run(robust_agent_example())

Monitoring and Analytics AgentΒΆ

Goal: Create an agent system with comprehensive monitoring and analytics.

import time
import json
from dataclasses import dataclass
from typing import Dict, List

@dataclass
class AgentMetrics:
    agent_name: str
    start_time: float
    end_time: float
    input_length: int
    output_length: int
    success: bool
    error_message: str = None
    tool_calls: int = 0

class MonitoredAgent(SimpleAgent):
    """Agent with comprehensive monitoring and analytics."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.metrics_history: List[AgentMetrics] = []
        self.session_id = str(int(time.time()))

    async def arun(self, input_data, track_metrics=True):
        """Run with comprehensive metrics tracking."""

        if not track_metrics:
            return await super().arun(input_data)

        start_time = time.time()
        metrics = AgentMetrics(
            agent_name=self.name,
            start_time=start_time,
            end_time=0,
            input_length=len(str(input_data)),
            output_length=0,
            success=False
        )

        try:
            # Track tool usage if ReactAgent
            initial_tool_count = getattr(self, 'tool_call_count', 0)

            # Execute
            result = await super().arun(input_data)

            # Update metrics
            metrics.end_time = time.time()
            metrics.output_length = len(str(result))
            metrics.success = True
            metrics.tool_calls = getattr(self, 'tool_call_count', 0) - initial_tool_count

            return result

        except Exception as e:
            metrics.end_time = time.time()
            metrics.success = False
            metrics.error_message = str(e)
            raise

        finally:
            self.metrics_history.append(metrics)
            await self.log_metrics(metrics)

    async def log_metrics(self, metrics: AgentMetrics):
        """Log metrics for analysis."""

        duration = metrics.end_time - metrics.start_time

        log_data = {
            "timestamp": datetime.now().isoformat(),
            "session_id": self.session_id,
            "agent_name": metrics.agent_name,
            "duration_seconds": duration,
            "input_length": metrics.input_length,
            "output_length": metrics.output_length,
            "success": metrics.success,
            "error_message": metrics.error_message,
            "tool_calls": metrics.tool_calls,
            "tokens_per_second": metrics.output_length / duration if duration > 0 else 0
        }

        # In production, send to logging service
        logging.info(f"AGENT_METRICS: {json.dumps(log_data)}")

    def get_analytics(self) -> Dict:
        """Get comprehensive analytics about agent performance."""

        if not self.metrics_history:
            return {"message": "No metrics available"}

        successful_calls = [m for m in self.metrics_history if m.success]
        failed_calls = [m for m in self.metrics_history if not m.success]

        durations = [m.end_time - m.start_time for m in successful_calls]

        analytics = {
            "total_calls": len(self.metrics_history),
            "successful_calls": len(successful_calls),
            "failed_calls": len(failed_calls),
            "success_rate": len(successful_calls) / len(self.metrics_history),
            "average_duration": sum(durations) / len(durations) if durations else 0,
            "min_duration": min(durations) if durations else 0,
            "max_duration": max(durations) if durations else 0,
            "total_input_tokens": sum(m.input_length for m in self.metrics_history),
            "total_output_tokens": sum(m.output_length for m in successful_calls),
            "average_input_length": sum(m.input_length for m in self.metrics_history) / len(self.metrics_history),
            "average_output_length": sum(m.output_length for m in successful_calls) / len(successful_calls) if successful_calls else 0,
            "total_tool_calls": sum(m.tool_calls for m in self.metrics_history),
            "error_types": self.analyze_error_types(failed_calls)
        }

        return analytics

    def analyze_error_types(self, failed_calls: List[AgentMetrics]) -> Dict:
        """Analyze types of errors encountered."""

        error_counts = {}
        for call in failed_calls:
            if call.error_message:
                error_type = self.classify_error_type(call.error_message)
                error_counts[error_type] = error_counts.get(error_type, 0) + 1

        return error_counts

    def classify_error_type(self, error_message: str) -> str:
        """Classify error message into categories."""

        error_lower = error_message.lower()

        if "rate limit" in error_lower:
            return "rate_limit"
        elif "timeout" in error_lower:
            return "timeout"
        elif "api" in error_lower:
            return "api_error"
        elif "validation" in error_lower:
            return "validation_error"
        else:
            return "other"

async def monitoring_example():
    """Demonstrate comprehensive agent monitoring."""

    # Create monitored agent
    monitored_agent = MonitoredAgent(
        name="analytics_assistant",
        engine=AugLLMConfig(temperature=0.7)
    )

    # Simulate various interactions
    test_interactions = [
        "What is artificial intelligence?",
        "Explain machine learning in simple terms",
        "What are the benefits of cloud computing?",
        "How does blockchain technology work?",
        "What is the future of renewable energy?",
        "",  # This might cause an error
        "Tell me about quantum computing" * 20,  # Long input
        "What is data science?",
        "Explain neural networks",
        "What are the applications of AI in healthcare?"
    ]

    print("πŸ“Š Running Monitored Agent System...")

    for i, interaction in enumerate(test_interactions):
        print(f"\nInteraction {i+1}/10")

        try:
            result = await monitored_agent.arun(interaction)
            print(f"βœ… Success - {len(result)} characters")
        except Exception as e:
            print(f"❌ Error: {str(e)[:100]}")

    # Get comprehensive analytics
    analytics = monitored_agent.get_analytics()

    print("\nπŸ“ˆ Agent Analytics Report")
    print("=" * 50)
    print(f"Total Calls: {analytics['total_calls']}")
    print(f"Success Rate: {analytics['success_rate']:.1%}")
    print(f"Average Duration: {analytics['average_duration']:.2f}s")
    print(f"Total Input Tokens: {analytics['total_input_tokens']:,}")
    print(f"Total Output Tokens: {analytics['total_output_tokens']:,}")
    print(f"Average Response Length: {analytics['average_output_length']:.0f} characters")
    print(f"Total Tool Calls: {analytics['total_tool_calls']}")

    if analytics['error_types']:
        print(f"\nError Analysis:")
        for error_type, count in analytics['error_types'].items():
            print(f"  {error_type}: {count} occurrences")

    # Performance insights
    print(f"\nPerformance Insights:")
    print(f"  Fastest Response: {analytics['min_duration']:.2f}s")
    print(f"  Slowest Response: {analytics['max_duration']:.2f}s")

    if analytics['success_rate'] > 0.9:
        print("  🎯 Excellent reliability")
    elif analytics['success_rate'] > 0.7:
        print("  ⚠️  Good reliability with room for improvement")
    else:
        print("  ❌ Poor reliability - needs attention")

asyncio.run(monitoring_example())

Best Practices SummaryΒΆ

Development Guidelines

  1. Start Simple: Begin with SimpleAgent, add complexity incrementally

  2. Use Type Hints: Always use Pydantic models for structured data

  3. Handle Errors: Implement comprehensive error handling and fallbacks

  4. Monitor Performance: Track metrics in production systems

  5. Test Thoroughly: Use real LLMs, never mock agent behavior

Production Readiness

  1. Error Resilience: Implement retry logic and fallback strategies

  2. Performance Monitoring: Track response times, success rates, costs

  3. Security: Validate inputs, sanitize outputs, secure API keys

  4. Scalability: Use async patterns, connection pooling, caching

  5. Observability: Comprehensive logging and metrics collection

Architecture Patterns

  1. Single Responsibility: Each agent has one clear purpose

  2. Composition: Build complex systems from simple, reusable agents

  3. Loose Coupling: Agents communicate through well-defined interfaces

  4. Self-Organization: Let agents adapt and coordinate autonomously

  5. Memory-First: Design around intelligent memory and context

Next StepsΒΆ

Continue exploring advanced Haive Agents capabilities:

Ready to build intelligent agent systems? πŸš€

Start with these examples and adapt them to your specific needs. The modular design makes it easy to combine patterns and create sophisticated AI applications.