Engine ArchitectureΒΆ
The Engine System is the beating heart of Haive Core - a revolutionary universal interface for AI components that provides seamless multi-provider orchestration, structured output handling, tool execution, and streaming capabilities through a unified, extensible architecture that adapts to any AI workload.
π Beyond Simple LLM WrappersΒΆ
Transform Your AI Integration from Fragmented to Unified:
- Universal Engine Interface
Single API for LLMs, retrievers, embedders, and custom engines with automatic provider detection and configuration
- Multi-Provider Orchestration
Seamless switching between 20+ providers with automatic failover, load balancing, and cost optimization
- Structured Output Revolution
Type-safe responses with Pydantic v2, automatic validation, and intelligent parsing for reliable AI outputs
- Advanced Tool Integration
Parallel tool execution, automatic discovery, validation framework, and error recovery for robust workflows
- Streaming & Async First
Built for real-time applications with native streaming, async execution, and backpressure handling
Core Engine ComponentsΒΆ
AugLLM Configuration SystemΒΆ
- class haive.core.engine.aug_llm.AugLLMConfig(*, id=<factory>, name=<factory>, engine_type=EngineType.LLM, description=None, input_schema=None, output_schema=None, version='1.0.0', metadata=<factory>, structured_output_model=None, structured_output_version=None, include_format_instructions=True, output_parser=None, parser_type=None, partial_variables=<factory>, tools=<factory>, pydantic_tools=<factory>, force_tool_use=False, force_tool_choice=None, tool_choice_mode='auto', bind_tools_kwargs=<factory>, tool_routes=<factory>, tool_metadata=<factory>, tools_dict=<factory>, routed_tools=<factory>, before_tool_validator=None, tool_instances=<factory>, llm_config=<factory>, prompt_template=<factory>, system_message=None, messages_placeholder_name='messages', add_messages_placeholder=True, force_messages_optional=True, examples=None, example_prompt=None, prefix=None, suffix=None, example_separator='\n\n', input_variables=None, schemas=<factory>, use_tool_for_format_instructions=False, tool_is_base_model=False, parse_raw_output=False, output_field_name=None, output_key=None, tool_kwargs=<factory>, bind_tools_config=<factory>, preprocess=None, postprocess=None, temperature=None, max_tokens=None, runtime_options=<factory>, custom_runnables=None, optional_variables=<factory>, uses_messages_field=None)[source]ΒΆ
Bases:
ToolRouteMixin
,StructuredOutputMixin
,InvokableEngine[Union[str, dict[str, Any], list[BaseMessage]], Union[BaseMessage, dict[str, Any]]]
Configuration for creating enhanced LLM chains with flexible message handling.
AugLLMConfig provides a structured way to configure and create LLM chains with prompts, tools, output parsers, and structured output models with comprehensive validation and automatic updates. It serves as the central configuration class for language model interactions in the Haive framework.
This class integrates several key functionalities: 1. Prompt template management with support for few-shot learning 2. Tool integration and discovery with automatic routing 3. Structured output handling (both parser-based and tool-based approaches) 4. Message handling for chat-based LLMs 5. Pre/post processing hooks for customization
The configuration system is designed to be highly flexible while enforcing consistent patterns and proper validation, making it easier to create reliable language model interactions.
- Parameters:
id (str)
name (str)
engine_type (EngineType)
description (str | None)
input_schema (type[BaseModel] | None)
output_schema (type[BaseModel] | None)
version (str)
structured_output_model (type[BaseModel] | None)
structured_output_version (Literal['v1', 'v2'] | None)
include_format_instructions (bool)
output_parser (Any | None)
parser_type (Literal['pydantic', 'pydantic_tools', 'str', 'json', 'custom'] | None)
force_tool_use (bool)
tool_choice_mode (Literal['auto', 'required', 'optional', 'none'])
llm_config (LLMConfig)
prompt_template (BasePromptTemplate | None)
system_message (str | None)
messages_placeholder_name (str)
add_messages_placeholder (bool)
force_messages_optional (bool)
example_prompt (PromptTemplate | None)
prefix (str | None)
suffix (str | None)
example_separator (str)
schemas (Sequence[type[BaseTool] | type[BaseModel] | Callable | StructuredTool | BaseModel])
use_tool_for_format_instructions (bool)
tool_is_base_model (bool)
parse_raw_output (bool)
output_field_name (str | None)
output_key (str | None)
temperature (float | None)
max_tokens (int | None)
custom_runnables (list[Runnable] | None)
uses_messages_field (bool | None)
- engine_typeΒΆ
The type of engine (always LLM).
- Type:
EngineType
- prompt_templateΒΆ
Template for structuring prompts.
- Type:
Optional[BasePromptTemplate]
- toolsΒΆ
Tools that can be bound to the LLM.
- Type:
Sequence[Union[Type[BaseTool], Type[BaseModel], Callable, StructuredTool, BaseModel]]
- structured_output_modelΒΆ
Pydantic model for structured outputs.
- Type:
Optional[Type[BaseModel]]
- structured_output_versionΒΆ
Version of structured output handling (v1: parser-based, v2: tool-based).
- Type:
Optional[StructuredOutputVersion]
- preprocessΒΆ
Function to preprocess input before sending to LLM.
- Type:
Optional[Callable]
- postprocessΒΆ
Function to postprocess output from LLM.
- Type:
Optional[Callable]
Examples
>>> from haive.core.engine.aug_llm.config import AugLLMConfig >>> from haive.core.models.llm.base import AzureLLMConfig >>> from pydantic import BaseModel, Field >>> >>> # Define a structured output model >>> class MovieReview(BaseModel): ... title: str = Field(description="Title of the movie") ... rating: int = Field(description="Rating from 1-10") ... review: str = Field(description="Detailed review of the movie") >>> >>> # Create a basic configuration >>> config = AugLLMConfig( ... name="movie_reviewer", ... llm_config=AzureLLMConfig(model="gpt-4"), ... system_message="You are a professional movie critic.", ... structured_output_model=MovieReview, ... temperature=0.7 ... ) >>> >>> # Create a runnable from the configuration >>> reviewer = config.create_runnable() >>> >>> # Use the runnable >>> result = reviewer.invoke("Review the movie 'Inception'")
- classmethod default_schemas_to_tools(data)[source]ΒΆ
Default schemas to tools if schemas isnβt provided but tools has values.
- classmethod ensure_structured_output_as_tool(data)[source]ΒΆ
Ensure structured output model is properly configured for both v1 and v2.
- classmethod from_few_shot(examples, example_prompt, prefix, suffix, input_variables, llm_config=None, **kwargs)[source]ΒΆ
Create with few-shot examples.
- classmethod from_few_shot_chat(examples, example_prompt, system_message=None, llm_config=None, **kwargs)[source]ΒΆ
Create with few-shot examples for chat templates.
- classmethod from_format_instructions(model, system_message=None, llm_config=None, as_tool=False, var_name='format_instructions', **kwargs)[source]ΒΆ
Create config with format instructions but without structured output.
- classmethod from_llm_config(llm_config, **kwargs)[source]ΒΆ
Create from an existing LLMConfig.
- Parameters:
llm_config (LLMConfig)
- classmethod from_prompt(prompt, llm_config=None, **kwargs)[source]ΒΆ
Create from a prompt template.
- Parameters:
prompt (BasePromptTemplate)
llm_config (LLMConfig | None)
- classmethod from_pydantic_tools(tool_models, system_message=None, llm_config=None, include_instructions=True, force_tool_use=False, **kwargs)[source]ΒΆ
Create with Pydantic tool models.
- classmethod from_structured_output_v1(model, system_message=None, llm_config=None, include_instructions=True, **kwargs)[source]ΒΆ
Create with v1 structured output using traditional parsing.
- classmethod from_structured_output_v2(model, system_message=None, llm_config=None, include_instructions=False, output_field_name=None, **kwargs)[source]ΒΆ
Create with v2 structured output using the tool-based approach.
- classmethod from_system_and_few_shot(system_message, examples, example_prompt, prefix, suffix, input_variables, llm_config=None, **kwargs)[source]ΒΆ
Create with system message and few-shot examples.
- classmethod from_system_prompt(system_prompt, llm_config=None, **kwargs)[source]ΒΆ
Create from a system prompt string.
- Parameters:
system_prompt (str)
llm_config (LLMConfig | None)
- classmethod from_tools(tools, system_message=None, llm_config=None, use_tool_for_format_instructions=None, force_tool_use=False, **kwargs)[source]ΒΆ
Create with specified tools.
- classmethod set_default_structured_output_version(data)[source]ΒΆ
Set default structured output version to v2 (tools) when model is provided but version is not.
- classmethod validate_prompt_template(v)[source]ΒΆ
Validate and reconstruct prompt template from dict data.
- Return type:
- classmethod validate_structured_output_model(v)[source]ΒΆ
Validate structured output model and default to tools-based validation.
- Return type:
- add_format_instructions(model=None, as_tools=False, var_name='format_instructions')[source]ΒΆ
Add format instructions to partial_variables without changing structured output configuration.
- Parameters:
- Return type:
- add_human_message(content)[source]ΒΆ
Add a human message to the prompt template.
- Parameters:
content (str)
- Return type:
- add_optional_variable(var_name)[source]ΒΆ
Add an optional variable to the prompt template.
- Parameters:
var_name (str)
- Return type:
- add_prompt_template(name, template)[source]ΒΆ
Add a named prompt template for easy switching.
- Parameters:
name (str) β Unique name for the template
template (BasePromptTemplate) β The prompt template to store
- Return type:
None
- add_system_message(content)[source]ΒΆ
Add or update system message in the prompt template.
- Parameters:
content (str)
- Return type:
- add_tool(tool)[source]ΒΆ
Add a tool to the configuration.
- Parameters:
tool (Any) β Tool to add (LangChain tool, Pydantic model, or callable)
- Returns:
Self for method chaining
- Return type:
- add_tool_with_route(tool, route, name=None, metadata=None)[source]ΒΆ
Add a tool with explicit route and metadata.
- apply_runnable_config(runnable_config=None)[source]ΒΆ
Extract parameters from runnable_config relevant to this engine.
- comprehensive_validation_and_setup()[source]ΒΆ
Comprehensive validation and setup after initialization.
- Return type:
- create_runnable(runnable_config=None)[source]ΒΆ
Create a runnable LLM chain based on this configuration.
- Parameters:
runnable_config (RunnableConfig | None)
- Return type:
Runnable
- create_tool_from_config(config, name=None, route=None, **kwargs)[source]ΒΆ
Create a tool from another config object.
- debug_tool_configuration()[source]ΒΆ
Print detailed debug information about tool configuration.
- Return type:
- get_active_template()[source]ΒΆ
Get the name of the currently active template.
- Return type:
str | None
- get_format_instructions(model=None, as_tools=False)[source]ΒΆ
Get format instructions for a model without changing the config.
- model_post_init(_AugLLMConfig__context)[source]ΒΆ
Proper Pydantic post-initialization.
- Return type:
None
- remove_message(index)[source]ΒΆ
Remove a message from the prompt template.
- Parameters:
index (int)
- Return type:
- remove_prompt_template(name=None)[source]ΒΆ
Remove a template or disable the active one.
- Parameters:
name (str | None) β Template name to remove. If None, disables active template.
- Returns:
Self for method chaining
- Return type:
- remove_tool(tool)[source]ΒΆ
Remove a tool from the configuration.
- Parameters:
tool (Any) β Tool instance to remove
- Returns:
Self for method chaining
- Return type:
- replace_message(index, message)[source]ΒΆ
Replace a message in the prompt template.
- Parameters:
- Return type:
- use_prompt_template(name)[source]ΒΆ
Switch to using a specific named template.
- Parameters:
name (str) β Name of the template to activate
- Returns:
Self for method chaining
- Raises:
ValueError β If template name not found
- Return type:
- with_format_instructions(model, as_tool=False, var_name='format_instructions')[source]ΒΆ
Add format instructions without setting up structured output or parser.
- Parameters:
- Return type:
- with_pydantic_tools(tool_models, include_instructions=True, force_use=False)[source]ΒΆ
Configure with Pydantic tools output parsing.
- Parameters:
- Return type:
- with_structured_output(model, include_instructions=True, version='v2')[source]ΒΆ
Configure with Pydantic structured output.
- Parameters:
- Return type:
- engine_type: EngineTypeΒΆ
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'validate_assignment': True}ΒΆ
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- tool_choice_mode: ToolChoiceModeΒΆ
The Ultimate LLM Configuration Framework
AugLLMConfig provides a comprehensive configuration system that goes far beyond simple model wrappers, offering enterprise-grade features for production AI systems.
Configuration Power:
from haive.core.engine.aug_llm import AugLLMConfig
from haive.core.engine.providers import AzureLLMConfig, AnthropicConfig
from pydantic import BaseModel
# Define structured output
class AnalysisResult(BaseModel):
sentiment: str
confidence: float
key_topics: List[str]
recommendations: List[str]
# Create advanced configuration
config = AugLLMConfig(
# Provider configuration
llm_config=AzureLLMConfig(
model="gpt-4",
temperature=0.7,
max_tokens=2000,
api_version="2024-02-01"
),
# System setup
system_message="You are an expert data analyst.",
# Tool integration
tools=[web_search, calculator, database_query],
force_tool_choice=["web_search"], # Require web search
# Structured output
structured_output_model=AnalysisResult,
structured_output_version="v2", # Use latest handling
# Advanced features
enable_streaming=True,
retry_strategy="exponential",
timeout=30,
# Callbacks for monitoring
callbacks=[token_counter, latency_tracker]
)
# Create runnable chain
analyzer = config.create_runnable()
# Execute with streaming
async for chunk in analyzer.astream({"query": "Analyze market trends"}):
print(chunk) # Real-time streaming output
Multi-Shot Learning Configuration:
# Configure with examples
config = AugLLMConfig(
llm_config=llm_config,
# Few-shot examples
examples=[
{"input": "The product is amazing!", "output": "positive"},
{"input": "Terrible experience", "output": "negative"},
{"input": "It's okay, nothing special", "output": "neutral"}
],
# Example formatting
example_prompt=PromptTemplate(
template="Input: {input}\nOutput: {output}"
),
prefix="Classify the sentiment of the following texts:",
suffix="Input: {text}\nOutput:",
# This creates an intelligent few-shot classifier
)
Engine Factory SystemΒΆ
Dynamic Engine Creation and Management
The factory system enables runtime engine creation with automatic configuration discovery and validation.
Factory Patterns:
from haive.core.engine import EngineFactory, EngineRegistry
# Register custom engine
@EngineRegistry.register("custom_llm")
class CustomLLMEngine(InvokableEngine):
"""Custom LLM implementation."""
def invoke(self, input_data):
# Custom logic
return process_with_custom_model(input_data)
# Create engines dynamically
factory = EngineFactory()
# Auto-detect best provider
engine = factory.create_engine(
task_type="text-generation",
requirements={
"max_tokens": 4000,
"supports_tools": True,
"cost_limit": 0.10 # Per request
}
)
# Create with specific config
azure_engine = factory.create_engine(
engine_type="llm",
provider="azure",
config={"model": "gpt-4", "temperature": 0.5}
)
Multi-Provider SupportΒΆ
20+ Provider Integrations
GPT-4, GPT-3.5
DALL-E 3
Whisper
Embeddings
Claude 3 Opus/Sonnet
Claude 2.1
Claude Instant
Constitutional AI
Gemini Pro/Ultra
PaLM 2
Vertex AI
Custom models
Azure OpenAI
Cognitive Services
Custom deployments
Private endpoints
Bedrock
SageMaker
Comprehend
Textract
Hugging Face
Ollama
LlamaCpp
vLLM
Provider Orchestration Example:
from haive.core.engine.providers import MultiProviderOrchestrator
# Configure provider pool
orchestrator = MultiProviderOrchestrator([
# Primary provider
AzureLLMConfig(
model="gpt-4",
priority=1,
rate_limit=100, # Requests per minute
cost_per_token=0.00003
),
# Fallback for high load
AnthropicConfig(
model="claude-3-sonnet",
priority=2,
rate_limit=50,
cost_per_token=0.00002
),
# Budget option
OllamaConfig(
model="llama3-70b",
priority=3,
rate_limit=None, # Unlimited local
cost_per_token=0.0
)
])
# Intelligent routing
response = await orchestrator.execute(
prompt="Complex analysis task",
routing_strategy="cost_optimized", # or "latency_optimized"
fallback_on_error=True,
max_retries=3
)
Tool Integration FrameworkΒΆ
Advanced Tool Execution System
π οΈ Tool Engine - Universal AI Tool Integration System
THE ULTIMATE TOOLKIT FOR INTELLIGENT AGENTS
Welcome to the Tool Engine - the revolutionary system that transforms any function, API, or service into an intelligent agent capability. This isnβt just function calling; itβs a comprehensive tool ecosystem that makes AI agents capable of interacting with the entire digital world.
π TOOL REVOLUTIONΒΆ
The Tool Engine represents a paradigm shift in how AI agents interact with external systems. Every function becomes an intelligent capability that agents can:
π Universal Integration: Connect any API, database, service, or function instantly π§ Intelligent Discovery: Automatically analyze and categorize tool capabilities β‘ Dynamic Execution: Real-time tool selection and parallel execution π‘οΈ Type Safety: Full validation and error handling for all tool interactions π Advanced Analytics: Comprehensive tool usage metrics and optimization
π CORE INNOVATIONSΒΆ
- 1. Universal Tool Abstraction π―
Every tool follows the same interface, regardless of complexity:
Examples
>>> # Simple function becomes a tool
>>> @tool
>>> def calculator(expression: str) -> float:
>>> """Calculate mathematical expressions."""
>>> return eval(expression)
>>>
>>> # Complex API becomes a tool
>>> @tool
>>> def web_search(query: str, max_results: int = 5) -> List[SearchResult]:
>>> """Search the web for information."""
>>> return search_api.query(query, limit=max_results)
>>>
>>> # All tools work the same way in agents
>>> agent = AugLLMConfig(
>>> model="gpt-4",
>>> tools=[calculator, web_search, database_query, email_sender]
>>> )
- 2. Intelligent Tool Analysis π
Automatic discovery and categorization of tool capabilities:
>>> from haive.core.engine.tool import ToolAnalyzer >>> >>> analyzer = ToolAnalyzer() >>> >>> # Analyze any tool automatically >>> analysis = analyzer.analyze_tool(my_complex_function) >>> print(f"Category: {analysis.category}") # e.g., "data_processing" >>> print(f"Capabilities: {analysis.capabilities}") # e.g., ["search", "filter"] >>> print(f"Complexity: {analysis.complexity}") # e.g., "medium" >>> print(f"Risk Level: {analysis.risk_level}") # e.g., "low" >>> >>> # Automatic tool optimization suggestions >>> suggestions = analyzer.get_optimization_suggestions(my_function) >>> for suggestion in suggestions: >>> print(f"π‘ {suggestion.description}")
- 3. Advanced Tool Types π¨
Rich type system for different tool behaviors:
>>> from haive.core.engine.tool import ( >>> ToolType, ToolCategory, ToolCapability, >>> StateAwareTool, InterruptibleTool >>> ) >>> >>> # State-aware tools that remember context >>> class DatabaseTool(StateAwareTool): >>> def __init__(self): >>> self.connection_pool = {} >>> self.transaction_state = {} >>> >>> def execute(self, query: str, context: ToolContext) -> QueryResult: >>> # Use context for connection management >>> conn = self.get_connection(context.session_id) >>> return conn.execute(query) >>> >>> # Interruptible tools for long-running operations >>> class DataProcessingTool(InterruptibleTool): >>> async def execute(self, data: LargeDataset) -> ProcessedData: >>> for chunk in data.chunks(): >>> if self.should_interrupt(): >>> return PartialResult(processed_so_far) >>> await self.process_chunk(chunk)
- 4. Tool Composition & Orchestration π§©
Build complex capabilities from simple tools:
>>> from haive.core.engine.tool import ToolEngine, compose_tools >>> >>> # Compose tools into workflows >>> research_workflow = compose_tools([ >>> web_search_tool, >>> content_extractor, >>> fact_checker, >>> summarizer >>> ]) >>> >>> # Parallel tool execution >>> analysis_suite = ToolEngine.parallel_tools([ >>> sentiment_analyzer, >>> entity_extractor, >>> keyword_extractor, >>> topic_classifier >>> ]) >>> >>> # Conditional tool chains >>> smart_processor = ToolEngine.conditional_chain({ >>> "text_input": text_processing_tools, >>> "image_input": image_processing_tools, >>> "audio_input": audio_processing_tools >>> })
π― ADVANCED FEATURESΒΆ
Real-time Tool Discovery π
>>> # Automatically discover tools from modules
>>> from haive.core.engine.tool import discover_tools
>>>
>>> # Scan entire modules for tools
>>> discovered = discover_tools([
>>> "myproject.api_tools",
>>> "myproject.data_tools",
>>> "myproject.ml_tools"
>>> ])
>>>
>>> print(f"Found {len(discovered)} tools:")
>>> for tool in discovered:
>>> print(f" π οΈ {tool.name}: {tool.description}")
>>> print(f" Categories: {', '.join(tool.categories)}")
>>> print(f" Capabilities: {', '.join(tool.capabilities)}")
Tool Validation & Testing β
>>> # Comprehensive tool validation
>>> validator = ToolValidator()
>>>
>>> validation_result = validator.validate_tool(my_tool)
>>> if validation_result.is_valid:
>>> print("β
Tool is valid and ready to use")
>>> else:
>>> print("β Tool validation failed:")
>>> for error in validation_result.errors:
>>> print(f" - {error.message}")
>>>
>>> # Automated tool testing
>>> test_suite = ToolTestSuite()
>>> test_suite.add_tool(my_tool)
>>>
>>> # Generate test cases automatically
>>> test_cases = test_suite.generate_test_cases(my_tool)
>>> results = test_suite.run_tests()
>>>
>>> for result in results:
>>> if result.passed:
>>> print(f"β
{result.test_name}")
>>> else:
>>> print(f"β {result.test_name}: {result.error}")
Tool Performance Optimization π
>>> # Performance monitoring and optimization
>>> optimizer = ToolOptimizer()
>>>
>>> # Analyze tool performance
>>> performance = optimizer.analyze_performance(my_tool)
>>> print(f"Average latency: {performance.avg_latency_ms}ms")
>>> print(f"Success rate: {performance.success_rate:.2%}")
>>> print(f"Memory usage: {performance.memory_usage_mb}MB")
>>>
>>> # Apply optimizations
>>> optimized_tool = optimizer.optimize(
>>> my_tool,
>>> strategies=[
>>> "caching", # Cache frequent results
>>> "connection_pooling", # Reuse connections
>>> "batching", # Batch similar requests
>>> "async_execution" # Use async when possible
>>> ]
>>> )
>>>
>>> # Performance improvement
>>> improvement = optimizer.measure_improvement(my_tool, optimized_tool)
>>> print(f"Performance improved by {improvement.latency_improvement:.1f}x")
Tool Security & Sandboxing π
>>> # Secure tool execution
>>> from haive.core.engine.tool import SecureToolEngine, SandboxConfig
>>>
>>> # Configure security sandbox
>>> sandbox_config = SandboxConfig(
>>> max_execution_time=30.0,
>>> max_memory_usage="500MB",
>>> allowed_network_hosts=["api.example.com"],
>>> blocked_filesystem_paths=["/etc", "/usr"],
>>> enable_audit_logging=True
>>> )
>>>
>>> # Create secure tool engine
>>> secure_engine = SecureToolEngine(
>>> tools=[potentially_unsafe_tool],
>>> sandbox_config=sandbox_config
>>> )
>>>
>>> # All tool execution is sandboxed
>>> result = secure_engine.execute(
>>> tool_name="data_processor",
>>> input_data=untrusted_data,
>>> context=execution_context
>>> )
>>>
>>> # Review security audit log
>>> audit_log = secure_engine.get_audit_log()
>>> for entry in audit_log:
>>> print(f"{entry.timestamp}: {entry.action} - {entry.result}")
ποΈ TOOL CATEGORIES & CAPABILITIESΒΆ
Built-in Tool Categories π
>>> from haive.core.engine.tool import ToolCategory
>>>
>>> categories = [
>>> ToolCategory.DATA_PROCESSING, # Data manipulation and analysis
>>> ToolCategory.WEB_INTERACTION, # Web scraping, API calls
>>> ToolCategory.FILE_OPERATIONS, # File I/O, format conversion
>>> ToolCategory.COMMUNICATION, # Email, messaging, notifications
>>> ToolCategory.COMPUTATION, # Mathematical calculations
>>> ToolCategory.SEARCH, # Information retrieval
>>> ToolCategory.VISUALIZATION, # Charts, graphs, reports
>>> ToolCategory.AI_MODELS, # ML model inference
>>> ToolCategory.DATABASE, # Database operations
>>> ToolCategory.SYSTEM_ADMIN, # System management tasks
>>> ]
Tool Capabilities π¨
>>> from haive.core.engine.tool import ToolCapability
>>>
>>> capabilities = [
>>> ToolCapability.READ, # Read data/information
>>> ToolCapability.WRITE, # Write/modify data
>>> ToolCapability.SEARCH, # Search/query capabilities
>>> ToolCapability.TRANSFORM, # Data transformation
>>> ToolCapability.ANALYZE, # Analysis and insights
>>> ToolCapability.GENERATE, # Content generation
>>> ToolCapability.VALIDATE, # Data validation
>>> ToolCapability.MONITOR, # Monitoring and alerting
>>> ToolCapability.SCHEDULE, # Task scheduling
>>> ToolCapability.INTEGRATE, # System integration
>>> ]
π¨ TOOL DEVELOPMENT PATTERNSΒΆ
Simple Function Tool π
>>> from haive.core.engine.tool import tool
>>>
>>> @tool
>>> def word_count(text: str) -> int:
>>> """Count words in text."""
>>> return len(text.split())
>>>
>>> # Automatically gets proper metadata
>>> assert word_count.category == ToolCategory.DATA_PROCESSING
>>> assert ToolCapability.ANALYZE in word_count.capabilities
Stateful Tool Class πͺ
>>> from haive.core.engine.tool import StatefulTool
>>>
>>> class DatabaseTool(StatefulTool):
>>> """Database interaction tool with connection management."""
>>>
>>> def __init__(self, connection_string: str):
>>> super().__init__()
>>> self.connection_string = connection_string
>>> self._connection = None
>>>
>>> def connect(self):
>>> """Establish database connection."""
>>> if not self._connection:
>>> self._connection = create_connection(self.connection_string)
>>>
>>> @tool_method
>>> def query(self, sql: str) -> List[Dict]:
>>> """Execute SQL query."""
>>> self.connect()
>>> return self._connection.execute(sql).fetchall()
>>>
>>> @tool_method
>>> def insert(self, table: str, data: Dict) -> bool:
>>> """Insert data into table."""
>>> self.connect()
>>> return self._connection.insert(table, data)
Async Tool Implementation β‘
>>> from haive.core.engine.tool import async_tool
>>>
>>> @async_tool
>>> async def fetch_url(url: str, timeout: float = 10.0) -> WebPage:
>>> """Fetch web page content asynchronously."""
>>> async with aiohttp.ClientSession() as session:
>>> async with session.get(url, timeout=timeout) as response:
>>> content = await response.text()
>>> return WebPage(
>>> url=url,
>>> content=content,
>>> status_code=response.status
>>> )
>>>
>>> # Use in async agents
>>> async def research_topic(topic: str):
>>> urls = await web_search(topic)
>>> pages = await asyncio.gather(*[
>>> fetch_url(url) for url in urls[:5]
>>> ])
>>> return analyze_pages(pages)
Tool with Complex Configuration βοΈ
>>> from haive.core.engine.tool import ConfigurableTool
>>> from pydantic import BaseModel, Field
>>>
>>> class EmailConfig(BaseModel):
>>> smtp_server: str = Field(..., description="SMTP server address")
>>> port: int = Field(587, description="SMTP port")
>>> username: str = Field(..., description="Email username")
>>> password: str = Field(..., description="Email password")
>>> use_tls: bool = Field(True, description="Use TLS encryption")
>>>
>>> class EmailTool(ConfigurableTool[EmailConfig]):
>>> """Email sending tool with full configuration."""
>>>
>>> config_class = EmailConfig
>>>
>>> def send_email(self, to: str, subject: str, body: str) -> bool:
>>> """Send email message."""
>>> with smtplib.SMTP(self.config.smtp_server, self.config.port) as server:
>>> if self.config.use_tls:
>>> server.starttls()
>>> server.login(self.config.username, self.config.password)
>>> server.send_message(self._create_message(to, subject, body))
>>> return True
>>>
>>> # Configure and use
>>> email_tool = EmailTool(config=EmailConfig(
>>> smtp_server="smtp.gmail.com",
>>> username="agent@example.com",
>>> password="app-password"
>>> ))
π TOOL ANALYTICS & INSIGHTSΒΆ
Usage Analytics π
>>> # Get comprehensive tool usage analytics
>>> analytics = ToolAnalytics()
>>>
>>> # Tool usage statistics
>>> usage_stats = analytics.get_usage_stats(time_range="last_7_days")
>>> print(f"Most used tool: {usage_stats.top_tool}")
>>> print(f"Total executions: {usage_stats.total_executions}")
>>> print(f"Average success rate: {usage_stats.avg_success_rate:.2%}")
>>>
>>> # Performance insights
>>> performance = analytics.get_performance_insights()
>>> for insight in performance.slow_tools:
>>> print(f"π {insight.tool_name}: {insight.avg_latency}ms")
>>> print(f" Suggestion: {insight.optimization_suggestion}")
>>>
>>> # Error analysis
>>> errors = analytics.get_error_analysis()
>>> for error_pattern in errors.common_patterns:
>>> print(f"β {error_pattern.error_type}: {error_pattern.frequency}")
>>> print(f" Tools affected: {', '.join(error_pattern.affected_tools)}")
Tool Recommendation Engine π―
>>> # Intelligent tool recommendations
>>> recommender = ToolRecommendationEngine()
>>>
>>> # Get tool suggestions for specific tasks
>>> recommendations = recommender.recommend_tools(
>>> task_description="I need to analyze customer feedback data",
>>> context={
>>> "data_type": "text",
>>> "data_size": "large",
>>> "output_format": "dashboard"
>>> }
>>> )
>>>
>>> for rec in recommendations:
>>> print(f"π οΈ {rec.tool_name} (confidence: {rec.confidence:.2f})")
>>> print(f" Why: {rec.reasoning}")
>>> print(f" Usage: {rec.example_usage}")
π ENTERPRISE FEATURESΒΆ
Tool Governance: Approval workflows for new tools
Access Control: Role-based tool permissions
Compliance: Audit trails for all tool executions
Multi-tenancy: Isolated tool environments per tenant
Cost Management: Tool usage tracking and budgets
Quality Assurance: Automated tool testing and validation
π BEST PRACTICESΒΆ
Clear Documentation: Every tool needs comprehensive docstrings
Type Safety: Use proper type hints for all parameters and returns
Error Handling: Implement robust error handling and recovery
Resource Management: Clean up connections and temporary files
Security: Validate all inputs and sanitize outputs
Performance: Optimize for common use cases and cache when appropriate
Testing: Include unit tests and integration tests for all tools
π GETTING STARTEDΒΆ
>>> from haive.core.engine.tool import tool, ToolEngine
>>>
>>> # 1. Create a simple tool
>>> @tool
>>> def greet(name: str) -> str:
>>> """Greet someone by name."""
>>> return f"Hello, {name}!"
>>>
>>> # 2. Use in an agent
>>> from haive.core.engine.aug_llm import AugLLMConfig
>>>
>>> agent = AugLLMConfig(
>>> model="gpt-4",
>>> tools=[greet]
>>> )
>>>
>>> # 3. The agent can now greet people!
>>> result = agent.invoke("Please greet Alice")
>>> # Agent will call greet("Alice") and respond: "Hello, Alice!"
β
Tool Engine: Where Functions Become Intelligent Capabilities π οΈ
- class haive.core.engine.tool.InterruptibleTool(*args, **kwargs)[source]ΒΆ
Protocol for tools that support interruption.
- __init__(*args, **kwargs)ΒΆ
- class haive.core.engine.tool.StateAwareTool(*args, **kwargs)[source]ΒΆ
Protocol for tools that interact with state.
- __init__(*args, **kwargs)ΒΆ
- class haive.core.engine.tool.ToolAnalyzer[source]ΒΆ
Analyzes tools to determine their properties and capabilities.
This analyzer uses existing Haive utilities to detect tool capabilities, schemas, and other properties needed for routing and execution.
- class haive.core.engine.tool.ToolCapability(*values)[source]ΒΆ
Fine-grained tool capabilities for routing and execution.
- ASYNC_CAPABLE = 'async_capable'ΒΆ
- BATCH_CAPABLE = 'batch_capable'ΒΆ
- FROM_STATE = 'from_state'ΒΆ
- INJECTED_STATE = 'injected_state'ΒΆ
- INTERRUPTIBLE = 'interruptible'ΒΆ
- READS_STATE = 'reads_state'ΒΆ
- RETRIEVER = 'retriever'ΒΆ
- ROUTED = 'routed'ΒΆ
- STATE_AWARE = 'state_aware'ΒΆ
- STORE = 'store'ΒΆ
- STREAMING = 'streaming'ΒΆ
- STRUCTURED_OUTPUT = 'structured_output'ΒΆ
- TO_STATE = 'to_state'ΒΆ
- TRANSFORMER = 'transformer'ΒΆ
- VALIDATED_OUTPUT = 'validated_output'ΒΆ
- VALIDATOR = 'validator'ΒΆ
- WRITES_STATE = 'writes_state'ΒΆ
- class haive.core.engine.tool.ToolCategory(*values)[source]ΒΆ
High-level tool categorization.
- COMMUNICATION = 'communication'ΒΆ
- COMPUTATION = 'computation'ΒΆ
- COORDINATION = 'coordination'ΒΆ
- GENERATION = 'generation'ΒΆ
- MEMORY = 'memory'ΒΆ
- RETRIEVAL = 'retrieval'ΒΆ
- SEARCH = 'search'ΒΆ
- TRANSFORMATION = 'transformation'ΒΆ
- UNKNOWN = 'unknown'ΒΆ
- VALIDATION = 'validation'ΒΆ
- class haive.core.engine.tool.ToolEngine(*, id=<factory>, name=<factory>, engine_type=EngineType.TOOL, description=None, input_schema=None, output_schema=None, version='1.0.0', metadata=<factory>, tools=None, toolkit=None, retry_policy=None, parallel=False, messages_key='messages', tool_choice='auto', return_source=True, timeout=None, max_iterations=None, auto_route=True, routing_strategy='auto', enable_analysis=True)[source]ΒΆ
Enhanced tool engine with universal typing and property analysis.
This engine manages tools with comprehensive property analysis, capability-based routing, and state interaction tracking.
- Parameters:
id (str)
name (str)
engine_type (EngineType)
description (str | None)
input_schema (type[BaseModel] | None)
output_schema (type[BaseModel] | None)
version (str)
tools (Sequence[BaseTool | StructuredTool | type[BaseTool] | BaseModel | type[BaseModel] | Callable[[...], Any] | BaseToolkit] | None)
toolkit (BaseToolkit | list[BaseToolkit] | None)
retry_policy (RetryPolicy | None)
parallel (bool)
messages_key (str)
tool_choice (Literal['auto', 'required', 'none'])
return_source (bool)
timeout (float | None)
max_iterations (int | None)
auto_route (bool)
routing_strategy (Literal['auto', 'capability', 'category', 'priority'])
enable_analysis (bool)
- classmethod augment_tool(tool, *, name=None, description=None, make_interruptible=False, interrupt_message='Tool execution interrupted', reads_state=False, writes_state=False, state_keys=None, structured_output_model=None)[source]ΒΆ
Augment an existing tool with additional capabilities.
This method adds capabilities to a tool without creating nested wrappers. All capabilities are added directly to the tool instance.
- Parameters:
- Return type:
StructuredTool
- classmethod create_human_interrupt_tool(name='human_interrupt', description='Request human intervention or approval', *, allow_ignore=True, allow_respond=True, allow_edit=False, allow_accept=True, interrupt_message='Human input requested')[source]ΒΆ
Create a tool that triggers human interrupt workflow.
This creates a tool that can pause graph execution and request human intervention, following langgraphβs interrupt patterns.
- Parameters:
name (str) β Tool name
description (str) β Tool description
allow_ignore (bool) β Whether human can skip/ignore
allow_respond (bool) β Whether human can provide text response
allow_edit (bool) β Whether human can edit the request
allow_accept (bool) β Whether human can accept as-is
interrupt_message (str) β Default interrupt message
- Returns:
A StructuredTool that triggers human interrupts
- Return type:
StructuredTool
- classmethod create_interruptible_tool(func, name=None, description=None, *, interrupt_message='Tool execution interrupted')[source]ΒΆ
Create or wrap a tool with interruption capability.
- classmethod create_retriever_tool(retriever, name, description, *, document_prompt=None, document_separator='\n\n', response_format='content')[source]ΒΆ
Create a tool to do retrieval of documents.
- classmethod create_retriever_tool_from_config(retriever_config, name, description, *, document_prompt=None, document_separator='\n\n', response_format='content')[source]ΒΆ
Create a retriever tool from a Haive retriever configuration.
This is a convenience method that creates a retriever from the config and then converts it to a tool with proper metadata.
- Parameters:
retriever_config (Any) β The BaseRetrieverConfig to use
name (str) β Tool name
description (str) β Tool description
document_prompt (BasePromptTemplate | None) β Optional prompt to format documents
document_separator (str) β Separator between documents
response_format (Literal['content', 'content_and_artifact']) β Format for tool response
- Returns:
A StructuredTool configured for retrieval
- Return type:
StructuredTool
- classmethod create_state_tool(func, name=None, description=None, *, reads_state=False, writes_state=False, state_keys=None)[source]ΒΆ
Create or wrap a tool with state interaction metadata.
- classmethod create_store_tools_suite(store_manager, namespace=None, include_tools=None)[source]ΒΆ
Create a suite of store/memory management tools.
This creates tools for memory operations like store, search, retrieve, update, and delete. These tools integrate with the StoreManager for persistent memory management.
- Parameters:
- Returns:
List of store tools with proper metadata
- Return type:
list[StructuredTool]
- classmethod create_structured_output_tool(func, name, description, output_model, *, infer_schema=True)[source]ΒΆ
Create a tool that produces structured output.
- classmethod from_aug_llm_config(config, *, extract_tools=True, include_structured_output=True, name=None)[source]ΒΆ
Create ToolEngine from AugLLMConfig.
- Parameters:
config (AugLLMConfig) β AugLLMConfig instance to convert
extract_tools (bool) β Whether to extract tools from config
include_structured_output (bool) β Whether to convert structured_output_model to tool
name (str | None) β Optional name for the engine
- Returns:
ToolEngine with tools extracted from AugLLMConfig
- Return type:
- classmethod from_document_engine(engine, *, create_loader_tools=True, create_splitter_tools=True, create_processor_tools=False, name=None)[source]ΒΆ
Create ToolEngine from DocumentEngine.
- Parameters:
engine (DocumentEngine) β DocumentEngine instance to convert
create_loader_tools (bool) β Whether to create document loading tools
create_splitter_tools (bool) β Whether to create document splitting tools
create_processor_tools (bool) β Whether to create document processing tools
name (str | None) β Optional name for the engine
- Returns:
ToolEngine with document processing tools
- Return type:
- classmethod from_multiple_engines(engines, *, engine_conversion_config=None, name=None)[source]ΒΆ
Create ToolEngine from multiple engines.
- Parameters:
- Returns:
ToolEngine with tools from all engines
- Return type:
- classmethod from_retriever_config(config, *, tool_name=None, tool_description=None, name=None)[source]ΒΆ
Create ToolEngine from BaseRetrieverConfig.
- Parameters:
- Returns:
ToolEngine with retriever converted to tool
- Return type:
- classmethod from_vectorstore_config(config, *, search_tool_name=None, similarity_search_tool=True, max_marginal_relevance_tool=False, name=None)[source]ΒΆ
Create ToolEngine from VectorStoreConfig.
- Parameters:
config (VectorStoreConfig) β VectorStoreConfig instance to convert
search_tool_name (str | None) β Base name for search tools
similarity_search_tool (bool) β Whether to create similarity search tool
max_marginal_relevance_tool (bool) β Whether to create MMR search tool
name (str | None) β Optional name for the engine
- Returns:
ToolEngine with vectorstore search tools
- Return type:
- classmethod get_capability_enum()[source]ΒΆ
Get ToolCapability enum for other components.
- Return type:
- classmethod get_tool_type()[source]ΒΆ
Get the universal ToolLike type for other components.
- Return type:
- create_runnable(runnable_config=None)[source]ΒΆ
Create enhanced ToolNode with property awareness.
- Parameters:
runnable_config (RunnableConfig | None)
- Return type:
- get_tool_properties(tool_name)[source]ΒΆ
Get properties for specific tool.
- Parameters:
tool_name (str)
- Return type:
ToolProperties | None
- get_tools_by_capability(*capabilities)[source]ΒΆ
Get tool names with specified capabilities.
- Parameters:
capabilities (ToolCapability)
- Return type:
- get_tools_by_category(category)[source]ΒΆ
Get tool names in category.
- Parameters:
category (ToolCategory)
- Return type:
- model_post_init(_ToolEngine__context)[source]ΒΆ
Initialize and analyze tools after creation.
- Parameters:
_ToolEngine__context (Any)
- Return type:
None
- engine_type: EngineTypeΒΆ
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}ΒΆ
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- routing_strategy: Literal['auto', 'capability', 'category', 'priority']ΒΆ
- tool_choice: Literal['auto', 'required', 'none']ΒΆ
- class haive.core.engine.tool.ToolProperties(*, name, tool_type, category=ToolCategory.UNKNOWN, capabilities=<factory>, is_state_tool=False, to_state_tool=False, from_state_tool=False, state_dependencies=<factory>, state_outputs=<factory>, is_interruptible=False, is_async=False, is_routed=False, input_schema=None, output_schema=None, structured_output_model=None, is_structured_output_model=False, description=None, version='1.0', tags=<factory>, expected_duration=None, requires_network=False)[source]ΒΆ
Comprehensive tool properties for analysis and routing.
This model captures all relevant properties of a tool including its type, capabilities, state interaction patterns, and schemas.
- Parameters:
name (str)
tool_type (ToolType)
category (ToolCategory)
capabilities (set[ToolCapability])
is_state_tool (bool)
to_state_tool (bool)
from_state_tool (bool)
is_interruptible (bool)
is_async (bool)
is_routed (bool)
input_schema (type[BaseModel] | None)
output_schema (type[BaseModel] | None)
structured_output_model (type[BaseModel] | None)
is_structured_output_model (bool)
description (str | None)
version (str)
expected_duration (float | None)
requires_network (bool)
- has_capability(capability)[source]ΒΆ
Check if tool has specific capability.
- Parameters:
capability (ToolCapability)
- Return type:
- capabilities: set[ToolCapability]ΒΆ
- category: ToolCategoryΒΆ
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}ΒΆ
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class haive.core.engine.tool.ToolType(*values)[source]ΒΆ
Tool implementation types.
- FUNCTION = 'function'ΒΆ
- LANGCHAIN_TOOL = 'langchain_tool'ΒΆ
- PYDANTIC_MODEL = 'pydantic_model'ΒΆ
- RETRIEVER_TOOL = 'retriever_tool'ΒΆ
- STORE_TOOL = 'store_tool'ΒΆ
- STRUCTURED_TOOL = 'structured_tool'ΒΆ
- TOOLKIT = 'toolkit'ΒΆ
- VALIDATION_TOOL = 'validation_tool'ΒΆ
Tool Creation and Management:
from haive.core.engine.tool import tool, ToolEngine
from typing import Annotated
# Define tools with rich metadata
@tool
def analyze_data(
data: Annotated[List[float], "Numerical data to analyze"],
method: Annotated[str, "Analysis method: mean, median, std"] = "mean"
) -> Annotated[dict, "Analysis results with statistics"]:
"""Perform statistical analysis on numerical data.
This tool supports various statistical methods and returns
comprehensive results including confidence intervals.
"""
import numpy as np
if method == "mean":
result = {
"value": np.mean(data),
"confidence_interval": calculate_ci(data),
"sample_size": len(data)
}
# ... more methods
return result
# Create tool engine
tool_engine = ToolEngine(
tools=[analyze_data, web_search, database_query],
execution_mode="parallel", # Execute independent tools in parallel
timeout_per_tool=10,
error_handling="continue" # Don't fail on single tool error
)
# Execute with validation
results = await tool_engine.execute_tools([
{"tool": "analyze_data", "args": {"data": [1, 2, 3, 4, 5]}},
{"tool": "web_search", "args": {"query": "latest statistics"}}
])
Structured Output SystemΒΆ
Type-Safe AI Responses
Output parser engine module for the Haive framework.
This module provides engine implementations for LangChain output parsers, enabling structured output parsing from LLM responses within the Haive framework.
Output parsers transform raw LLM text outputs into structured data formats, including JSON, Pydantic models, enums, lists, and custom formats. This module wraps LangChainβs output parser functionality in Haiveβs engine system for consistent usage across the framework.
- Key Components:
OutputParserEngine: Engine wrapper for LangChain output parsers OutputParserType: Enumeration of supported parser types
- Supported Parser Types:
JSON: Parse output as JSON objects
PydanticOutputParser: Parse into Pydantic model instances
EnumOutputParser: Parse into enum values
DatetimeOutputParser: Parse datetime strings
BooleanOutputParser: Parse boolean values
CommaSeparatedListOutputParser: Parse comma-separated lists
MarkdownListOutputParser: Parse markdown formatted lists
NumberedListOutputParser: Parse numbered lists
XMLOutputParser: Parse XML formatted output
Examples
JSON output parsing:
from haive.core.engine.output_parser import OutputParserEngine, OutputParserType
parser = OutputParserEngine(
parser_type=OutputParserType.JSON,
name="json_parser"
)
result = parser.invoke('{"name": "John", "age": 30}')
# Returns: {"name": "John", "age": 30}
Pydantic model parsing:
from pydantic import BaseModel, Field
from haive.core.engine.output_parser import OutputParserEngine, OutputParserType
class Person(BaseModel):
name: str = Field(description="Person's name")
age: int = Field(description="Person's age")
parser = OutputParserEngine(
parser_type=OutputParserType.PYDANTIC,
pydantic_object=Person,
name="person_parser"
)
result = parser.invoke("name: John\\nage: 30")
# Returns: Person(name="John", age=30)
See also
LangChain output parsers documentation
Output parser types: types.py
Base implementation: base.py
- class haive.core.engine.output_parser.OutputParserEngine(*, id=<factory>, name=<factory>, engine_type=EngineType.OUTPUT_PARSER, description=None, input_schema=None, output_schema=None, version='1.0.0', metadata=<factory>, parser_type, parser_config=<factory>, pydantic_model=None, regex_pattern=None, enum_class=None, response_schemas=None)[source]ΒΆ
Engine that wraps LangChain OutputParsers to convert text to structured data.
The generic parameter TOut represents the specific return type of the parser. This allows for type-safe usage of parsers in your workflows.
Supported output types include: - Primitive types (bool, str, int, float, datetime) - Collection types (List[str], Dict[str, Any], etc.) - Pydantic models (any BaseModel subclass) - Specialized types (pandas.DataFrame, Enum values) - Custom structured types
Examples
# Create a JSON parser json_parser = create_output_parser_engine(
OutputParserType.JSON, name=βmy_json_parserβ
)
# Use in a graph graph.add_node(βparse_jsonβ, json_parser)
- Parameters:
- async ainvoke(input_data, runnable_config=None)[source]ΒΆ
Asynchronously invoke the parser with the input data.
- Parameters:
input_data (TIn) β Input text, message, or dictionary
runnable_config (RunnableConfig | None) β Optional runtime configuration
- Returns:
Parsed output based on parser type
- Return type:
TOut
- create_runnable(runnable_config=None)[source]ΒΆ
Create the appropriate parser based on configuration.
- Parameters:
runnable_config (RunnableConfig | None) β Optional runtime configuration
- Returns:
An instantiated output parser
- Raises:
ValueError β If parser type is unknown or required config is missing
ImportError β If required packages are not installed
- Return type:
- invoke(input_data, runnable_config=None)[source]ΒΆ
Invoke the parser with the input data.
- Parameters:
input_data (TIn) β Input text, message, or dictionary
runnable_config (RunnableConfig | None) β Optional runtime configuration
- Returns:
Parsed output based on parser type
- Raises:
ValueError β If the input cannot be processed
Exception β If parsing fails and no error handler is provided
- Return type:
TOut
- engine_type: EngineTypeΒΆ
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}ΒΆ
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- parser_type: OutputParserTypeΒΆ
- class haive.core.engine.output_parser.OutputParserType(*values)[source]ΒΆ
Enumeration of supported output parser types.
- BOOLEAN = 'boolean'ΒΆ
- COMBINING = 'combining'ΒΆ
- COMMA_SEPARATED_LIST = 'comma_separated_list'ΒΆ
- DATETIME = 'datetime'ΒΆ
- ENUM = 'enum'ΒΆ
- JSON = 'json'ΒΆ
- LIST = 'list'ΒΆ
- MARKDOWN_LIST = 'markdown_list'ΒΆ
- NUMBERED_LIST = 'numbered_list'ΒΆ
- OPENAI_TOOLS = 'openai_tools'ΒΆ
- OPENAI_TOOLS_KEY = 'openai_tools_key'ΒΆ
- PANDAS_DATAFRAME = 'pandas_dataframe'ΒΆ
- PYDANTIC = 'pydantic'ΒΆ
- PYDANTIC_TOOLS = 'pydantic_tools'ΒΆ
- REGEX = 'regex'ΒΆ
- REGEX_DICT = 'regex_dict'ΒΆ
- SIMPLE_JSON = 'simple_json'ΒΆ
- STRING = 'string'ΒΆ
- STRUCTURED = 'structured'ΒΆ
- XML = 'xml'ΒΆ
- YAML = 'yaml'ΒΆ
Structured Output Patterns:
from haive.core.engine.output_parser import create_structured_parser
from pydantic import BaseModel, Field
from typing import List, Optional
# Complex nested structure
class Address(BaseModel):
street: str
city: str
country: str
postal_code: Optional[str] = None
class Person(BaseModel):
name: str = Field(description="Full name")
age: int = Field(ge=0, le=150)
email: str = Field(pattern=r"[^@]+@[^@]+\.[^@]+")
address: Address
skills: List[str] = Field(min_items=1)
class TeamAnalysis(BaseModel):
team_name: str
members: List[Person]
average_age: float
top_skills: List[str] = Field(max_items=5)
recommendations: List[str]
# Create parser with validation
parser = create_structured_parser(
model=TeamAnalysis,
strict_mode=True, # Enforce all validations
repair_json=True, # Fix malformed JSON
max_retries=3
)
# Use with LLM
config = AugLLMConfig(
llm_config=llm_config,
structured_output_model=TeamAnalysis,
output_parser=parser
)
# Get validated, typed response
result: TeamAnalysis = config.create_runnable().invoke(
"Analyze our development team structure"
)
print(f"Average age: {result.average_age}")
print(f"Top skills: {', '.join(result.top_skills)}")
Streaming ArchitectureΒΆ
Real-Time Data Processing
Advanced Streaming Patterns:
from haive.core.engine.streaming import StreamingEngine, ChunkProcessor
class TokenAggregator(ChunkProcessor):
"""Aggregate tokens with buffering."""
def __init__(self, buffer_size: int = 10):
self.buffer = []
self.buffer_size = buffer_size
async def process_chunk(self, chunk):
self.buffer.append(chunk)
if len(self.buffer) >= self.buffer_size:
# Emit aggregated chunk
result = "".join(self.buffer)
self.buffer = []
return result
return None # Buffer not full
# Create streaming pipeline
streaming_engine = StreamingEngine(
engine=llm_engine,
processors=[
TokenAggregator(buffer_size=5),
SentimentAnalyzer(), # Analyze sentiment per chunk
KeywordExtractor() # Extract keywords in real-time
]
)
# Stream with processing
async for processed_chunk in streaming_engine.stream(prompt):
print(f"Chunk: {processed_chunk.text}")
print(f"Sentiment: {processed_chunk.sentiment}")
print(f"Keywords: {processed_chunk.keywords}")
Performance & OptimizationΒΆ
Engine Performance MetricsΒΆ
Blazing Fast Execution
Initialization: < 10ms engine startup time
First Token: < 100ms time to first token (streaming)
Throughput: 10,000+ tokens/second (local models)
Concurrency: 1,000+ parallel requests
Memory: < 100MB base memory footprint
Optimization Strategies:
from haive.core.engine.optimization import EngineOptimizer
# Create optimized engine
optimizer = EngineOptimizer()
optimized_engine = optimizer.optimize(
base_engine,
strategies=[
"connection_pooling", # Reuse connections
"response_caching", # Cache common queries
"batch_processing", # Batch similar requests
"token_budgeting", # Manage token usage
"adaptive_timeout" # Dynamic timeout adjustment
]
)
# Monitor performance
metrics = optimized_engine.get_metrics()
print(f"Average latency: {metrics.avg_latency}ms")
print(f"Cache hit rate: {metrics.cache_hit_rate}%")
print(f"Token efficiency: {metrics.tokens_per_dollar}")
Cost OptimizationΒΆ
Intelligent Cost Management:
from haive.core.engine.cost import CostOptimizer, BudgetManager
# Set up budget controls
budget_manager = BudgetManager(
daily_limit=100.0, # $100/day
alert_thresholds=[0.5, 0.8, 0.95],
notification_handlers=[email_handler, slack_handler]
)
# Cost-aware routing
cost_optimizer = CostOptimizer(
providers={
"gpt-4": {"cost_per_1k_tokens": 0.03, "quality": 0.95},
"claude-3": {"cost_per_1k_tokens": 0.02, "quality": 0.93},
"llama3": {"cost_per_1k_tokens": 0.0, "quality": 0.85}
}
)
# Route based on query complexity
engine = cost_optimizer.select_engine(
query=user_query,
quality_threshold=0.9, # Minimum quality required
optimize_for="cost" # or "quality" or "balanced"
)
Advanced PatternsΒΆ
Retrieval-Augmented GenerationΒΆ
Advanced RAG Implementation:
from haive.core.engine.retriever import RetrieverEngine
from haive.core.engine.patterns import RAGPattern
# Configure RAG system
rag = RAGPattern(
retriever=RetrieverEngine(
vector_store=pinecone_store,
embedding_model="text-embedding-3-large",
top_k=5,
similarity_threshold=0.8
),
generator=AugLLMConfig(
llm_config=gpt4_config,
system_message="Use the provided context to answer questions accurately."
),
reranker=CohereReranker(model="rerank-english-v2.0"),
citation_mode="inline" # Add citations to response
)
# Execute RAG query
result = await rag.query(
"What are the latest developments in quantum computing?",
filters={"date": {"$gte": "2024-01-01"}},
include_sources=True
)
print(result.answer)
print(f"Sources: {result.sources}")
print(f"Confidence: {result.confidence}")
Engine CompositionΒΆ
Building Complex Engines:
from haive.core.engine.composition import EngineComposer, Pipeline
# Compose multi-stage engine
composer = EngineComposer()
# Stage 1: Information extraction
extractor = composer.create_stage(
name="extractor",
engine=AugLLMConfig(
llm_config=fast_model,
structured_output_model=ExtractedInfo
)
)
# Stage 2: Enrichment with external data
enricher = composer.create_stage(
name="enricher",
engine=ToolEngine(tools=[api_lookup, database_query]),
depends_on=["extractor"]
)
# Stage 3: Analysis and synthesis
analyzer = composer.create_stage(
name="analyzer",
engine=AugLLMConfig(
llm_config=powerful_model,
system_message="Perform deep analysis on enriched data"
),
depends_on=["enricher"]
)
# Build pipeline
pipeline = composer.build_pipeline(
stages=[extractor, enricher, analyzer],
execution_mode="sequential",
state_passing="incremental" # Each stage adds to state
)
# Execute pipeline
final_result = await pipeline.execute(initial_input)
Enterprise FeaturesΒΆ
Production-Ready Capabilities
High Availability: Multi-region deployment with automatic failover
Security: End-to-end encryption, API key rotation, audit logging
Compliance: GDPR, HIPAA, SOC2 compliant configurations
Monitoring: OpenTelemetry integration, custom metrics, alerting
Governance: Model access control, usage policies, cost allocation
See AlsoΒΆ
Schema System - Dynamic state management for engines
graph_workflows - Orchestrate engines in workflows
tool_integration - Deep dive into tool system
Examples - Real-world engine patterns