π Examples & TutorialsΒΆ
This comprehensive guide provides practical examples for every aspect of Haive Agents, from basic usage to advanced patterns.
Basic Agent ExamplesΒΆ
Simple Conversation AgentΒΆ
Goal: Create a basic conversational agent that can chat naturally.
import asyncio
from haive.agents import SimpleAgent
from haive.core.engine.aug_llm import AugLLMConfig
async def basic_conversation_example():
"""Basic conversation with a simple agent."""
# Create conversational agent
agent = SimpleAgent(
name="friendly_assistant",
engine=AugLLLConfig(
temperature=0.7,
system_message="You are a helpful and friendly assistant."
)
)
# Single conversation
response = await agent.arun("Tell me about renewable energy.")
print(f"Agent: {response}")
# Follow-up conversation (maintains context)
followup = await agent.arun("What are the main challenges?")
print(f"Agent: {followup}")
# Run the example
asyncio.run(basic_conversation_example())
Expected Output:
Agent: Renewable energy refers to energy sources that naturally replenish themselves,
such as solar, wind, hydroelectric, and geothermal power. These sources offer numerous
benefits including reduced greenhouse gas emissions...
Agent: The main challenges with renewable energy include intermittency (solar and wind
depend on weather), energy storage costs, grid integration complexity, and initial
capital investment requirements...
Structured Output AgentΒΆ
Goal: Create an agent that returns structured, typed data instead of plain text.
from pydantic import BaseModel, Field
from typing import List
from haive.agents import SimpleAgent
class SentimentAnalysis(BaseModel):
"""Structured sentiment analysis result."""
sentiment: str = Field(description="positive, negative, or neutral")
confidence: float = Field(ge=0.0, le=1.0, description="Confidence score")
emotions: List[str] = Field(description="Detected emotions")
key_phrases: List[str] = Field(description="Important phrases")
async def structured_output_example():
"""Agent that returns structured data."""
# Create agent with structured output
analyzer = SimpleAgent(
name="sentiment_analyzer",
engine=AugLLMConfig(
structured_output_model=SentimentAnalysis,
temperature=0.3, # Lower for more consistent structure
system_message="You are an expert sentiment analyzer."
)
)
# Analyze text and get structured result
text = "I absolutely love this new product! It's innovative and well-designed."
result = await analyzer.arun(text)
# Result is a SentimentAnalysis object, not a string
print(f"Sentiment: {result.sentiment}")
print(f"Confidence: {result.confidence:.2f}")
print(f"Emotions: {', '.join(result.emotions)}")
print(f"Key phrases: {', '.join(result.key_phrases)}")
asyncio.run(structured_output_example())
Expected Output:
Sentiment: positive
Confidence: 0.92
Emotions: love, excitement, satisfaction
Key phrases: absolutely love, innovative, well-designed
Tool-Enabled Agent ExamplesΒΆ
Calculator AgentΒΆ
Goal: Create an agent that can perform mathematical calculations using tools.
from haive.agents import ReactAgent
from langchain_core.tools import tool
import math
@tool
def calculator(expression: str) -> str:
"""Calculate mathematical expressions safely.
Args:
expression: Mathematical expression like "2+2", "sqrt(16)", "sin(pi/2)"
"""
try:
# Safe evaluation with math functions available
safe_dict = {
"__builtins__": {},
"abs": abs, "round": round, "min": min, "max": max,
"sum": sum, "pow": pow, "sqrt": math.sqrt, "sin": math.sin,
"cos": math.cos, "tan": math.tan, "pi": math.pi, "e": math.e
}
result = eval(expression, safe_dict)
return f"Result: {result}"
except Exception as e:
return f"Error: {str(e)}"
@tool
def unit_converter(value: float, from_unit: str, to_unit: str) -> str:
"""Convert between units of measurement.
Args:
value: Numeric value to convert
from_unit: Source unit (e.g., 'celsius', 'fahrenheit', 'meters', 'feet')
to_unit: Target unit
"""
conversions = {
('celsius', 'fahrenheit'): lambda x: x * 9/5 + 32,
('fahrenheit', 'celsius'): lambda x: (x - 32) * 5/9,
('meters', 'feet'): lambda x: x * 3.28084,
('feet', 'meters'): lambda x: x / 3.28084,
}
key = (from_unit.lower(), to_unit.lower())
if key in conversions:
result = conversions[key](value)
return f"{value} {from_unit} = {result:.2f} {to_unit}"
else:
return f"Conversion from {from_unit} to {to_unit} not supported"
async def calculator_agent_example():
"""Agent that can perform calculations and conversions."""
calculator_agent = ReactAgent(
name="math_assistant",
engine=AugLLMConfig(
temperature=0.1, # Low temperature for accuracy
system_message="You are a precise mathematical assistant."
),
tools=[calculator, unit_converter],
max_iterations=3
)
# Complex mathematical problem
problem = """
I have a circular garden with radius 8.5 meters.
What's the area in square feet? Also, if the temperature
is 23Β°C, what is that in Fahrenheit?
"""
result = await calculator_agent.arun(problem)
print(result)
asyncio.run(calculator_agent_example())
Agent Reasoning Process:
Think: βI need to calculate the area of a circle and convert unitsβ
Act:
calculator("pi * 8.5**2")
β Area = 226.98 mΒ²Act:
unit_converter(226.98, "square meters", "square feet")
β 2442.46 ftΒ²Act:
unit_converter(23, "celsius", "fahrenheit")
β 73.4Β°FRespond: βThe garden area is 2442.46 square feet, and 23Β°C equals 73.4Β°Fβ
Web Research AgentΒΆ
Goal: Create an agent that can search the web and synthesize information.
import requests
from bs4 import BeautifulSoup
from langchain_core.tools import tool
@tool
def web_search(query: str) -> str:
"""Search the web for current information.
Args:
query: Search query string
"""
# Simulated web search (use real API in production)
search_results = {
"AI trends 2025": "Generative AI adoption accelerates, multimodal AI becomes mainstream, AI agents gain enterprise adoption...",
"climate change": "Global temperatures continue rising, renewable energy reaches cost parity, climate adaptation strategies evolving...",
"quantum computing": "IBM, Google make breakthroughs in error correction, quantum advantage demonstrated in optimization..."
}
# Find best match
for topic, content in search_results.items():
if any(word in query.lower() for word in topic.lower().split()):
return f"Search results for '{query}':\n{content}"
return f"Search results for '{query}': General web content about {query}"
@tool
def extract_webpage_content(url: str) -> str:
"""Extract text content from a webpage.
Args:
url: URL to extract content from
"""
try:
# Simulated webpage extraction (use real requests in production)
mock_content = {
"news.example.com": "Breaking: New AI breakthrough announced by research team...",
"tech.example.com": "Analysis: The future of artificial intelligence in 2025...",
}
for domain, content in mock_content.items():
if domain in url:
return f"Content from {url}:\n{content}"
return f"Content from {url}: Sample webpage content"
except Exception as e:
return f"Error extracting content: {str(e)}"
async def research_agent_example():
"""Agent that conducts web research and synthesizes findings."""
researcher = ReactAgent(
name="research_specialist",
engine=AugLLMConfig(
temperature=0.4,
system_message="""You are a thorough research specialist. When researching:
1. Search for multiple perspectives
2. Verify information from different sources
3. Synthesize findings into clear insights
4. Cite your sources"""
),
tools=[web_search, extract_webpage_content],
max_iterations=5
)
research_query = """
Research the current state of AI in healthcare. What are the main
applications, benefits, and challenges? Provide a comprehensive overview.
"""
result = await researcher.arun(research_query)
print(result)
asyncio.run(research_agent_example())
Multi-Agent Workflow ExamplesΒΆ
Content Creation PipelineΒΆ
Goal: Coordinate multiple agents to research, write, and review content.
from haive.agents import SimpleAgent, ReactAgent, MultiAgent
from pydantic import BaseModel, Field
from typing import List
# Structured outputs for each stage
class ResearchBrief(BaseModel):
topic: str = Field(description="Main topic researched")
key_findings: List[str] = Field(description="Important findings")
sources: List[str] = Field(description="Information sources")
recommendations: List[str] = Field(description="Content recommendations")
class ContentDraft(BaseModel):
title: str = Field(description="Article title")
introduction: str = Field(description="Opening paragraph")
main_sections: List[str] = Field(description="Main content sections")
conclusion: str = Field(description="Closing paragraph")
word_count: int = Field(description="Approximate word count")
class ReviewFeedback(BaseModel):
overall_rating: int = Field(ge=1, le=10, description="Overall quality rating")
strengths: List[str] = Field(description="Content strengths")
improvements: List[str] = Field(description="Areas for improvement")
revised_content: str = Field(description="Improved version")
async def content_pipeline_example():
"""Multi-agent content creation workflow."""
# Research Agent - Gathers information
researcher = ReactAgent(
name="content_researcher",
engine=AugLLMConfig(
structured_output_model=ResearchBrief,
temperature=0.3,
system_message="You are a thorough content researcher."
),
tools=[web_search, extract_webpage_content]
)
# Writer Agent - Creates content
writer = SimpleAgent(
name="content_writer",
engine=AugLLMConfig(
structured_output_model=ContentDraft,
temperature=0.8,
system_message="You are a skilled content writer who creates engaging articles."
)
)
# Editor Agent - Reviews and improves
editor = SimpleAgent(
name="content_editor",
engine=AugLLMConfig(
structured_output_model=ReviewFeedback,
temperature=0.2,
system_message="You are a meticulous editor focused on quality and clarity."
)
)
# Create workflow
content_team = MultiAgent(
name="content_creation_team",
agents=[researcher, writer, editor],
execution_mode="sequential"
)
# Execute pipeline
topic = "The Impact of Artificial Intelligence on Remote Work"
result = await content_team.arun(f"Create a comprehensive article about: {topic}")
# Extract results from each stage
research = result.get("content_researcher", {})
draft = result.get("content_writer", {})
review = result.get("content_editor", {})
print("π RESEARCH STAGE")
print(f"Key Findings: {', '.join(research.get('key_findings', []))}")
print()
print("βοΈ WRITING STAGE")
print(f"Title: {draft.get('title', 'N/A')}")
print(f"Word Count: {draft.get('word_count', 'N/A')}")
print()
print("π EDITING STAGE")
print(f"Rating: {review.get('overall_rating', 'N/A')}/10")
print(f"Improvements: {', '.join(review.get('improvements', []))}")
print()
print("π FINAL CONTENT")
print(review.get('revised_content', 'Content not available'))
asyncio.run(content_pipeline_example())
Data Analysis TeamΒΆ
Goal: Create a team of agents that analyze data, create visualizations, and generate insights.
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from io import StringIO
@tool
def analyze_dataset(csv_data: str) -> str:
"""Analyze a CSV dataset and return statistics.
Args:
csv_data: CSV data as string
"""
try:
df = pd.read_csv(StringIO(csv_data))
analysis = {
"rows": len(df),
"columns": len(df.columns),
"numeric_summary": df.describe().to_string(),
"missing_values": df.isnull().sum().to_dict(),
"data_types": df.dtypes.to_dict()
}
return str(analysis)
except Exception as e:
return f"Analysis error: {str(e)}"
@tool
def create_visualization(data_description: str, chart_type: str) -> str:
"""Create data visualization based on description.
Args:
data_description: Description of the data to visualize
chart_type: Type of chart (bar, line, scatter, histogram)
"""
# Simulated visualization creation
return f"Created {chart_type} chart: {data_description}"
class DataInsights(BaseModel):
key_metrics: List[str] = Field(description="Important metrics found")
trends: List[str] = Field(description="Trends identified")
anomalies: List[str] = Field(description="Unusual patterns")
recommendations: List[str] = Field(description="Actionable insights")
async def data_analysis_team_example():
"""Multi-agent data analysis workflow."""
# Data Analyst - Examines raw data
analyst = ReactAgent(
name="data_analyst",
engine=AugLLMConfig(
temperature=0.2,
system_message="You are a meticulous data analyst."
),
tools=[analyze_dataset, create_visualization]
)
# Insights Generator - Finds patterns and meaning
insights_agent = SimpleAgent(
name="insights_generator",
engine=AugLLMConfig(
structured_output_model=DataInsights,
temperature=0.4,
system_message="You are an expert at finding insights in data."
)
)
# Report Writer - Creates final report
reporter = SimpleAgent(
name="report_writer",
engine=AugLLMConfig(
temperature=0.6,
system_message="You create clear, actionable business reports."
)
)
# Create analysis team
analysis_team = MultiAgent(
name="data_analysis_team",
agents=[analyst, insights_agent, reporter],
execution_mode="sequential"
)
# Sample dataset
sample_data = """date,sales,customers,region
2024-01-01,1200,45,North
2024-01-02,1350,52,North
2024-01-03,980,38,South
2024-01-04,1400,58,North
2024-01-05,1100,42,South"""
# Run analysis
result = await analysis_team.arun(
f"Analyze this sales data and provide insights: {sample_data}"
)
print("π Data Analysis Complete:")
print(result)
asyncio.run(data_analysis_team_example())
Advanced Agent ExamplesΒΆ
Self-Modifying AgentΒΆ
Goal: Create an agent that adapts its behavior based on performance feedback.
from haive.agents.agent import SelfModifyingAgent
from pydantic import BaseModel
class PerformanceMetrics(BaseModel):
accuracy: float = Field(ge=0.0, le=1.0)
response_time: float
user_satisfaction: float = Field(ge=0.0, le=1.0)
error_rate: float = Field(ge=0.0, le=1.0)
class LearningAgent(SelfModifyingAgent):
"""Agent that learns and adapts from experience."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.performance_history = []
self.adaptation_count = 0
async def arun(self, input_data, collect_metrics=True):
"""Run with performance monitoring and adaptation."""
start_time = time.time()
# Execute task
result = await super().arun(input_data)
# Collect performance metrics
if collect_metrics:
end_time = time.time()
metrics = await self.evaluate_performance(
input_data, result, end_time - start_time
)
self.performance_history.append(metrics)
# Adapt if performance is declining
if await self.should_adapt(metrics):
await self.adapt_behavior(metrics)
return result
async def evaluate_performance(self, input_data, result, response_time):
"""Evaluate performance of the last interaction."""
# Simplified metrics (in practice, use real evaluation)
accuracy = 0.85 if len(result) > 50 else 0.60
satisfaction = 0.90 if "helpful" in result.lower() else 0.70
error_rate = 0.05 if "error" not in result.lower() else 0.20
return PerformanceMetrics(
accuracy=accuracy,
response_time=response_time,
user_satisfaction=satisfaction,
error_rate=error_rate
)
async def should_adapt(self, current_metrics):
"""Determine if adaptation is needed."""
if len(self.performance_history) < 3:
return False
# Check if performance is declining
recent_accuracy = sum(m.accuracy for m in self.performance_history[-3:]) / 3
recent_satisfaction = sum(m.user_satisfaction for m in self.performance_history[-3:]) / 3
return recent_accuracy < 0.75 or recent_satisfaction < 0.75
async def adapt_behavior(self, metrics):
"""Adapt agent behavior based on metrics."""
adaptations = {}
if metrics.accuracy < 0.70:
# Lower temperature for more consistent responses
adaptations["temperature"] = max(0.1, self.engine.temperature * 0.8)
adaptations["system_message"] = self.engine.system_message + "\nPrioritize accuracy over creativity."
if metrics.response_time > 10.0:
# Optimize for faster responses
adaptations["max_tokens"] = min(1000, self.engine.max_tokens or 2000)
if metrics.error_rate > 0.15:
# Add verification tools
verification_tool = self.create_verification_tool()
adaptations["tools"] = self.tools + [verification_tool]
if adaptations:
await self.modify_behavior(adaptations)
self.adaptation_count += 1
print(f"π§ Adaptation #{self.adaptation_count}: {list(adaptations.keys())}")
def create_verification_tool(self):
"""Create a verification tool for fact-checking."""
@tool
def verify_facts(statement: str) -> str:
"""Verify factual accuracy of a statement."""
# Simplified verification
return f"Verification: {statement} appears accurate based on available data."
return verify_facts
async def self_modifying_example():
"""Demonstrate self-modifying agent capabilities."""
learning_agent = LearningAgent(
name="adaptive_assistant",
engine=AugLLMConfig(
temperature=0.9, # Start with high creativity
system_message="You are a helpful assistant."
)
)
# Simulate interactions that trigger adaptations
interactions = [
"What is photosynthesis?",
"Explain quantum mechanics simply",
"What are the benefits of exercise?",
"Describe the water cycle",
"Explain machine learning"
]
for i, query in enumerate(interactions):
print(f"\n--- Interaction {i+1} ---")
result = await learning_agent.arun(query)
print(f"Query: {query}")
print(f"Response length: {len(result)} characters")
if learning_agent.performance_history:
latest = learning_agent.performance_history[-1]
print(f"Accuracy: {latest.accuracy:.2f}, Satisfaction: {latest.user_satisfaction:.2f}")
print(f"\nπ― Final Stats:")
print(f"Total adaptations: {learning_agent.adaptation_count}")
print(f"Current temperature: {learning_agent.engine.temperature}")
print(f"Tools available: {len(learning_agent.tools)}")
asyncio.run(self_modifying_example())
Self-Replicating Agent SystemΒΆ
Goal: Create agents that can spawn specialized copies of themselves.
from haive.agents.agent import SelfReplicatingAgent
class SwarmAgent(SelfReplicatingAgent):
"""Agent that can create specialized swarm members."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.offspring = []
self.specialization = kwargs.get('specialization', 'general')
async def analyze_and_delegate(self, complex_task: str):
"""Analyze task and create specialized agents to handle it."""
# Analyze task complexity and requirements
analysis = await self.arun(
f"Analyze this task and identify what specializations are needed: {complex_task}"
)
# Create specialized offspring based on analysis
specialists = await self.create_specialists(analysis, complex_task)
# Coordinate specialist execution
results = await self.coordinate_specialists(specialists, complex_task)
return results
async def create_specialists(self, analysis: str, task: str):
"""Create specialized agent copies."""
specialists = []
if "research" in analysis.lower():
researcher = await self.replicate({
"name": f"{self.name}_researcher",
"specialization": "research",
"temperature": 0.3,
"system_message": "You are a thorough researcher. Gather comprehensive, accurate information.",
"tools": [web_search, extract_webpage_content]
})
specialists.append(researcher)
if "analysis" in analysis.lower():
analyst = await self.replicate({
"name": f"{self.name}_analyst",
"specialization": "analysis",
"temperature": 0.2,
"system_message": "You are a data analyst. Focus on patterns, insights, and quantitative analysis.",
"tools": [analyze_dataset, create_visualization]
})
specialists.append(analyst)
if "writing" in analysis.lower():
writer = await self.replicate({
"name": f"{self.name}_writer",
"specialization": "writing",
"temperature": 0.7,
"system_message": "You are a skilled writer. Create clear, engaging, well-structured content.",
"tools": []
})
specialists.append(writer)
if "review" in analysis.lower():
reviewer = await self.replicate({
"name": f"{self.name}_reviewer",
"specialization": "review",
"temperature": 0.1,
"system_message": "You are a meticulous reviewer. Ensure quality, accuracy, and completeness.",
"tools": []
})
specialists.append(reviewer)
self.offspring.extend(specialists)
return specialists
async def coordinate_specialists(self, specialists, task):
"""Coordinate execution among specialist agents."""
results = {}
# Execute specialists in appropriate order
execution_order = self.determine_execution_order(specialists)
for specialist in execution_order:
# Provide context from previous specialists
context = self.build_context(results)
specialist_task = f"{task}\n\nContext from other specialists:\n{context}"
result = await specialist.arun(specialist_task)
results[specialist.specialization] = result
print(f"β
{specialist.specialization.title()} completed")
return results
def determine_execution_order(self, specialists):
"""Determine optimal execution order for specialists."""
# Define typical workflow order
order_priority = {
"research": 1,
"analysis": 2,
"writing": 3,
"review": 4
}
return sorted(specialists, key=lambda s: order_priority.get(s.specialization, 5))
def build_context(self, previous_results):
"""Build context from previous specialist results."""
context_parts = []
for spec, result in previous_results.items():
context_parts.append(f"{spec.title()}: {result[:200]}...")
return "\n".join(context_parts)
async def self_replicating_example():
"""Demonstrate self-replicating agent swarm."""
# Create master agent
swarm_master = SwarmAgent(
name="swarm_coordinator",
engine=AugLLMConfig(
temperature=0.5,
system_message="You are a master coordinator capable of creating specialized agents."
)
)
# Complex task requiring multiple specializations
complex_task = """
Create a comprehensive market analysis report for electric vehicles in 2025.
The report should include:
1. Current market data and trends
2. Competitive landscape analysis
3. Consumer behavior insights
4. Future projections and recommendations
5. Executive summary and action items
"""
print("π Starting swarm delegation...")
results = await swarm_master.analyze_and_delegate(complex_task)
print(f"\nπ― Task Results:")
for specialization, result in results.items():
print(f"\n--- {specialization.upper()} ---")
print(result[:300] + "..." if len(result) > 300 else result)
print(f"\nπ Swarm Statistics:")
print(f"Specialists created: {len(swarm_master.offspring)}")
print(f"Specializations: {[s.specialization for s in swarm_master.offspring]}")
asyncio.run(self_replicating_example())
Memory-Enhanced ExamplesΒΆ
Graph-Based Knowledge AgentΒΆ
Goal: Create an agent with sophisticated graph-based memory that builds knowledge over time.
from haive.agents.memory_reorganized import SimpleMemoryAgent
from haive.agents.memory_reorganized.coordination import IntegratedMemorySystem
async def graph_memory_example():
"""Demonstrate graph-based memory capabilities."""
# Configure advanced memory system
memory_config = {
"mode": "INTEGRATED", # Graph + Vector + Temporal
"enable_classification": True,
"enable_consolidation": True,
"enable_importance_scoring": True,
# In production, use real Neo4j
"neo4j_config": {
"uri": "neo4j://localhost:7687",
"username": "neo4j",
"password": "password"
}
}
# Create memory-enhanced agent
knowledge_agent = SimpleMemoryAgent(
name="knowledge_assistant",
engine=AugLLMConfig(
temperature=0.6,
system_message="You are a knowledgeable assistant with perfect memory."
),
memory_config=memory_config
)
# Build knowledge over multiple interactions
knowledge_items = [
"Alice Johnson is the CTO of TechCorp, a software company based in San Francisco.",
"TechCorp specializes in artificial intelligence and machine learning solutions.",
"Alice has a PhD in Computer Science from Stanford University, completed in 2018.",
"She previously worked at Google as a Senior Research Scientist for 5 years.",
"Alice is an expert in computer vision and natural language processing.",
"TechCorp was founded in 2019 and has raised $50M in Series A funding.",
"The company's main product is an AI platform called 'IntelliCore'.",
"Alice frequently speaks at AI conferences and has published 23 research papers.",
"TechCorp's biggest competitor is AILabs Inc, based in Boston.",
"Alice mentors junior engineers and leads the research and development team."
]
print("π§ Building Knowledge Base...")
for i, item in enumerate(knowledge_items):
await knowledge_agent.store_memory(item)
print(f"Stored item {i+1}/10")
print("\nπ Querying Knowledge...")
# Test different types of queries
queries = [
"Who is Alice Johnson?",
"What companies are mentioned?",
"Tell me about TechCorp's competition",
"What is Alice's educational background?",
"How are Alice and TechCorp connected?",
"What expertise does Alice have in AI?"
]
for query in queries:
print(f"\n--- Query: {query} ---")
response = await knowledge_agent.arun(query)
print(f"Response: {response}")
# Show memory retrieval details
memory_results = await knowledge_agent.retrieve_memories(query)
print(f"Memories retrieved: {len(memory_results)}")
# Show graph relationships (simulated)
print("Graph relationships explored: Alice β TechCorp β AI β Stanford")
asyncio.run(graph_memory_example())
Long-Term Learning AgentΒΆ
Goal: Create an agent that learns and improves from extended conversations.
from haive.agents.memory_reorganized.agents import LongTermMemoryAgent
async def long_term_learning_example():
"""Demonstrate long-term learning capabilities."""
# Create agent with long-term memory
learning_agent = LongTermMemoryAgent(
name="learning_companion",
engine=AugLLMConfig(
temperature=0.7,
system_message="""You are a learning companion who remembers everything
about our conversations and grows more helpful over time."""
),
memory_config={
"enable_episodic_memory": True,
"enable_semantic_memory": True,
"enable_consolidation": True,
"consolidation_interval": 3600 # 1 hour
}
)
# Simulate extended conversation over time
conversation_history = [
# Day 1 - Initial interaction
("Hi, I'm Sarah. I'm a data scientist working on climate models.", "Day 1"),
("I specialize in time series analysis and predictive modeling.", "Day 1"),
("I'm currently working on predicting sea level rise patterns.", "Day 1"),
# Day 2 - Follow-up conversation
("How's your climate model work progressing, Sarah?", "Day 2"),
("I've been struggling with handling missing data in historical records.", "Day 2"),
("Do you remember what I said about my specialization?", "Day 2"),
# Day 3 - Building on previous knowledge
("I made a breakthrough with the missing data problem!", "Day 3"),
("The time series techniques you mentioned were really helpful.", "Day 3"),
("Can you help me plan the next phase of my sea level research?", "Day 3"),
# Day 7 - Long-term relationship
("I'm presenting my climate model at a conference next week.", "Day 7"),
("Can you help me prepare based on everything you know about my work?", "Day 7"),
]
print("π Simulating Long-Term Learning...")
responses = []
for i, (message, day) in enumerate(conversation_history):
print(f"\n--- {day} - Message {i+1} ---")
print(f"User: {message}")
# Get response with memory context
response = await learning_agent.arun(message)
print(f"Agent: {response[:200]}...")
responses.append(response)
# Store interaction for learning
await learning_agent.store_interaction(message, response, {"day": day})
print(f"\nπ§ Memory Analysis:")
# Check what the agent has learned
memory_summary = await learning_agent.get_memory_summary()
print(f"Total memories: {memory_summary.get('total_memories', 0)}")
print(f"User profile completeness: {memory_summary.get('user_profile_completeness', 0):.1%}")
print(f"Relationship strength: {memory_summary.get('relationship_strength', 0):.1f}/10")
# Test personalized response
final_query = "What do you know about me and how can you help with my work?"
personalized_response = await learning_agent.arun(final_query)
print(f"\nπ― Personalized Response:")
print(personalized_response)
asyncio.run(long_term_learning_example())
Production-Ready ExamplesΒΆ
Error-Resilient Agent SystemΒΆ
Goal: Create a robust agent system that handles errors gracefully and maintains uptime.
import asyncio
import logging
from typing import Optional
from datetime import datetime
class RobustAgent(SimpleAgent):
"""Production-ready agent with comprehensive error handling."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.error_count = 0
self.last_error = None
self.fallback_responses = {
"openai_error": "I'm experiencing technical difficulties. Let me try a simpler approach.",
"timeout_error": "That request is taking longer than expected. Could you rephrase it?",
"rate_limit": "I'm receiving too many requests right now. Please wait a moment.",
"general_error": "I encountered an issue. Let me try to help you differently."
}
async def arun(self, input_data, max_retries=3, fallback_model=None):
"""Run with comprehensive error handling and recovery."""
for attempt in range(max_retries + 1):
try:
# Log attempt
logging.info(f"Agent {self.name} attempt {attempt + 1}/{max_retries + 1}")
# Try normal execution
result = await super().arun(input_data)
# Reset error count on success
if self.error_count > 0:
logging.info(f"Agent {self.name} recovered after {self.error_count} errors")
self.error_count = 0
return result
except Exception as e:
self.error_count += 1
self.last_error = {
"error": str(e),
"timestamp": datetime.now(),
"attempt": attempt + 1
}
logging.error(f"Agent {self.name} error on attempt {attempt + 1}: {e}")
# Last attempt - use fallback
if attempt == max_retries:
return await self.fallback_response(input_data, e)
# Wait before retry with exponential backoff
wait_time = (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(wait_time)
# Try fallback model if available
if fallback_model and attempt == max_retries - 1:
await self.switch_to_fallback_model(fallback_model)
async def fallback_response(self, input_data: str, error: Exception) -> str:
"""Generate fallback response when all retries fail."""
error_type = self.classify_error(error)
fallback_msg = self.fallback_responses.get(error_type, self.fallback_responses["general_error"])
# Try to provide some value even in failure
if "question" in input_data.lower():
return f"{fallback_msg} Could you ask your question in a different way?"
elif "help" in input_data.lower():
return f"{fallback_msg} Here are some things I can typically help with: answering questions, providing information, and assisting with tasks."
else:
return fallback_msg
def classify_error(self, error: Exception) -> str:
"""Classify error type for appropriate fallback."""
error_str = str(error).lower()
if "rate limit" in error_str or "quota" in error_str:
return "rate_limit"
elif "timeout" in error_str or "timed out" in error_str:
return "timeout_error"
elif "openai" in error_str or "api" in error_str:
return "openai_error"
else:
return "general_error"
async def switch_to_fallback_model(self, fallback_config):
"""Switch to a fallback model configuration."""
logging.info(f"Switching agent {self.name} to fallback model")
original_engine = self.engine
try:
self.engine = fallback_config
await self.recompile_if_needed()
logging.info("Fallback model switch successful")
except Exception as e:
logging.error(f"Fallback model switch failed: {e}")
self.engine = original_engine
async def robust_agent_example():
"""Demonstrate error-resilient agent system."""
# Configure primary and fallback models
primary_config = AugLLMConfig(
model="gpt-4",
temperature=0.7,
timeout=30
)
fallback_config = AugLLMConfig(
model="gpt-3.5-turbo", # Faster, more reliable fallback
temperature=0.5,
timeout=15
)
# Create robust agent
robust_agent = RobustAgent(
name="production_assistant",
engine=primary_config
)
# Test error scenarios
test_cases = [
"What are the benefits of renewable energy?", # Normal case
"Explain quantum mechanics in detail" * 100, # Potentially problematic long input
"Help me understand machine learning", # Normal case
"", # Edge case - empty input
"What is the meaning of life?" * 50 # Another potentially problematic case
]
print("π‘οΈ Testing Robust Agent System...")
for i, test_input in enumerate(test_cases):
print(f"\n--- Test Case {i+1} ---")
print(f"Input: {test_input[:100]}{'...' if len(test_input) > 100 else ''}")
start_time = time.time()
try:
result = await robust_agent.arun(
test_input,
max_retries=2,
fallback_model=fallback_config
)
end_time = time.time()
print(f"β
Success ({end_time - start_time:.1f}s)")
print(f"Response: {result[:150]}...")
except Exception as e:
print(f"β Final failure: {e}")
print(f"Error count: {robust_agent.error_count}")
# System health check
print(f"\nπ System Health:")
print(f"Total error count: {robust_agent.error_count}")
print(f"Last error: {robust_agent.last_error}")
print(f"Agent operational: {'Yes' if robust_agent.error_count < 5 else 'Degraded'}")
asyncio.run(robust_agent_example())
Monitoring and Analytics AgentΒΆ
Goal: Create an agent system with comprehensive monitoring and analytics.
import time
import json
from dataclasses import dataclass
from typing import Dict, List
@dataclass
class AgentMetrics:
agent_name: str
start_time: float
end_time: float
input_length: int
output_length: int
success: bool
error_message: str = None
tool_calls: int = 0
class MonitoredAgent(SimpleAgent):
"""Agent with comprehensive monitoring and analytics."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.metrics_history: List[AgentMetrics] = []
self.session_id = str(int(time.time()))
async def arun(self, input_data, track_metrics=True):
"""Run with comprehensive metrics tracking."""
if not track_metrics:
return await super().arun(input_data)
start_time = time.time()
metrics = AgentMetrics(
agent_name=self.name,
start_time=start_time,
end_time=0,
input_length=len(str(input_data)),
output_length=0,
success=False
)
try:
# Track tool usage if ReactAgent
initial_tool_count = getattr(self, 'tool_call_count', 0)
# Execute
result = await super().arun(input_data)
# Update metrics
metrics.end_time = time.time()
metrics.output_length = len(str(result))
metrics.success = True
metrics.tool_calls = getattr(self, 'tool_call_count', 0) - initial_tool_count
return result
except Exception as e:
metrics.end_time = time.time()
metrics.success = False
metrics.error_message = str(e)
raise
finally:
self.metrics_history.append(metrics)
await self.log_metrics(metrics)
async def log_metrics(self, metrics: AgentMetrics):
"""Log metrics for analysis."""
duration = metrics.end_time - metrics.start_time
log_data = {
"timestamp": datetime.now().isoformat(),
"session_id": self.session_id,
"agent_name": metrics.agent_name,
"duration_seconds": duration,
"input_length": metrics.input_length,
"output_length": metrics.output_length,
"success": metrics.success,
"error_message": metrics.error_message,
"tool_calls": metrics.tool_calls,
"tokens_per_second": metrics.output_length / duration if duration > 0 else 0
}
# In production, send to logging service
logging.info(f"AGENT_METRICS: {json.dumps(log_data)}")
def get_analytics(self) -> Dict:
"""Get comprehensive analytics about agent performance."""
if not self.metrics_history:
return {"message": "No metrics available"}
successful_calls = [m for m in self.metrics_history if m.success]
failed_calls = [m for m in self.metrics_history if not m.success]
durations = [m.end_time - m.start_time for m in successful_calls]
analytics = {
"total_calls": len(self.metrics_history),
"successful_calls": len(successful_calls),
"failed_calls": len(failed_calls),
"success_rate": len(successful_calls) / len(self.metrics_history),
"average_duration": sum(durations) / len(durations) if durations else 0,
"min_duration": min(durations) if durations else 0,
"max_duration": max(durations) if durations else 0,
"total_input_tokens": sum(m.input_length for m in self.metrics_history),
"total_output_tokens": sum(m.output_length for m in successful_calls),
"average_input_length": sum(m.input_length for m in self.metrics_history) / len(self.metrics_history),
"average_output_length": sum(m.output_length for m in successful_calls) / len(successful_calls) if successful_calls else 0,
"total_tool_calls": sum(m.tool_calls for m in self.metrics_history),
"error_types": self.analyze_error_types(failed_calls)
}
return analytics
def analyze_error_types(self, failed_calls: List[AgentMetrics]) -> Dict:
"""Analyze types of errors encountered."""
error_counts = {}
for call in failed_calls:
if call.error_message:
error_type = self.classify_error_type(call.error_message)
error_counts[error_type] = error_counts.get(error_type, 0) + 1
return error_counts
def classify_error_type(self, error_message: str) -> str:
"""Classify error message into categories."""
error_lower = error_message.lower()
if "rate limit" in error_lower:
return "rate_limit"
elif "timeout" in error_lower:
return "timeout"
elif "api" in error_lower:
return "api_error"
elif "validation" in error_lower:
return "validation_error"
else:
return "other"
async def monitoring_example():
"""Demonstrate comprehensive agent monitoring."""
# Create monitored agent
monitored_agent = MonitoredAgent(
name="analytics_assistant",
engine=AugLLMConfig(temperature=0.7)
)
# Simulate various interactions
test_interactions = [
"What is artificial intelligence?",
"Explain machine learning in simple terms",
"What are the benefits of cloud computing?",
"How does blockchain technology work?",
"What is the future of renewable energy?",
"", # This might cause an error
"Tell me about quantum computing" * 20, # Long input
"What is data science?",
"Explain neural networks",
"What are the applications of AI in healthcare?"
]
print("π Running Monitored Agent System...")
for i, interaction in enumerate(test_interactions):
print(f"\nInteraction {i+1}/10")
try:
result = await monitored_agent.arun(interaction)
print(f"β
Success - {len(result)} characters")
except Exception as e:
print(f"β Error: {str(e)[:100]}")
# Get comprehensive analytics
analytics = monitored_agent.get_analytics()
print("\nπ Agent Analytics Report")
print("=" * 50)
print(f"Total Calls: {analytics['total_calls']}")
print(f"Success Rate: {analytics['success_rate']:.1%}")
print(f"Average Duration: {analytics['average_duration']:.2f}s")
print(f"Total Input Tokens: {analytics['total_input_tokens']:,}")
print(f"Total Output Tokens: {analytics['total_output_tokens']:,}")
print(f"Average Response Length: {analytics['average_output_length']:.0f} characters")
print(f"Total Tool Calls: {analytics['total_tool_calls']}")
if analytics['error_types']:
print(f"\nError Analysis:")
for error_type, count in analytics['error_types'].items():
print(f" {error_type}: {count} occurrences")
# Performance insights
print(f"\nPerformance Insights:")
print(f" Fastest Response: {analytics['min_duration']:.2f}s")
print(f" Slowest Response: {analytics['max_duration']:.2f}s")
if analytics['success_rate'] > 0.9:
print(" π― Excellent reliability")
elif analytics['success_rate'] > 0.7:
print(" β οΈ Good reliability with room for improvement")
else:
print(" β Poor reliability - needs attention")
asyncio.run(monitoring_example())
Best Practices SummaryΒΆ
Development Guidelines
Start Simple: Begin with SimpleAgent, add complexity incrementally
Use Type Hints: Always use Pydantic models for structured data
Handle Errors: Implement comprehensive error handling and fallbacks
Monitor Performance: Track metrics in production systems
Test Thoroughly: Use real LLMs, never mock agent behavior
Production Readiness
Error Resilience: Implement retry logic and fallback strategies
Performance Monitoring: Track response times, success rates, costs
Security: Validate inputs, sanitize outputs, secure API keys
Scalability: Use async patterns, connection pooling, caching
Observability: Comprehensive logging and metrics collection
Architecture Patterns
Single Responsibility: Each agent has one clear purpose
Composition: Build complex systems from simple, reusable agents
Loose Coupling: Agents communicate through well-defined interfaces
Self-Organization: Let agents adapt and coordinate autonomously
Memory-First: Design around intelligent memory and context
Next StepsΒΆ
Continue exploring advanced Haive Agents capabilities:
Memory Systems - Advanced memory and knowledge management
Dynamic Supervisor System - Self-organizing agent teams
π Production Deployment Guide - Production deployment strategies
agents - Complete API reference
Ready to build intelligent agent systems? π
Start with these examples and adapt them to your specific needs. The modular design makes it easy to combine patterns and create sophisticated AI applications.