Engine ArchitectureΒΆ

The Engine System is the beating heart of Haive Core - a revolutionary universal interface for AI components that provides seamless multi-provider orchestration, structured output handling, tool execution, and streaming capabilities through a unified, extensible architecture that adapts to any AI workload.

πŸš€ Beyond Simple LLM WrappersΒΆ

Transform Your AI Integration from Fragmented to Unified:

Universal Engine Interface

Single API for LLMs, retrievers, embedders, and custom engines with automatic provider detection and configuration

Multi-Provider Orchestration

Seamless switching between 20+ providers with automatic failover, load balancing, and cost optimization

Structured Output Revolution

Type-safe responses with Pydantic v2, automatic validation, and intelligent parsing for reliable AI outputs

Advanced Tool Integration

Parallel tool execution, automatic discovery, validation framework, and error recovery for robust workflows

Streaming & Async First

Built for real-time applications with native streaming, async execution, and backpressure handling

Core Engine ComponentsΒΆ

AugLLM Configuration SystemΒΆ

class haive.core.engine.aug_llm.AugLLMConfig(*, id=<factory>, name=<factory>, engine_type=EngineType.LLM, description=None, input_schema=None, output_schema=None, version='1.0.0', metadata=<factory>, structured_output_model=None, structured_output_version=None, include_format_instructions=True, output_parser=None, parser_type=None, partial_variables=<factory>, tools=<factory>, pydantic_tools=<factory>, force_tool_use=False, force_tool_choice=None, tool_choice_mode='auto', bind_tools_kwargs=<factory>, tool_routes=<factory>, tool_metadata=<factory>, tools_dict=<factory>, routed_tools=<factory>, before_tool_validator=None, tool_instances=<factory>, llm_config=<factory>, prompt_template=<factory>, system_message=None, messages_placeholder_name='messages', add_messages_placeholder=True, force_messages_optional=True, examples=None, example_prompt=None, prefix=None, suffix=None, example_separator='\n\n', input_variables=None, schemas=<factory>, use_tool_for_format_instructions=False, tool_is_base_model=False, parse_raw_output=False, output_field_name=None, output_key=None, tool_kwargs=<factory>, bind_tools_config=<factory>, preprocess=None, postprocess=None, temperature=None, max_tokens=None, runtime_options=<factory>, custom_runnables=None, optional_variables=<factory>, uses_messages_field=None)[source]ΒΆ

Bases: ToolRouteMixin, StructuredOutputMixin, InvokableEngine[Union[str, dict[str, Any], list[BaseMessage]], Union[BaseMessage, dict[str, Any]]]

Configuration for creating enhanced LLM chains with flexible message handling.

AugLLMConfig provides a structured way to configure and create LLM chains with prompts, tools, output parsers, and structured output models with comprehensive validation and automatic updates. It serves as the central configuration class for language model interactions in the Haive framework.

This class integrates several key functionalities: 1. Prompt template management with support for few-shot learning 2. Tool integration and discovery with automatic routing 3. Structured output handling (both parser-based and tool-based approaches) 4. Message handling for chat-based LLMs 5. Pre/post processing hooks for customization

The configuration system is designed to be highly flexible while enforcing consistent patterns and proper validation, making it easier to create reliable language model interactions.

Parameters:
engine_typeΒΆ

The type of engine (always LLM).

Type:

EngineType

llm_configΒΆ

Configuration for the LLM provider.

Type:

LLMConfig

prompt_templateΒΆ

Template for structuring prompts.

Type:

Optional[BasePromptTemplate]

system_messageΒΆ

System message for chat models.

Type:

Optional[str]

toolsΒΆ

Tools that can be bound to the LLM.

Type:

Sequence[Union[Type[BaseTool], Type[BaseModel], Callable, StructuredTool, BaseModel]]

structured_output_modelΒΆ

Pydantic model for structured outputs.

Type:

Optional[Type[BaseModel]]

structured_output_versionΒΆ

Version of structured output handling (v1: parser-based, v2: tool-based).

Type:

Optional[StructuredOutputVersion]

temperatureΒΆ

Temperature parameter for the LLM.

Type:

Optional[float]

max_tokensΒΆ

Maximum number of tokens to generate.

Type:

Optional[int]

preprocessΒΆ

Function to preprocess input before sending to LLM.

Type:

Optional[Callable]

postprocessΒΆ

Function to postprocess output from LLM.

Type:

Optional[Callable]

Examples

>>> from haive.core.engine.aug_llm.config import AugLLMConfig
>>> from haive.core.models.llm.base import AzureLLMConfig
>>> from pydantic import BaseModel, Field
>>>
>>> # Define a structured output model
>>> class MovieReview(BaseModel):
...     title: str = Field(description="Title of the movie")
...     rating: int = Field(description="Rating from 1-10")
...     review: str = Field(description="Detailed review of the movie")
>>>
>>> # Create a basic configuration
>>> config = AugLLMConfig(
...     name="movie_reviewer",
...     llm_config=AzureLLMConfig(model="gpt-4"),
...     system_message="You are a professional movie critic.",
...     structured_output_model=MovieReview,
...     temperature=0.7
... )
>>>
>>> # Create a runnable from the configuration
>>> reviewer = config.create_runnable()
>>>
>>> # Use the runnable
>>> result = reviewer.invoke("Review the movie 'Inception'")
classmethod default_schemas_to_tools(data)[source]ΒΆ

Default schemas to tools if schemas isn’t provided but tools has values.

Parameters:

data (dict[str, Any])

classmethod ensure_structured_output_as_tool(data)[source]ΒΆ

Ensure structured output model is properly configured for both v1 and v2.

Parameters:

data (dict[str, Any])

classmethod from_few_shot(examples, example_prompt, prefix, suffix, input_variables, llm_config=None, **kwargs)[source]ΒΆ

Create with few-shot examples.

Parameters:
classmethod from_few_shot_chat(examples, example_prompt, system_message=None, llm_config=None, **kwargs)[source]ΒΆ

Create with few-shot examples for chat templates.

Parameters:
classmethod from_format_instructions(model, system_message=None, llm_config=None, as_tool=False, var_name='format_instructions', **kwargs)[source]ΒΆ

Create config with format instructions but without structured output.

Parameters:
classmethod from_llm_config(llm_config, **kwargs)[source]ΒΆ

Create from an existing LLMConfig.

Parameters:

llm_config (LLMConfig)

classmethod from_prompt(prompt, llm_config=None, **kwargs)[source]ΒΆ

Create from a prompt template.

Parameters:
  • prompt (BasePromptTemplate)

  • llm_config (LLMConfig | None)

classmethod from_pydantic_tools(tool_models, system_message=None, llm_config=None, include_instructions=True, force_tool_use=False, **kwargs)[source]ΒΆ

Create with Pydantic tool models.

Parameters:
  • tool_models (list[type[BaseModel]])

  • system_message (str | None)

  • llm_config (LLMConfig | None)

  • include_instructions (bool)

  • force_tool_use (bool)

classmethod from_structured_output_v1(model, system_message=None, llm_config=None, include_instructions=True, **kwargs)[source]ΒΆ

Create with v1 structured output using traditional parsing.

Parameters:
  • model (type[BaseModel])

  • system_message (str | None)

  • llm_config (LLMConfig | None)

  • include_instructions (bool)

classmethod from_structured_output_v2(model, system_message=None, llm_config=None, include_instructions=False, output_field_name=None, **kwargs)[source]ΒΆ

Create with v2 structured output using the tool-based approach.

Parameters:
  • model (type[BaseModel])

  • system_message (str | None)

  • llm_config (LLMConfig | None)

  • include_instructions (bool)

  • output_field_name (str | None)

classmethod from_system_and_few_shot(system_message, examples, example_prompt, prefix, suffix, input_variables, llm_config=None, **kwargs)[source]ΒΆ

Create with system message and few-shot examples.

Parameters:
classmethod from_system_prompt(system_prompt, llm_config=None, **kwargs)[source]ΒΆ

Create from a system prompt string.

Parameters:
classmethod from_tools(tools, system_message=None, llm_config=None, use_tool_for_format_instructions=None, force_tool_use=False, **kwargs)[source]ΒΆ

Create with specified tools.

Parameters:
  • tools (list[BaseTool | type[BaseTool] | str | type[BaseModel]])

  • system_message (str | None)

  • llm_config (LLMConfig | None)

  • use_tool_for_format_instructions (bool | None)

  • force_tool_use (bool)

classmethod set_default_structured_output_version(data)[source]ΒΆ

Set default structured output version to v2 (tools) when model is provided but version is not.

Parameters:

data (dict[str, Any])

classmethod validate_prompt_template(v)[source]ΒΆ

Validate and reconstruct prompt template from dict data.

Return type:

Any

classmethod validate_schemas(v)[source]ΒΆ

Validate and auto-name schemas.

Return type:

Any

classmethod validate_structured_output_model(v)[source]ΒΆ

Validate structured output model and default to tools-based validation.

Return type:

Any

classmethod validate_tools(v)[source]ΒΆ

Validate and auto-name tools.

Return type:

Any

add_format_instructions(model=None, as_tools=False, var_name='format_instructions')[source]ΒΆ

Add format instructions to partial_variables without changing structured output configuration.

Parameters:
  • model (type[BaseModel] | None)

  • as_tools (bool)

  • var_name (str)

Return type:

AugLLMConfig

add_human_message(content)[source]ΒΆ

Add a human message to the prompt template.

Parameters:

content (str)

Return type:

AugLLMConfig

add_optional_variable(var_name)[source]ΒΆ

Add an optional variable to the prompt template.

Parameters:

var_name (str)

Return type:

AugLLMConfig

add_prompt_template(name, template)[source]ΒΆ

Add a named prompt template for easy switching.

Parameters:
  • name (str) – Unique name for the template

  • template (BasePromptTemplate) – The prompt template to store

  • prompt_template (langchain_core.prompts.BasePromptTemplate)

Return type:

None

add_system_message(content)[source]ΒΆ

Add or update system message in the prompt template.

Parameters:

content (str)

Return type:

AugLLMConfig

add_tool(tool)[source]ΒΆ

Add a tool to the configuration.

Parameters:
  • tool (Any) – Tool to add (LangChain tool, Pydantic model, or callable)

  • name (str | None)

  • route (str | None)

Returns:

Self for method chaining

Return type:

AugLLMConfig

add_tool_with_route(tool, route, name=None, metadata=None)[source]ΒΆ

Add a tool with explicit route and metadata.

Parameters:
Return type:

AugLLMConfig

apply_runnable_config(runnable_config=None)[source]ΒΆ

Extract parameters from runnable_config relevant to this engine.

Parameters:

runnable_config (RunnableConfig | None)

Return type:

dict[str, Any]

clear_tools()[source]ΒΆ

Clear all tools.

Returns:

Self for method chaining

Return type:

AugLLMConfig

comprehensive_validation_and_setup()[source]ΒΆ

Comprehensive validation and setup after initialization.

Return type:

Self

create_runnable(runnable_config=None)[source]ΒΆ

Create a runnable LLM chain based on this configuration.

Parameters:

runnable_config (RunnableConfig | None)

Return type:

Runnable

create_tool_from_config(config, name=None, route=None, **kwargs)[source]ΒΆ

Create a tool from another config object.

Parameters:
  • config (Any) – Configuration object that has a to_tool method

  • name (str | None) – Tool name

  • route (str | None) – Tool route to set

  • **kwargs – Additional kwargs for tool creation

Returns:

Created tool

Return type:

Any

debug_tool_configuration()[source]ΒΆ

Print detailed debug information about tool configuration.

Return type:

AugLLMConfig

get_active_template()[source]ΒΆ

Get the name of the currently active template.

Return type:

str | None

get_format_instructions(model=None, as_tools=False)[source]ΒΆ

Get format instructions for a model without changing the config.

Parameters:
  • model (type[BaseModel] | None)

  • as_tools (bool)

Return type:

str

get_input_fields()[source]ΒΆ

Get schema fields for input.

Return type:

dict[str, tuple[type, Any]]

get_output_fields()[source]ΒΆ

Get schema fields for output.

Return type:

dict[str, tuple[type, Any]]

instantiate_llm()[source]ΒΆ

Instantiate the LLM based on the configuration.

Return type:

Any

list_prompt_templates()[source]ΒΆ

List available template names.

Return type:

list[str]

model_post_init(_AugLLMConfig__context)[source]ΒΆ

Proper Pydantic post-initialization.

Return type:

None

remove_message(index)[source]ΒΆ

Remove a message from the prompt template.

Parameters:

index (int)

Return type:

AugLLMConfig

remove_prompt_template(name=None)[source]ΒΆ

Remove a template or disable the active one.

Parameters:

name (str | None) – Template name to remove. If None, disables active template.

Returns:

Self for method chaining

Return type:

AugLLMConfig

remove_tool(tool)[source]ΒΆ

Remove a tool from the configuration.

Parameters:

tool (Any) – Tool instance to remove

Returns:

Self for method chaining

Return type:

AugLLMConfig

replace_message(index, message)[source]ΒΆ

Replace a message in the prompt template.

Parameters:
  • index (int)

  • message (str | BaseMessage)

Return type:

AugLLMConfig

use_prompt_template(name)[source]ΒΆ

Switch to using a specific named template.

Parameters:

name (str) – Name of the template to activate

Returns:

Self for method chaining

Raises:

ValueError – If template name not found

Return type:

AugLLMConfig

with_format_instructions(model, as_tool=False, var_name='format_instructions')[source]ΒΆ

Add format instructions without setting up structured output or parser.

Parameters:
  • model (type[BaseModel])

  • as_tool (bool)

  • var_name (str)

Return type:

AugLLMConfig

with_pydantic_tools(tool_models, include_instructions=True, force_use=False)[source]ΒΆ

Configure with Pydantic tools output parsing.

Parameters:
  • tool_models (list[type[BaseModel]])

  • include_instructions (bool)

  • force_use (bool)

Return type:

AugLLMConfig

with_structured_output(model, include_instructions=True, version='v2')[source]ΒΆ

Configure with Pydantic structured output.

Parameters:
  • model (type[BaseModel])

  • include_instructions (bool)

  • version (str)

Return type:

AugLLMConfig

with_tools(tools, force_use=False, specific_tool=None)[source]ΒΆ

Configure with specified tools.

Parameters:
Return type:

AugLLMConfig

add_messages_placeholder: boolΒΆ
bind_tools_config: dict[str, Any]ΒΆ
bind_tools_kwargs: dict[str, Any]ΒΆ
custom_runnables: list[Runnable] | NoneΒΆ
engine_type: EngineTypeΒΆ
example_prompt: PromptTemplate | NoneΒΆ
example_separator: strΒΆ
examples: list[dict[str, Any]] | NoneΒΆ
force_messages_optional: boolΒΆ
force_tool_choice: bool | str | list[str] | NoneΒΆ
force_tool_use: boolΒΆ
include_format_instructions: boolΒΆ
input_variables: list[str] | NoneΒΆ
llm_config: LLMConfigΒΆ
max_tokens: int | NoneΒΆ
messages_placeholder_name: strΒΆ
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'validate_assignment': True}ΒΆ

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

optional_variables: list[str]ΒΆ
output_field_name: str | NoneΒΆ
output_key: str | NoneΒΆ
output_parser: Any | NoneΒΆ
parse_raw_output: boolΒΆ
parser_type: ParserType | NoneΒΆ
partial_variables: dict[str, Any]ΒΆ
postprocess: Callable[[Any], Any] | NoneΒΆ
prefix: str | NoneΒΆ
preprocess: Callable[[Any], Any] | NoneΒΆ
prompt_template: BasePromptTemplate | NoneΒΆ
pydantic_tools: list[type[BaseModel]]ΒΆ
runtime_options: dict[str, Any]ΒΆ
schemas: Sequence[type[BaseTool] | type[BaseModel] | Callable | StructuredTool | BaseModel]ΒΆ
structured_output_model: type[BaseModel] | NoneΒΆ
structured_output_version: StructuredOutputVersion | NoneΒΆ
suffix: str | NoneΒΆ
system_message: str | NoneΒΆ
temperature: float | NoneΒΆ
tool_choice_mode: ToolChoiceModeΒΆ
tool_is_base_model: boolΒΆ
tool_kwargs: dict[str, dict[str, Any]]ΒΆ
use_tool_for_format_instructions: boolΒΆ
uses_messages_field: bool | NoneΒΆ

The Ultimate LLM Configuration Framework

AugLLMConfig provides a comprehensive configuration system that goes far beyond simple model wrappers, offering enterprise-grade features for production AI systems.

Configuration Power:

from haive.core.engine.aug_llm import AugLLMConfig
from haive.core.engine.providers import AzureLLMConfig, AnthropicConfig
from pydantic import BaseModel

# Define structured output
class AnalysisResult(BaseModel):
    sentiment: str
    confidence: float
    key_topics: List[str]
    recommendations: List[str]

# Create advanced configuration
config = AugLLMConfig(
    # Provider configuration
    llm_config=AzureLLMConfig(
        model="gpt-4",
        temperature=0.7,
        max_tokens=2000,
        api_version="2024-02-01"
    ),

    # System setup
    system_message="You are an expert data analyst.",

    # Tool integration
    tools=[web_search, calculator, database_query],
    force_tool_choice=["web_search"],  # Require web search

    # Structured output
    structured_output_model=AnalysisResult,
    structured_output_version="v2",  # Use latest handling

    # Advanced features
    enable_streaming=True,
    retry_strategy="exponential",
    timeout=30,

    # Callbacks for monitoring
    callbacks=[token_counter, latency_tracker]
)

# Create runnable chain
analyzer = config.create_runnable()

# Execute with streaming
async for chunk in analyzer.astream({"query": "Analyze market trends"}):
    print(chunk)  # Real-time streaming output

Multi-Shot Learning Configuration:

# Configure with examples
config = AugLLMConfig(
    llm_config=llm_config,

    # Few-shot examples
    examples=[
        {"input": "The product is amazing!", "output": "positive"},
        {"input": "Terrible experience", "output": "negative"},
        {"input": "It's okay, nothing special", "output": "neutral"}
    ],

    # Example formatting
    example_prompt=PromptTemplate(
        template="Input: {input}\nOutput: {output}"
    ),
    prefix="Classify the sentiment of the following texts:",
    suffix="Input: {text}\nOutput:",

    # This creates an intelligent few-shot classifier
)

Engine Factory SystemΒΆ

Dynamic Engine Creation and Management

The factory system enables runtime engine creation with automatic configuration discovery and validation.

Factory Patterns:

from haive.core.engine import EngineFactory, EngineRegistry

# Register custom engine
@EngineRegistry.register("custom_llm")
class CustomLLMEngine(InvokableEngine):
    """Custom LLM implementation."""

    def invoke(self, input_data):
        # Custom logic
        return process_with_custom_model(input_data)

# Create engines dynamically
factory = EngineFactory()

# Auto-detect best provider
engine = factory.create_engine(
    task_type="text-generation",
    requirements={
        "max_tokens": 4000,
        "supports_tools": True,
        "cost_limit": 0.10  # Per request
    }
)

# Create with specific config
azure_engine = factory.create_engine(
    engine_type="llm",
    provider="azure",
    config={"model": "gpt-4", "temperature": 0.5}
)

Multi-Provider SupportΒΆ

20+ Provider Integrations

OpenAI
  • GPT-4, GPT-3.5

  • DALL-E 3

  • Whisper

  • Embeddings

Anthropic
  • Claude 3 Opus/Sonnet

  • Claude 2.1

  • Claude Instant

  • Constitutional AI

Google
  • Gemini Pro/Ultra

  • PaLM 2

  • Vertex AI

  • Custom models

Azure
  • Azure OpenAI

  • Cognitive Services

  • Custom deployments

  • Private endpoints

AWS
  • Bedrock

  • SageMaker

  • Comprehend

  • Textract

Open Source
  • Hugging Face

  • Ollama

  • LlamaCpp

  • vLLM

Provider Orchestration Example:

from haive.core.engine.providers import MultiProviderOrchestrator

# Configure provider pool
orchestrator = MultiProviderOrchestrator([
    # Primary provider
    AzureLLMConfig(
        model="gpt-4",
        priority=1,
        rate_limit=100,  # Requests per minute
        cost_per_token=0.00003
    ),

    # Fallback for high load
    AnthropicConfig(
        model="claude-3-sonnet",
        priority=2,
        rate_limit=50,
        cost_per_token=0.00002
    ),

    # Budget option
    OllamaConfig(
        model="llama3-70b",
        priority=3,
        rate_limit=None,  # Unlimited local
        cost_per_token=0.0
    )
])

# Intelligent routing
response = await orchestrator.execute(
    prompt="Complex analysis task",
    routing_strategy="cost_optimized",  # or "latency_optimized"
    fallback_on_error=True,
    max_retries=3
)

Tool Integration FrameworkΒΆ

Advanced Tool Execution System

πŸ› οΈ Tool Engine - Universal AI Tool Integration System

THE ULTIMATE TOOLKIT FOR INTELLIGENT AGENTS

Welcome to the Tool Engine - the revolutionary system that transforms any function, API, or service into an intelligent agent capability. This isn’t just function calling; it’s a comprehensive tool ecosystem that makes AI agents capable of interacting with the entire digital world.

πŸš€ TOOL REVOLUTIONΒΆ

The Tool Engine represents a paradigm shift in how AI agents interact with external systems. Every function becomes an intelligent capability that agents can:

πŸ”Œ Universal Integration: Connect any API, database, service, or function instantly 🧠 Intelligent Discovery: Automatically analyze and categorize tool capabilities ⚑ Dynamic Execution: Real-time tool selection and parallel execution πŸ›‘οΈ Type Safety: Full validation and error handling for all tool interactions πŸ“Š Advanced Analytics: Comprehensive tool usage metrics and optimization

🌟 CORE INNOVATIONS¢

1. Universal Tool Abstraction 🎯

Every tool follows the same interface, regardless of complexity:

Examples

>>> # Simple function becomes a tool
>>> @tool
>>> def calculator(expression: str) -> float:
>>> """Calculate mathematical expressions."""
>>> return eval(expression)
>>>
>>> # Complex API becomes a tool
>>> @tool
>>> def web_search(query: str, max_results: int = 5) -> List[SearchResult]:
>>> """Search the web for information."""
>>> return search_api.query(query, limit=max_results)
>>>
>>> # All tools work the same way in agents
>>> agent = AugLLMConfig(
>>> model="gpt-4",
>>> tools=[calculator, web_search, database_query, email_sender]
>>> )
2. Intelligent Tool Analysis πŸ”

Automatic discovery and categorization of tool capabilities:

>>> from haive.core.engine.tool import ToolAnalyzer
>>>
>>> analyzer = ToolAnalyzer()
>>>
>>> # Analyze any tool automatically
>>> analysis = analyzer.analyze_tool(my_complex_function)
>>> print(f"Category: {analysis.category}")           # e.g., "data_processing"
>>> print(f"Capabilities: {analysis.capabilities}")   # e.g., ["search", "filter"]
>>> print(f"Complexity: {analysis.complexity}")       # e.g., "medium"
>>> print(f"Risk Level: {analysis.risk_level}")       # e.g., "low"
>>>
>>> # Automatic tool optimization suggestions
>>> suggestions = analyzer.get_optimization_suggestions(my_function)
>>> for suggestion in suggestions:
>>> print(f"πŸ’‘ {suggestion.description}")
3. Advanced Tool Types 🎨

Rich type system for different tool behaviors:

>>> from haive.core.engine.tool import (
>>> ToolType, ToolCategory, ToolCapability,
>>> StateAwareTool, InterruptibleTool
>>> )
>>>
>>> # State-aware tools that remember context
>>> class DatabaseTool(StateAwareTool):
>>> def __init__(self):
>>> self.connection_pool = {}
>>> self.transaction_state = {}
>>>
>>> def execute(self, query: str, context: ToolContext) -> QueryResult:
>>> # Use context for connection management
>>> conn = self.get_connection(context.session_id)
>>> return conn.execute(query)
>>>
>>> # Interruptible tools for long-running operations
>>> class DataProcessingTool(InterruptibleTool):
>>> async def execute(self, data: LargeDataset) -> ProcessedData:
>>> for chunk in data.chunks():
>>> if self.should_interrupt():
>>> return PartialResult(processed_so_far)
>>> await self.process_chunk(chunk)
4. Tool Composition & Orchestration 🧩

Build complex capabilities from simple tools:

>>> from haive.core.engine.tool import ToolEngine, compose_tools
>>>
>>> # Compose tools into workflows
>>> research_workflow = compose_tools([
>>> web_search_tool,
>>> content_extractor,
>>> fact_checker,
>>> summarizer
>>> ])
>>>
>>> # Parallel tool execution
>>> analysis_suite = ToolEngine.parallel_tools([
>>> sentiment_analyzer,
>>> entity_extractor,
>>> keyword_extractor,
>>> topic_classifier
>>> ])
>>>
>>> # Conditional tool chains
>>> smart_processor = ToolEngine.conditional_chain({
>>> "text_input": text_processing_tools,
>>> "image_input": image_processing_tools,
>>> "audio_input": audio_processing_tools
>>> })

🎯 ADVANCED FEATURES¢

Real-time Tool Discovery πŸ”

>>> # Automatically discover tools from modules
>>> from haive.core.engine.tool import discover_tools
>>>
>>> # Scan entire modules for tools
>>> discovered = discover_tools([
>>> "myproject.api_tools",
>>> "myproject.data_tools",
>>> "myproject.ml_tools"
>>> ])
>>>
>>> print(f"Found {len(discovered)} tools:")
>>> for tool in discovered:
>>> print(f"  πŸ› οΈ {tool.name}: {tool.description}")
>>> print(f"     Categories: {', '.join(tool.categories)}")
>>> print(f"     Capabilities: {', '.join(tool.capabilities)}")

Tool Validation & Testing βœ…

>>> # Comprehensive tool validation
>>> validator = ToolValidator()
>>>
>>> validation_result = validator.validate_tool(my_tool)
>>> if validation_result.is_valid:
>>> print("βœ… Tool is valid and ready to use")
>>> else:
>>> print("❌ Tool validation failed:")
>>> for error in validation_result.errors:
>>> print(f"  - {error.message}")
>>>
>>> # Automated tool testing
>>> test_suite = ToolTestSuite()
>>> test_suite.add_tool(my_tool)
>>>
>>> # Generate test cases automatically
>>> test_cases = test_suite.generate_test_cases(my_tool)
>>> results = test_suite.run_tests()
>>>
>>> for result in results:
>>> if result.passed:
>>> print(f"βœ… {result.test_name}")
>>> else:
>>> print(f"❌ {result.test_name}: {result.error}")

Tool Performance Optimization πŸš€

>>> # Performance monitoring and optimization
>>> optimizer = ToolOptimizer()
>>>
>>> # Analyze tool performance
>>> performance = optimizer.analyze_performance(my_tool)
>>> print(f"Average latency: {performance.avg_latency_ms}ms")
>>> print(f"Success rate: {performance.success_rate:.2%}")
>>> print(f"Memory usage: {performance.memory_usage_mb}MB")
>>>
>>> # Apply optimizations
>>> optimized_tool = optimizer.optimize(
>>> my_tool,
>>> strategies=[
>>> "caching",           # Cache frequent results
>>> "connection_pooling", # Reuse connections
>>> "batching",          # Batch similar requests
>>> "async_execution"    # Use async when possible
>>> ]
>>> )
>>>
>>> # Performance improvement
>>> improvement = optimizer.measure_improvement(my_tool, optimized_tool)
>>> print(f"Performance improved by {improvement.latency_improvement:.1f}x")

Tool Security & Sandboxing πŸ”’

>>> # Secure tool execution
>>> from haive.core.engine.tool import SecureToolEngine, SandboxConfig
>>>
>>> # Configure security sandbox
>>> sandbox_config = SandboxConfig(
>>> max_execution_time=30.0,
>>> max_memory_usage="500MB",
>>> allowed_network_hosts=["api.example.com"],
>>> blocked_filesystem_paths=["/etc", "/usr"],
>>> enable_audit_logging=True
>>> )
>>>
>>> # Create secure tool engine
>>> secure_engine = SecureToolEngine(
>>> tools=[potentially_unsafe_tool],
>>> sandbox_config=sandbox_config
>>> )
>>>
>>> # All tool execution is sandboxed
>>> result = secure_engine.execute(
>>> tool_name="data_processor",
>>> input_data=untrusted_data,
>>> context=execution_context
>>> )
>>>
>>> # Review security audit log
>>> audit_log = secure_engine.get_audit_log()
>>> for entry in audit_log:
>>> print(f"{entry.timestamp}: {entry.action} - {entry.result}")

πŸ—οΈ TOOL CATEGORIES & CAPABILITIESΒΆ

Built-in Tool Categories πŸ“š

>>> from haive.core.engine.tool import ToolCategory
>>>
>>> categories = [
>>> ToolCategory.DATA_PROCESSING,    # Data manipulation and analysis
>>> ToolCategory.WEB_INTERACTION,    # Web scraping, API calls
>>> ToolCategory.FILE_OPERATIONS,    # File I/O, format conversion
>>> ToolCategory.COMMUNICATION,      # Email, messaging, notifications
>>> ToolCategory.COMPUTATION,        # Mathematical calculations
>>> ToolCategory.SEARCH,             # Information retrieval
>>> ToolCategory.VISUALIZATION,      # Charts, graphs, reports
>>> ToolCategory.AI_MODELS,          # ML model inference
>>> ToolCategory.DATABASE,           # Database operations
>>> ToolCategory.SYSTEM_ADMIN,       # System management tasks
>>> ]

Tool Capabilities 🎨

>>> from haive.core.engine.tool import ToolCapability
>>>
>>> capabilities = [
>>> ToolCapability.READ,             # Read data/information
>>> ToolCapability.WRITE,            # Write/modify data
>>> ToolCapability.SEARCH,           # Search/query capabilities
>>> ToolCapability.TRANSFORM,        # Data transformation
>>> ToolCapability.ANALYZE,          # Analysis and insights
>>> ToolCapability.GENERATE,         # Content generation
>>> ToolCapability.VALIDATE,         # Data validation
>>> ToolCapability.MONITOR,          # Monitoring and alerting
>>> ToolCapability.SCHEDULE,         # Task scheduling
>>> ToolCapability.INTEGRATE,        # System integration
>>> ]

🎨 TOOL DEVELOPMENT PATTERNS¢

Simple Function Tool πŸ“

>>> from haive.core.engine.tool import tool
>>>
>>> @tool
>>> def word_count(text: str) -> int:
>>> """Count words in text."""
>>> return len(text.split())
>>>
>>> # Automatically gets proper metadata
>>> assert word_count.category == ToolCategory.DATA_PROCESSING
>>> assert ToolCapability.ANALYZE in word_count.capabilities

Stateful Tool Class πŸͺ

>>> from haive.core.engine.tool import StatefulTool
>>>
>>> class DatabaseTool(StatefulTool):
>>> """Database interaction tool with connection management."""
>>>
>>> def __init__(self, connection_string: str):
>>> super().__init__()
>>> self.connection_string = connection_string
>>> self._connection = None
>>>
>>> def connect(self):
>>> """Establish database connection."""
>>> if not self._connection:
>>> self._connection = create_connection(self.connection_string)
>>>
>>> @tool_method
>>> def query(self, sql: str) -> List[Dict]:
>>> """Execute SQL query."""
>>> self.connect()
>>> return self._connection.execute(sql).fetchall()
>>>
>>> @tool_method
>>> def insert(self, table: str, data: Dict) -> bool:
>>> """Insert data into table."""
>>> self.connect()
>>> return self._connection.insert(table, data)

Async Tool Implementation ⚑

>>> from haive.core.engine.tool import async_tool
>>>
>>> @async_tool
>>> async def fetch_url(url: str, timeout: float = 10.0) -> WebPage:
>>> """Fetch web page content asynchronously."""
>>> async with aiohttp.ClientSession() as session:
>>> async with session.get(url, timeout=timeout) as response:
>>> content = await response.text()
>>> return WebPage(
>>> url=url,
>>> content=content,
>>> status_code=response.status
>>> )
>>>
>>> # Use in async agents
>>> async def research_topic(topic: str):
>>> urls = await web_search(topic)
>>> pages = await asyncio.gather(*[
>>> fetch_url(url) for url in urls[:5]
>>> ])
>>> return analyze_pages(pages)

Tool with Complex Configuration βš™οΈ

>>> from haive.core.engine.tool import ConfigurableTool
>>> from pydantic import BaseModel, Field
>>>
>>> class EmailConfig(BaseModel):
>>> smtp_server: str = Field(..., description="SMTP server address")
>>> port: int = Field(587, description="SMTP port")
>>> username: str = Field(..., description="Email username")
>>> password: str = Field(..., description="Email password")
>>> use_tls: bool = Field(True, description="Use TLS encryption")
>>>
>>> class EmailTool(ConfigurableTool[EmailConfig]):
>>> """Email sending tool with full configuration."""
>>>
>>> config_class = EmailConfig
>>>
>>> def send_email(self, to: str, subject: str, body: str) -> bool:
>>> """Send email message."""
>>> with smtplib.SMTP(self.config.smtp_server, self.config.port) as server:
>>> if self.config.use_tls:
>>> server.starttls()
>>> server.login(self.config.username, self.config.password)
>>> server.send_message(self._create_message(to, subject, body))
>>> return True
>>>
>>> # Configure and use
>>> email_tool = EmailTool(config=EmailConfig(
>>> smtp_server="smtp.gmail.com",
>>> username="agent@example.com",
>>> password="app-password"
>>> ))

πŸ“Š TOOL ANALYTICS & INSIGHTSΒΆ

Usage Analytics πŸ“ˆ

>>> # Get comprehensive tool usage analytics
>>> analytics = ToolAnalytics()
>>>
>>> # Tool usage statistics
>>> usage_stats = analytics.get_usage_stats(time_range="last_7_days")
>>> print(f"Most used tool: {usage_stats.top_tool}")
>>> print(f"Total executions: {usage_stats.total_executions}")
>>> print(f"Average success rate: {usage_stats.avg_success_rate:.2%}")
>>>
>>> # Performance insights
>>> performance = analytics.get_performance_insights()
>>> for insight in performance.slow_tools:
>>> print(f"🐌 {insight.tool_name}: {insight.avg_latency}ms")
>>> print(f"   Suggestion: {insight.optimization_suggestion}")
>>>
>>> # Error analysis
>>> errors = analytics.get_error_analysis()
>>> for error_pattern in errors.common_patterns:
>>> print(f"❌ {error_pattern.error_type}: {error_pattern.frequency}")
>>> print(f"   Tools affected: {', '.join(error_pattern.affected_tools)}")

Tool Recommendation Engine 🎯

>>> # Intelligent tool recommendations
>>> recommender = ToolRecommendationEngine()
>>>
>>> # Get tool suggestions for specific tasks
>>> recommendations = recommender.recommend_tools(
>>> task_description="I need to analyze customer feedback data",
>>> context={
>>> "data_type": "text",
>>> "data_size": "large",
>>> "output_format": "dashboard"
>>> }
>>> )
>>>
>>> for rec in recommendations:
>>> print(f"πŸ› οΈ {rec.tool_name} (confidence: {rec.confidence:.2f})")
>>> print(f"   Why: {rec.reasoning}")
>>> print(f"   Usage: {rec.example_usage}")

πŸ”’ ENTERPRISE FEATURESΒΆ

  • Tool Governance: Approval workflows for new tools

  • Access Control: Role-based tool permissions

  • Compliance: Audit trails for all tool executions

  • Multi-tenancy: Isolated tool environments per tenant

  • Cost Management: Tool usage tracking and budgets

  • Quality Assurance: Automated tool testing and validation

πŸŽ“ BEST PRACTICESΒΆ

  1. Clear Documentation: Every tool needs comprehensive docstrings

  2. Type Safety: Use proper type hints for all parameters and returns

  3. Error Handling: Implement robust error handling and recovery

  4. Resource Management: Clean up connections and temporary files

  5. Security: Validate all inputs and sanitize outputs

  6. Performance: Optimize for common use cases and cache when appropriate

  7. Testing: Include unit tests and integration tests for all tools

πŸš€ GETTING STARTEDΒΆ

>>> from haive.core.engine.tool import tool, ToolEngine
>>>
>>> # 1. Create a simple tool
>>> @tool
>>> def greet(name: str) -> str:
>>> """Greet someone by name."""
>>> return f"Hello, {name}!"
>>>
>>> # 2. Use in an agent
>>> from haive.core.engine.aug_llm import AugLLMConfig
>>>
>>> agent = AugLLMConfig(
>>> model="gpt-4",
>>> tools=[greet]
>>> )
>>>
>>> # 3. The agent can now greet people!
>>> result = agent.invoke("Please greet Alice")
>>> # Agent will call greet("Alice") and respond: "Hello, Alice!"

β€”

Tool Engine: Where Functions Become Intelligent Capabilities πŸ› οΈ

class haive.core.engine.tool.InterruptibleTool(*args, **kwargs)[source]ΒΆ

Protocol for tools that support interruption.

__init__(*args, **kwargs)ΒΆ
interrupt()[source]ΒΆ

Interrupt tool execution.

Return type:

None

property is_interruptible: boolΒΆ

Check if tool can be interrupted.

Return type:

bool

class haive.core.engine.tool.StateAwareTool(*args, **kwargs)[source]ΒΆ

Protocol for tools that interact with state.

__init__(*args, **kwargs)ΒΆ
property reads_state: boolΒΆ

Check if tool reads from state.

Return type:

bool

property state_dependencies: list[str]ΒΆ

Get state keys this tool depends on.

Return type:

list[str]

property writes_state: boolΒΆ

Check if tool writes to state.

Return type:

bool

class haive.core.engine.tool.ToolAnalyzer[source]ΒΆ

Analyzes tools to determine their properties and capabilities.

This analyzer uses existing Haive utilities to detect tool capabilities, schemas, and other properties needed for routing and execution.

__init__()[source]ΒΆ

Initialize the tool analyzer.

analyze(tool, force=False)[source]ΒΆ

Perform comprehensive tool analysis.

Parameters:
  • tool (BaseTool | StructuredTool | type[BaseTool] | BaseModel | type[BaseModel] | Callable[[...], Any] | BaseToolkit) – Tool to analyze

  • force (bool) – Force re-analysis even if cached

Returns:

ToolProperties with complete analysis

Return type:

ToolProperties

class haive.core.engine.tool.ToolCapability(*values)[source]ΒΆ

Fine-grained tool capabilities for routing and execution.

ASYNC_CAPABLE = 'async_capable'ΒΆ
BATCH_CAPABLE = 'batch_capable'ΒΆ
FROM_STATE = 'from_state'ΒΆ
INJECTED_STATE = 'injected_state'ΒΆ
INTERRUPTIBLE = 'interruptible'ΒΆ
READS_STATE = 'reads_state'ΒΆ
RETRIEVER = 'retriever'ΒΆ
ROUTED = 'routed'ΒΆ
STATE_AWARE = 'state_aware'ΒΆ
STORE = 'store'ΒΆ
STREAMING = 'streaming'ΒΆ
STRUCTURED_OUTPUT = 'structured_output'ΒΆ
TO_STATE = 'to_state'ΒΆ
TRANSFORMER = 'transformer'ΒΆ
VALIDATED_OUTPUT = 'validated_output'ΒΆ
VALIDATOR = 'validator'ΒΆ
WRITES_STATE = 'writes_state'ΒΆ
class haive.core.engine.tool.ToolCategory(*values)[source]ΒΆ

High-level tool categorization.

COMMUNICATION = 'communication'ΒΆ
COMPUTATION = 'computation'ΒΆ
COORDINATION = 'coordination'ΒΆ
GENERATION = 'generation'ΒΆ
MEMORY = 'memory'ΒΆ
RETRIEVAL = 'retrieval'ΒΆ
SEARCH = 'search'ΒΆ
TRANSFORMATION = 'transformation'ΒΆ
UNKNOWN = 'unknown'ΒΆ
VALIDATION = 'validation'ΒΆ
class haive.core.engine.tool.ToolEngine(*, id=<factory>, name=<factory>, engine_type=EngineType.TOOL, description=None, input_schema=None, output_schema=None, version='1.0.0', metadata=<factory>, tools=None, toolkit=None, retry_policy=None, parallel=False, messages_key='messages', tool_choice='auto', return_source=True, timeout=None, max_iterations=None, auto_route=True, routing_strategy='auto', enable_analysis=True)[source]ΒΆ

Enhanced tool engine with universal typing and property analysis.

This engine manages tools with comprehensive property analysis, capability-based routing, and state interaction tracking.

Parameters:
  • id (str)

  • name (str)

  • engine_type (EngineType)

  • description (str | None)

  • input_schema (type[BaseModel] | None)

  • output_schema (type[BaseModel] | None)

  • version (str)

  • metadata (dict[str, Any])

  • tools (Sequence[BaseTool | StructuredTool | type[BaseTool] | BaseModel | type[BaseModel] | Callable[[...], Any] | BaseToolkit] | None)

  • toolkit (BaseToolkit | list[BaseToolkit] | None)

  • retry_policy (RetryPolicy | None)

  • parallel (bool)

  • messages_key (str)

  • tool_choice (Literal['auto', 'required', 'none'])

  • return_source (bool)

  • timeout (float | None)

  • max_iterations (int | None)

  • auto_route (bool)

  • routing_strategy (Literal['auto', 'capability', 'category', 'priority'])

  • enable_analysis (bool)

classmethod augment_tool(tool, *, name=None, description=None, make_interruptible=False, interrupt_message='Tool execution interrupted', reads_state=False, writes_state=False, state_keys=None, structured_output_model=None)[source]ΒΆ

Augment an existing tool with additional capabilities.

This method adds capabilities to a tool without creating nested wrappers. All capabilities are added directly to the tool instance.

Parameters:
  • tool (StructuredTool | BaseTool | Callable[[...], Any])

  • name (str | None)

  • description (str | None)

  • make_interruptible (bool)

  • interrupt_message (str)

  • reads_state (bool)

  • writes_state (bool)

  • state_keys (list[str] | None)

  • structured_output_model (type[BaseModel] | None)

Return type:

StructuredTool

classmethod create_human_interrupt_tool(name='human_interrupt', description='Request human intervention or approval', *, allow_ignore=True, allow_respond=True, allow_edit=False, allow_accept=True, interrupt_message='Human input requested')[source]ΒΆ

Create a tool that triggers human interrupt workflow.

This creates a tool that can pause graph execution and request human intervention, following langgraph’s interrupt patterns.

Parameters:
  • name (str) – Tool name

  • description (str) – Tool description

  • allow_ignore (bool) – Whether human can skip/ignore

  • allow_respond (bool) – Whether human can provide text response

  • allow_edit (bool) – Whether human can edit the request

  • allow_accept (bool) – Whether human can accept as-is

  • interrupt_message (str) – Default interrupt message

Returns:

A StructuredTool that triggers human interrupts

Return type:

StructuredTool

classmethod create_interruptible_tool(func, name=None, description=None, *, interrupt_message='Tool execution interrupted')[source]ΒΆ

Create or wrap a tool with interruption capability.

Parameters:
  • func (Callable[[...], Any] | StructuredTool | BaseTool)

  • name (str | None)

  • description (str | None)

  • interrupt_message (str)

Return type:

StructuredTool

classmethod create_retriever_tool(retriever, name, description, *, document_prompt=None, document_separator='\n\n', response_format='content')[source]ΒΆ

Create a tool to do retrieval of documents.

Parameters:
  • retriever (BaseRetriever)

  • name (str)

  • description (str)

  • document_prompt (BasePromptTemplate | None)

  • document_separator (str)

  • response_format (Literal['content', 'content_and_artifact'])

Return type:

StructuredTool

classmethod create_retriever_tool_from_config(retriever_config, name, description, *, document_prompt=None, document_separator='\n\n', response_format='content')[source]ΒΆ

Create a retriever tool from a Haive retriever configuration.

This is a convenience method that creates a retriever from the config and then converts it to a tool with proper metadata.

Parameters:
  • retriever_config (Any) – The BaseRetrieverConfig to use

  • name (str) – Tool name

  • description (str) – Tool description

  • document_prompt (BasePromptTemplate | None) – Optional prompt to format documents

  • document_separator (str) – Separator between documents

  • response_format (Literal['content', 'content_and_artifact']) – Format for tool response

Returns:

A StructuredTool configured for retrieval

Return type:

StructuredTool

classmethod create_state_tool(func, name=None, description=None, *, reads_state=False, writes_state=False, state_keys=None)[source]ΒΆ

Create or wrap a tool with state interaction metadata.

Parameters:
  • func (Callable[[...], Any] | StructuredTool | BaseTool)

  • name (str | None)

  • description (str | None)

  • reads_state (bool)

  • writes_state (bool)

  • state_keys (list[str] | None)

Return type:

StructuredTool

classmethod create_store_tools_suite(store_manager, namespace=None, include_tools=None)[source]ΒΆ

Create a suite of store/memory management tools.

This creates tools for memory operations like store, search, retrieve, update, and delete. These tools integrate with the StoreManager for persistent memory management.

Parameters:
  • store_manager (Any) – The StoreManager instance to use

  • namespace (tuple[str, ...] | None) – Optional namespace for memory isolation

  • include_tools (list[str] | None) – List of tools to include (defaults to all) Options: [β€œstore”, β€œsearch”, β€œretrieve”, β€œupdate”, β€œdelete”]

Returns:

List of store tools with proper metadata

Return type:

list[StructuredTool]

classmethod create_structured_output_tool(func, name, description, output_model, *, infer_schema=True)[source]ΒΆ

Create a tool that produces structured output.

Parameters:
Return type:

StructuredTool

classmethod from_aug_llm_config(config, *, extract_tools=True, include_structured_output=True, name=None)[source]ΒΆ

Create ToolEngine from AugLLMConfig.

Parameters:
  • config (AugLLMConfig) – AugLLMConfig instance to convert

  • extract_tools (bool) – Whether to extract tools from config

  • include_structured_output (bool) – Whether to convert structured_output_model to tool

  • name (str | None) – Optional name for the engine

Returns:

ToolEngine with tools extracted from AugLLMConfig

Return type:

ToolEngine

classmethod from_document_engine(engine, *, create_loader_tools=True, create_splitter_tools=True, create_processor_tools=False, name=None)[source]ΒΆ

Create ToolEngine from DocumentEngine.

Parameters:
  • engine (DocumentEngine) – DocumentEngine instance to convert

  • create_loader_tools (bool) – Whether to create document loading tools

  • create_splitter_tools (bool) – Whether to create document splitting tools

  • create_processor_tools (bool) – Whether to create document processing tools

  • name (str | None) – Optional name for the engine

Returns:

ToolEngine with document processing tools

Return type:

ToolEngine

classmethod from_multiple_engines(engines, *, engine_conversion_config=None, name=None)[source]ΒΆ

Create ToolEngine from multiple engines.

Parameters:
  • engines (dict[str, InvokableEngine]) – Dictionary of engine_name -> engine_instance

  • engine_conversion_config (dict[str, dict] | None) – Configuration for each engine’s conversion

  • name (str | None) – Optional name for the combined engine

Returns:

ToolEngine with tools from all engines

Return type:

ToolEngine

classmethod from_retriever_config(config, *, tool_name=None, tool_description=None, name=None)[source]ΒΆ

Create ToolEngine from BaseRetrieverConfig.

Parameters:
  • config (BaseRetrieverConfig) – BaseRetrieverConfig instance to convert

  • tool_name (str | None) – Name for the retriever tool

  • tool_description (str | None) – Description for the retriever tool

  • name (str | None) – Optional name for the engine

Returns:

ToolEngine with retriever converted to tool

Return type:

ToolEngine

classmethod from_vectorstore_config(config, *, search_tool_name=None, similarity_search_tool=True, max_marginal_relevance_tool=False, name=None)[source]ΒΆ

Create ToolEngine from VectorStoreConfig.

Parameters:
  • config (VectorStoreConfig) – VectorStoreConfig instance to convert

  • search_tool_name (str | None) – Base name for search tools

  • similarity_search_tool (bool) – Whether to create similarity search tool

  • max_marginal_relevance_tool (bool) – Whether to create MMR search tool

  • name (str | None) – Optional name for the engine

Returns:

ToolEngine with vectorstore search tools

Return type:

ToolEngine

classmethod get_analyzer()[source]ΒΆ

Get a tool analyzer instance.

Return type:

ToolAnalyzer

classmethod get_capability_enum()[source]ΒΆ

Get ToolCapability enum for other components.

Return type:

type[ToolCapability]

classmethod get_category_enum()[source]ΒΆ

Get ToolCategory enum for other components.

Return type:

type[ToolCategory]

classmethod get_tool_type()[source]ΒΆ

Get the universal ToolLike type for other components.

Return type:

type

classmethod validate_engine_type(v)[source]ΒΆ

Validate engine type is TOOL.

Return type:

Any

classmethod validate_tool_choice(v)[source]ΒΆ

Validate tool_choice has a valid value.

Return type:

Any

classmethod validate_toolkit(v)[source]ΒΆ

Validate toolkit is of the correct type.

Return type:

Any

classmethod validate_tools(v)[source]ΒΆ

Validate tools are of the correct type.

Return type:

Any

create_runnable(runnable_config=None)[source]ΒΆ

Create enhanced ToolNode with property awareness.

Parameters:

runnable_config (RunnableConfig | None)

Return type:

Any

get_input_fields()[source]ΒΆ

Define input fields for tool engine.

Return type:

dict[str, tuple[type, Any]]

get_interruptible_tools()[source]ΒΆ

Get all interruptible tool names.

Return type:

list[str]

get_output_fields()[source]ΒΆ

Define output fields for tool engine.

Return type:

dict[str, tuple[type, Any]]

get_state_tools()[source]ΒΆ

Get all state-aware tool names.

Return type:

list[str]

get_tool_properties(tool_name)[source]ΒΆ

Get properties for specific tool.

Parameters:

tool_name (str)

Return type:

ToolProperties | None

get_tools_by_capability(*capabilities)[source]ΒΆ

Get tool names with specified capabilities.

Parameters:

capabilities (ToolCapability)

Return type:

list[str]

get_tools_by_category(category)[source]ΒΆ

Get tool names in category.

Parameters:

category (ToolCategory)

Return type:

list[str]

get_tools_reading_state()[source]ΒΆ

Get tools that read from state.

Return type:

list[str]

get_tools_writing_state()[source]ΒΆ

Get tools that write to state.

Return type:

list[str]

model_post_init(_ToolEngine__context)[source]ΒΆ

Initialize and analyze tools after creation.

Parameters:
  • __context (Any)

  • _ToolEngine__context (Any)

Return type:

None

auto_route: boolΒΆ
enable_analysis: boolΒΆ
engine_type: EngineTypeΒΆ
max_iterations: int | NoneΒΆ
messages_key: strΒΆ
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}ΒΆ

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

parallel: boolΒΆ
retry_policy: RetryPolicy | NoneΒΆ
return_source: boolΒΆ
routing_strategy: Literal['auto', 'capability', 'category', 'priority']ΒΆ
timeout: float | NoneΒΆ
tool_choice: Literal['auto', 'required', 'none']ΒΆ
toolkit: BaseToolkit | list[BaseToolkit] | NoneΒΆ
tools: Sequence[ToolLike] | NoneΒΆ
class haive.core.engine.tool.ToolProperties(*, name, tool_type, category=ToolCategory.UNKNOWN, capabilities=<factory>, is_state_tool=False, to_state_tool=False, from_state_tool=False, state_dependencies=<factory>, state_outputs=<factory>, is_interruptible=False, is_async=False, is_routed=False, input_schema=None, output_schema=None, structured_output_model=None, is_structured_output_model=False, description=None, version='1.0', tags=<factory>, expected_duration=None, requires_network=False)[source]ΒΆ

Comprehensive tool properties for analysis and routing.

This model captures all relevant properties of a tool including its type, capabilities, state interaction patterns, and schemas.

Parameters:
has_capability(capability)[source]ΒΆ

Check if tool has specific capability.

Parameters:

capability (ToolCapability)

Return type:

bool

has_structured_output()[source]ΒΆ

Check if tool has structured output.

Return type:

bool

interacts_with_state()[source]ΒΆ

Check if tool has any state interaction.

Return type:

bool

is_retriever()[source]ΒΆ

Check if this is a retriever tool.

Return type:

bool

capabilities: set[ToolCapability]ΒΆ
category: ToolCategoryΒΆ
description: str | NoneΒΆ
expected_duration: float | NoneΒΆ
from_state_tool: boolΒΆ
input_schema: type[BaseModel] | NoneΒΆ
is_async: boolΒΆ
is_interruptible: boolΒΆ
is_routed: boolΒΆ
is_state_tool: boolΒΆ
is_structured_output_model: boolΒΆ
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}ΒΆ

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: strΒΆ
output_schema: type[BaseModel] | NoneΒΆ
requires_network: boolΒΆ
state_dependencies: list[str]ΒΆ
state_outputs: list[str]ΒΆ
structured_output_model: type[BaseModel] | NoneΒΆ
tags: list[str]ΒΆ
to_state_tool: boolΒΆ
tool_type: ToolTypeΒΆ
version: strΒΆ
class haive.core.engine.tool.ToolType(*values)[source]ΒΆ

Tool implementation types.

FUNCTION = 'function'ΒΆ
LANGCHAIN_TOOL = 'langchain_tool'ΒΆ
PYDANTIC_MODEL = 'pydantic_model'ΒΆ
RETRIEVER_TOOL = 'retriever_tool'ΒΆ
STORE_TOOL = 'store_tool'ΒΆ
STRUCTURED_TOOL = 'structured_tool'ΒΆ
TOOLKIT = 'toolkit'ΒΆ
VALIDATION_TOOL = 'validation_tool'ΒΆ

Tool Creation and Management:

from haive.core.engine.tool import tool, ToolEngine
from typing import Annotated

# Define tools with rich metadata
@tool
def analyze_data(
    data: Annotated[List[float], "Numerical data to analyze"],
    method: Annotated[str, "Analysis method: mean, median, std"] = "mean"
) -> Annotated[dict, "Analysis results with statistics"]:
    """Perform statistical analysis on numerical data.

    This tool supports various statistical methods and returns
    comprehensive results including confidence intervals.
    """
    import numpy as np

    if method == "mean":
        result = {
            "value": np.mean(data),
            "confidence_interval": calculate_ci(data),
            "sample_size": len(data)
        }
    # ... more methods

    return result

# Create tool engine
tool_engine = ToolEngine(
    tools=[analyze_data, web_search, database_query],
    execution_mode="parallel",  # Execute independent tools in parallel
    timeout_per_tool=10,
    error_handling="continue"  # Don't fail on single tool error
)

# Execute with validation
results = await tool_engine.execute_tools([
    {"tool": "analyze_data", "args": {"data": [1, 2, 3, 4, 5]}},
    {"tool": "web_search", "args": {"query": "latest statistics"}}
])

Structured Output SystemΒΆ

Type-Safe AI Responses

Output parser engine module for the Haive framework.

This module provides engine implementations for LangChain output parsers, enabling structured output parsing from LLM responses within the Haive framework.

Output parsers transform raw LLM text outputs into structured data formats, including JSON, Pydantic models, enums, lists, and custom formats. This module wraps LangChain’s output parser functionality in Haive’s engine system for consistent usage across the framework.

Key Components:

OutputParserEngine: Engine wrapper for LangChain output parsers OutputParserType: Enumeration of supported parser types

Supported Parser Types:
  • JSON: Parse output as JSON objects

  • PydanticOutputParser: Parse into Pydantic model instances

  • EnumOutputParser: Parse into enum values

  • DatetimeOutputParser: Parse datetime strings

  • BooleanOutputParser: Parse boolean values

  • CommaSeparatedListOutputParser: Parse comma-separated lists

  • MarkdownListOutputParser: Parse markdown formatted lists

  • NumberedListOutputParser: Parse numbered lists

  • XMLOutputParser: Parse XML formatted output

Examples

JSON output parsing:

from haive.core.engine.output_parser import OutputParserEngine, OutputParserType

parser = OutputParserEngine(
    parser_type=OutputParserType.JSON,
    name="json_parser"
)

result = parser.invoke('{"name": "John", "age": 30}')
# Returns: {"name": "John", "age": 30}

Pydantic model parsing:

from pydantic import BaseModel, Field
from haive.core.engine.output_parser import OutputParserEngine, OutputParserType

class Person(BaseModel):
    name: str = Field(description="Person's name")
    age: int = Field(description="Person's age")

parser = OutputParserEngine(
    parser_type=OutputParserType.PYDANTIC,
    pydantic_object=Person,
    name="person_parser"
)

result = parser.invoke("name: John\\nage: 30")
# Returns: Person(name="John", age=30)

See also

  • LangChain output parsers documentation

  • Output parser types: types.py

  • Base implementation: base.py

class haive.core.engine.output_parser.OutputParserEngine(*, id=<factory>, name=<factory>, engine_type=EngineType.OUTPUT_PARSER, description=None, input_schema=None, output_schema=None, version='1.0.0', metadata=<factory>, parser_type, parser_config=<factory>, pydantic_model=None, regex_pattern=None, enum_class=None, response_schemas=None)[source]ΒΆ

Engine that wraps LangChain OutputParsers to convert text to structured data.

The generic parameter TOut represents the specific return type of the parser. This allows for type-safe usage of parsers in your workflows.

Supported output types include: - Primitive types (bool, str, int, float, datetime) - Collection types (List[str], Dict[str, Any], etc.) - Pydantic models (any BaseModel subclass) - Specialized types (pandas.DataFrame, Enum values) - Custom structured types

Examples

# Create a JSON parser json_parser = create_output_parser_engine(

OutputParserType.JSON, name=”my_json_parser”

)

# Use in a graph graph.add_node(β€œparse_json”, json_parser)

Parameters:
async ainvoke(input_data, runnable_config=None)[source]ΒΆ

Asynchronously invoke the parser with the input data.

Parameters:
  • input_data (TIn) – Input text, message, or dictionary

  • runnable_config (RunnableConfig | None) – Optional runtime configuration

Returns:

Parsed output based on parser type

Return type:

TOut

create_runnable(runnable_config=None)[source]ΒΆ

Create the appropriate parser based on configuration.

Parameters:

runnable_config (RunnableConfig | None) – Optional runtime configuration

Returns:

An instantiated output parser

Raises:
  • ValueError – If parser type is unknown or required config is missing

  • ImportError – If required packages are not installed

Return type:

Any

get_input_fields()[source]ΒΆ

Define input field requirements.

Return type:

dict[str, tuple]

get_output_fields()[source]ΒΆ

Define output field requirements based on parser type.

Return type:

dict[str, tuple]

invoke(input_data, runnable_config=None)[source]ΒΆ

Invoke the parser with the input data.

Parameters:
  • input_data (TIn) – Input text, message, or dictionary

  • runnable_config (RunnableConfig | None) – Optional runtime configuration

Returns:

Parsed output based on parser type

Raises:
  • ValueError – If the input cannot be processed

  • Exception – If parsing fails and no error handler is provided

Return type:

TOut

engine_type: EngineTypeΒΆ
enum_class: type[Enum] | NoneΒΆ
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}ΒΆ

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

parser_config: dict[str, Any]ΒΆ
parser_type: OutputParserTypeΒΆ
pydantic_model: type[BaseModel] | NoneΒΆ
regex_pattern: str | NoneΒΆ
response_schemas: list[dict[str, Any]] | NoneΒΆ
class haive.core.engine.output_parser.OutputParserType(*values)[source]ΒΆ

Enumeration of supported output parser types.

BOOLEAN = 'boolean'ΒΆ
COMBINING = 'combining'ΒΆ
COMMA_SEPARATED_LIST = 'comma_separated_list'ΒΆ
DATETIME = 'datetime'ΒΆ
ENUM = 'enum'ΒΆ
JSON = 'json'ΒΆ
LIST = 'list'ΒΆ
MARKDOWN_LIST = 'markdown_list'ΒΆ
NUMBERED_LIST = 'numbered_list'ΒΆ
OPENAI_TOOLS = 'openai_tools'ΒΆ
OPENAI_TOOLS_KEY = 'openai_tools_key'ΒΆ
PANDAS_DATAFRAME = 'pandas_dataframe'ΒΆ
PYDANTIC = 'pydantic'ΒΆ
PYDANTIC_TOOLS = 'pydantic_tools'ΒΆ
REGEX = 'regex'ΒΆ
REGEX_DICT = 'regex_dict'ΒΆ
SIMPLE_JSON = 'simple_json'ΒΆ
STRING = 'string'ΒΆ
STRUCTURED = 'structured'ΒΆ
XML = 'xml'ΒΆ
YAML = 'yaml'ΒΆ

Structured Output Patterns:

from haive.core.engine.output_parser import create_structured_parser
from pydantic import BaseModel, Field
from typing import List, Optional

# Complex nested structure
class Address(BaseModel):
    street: str
    city: str
    country: str
    postal_code: Optional[str] = None

class Person(BaseModel):
    name: str = Field(description="Full name")
    age: int = Field(ge=0, le=150)
    email: str = Field(pattern=r"[^@]+@[^@]+\.[^@]+")
    address: Address
    skills: List[str] = Field(min_items=1)

class TeamAnalysis(BaseModel):
    team_name: str
    members: List[Person]
    average_age: float
    top_skills: List[str] = Field(max_items=5)
    recommendations: List[str]

# Create parser with validation
parser = create_structured_parser(
    model=TeamAnalysis,
    strict_mode=True,  # Enforce all validations
    repair_json=True,  # Fix malformed JSON
    max_retries=3
)

# Use with LLM
config = AugLLMConfig(
    llm_config=llm_config,
    structured_output_model=TeamAnalysis,
    output_parser=parser
)

# Get validated, typed response
result: TeamAnalysis = config.create_runnable().invoke(
    "Analyze our development team structure"
)
print(f"Average age: {result.average_age}")
print(f"Top skills: {', '.join(result.top_skills)}")

Streaming ArchitectureΒΆ

Real-Time Data Processing

Advanced Streaming Patterns:

from haive.core.engine.streaming import StreamingEngine, ChunkProcessor

class TokenAggregator(ChunkProcessor):
    """Aggregate tokens with buffering."""

    def __init__(self, buffer_size: int = 10):
        self.buffer = []
        self.buffer_size = buffer_size

    async def process_chunk(self, chunk):
        self.buffer.append(chunk)

        if len(self.buffer) >= self.buffer_size:
            # Emit aggregated chunk
            result = "".join(self.buffer)
            self.buffer = []
            return result
        return None  # Buffer not full

# Create streaming pipeline
streaming_engine = StreamingEngine(
    engine=llm_engine,
    processors=[
        TokenAggregator(buffer_size=5),
        SentimentAnalyzer(),  # Analyze sentiment per chunk
        KeywordExtractor()    # Extract keywords in real-time
    ]
)

# Stream with processing
async for processed_chunk in streaming_engine.stream(prompt):
    print(f"Chunk: {processed_chunk.text}")
    print(f"Sentiment: {processed_chunk.sentiment}")
    print(f"Keywords: {processed_chunk.keywords}")

Performance & OptimizationΒΆ

Engine Performance MetricsΒΆ

Blazing Fast Execution

  • Initialization: < 10ms engine startup time

  • First Token: < 100ms time to first token (streaming)

  • Throughput: 10,000+ tokens/second (local models)

  • Concurrency: 1,000+ parallel requests

  • Memory: < 100MB base memory footprint

Optimization Strategies:

from haive.core.engine.optimization import EngineOptimizer

# Create optimized engine
optimizer = EngineOptimizer()

optimized_engine = optimizer.optimize(
    base_engine,
    strategies=[
        "connection_pooling",    # Reuse connections
        "response_caching",      # Cache common queries
        "batch_processing",      # Batch similar requests
        "token_budgeting",      # Manage token usage
        "adaptive_timeout"      # Dynamic timeout adjustment
    ]
)

# Monitor performance
metrics = optimized_engine.get_metrics()
print(f"Average latency: {metrics.avg_latency}ms")
print(f"Cache hit rate: {metrics.cache_hit_rate}%")
print(f"Token efficiency: {metrics.tokens_per_dollar}")

Cost OptimizationΒΆ

Intelligent Cost Management:

from haive.core.engine.cost import CostOptimizer, BudgetManager

# Set up budget controls
budget_manager = BudgetManager(
    daily_limit=100.0,  # $100/day
    alert_thresholds=[0.5, 0.8, 0.95],
    notification_handlers=[email_handler, slack_handler]
)

# Cost-aware routing
cost_optimizer = CostOptimizer(
    providers={
        "gpt-4": {"cost_per_1k_tokens": 0.03, "quality": 0.95},
        "claude-3": {"cost_per_1k_tokens": 0.02, "quality": 0.93},
        "llama3": {"cost_per_1k_tokens": 0.0, "quality": 0.85}
    }
)

# Route based on query complexity
engine = cost_optimizer.select_engine(
    query=user_query,
    quality_threshold=0.9,  # Minimum quality required
    optimize_for="cost"     # or "quality" or "balanced"
)

Advanced PatternsΒΆ

Retrieval-Augmented GenerationΒΆ

Advanced RAG Implementation:

from haive.core.engine.retriever import RetrieverEngine
from haive.core.engine.patterns import RAGPattern

# Configure RAG system
rag = RAGPattern(
    retriever=RetrieverEngine(
        vector_store=pinecone_store,
        embedding_model="text-embedding-3-large",
        top_k=5,
        similarity_threshold=0.8
    ),
    generator=AugLLMConfig(
        llm_config=gpt4_config,
        system_message="Use the provided context to answer questions accurately."
    ),
    reranker=CohereReranker(model="rerank-english-v2.0"),
    citation_mode="inline"  # Add citations to response
)

# Execute RAG query
result = await rag.query(
    "What are the latest developments in quantum computing?",
    filters={"date": {"$gte": "2024-01-01"}},
    include_sources=True
)

print(result.answer)
print(f"Sources: {result.sources}")
print(f"Confidence: {result.confidence}")

Engine CompositionΒΆ

Building Complex Engines:

from haive.core.engine.composition import EngineComposer, Pipeline

# Compose multi-stage engine
composer = EngineComposer()

# Stage 1: Information extraction
extractor = composer.create_stage(
    name="extractor",
    engine=AugLLMConfig(
        llm_config=fast_model,
        structured_output_model=ExtractedInfo
    )
)

# Stage 2: Enrichment with external data
enricher = composer.create_stage(
    name="enricher",
    engine=ToolEngine(tools=[api_lookup, database_query]),
    depends_on=["extractor"]
)

# Stage 3: Analysis and synthesis
analyzer = composer.create_stage(
    name="analyzer",
    engine=AugLLMConfig(
        llm_config=powerful_model,
        system_message="Perform deep analysis on enriched data"
    ),
    depends_on=["enricher"]
)

# Build pipeline
pipeline = composer.build_pipeline(
    stages=[extractor, enricher, analyzer],
    execution_mode="sequential",
    state_passing="incremental"  # Each stage adds to state
)

# Execute pipeline
final_result = await pipeline.execute(initial_input)

Enterprise FeaturesΒΆ

Production-Ready Capabilities

  • High Availability: Multi-region deployment with automatic failover

  • Security: End-to-end encryption, API key rotation, audit logging

  • Compliance: GDPR, HIPAA, SOC2 compliant configurations

  • Monitoring: OpenTelemetry integration, custom metrics, alerting

  • Governance: Model access control, usage policies, cost allocation

See AlsoΒΆ