agents.multiΒΆ
MultiAgent Package - Advanced Multi-Agent Coordination and Orchestration.
This package provides the MultiAgent system for coordinating multiple AI agents in sophisticated workflows, enabling complex task decomposition, parallel processing, conditional routing, and hierarchical agent management for enterprise-scale applications.
The MultiAgent framework enables: - Sequential Workflows: Step-by-step agent coordination with state passing - Parallel Processing: Simultaneous agent execution with result aggregation - Conditional Routing: Dynamic agent selection based on state conditions - Hierarchical Management: Multi-level agent supervision and coordination - State Management: Sophisticated state sharing and transformation between agents
- Core Architecture:
MultiAgent implements advanced coordination patterns through several key components:
- Execution Engine:
Sequential execution with state passing between agents
Parallel execution with concurrent agent processing
Conditional routing with dynamic decision making
Hybrid execution modes combining multiple patterns
Error handling and recovery across agent boundaries
- State Management System:
Unified state schema across all coordinated agents
State transformation and projection for agent compatibility
Shared context preservation throughout workflow execution
Agent-specific state isolation and protection
Cross-agent communication and data sharing
- Agent Coordination:
Dynamic agent addition and removal during execution
Agent capability discovery and matching
Load balancing and resource optimization
Agent health monitoring and failure recovery
Performance monitoring and optimization
- Workflow Control:
Conditional branching and decision points
Loop and iteration control structures
Error handling and exception propagation
Timeout and resource limit enforcement
Progress monitoring and reporting
- Agent Coordination Patterns:
- Sequential Execution:
Linear workflow with agent-to-agent handoffs
State accumulation and transformation through pipeline
Dependency management and ordering constraints
Error propagation and recovery strategies
- Parallel Processing:
Concurrent agent execution for independent tasks
Result aggregation and synthesis
Resource optimization and load distribution
Synchronization and coordination mechanisms
- Conditional Routing:
Dynamic agent selection based on runtime conditions
Complex decision trees and branching logic
State-dependent workflow adaptation
Multi-criteria agent selection
- Hierarchical Coordination:
Multi-level agent supervision and management
Supervisor agents coordinating worker agents
Nested MultiAgent workflows
Delegation and task distribution patterns
Examples
Basic sequential workflow for content creation:
from haive.agents.multi import MultiAgent
from haive.agents.simple import SimpleAgent
from haive.agents.react import ReactAgent
from haive.core.engine.aug_llm import AugLLMConfig
# Create specialized agents for content pipeline
researcher = ReactAgent(
name="researcher",
engine=AugLLMConfig(
tools=[web_search, fact_checker],
temperature=0.4,
system_message="You are a thorough researcher who gathers comprehensive information."
)
)
writer = SimpleAgent(
name="writer",
engine=AugLLMConfig(
temperature=0.8,
system_message="You are a skilled writer who creates engaging, well-structured content."
)
)
editor = SimpleAgent(
name="editor",
engine=AugLLMConfig(
temperature=0.3,
system_message="You are a meticulous editor who improves clarity and correctness."
)
)
# Create sequential content creation workflow
content_pipeline = MultiAgent(
name="content_creation",
agents=[researcher, writer, editor],
execution_mode="sequential"
)
# Execute coordinated workflow
result = await content_pipeline.arun(
"Create a comprehensive article about renewable energy trends in 2024"
)
# Flow: researcher gathers data β writer creates content β editor refines
Parallel processing for multi-perspective analysis:
from haive.agents.multi import MultiAgent
from haive.agents.simple import SimpleAgent
from haive.core.engine.aug_llm import AugLLMConfig
# Create specialized analysis agents
technical_analyst = SimpleAgent(
name="technical_analyst",
engine=AugLLMConfig(
temperature=0.2,
system_message="You analyze technical feasibility and implementation details."
)
)
business_analyst = SimpleAgent(
name="business_analyst",
engine=AugLLMConfig(
temperature=0.3,
system_message="You analyze business impact, ROI, and market considerations."
)
)
risk_analyst = SimpleAgent(
name="risk_analyst",
engine=AugLLMConfig(
temperature=0.2,
system_message="You identify potential risks, challenges, and mitigation strategies."
)
)
user_experience_analyst = SimpleAgent(
name="ux_analyst",
engine=AugLLMConfig(
temperature=0.4,
system_message="You analyze user experience and usability implications."
)
)
# Create parallel analysis workflow
analysis_team = MultiAgent(
name="comprehensive_analysis",
agents=[technical_analyst, business_analyst, risk_analyst, user_experience_analyst],
execution_mode="parallel"
)
# Execute parallel analysis
analysis_results = await analysis_team.arun(
"Analyze the proposal to implement AI-powered customer service automation"
)
# All agents analyze simultaneously, results are aggregated
Conditional routing based on complexity assessment:
from haive.agents.multi import MultiAgent
from haive.agents.simple import SimpleAgent
from haive.agents.react import ReactAgent
from haive.core.engine.aug_llm import AugLLMConfig
# Create complexity classifier
classifier = SimpleAgent(
name="complexity_classifier",
engine=AugLLMConfig(
temperature=0.1,
system_message="You classify task complexity as 'simple', 'moderate', or 'complex'."
)
)
# Create agents for different complexity levels
simple_processor = SimpleAgent(
name="simple_processor",
engine=AugLLMConfig(
system_message="You handle straightforward tasks efficiently."
)
)
moderate_processor = ReactAgent(
name="moderate_processor",
engine=AugLLMConfig(
tools=[basic_tools],
system_message="You handle moderately complex tasks with some tool usage."
)
)
complex_processor = ReactAgent(
name="complex_processor",
engine=AugLLMConfig(
tools=[advanced_tools],
system_message="You handle complex tasks requiring extensive reasoning and tools."
),
max_iterations=10
)
# Create conditional routing workflow
adaptive_processor = MultiAgent(
name="adaptive_task_processor",
agents=[classifier, simple_processor, moderate_processor, complex_processor],
execution_mode="conditional"
)
# Add routing logic
adaptive_processor.add_conditional_edge(
from_agent="classifier",
condition=lambda state: state.get("complexity") == "simple",
true_agent="simple_processor",
false_agent="moderate_processor"
)
adaptive_processor.add_conditional_edge(
from_agent="classifier",
condition=lambda state: state.get("complexity") == "complex",
true_agent="complex_processor",
false_agent="moderate_processor"
)
# Execute with automatic routing
result = await adaptive_processor.arun("Analyze this data and provide insights")
Hierarchical multi-agent system with supervision:
from haive.agents.multi import MultiAgent
from haive.agents.simple import SimpleAgent
from haive.agents.react import ReactAgent
from haive.core.engine.aug_llm import AugLLMConfig
# Create worker agents for specific domains
frontend_team = MultiAgent(
name="frontend_development",
agents=[
ReactAgent(name="ui_designer", engine=design_config, tools=[design_tools]),
ReactAgent(name="frontend_dev", engine=dev_config, tools=[frontend_tools]),
SimpleAgent(name="ux_reviewer", engine=review_config)
],
execution_mode="sequential"
)
backend_team = MultiAgent(
name="backend_development",
agents=[
ReactAgent(name="api_designer", engine=api_config, tools=[api_tools]),
ReactAgent(name="backend_dev", engine=dev_config, tools=[backend_tools]),
SimpleAgent(name="security_reviewer", engine=security_config)
],
execution_mode="sequential"
)
qa_team = MultiAgent(
name="quality_assurance",
agents=[
ReactAgent(name="test_designer", engine=test_config, tools=[testing_tools]),
ReactAgent(name="automation_tester", engine=automation_config, tools=[automation_tools]),
SimpleAgent(name="qa_reviewer", engine=qa_config)
],
execution_mode="parallel"
)
# Create project supervisor
project_supervisor = MultiAgent(
name="software_project",
agents=[frontend_team, backend_team, qa_team],
execution_mode="parallel_then_sequential" # Teams work in parallel, then integration
)
# Execute hierarchical project
project_result = await project_supervisor.arun(
"Develop a complete e-commerce web application with payment integration"
)
Dynamic agent composition with runtime adaptation:
from haive.agents.multi import MultiAgent
from haive.agents.simple import SimpleAgent
from haive.agents.react import ReactAgent
class AdaptiveMultiAgent(MultiAgent):
"""MultiAgent that adapts its composition based on task requirements."""
async def analyze_requirements(self, task: str) -> List[str]:
"""Analyze task to determine required agent capabilities."""
# Use LLM to analyze task and determine needed capabilities
analyzer = SimpleAgent(name="requirement_analyzer", engine=analysis_config)
requirements = await analyzer.arun(f"What agent capabilities are needed for: {task}")
return self.parse_requirements(requirements)
async def compose_team(self, capabilities: List[str]) -> List[Agent]:
"""Dynamically compose team based on required capabilities."""
team = []
if "research" in capabilities:
team.append(ReactAgent(name="researcher", tools=[search_tools]))
if "analysis" in capabilities:
team.append(ReactAgent(name="analyst", tools=[analysis_tools]))
if "creative" in capabilities:
team.append(SimpleAgent(name="creative_writer", engine=creative_config))
if "technical" in capabilities:
team.append(ReactAgent(name="technical_expert", tools=[technical_tools]))
return team
async def arun(self, task: str):
"""Execute with dynamic team composition."""
# Analyze requirements
capabilities = await self.analyze_requirements(task)
# Compose optimal team
optimal_team = await self.compose_team(capabilities)
# Update agents
self.agents = optimal_team
# Execute with optimal team
return await super().arun(task)
# Use adaptive multi-agent
adaptive_system = AdaptiveMultiAgent(name="adaptive_solver")
result = await adaptive_system.arun("Complex multifaceted problem requiring various expertise")
Structured output coordination across agents:
from haive.agents.multi import MultiAgent
from haive.agents.simple import SimpleAgent
from haive.core.engine.aug_llm import AugLLMConfig
from pydantic import BaseModel, Field
from typing import List
# Define structured output schemas
class ResearchFindings(BaseModel):
key_facts: List[str] = Field(description="Important facts discovered")
sources: List[str] = Field(description="Information sources")
confidence: float = Field(ge=0.0, le=1.0, description="Confidence in findings")
class ContentDraft(BaseModel):
title: str = Field(description="Article title")
outline: List[str] = Field(description="Article structure")
content: str = Field(description="Full article content")
word_count: int = Field(description="Total word count")
class EditorialReview(BaseModel):
clarity_score: float = Field(ge=0.0, le=10.0, description="Clarity rating")
engagement_score: float = Field(ge=0.0, le=10.0, description="Engagement rating")
improvements: List[str] = Field(description="Suggested improvements")
final_content: str = Field(description="Edited content")
# Create agents with structured outputs
structured_researcher = SimpleAgent(
name="structured_researcher",
engine=AugLLMConfig(
structured_output_model=ResearchFindings,
temperature=0.3
)
)
structured_writer = SimpleAgent(
name="structured_writer",
engine=AugLLMConfig(
structured_output_model=ContentDraft,
temperature=0.7
)
)
structured_editor = SimpleAgent(
name="structured_editor",
engine=AugLLMConfig(
structured_output_model=EditorialReview,
temperature=0.2
)
)
# Create structured workflow
structured_pipeline = MultiAgent(
name="structured_content_creation",
agents=[structured_researcher, structured_writer, structured_editor],
execution_mode="sequential"
)
# Execute with structured data flow
final_result = await structured_pipeline.arun("Create article about space exploration")
# Each agent produces validated structured output passed to next agent
- Performance Characteristics:
- Execution Performance:
Sequential workflows: Sum of individual agent execution times + 10-50ms coordination overhead
Parallel workflows: Max individual agent time + 20-100ms aggregation overhead
Conditional routing: Classification time + selected agent execution + 5-20ms routing overhead
State management: <10ms per agent transition for typical state sizes
- Scalability Metrics:
Agent count: 100+ agents per MultiAgent workflow supported
Parallel execution: Limited by system resources and LLM provider rate limits
State size: Handles MB-scale state objects with efficient serialization
Nested workflows: 10+ levels of MultiAgent nesting supported
- Resource Management:
Memory efficiency: Optimized state sharing and agent lifecycle management
Error recovery: Configurable retry strategies and fallback mechanisms
Timeout handling: Per-agent and workflow-level timeout controls
Resource limits: CPU and memory usage monitoring and limits
- Integration Patterns:
- Workflow Orchestration:
Business process automation with agent-based steps
Data processing pipelines with specialized agent components
Content creation workflows with review and approval stages
Complex analysis workflows with multiple perspectives
- Microservice Architecture:
Agent-based microservices with MultiAgent coordination
Service mesh integration with agent discovery and routing
Load balancing and scaling across agent instances
Cross-service communication through agent interfaces
- Enterprise Integration:
ERP system integration with agent-based business logic
Customer service automation with escalation workflows
Document processing with multi-stage validation
Compliance and audit workflows with approval chains
- Advanced Features:
- Dynamic Reconfiguration:
Runtime agent addition and removal
Workflow pattern switching based on conditions
Performance-based agent selection and optimization
Adaptive resource allocation
- State Management:
Sophisticated state transformation between agents
Agent-specific state views and projections
State versioning and rollback capabilities
Cross-workflow state sharing and persistence
- Monitoring and Observability:
Comprehensive execution tracing and logging
Performance metrics and bottleneck identification
Agent health monitoring and failure detection
Workflow visualization and debugging tools
- Best Practices:
- Workflow Design:
Design agents with clear, focused responsibilities
Use appropriate execution modes for task characteristics
Implement proper error handling and recovery strategies
Design state schemas for forward compatibility
- Performance Optimization:
Minimize state transfer between agents
Use parallel execution for independent tasks
Implement caching for frequently used agent results
Monitor and optimize agent execution times
- Error Handling:
Implement comprehensive error recovery strategies
Use conditional routing for error handling workflows
Design fallback agents for critical workflow paths
Monitor and alert on workflow failures
- Version History:
- v4.0 (Current):
Enhanced state management with projection support
Improved parallel execution and synchronization
Advanced conditional routing with complex decision logic
Performance optimizations and monitoring improvements
- v3.0:
Hierarchical MultiAgent support
Dynamic agent composition capabilities
Enhanced error handling and recovery
Structured output coordination
- v2.0:
Parallel execution support
Conditional routing implementation
State management improvements
Performance monitoring integration
- v1.0:
Initial MultiAgent implementation
Sequential workflow execution
Basic agent coordination patterns
See also
haive.agents.multi.enhanced_multi_agent_v4
: Latest MultiAgent implementation
haive.agents.simple
: SimpleAgent for basic agent building blocks
haive.agents.react
: ReactAgent for tool-based reasoning
haive.agents.base
: Base Agent class and coordination patterns
SubmodulesΒΆ
- agents.multi.agent
- agents.multi.base
- agents.multi.compatibility
- agents.multi.core
- agents.multi.enhanced
- agents.multi.enhanced_clean_multi_agent
- agents.multi.enhanced_dynamic_supervisor
- agents.multi.enhanced_multi_agent_generic
- agents.multi.enhanced_multi_agent_standalone
- agents.multi.enhanced_parallel_agent
- agents.multi.enhanced_sequential_agent
- agents.multi.enhanced_supervisor_agent
- agents.multi.sequential
- agents.multi.simple
- agents.multi.utils