Source code for haive.core.engine.tool.analyzer

"""Tool analysis system leveraging Haive's existing utilities.

This module analyzes tools to determine their properties and capabilities
using the existing utilities in haive.core.common.
"""

import asyncio
import inspect
from collections.abc import Callable
from typing import get_args, get_origin, get_type_hints

from pydantic import BaseModel

from haive.core.utils.interrupt_utils import is_interruptible
from haive.core.utils.tools.tool_schema_generator import (
    extract_input_schema,
    extract_output_schema,
)

from haive.core.engine.tool.types import (
    InterruptibleTool,
    StateAwareTool,
    ToolCapability,
    ToolCategory,
    ToolLike,
    ToolProperties,
    ToolType,
)


[docs] class ToolAnalyzer: """Analyzes tools to determine their properties and capabilities. This analyzer uses existing Haive utilities to detect tool capabilities, schemas, and other properties needed for routing and execution. """
[docs] def __init__(self): """Initialize the tool analyzer.""" self._cache: dict[str, ToolProperties] = {}
[docs] def analyze(self, tool: ToolLike, force: bool = False) -> ToolProperties: """Perform comprehensive tool analysis. Args: tool: Tool to analyze force: Force re-analysis even if cached Returns: ToolProperties with complete analysis """ # Check cache tool_id = self._get_tool_id(tool) if not force and tool_id in self._cache: return self._cache[tool_id] # Get basic info name = self._get_tool_name(tool) tool_type = self._determine_tool_type(tool) # Create properties properties = ToolProperties( name=name, tool_type=tool_type, category=self._determine_category(tool), description=self._get_description(tool), ) # Analyze capabilities self._analyze_capabilities(tool, properties) # Analyze state interaction self._analyze_state_interaction(tool, properties) # Extract schemas using existing utilities properties.input_schema = self._safe_extract_schema(extract_input_schema, tool) properties.output_schema = self._safe_extract_schema( extract_output_schema, tool ) # Check for structured output model properties.structured_output_model = self._extract_structured_output_model(tool) properties.is_structured_output_model = ( properties.structured_output_model is not None ) # Analyze performance characteristics self._analyze_performance_hints(tool, properties) # Re-check structured output capability after schema extraction if ( properties.output_schema or properties.structured_output_model ) and ToolCapability.STRUCTURED_OUTPUT not in properties.capabilities: properties.capabilities.add(ToolCapability.STRUCTURED_OUTPUT) # Also check for validated output if we have a structured output model if ( properties.structured_output_model and ToolCapability.VALIDATED_OUTPUT not in properties.capabilities ): properties.capabilities.add(ToolCapability.VALIDATED_OUTPUT) # Cache result self._cache[tool_id] = properties return properties
def _determine_tool_type(self, tool: ToolLike) -> ToolType: """Determine the tool implementation type. This follows the patterns from ToolRouteMixin._analyze_tool. """ # Check for toolkit first from langchain_core.tools.base import BaseToolkit if isinstance(tool, BaseToolkit): return ToolType.TOOLKIT # Check if it's a class with BaseTool in MRO if hasattr(tool, "__bases__"): mro = inspect.getmro(tool) if any("BaseTool" in str(base) for base in mro): return ToolType.LANGCHAIN_TOOL # Check instances from langchain_core.tools import BaseTool, StructuredTool if isinstance(tool, StructuredTool): return ToolType.STRUCTURED_TOOL elif isinstance(tool, BaseTool): return ToolType.LANGCHAIN_TOOL # Check for Pydantic model with __call__ if isinstance(tool, BaseModel): if callable(tool) and callable(tool.__call__): return ToolType.PYDANTIC_MODEL # Could still be a validation tool if "validat" in tool.__class__.__name__.lower(): return ToolType.VALIDATION_TOOL # Check for retriever tool patterns if self._is_retriever(tool): return ToolType.RETRIEVER_TOOL # Default to function if callable(tool): return ToolType.FUNCTION return ToolType.FUNCTION def _determine_category(self, tool: ToolLike) -> ToolCategory: """Determine tool category from name, description, and type.""" # Check explicit category marker if hasattr(tool, "__tool_category__"): return tool.__tool_category__ name = self._get_tool_name(tool).lower() desc = self._get_description(tool).lower() # Category detection patterns patterns = { ToolCategory.RETRIEVAL: [ "retriev", "fetch", "search", "query", "lookup", "find", ], ToolCategory.COMPUTATION: [ "calculat", "comput", "math", "analyz", "process", "solve", ], ToolCategory.COMMUNICATION: [ "send", "email", "notify", "api", "webhook", "message", ], ToolCategory.TRANSFORMATION: [ "convert", "transform", "parse", "format", "encode", "decode", ], ToolCategory.VALIDATION: [ "validat", "check", "verify", "test", "assert", "ensure", ], ToolCategory.MEMORY: [ "remember", "store", "save", "persist", "cache", "recall", ], ToolCategory.SEARCH: [ "search", "find", "google", "bing", "web", "internet", ], ToolCategory.GENERATION: [ "generat", "create", "write", "compose", "build", "make", ], ToolCategory.COORDINATION: [ "coordinat", "orchestrat", "manage", "control", "route", ], } # Check patterns for category, keywords in patterns.items(): if any(kw in name or kw in desc for kw in keywords): return category return ToolCategory.UNKNOWN def _analyze_capabilities(self, tool: ToolLike, properties: ToolProperties) -> None: """Analyze and set tool capabilities.""" capabilities = set() # Check interruptibility using existing util if is_interruptible(tool) or isinstance(tool, InterruptibleTool): capabilities.add(ToolCapability.INTERRUPTIBLE) properties.is_interruptible = True # Check if tool has explicit interruptible marker if hasattr(tool, "__interruptible__") and tool.__interruptible__: capabilities.add(ToolCapability.INTERRUPTIBLE) properties.is_interruptible = True # Check async capability if self._is_async(tool): capabilities.add(ToolCapability.ASYNC_CAPABLE) properties.is_async = True # Check structured output if properties.output_schema or properties.structured_output_model: capabilities.add(ToolCapability.STRUCTURED_OUTPUT) # Check if it's a retriever if self._is_retriever(tool) or properties.tool_type == ToolType.RETRIEVER_TOOL: capabilities.add(ToolCapability.RETRIEVER) # Check if it's a validator if ( properties.tool_type == ToolType.VALIDATION_TOOL or "validat" in properties.name.lower() ): capabilities.add(ToolCapability.VALIDATOR) # Check for routed tool if hasattr(tool, "__tool_route__") or hasattr(tool, "route"): capabilities.add(ToolCapability.ROUTED) properties.is_routed = True # Check for transformer pattern if properties.category == ToolCategory.TRANSFORMATION: capabilities.add(ToolCapability.TRANSFORMER) # Check if tool has explicit capabilities if hasattr(tool, "__tool_capabilities__"): capabilities.update(tool.__tool_capabilities__) properties.capabilities = capabilities def _analyze_state_interaction( self, tool: ToolLike, properties: ToolProperties ) -> None: """Analyze how tool interacts with state.""" # Check if implements StateAwareTool protocol if isinstance(tool, StateAwareTool): properties.is_state_tool = True if tool.reads_state: properties.from_state_tool = True properties.capabilities.add(ToolCapability.FROM_STATE) properties.capabilities.add(ToolCapability.READS_STATE) if tool.writes_state: properties.to_state_tool = True properties.capabilities.add(ToolCapability.TO_STATE) properties.capabilities.add(ToolCapability.WRITES_STATE) if hasattr(tool, "state_dependencies"): properties.state_dependencies = list(tool.state_dependencies) properties.capabilities.add(ToolCapability.STATE_AWARE) # Also check for InjectedState annotation if self._uses_injected_state(tool): properties.is_state_tool = True properties.from_state_tool = True properties.capabilities.add(ToolCapability.INJECTED_STATE) properties.capabilities.add(ToolCapability.READS_STATE) properties.capabilities.add(ToolCapability.STATE_AWARE) # Check parameter names for state interaction if callable(tool): try: sig = inspect.signature(tool) param_names = set(sig.parameters.keys()) # State reading indicators state_read_params = {"state", "context", "graph_state", "agent_state"} if param_names.intersection(state_read_params): properties.is_state_tool = True properties.from_state_tool = True properties.capabilities.add(ToolCapability.FROM_STATE) properties.capabilities.add(ToolCapability.READS_STATE) properties.capabilities.add(ToolCapability.STATE_AWARE) # Check return annotation for state writing if sig.return_annotation != sig.empty: return_str = str(sig.return_annotation).lower() if "state" in return_str or "dict" in return_str: properties.to_state_tool = True properties.capabilities.add(ToolCapability.TO_STATE) properties.capabilities.add(ToolCapability.WRITES_STATE) properties.capabilities.add(ToolCapability.STATE_AWARE) except: pass # Check docstring for state interaction hints (only positive indicators) doc = self._get_description(tool).lower() if "state" in doc: # Look for positive state interaction phrases, not just individual words positive_read_phrases = [ "read state", "reads state", "get state", "gets state", "access state", "accesses state", "retrieve state", "retrieves state", "from state", "state data", "state information", ] positive_write_phrases = [ "write state", "writes state", "update state", "updates state", "modify state", "modifies state", "set state", "sets state", "to state", "store state", "save state", ] # Only trigger if we find positive phrases, not negative ones if not any( neg in doc for neg in ["without state", "no state", "not state"] ): if any(phrase in doc for phrase in positive_read_phrases): properties.from_state_tool = True properties.capabilities.add(ToolCapability.FROM_STATE) properties.capabilities.add(ToolCapability.READS_STATE) if any(phrase in doc for phrase in positive_write_phrases): properties.to_state_tool = True properties.capabilities.add(ToolCapability.TO_STATE) properties.capabilities.add(ToolCapability.WRITES_STATE) if properties.from_state_tool or properties.to_state_tool: properties.is_state_tool = True properties.capabilities.add(ToolCapability.STATE_AWARE) def _uses_injected_state(self, tool: ToolLike) -> bool: """Check if tool uses InjectedState annotation.""" # For StructuredTool/BaseTool, check the func attribute func_to_check = tool if hasattr(tool, "func") and callable(tool.func): func_to_check = tool.func elif not callable(tool): return False try: # Get type hints including extras for Annotated types hints = get_type_hints(func_to_check, include_extras=True) for param_name, param_type in hints.items(): # Skip return type if param_name == "return": continue # Check for Annotated type origin = get_origin(param_type) if origin is not None: args = get_args(param_type) # Look for InjectedState in the annotations if any("InjectedState" in str(arg) for arg in args): return True # Also check string representation as fallback if "InjectedState" in str(param_type): return True except: pass return False def _is_async(self, tool: ToolLike) -> bool: """Check if tool supports async execution.""" # Check if the tool itself is async if asyncio.iscoroutinefunction(tool): return True # Check if __call__ method is async if callable(tool): return asyncio.iscoroutinefunction(tool.__call__) # Check for async methods if hasattr(tool, "ainvoke") or hasattr(tool, "arun"): return True return False def _is_retriever(self, tool: ToolLike) -> bool: """Check if tool is a retriever.""" # Check name/description patterns name = self._get_tool_name(tool).lower() desc = self._get_description(tool).lower() retriever_patterns = [ "retriev", "fetch", "search", "query", "lookup", "find", "rag", ] if any(pattern in name or pattern in desc for pattern in retriever_patterns): return True # Check for retriever base classes or methods if hasattr(tool, "__class__"): class_name = tool.__class__.__name__.lower() if "retriever" in class_name: return True # Check for retriever-specific methods if hasattr(tool, "get_relevant_documents") or hasattr(tool, "retrieve"): return True return False def _extract_structured_output_model( self, tool: ToolLike ) -> type[BaseModel] | None: """Extract structured output model if present.""" # Check for explicit structured_output_model attribute if hasattr(tool, "structured_output_model"): model = tool.structured_output_model if isinstance(model, type) and issubclass(model, BaseModel): return model # Check return type annotation if callable(tool): try: sig = inspect.signature(tool) if sig.return_annotation != sig.empty: # Check if return type is a BaseModel subclass return_type = sig.return_annotation if isinstance(return_type, type) and issubclass( return_type, BaseModel ): return return_type except: pass # Check if output schema is a BaseModel if hasattr(tool, "output_schema"): schema = tool.output_schema if isinstance(schema, type) and issubclass(schema, BaseModel): return schema return None def _analyze_performance_hints( self, tool: ToolLike, properties: ToolProperties ) -> None: """Analyze performance characteristics of the tool.""" # Check for network requirements name = properties.name.lower() desc = properties.description or "" network_indicators = [ "api", "http", "request", "fetch", "download", "web", "url", ] if any( indicator in name or indicator in desc.lower() for indicator in network_indicators ): properties.requires_network = True # Check for explicit performance hints if hasattr(tool, "__performance_hints__"): hints = tool.__performance_hints__ if isinstance(hints, dict): properties.expected_duration = hints.get("expected_duration") properties.requires_network = hints.get( "requires_network", properties.requires_network ) def _safe_extract_schema( self, extractor: Callable, tool: ToolLike ) -> type[BaseModel] | None: """Safely extract schema using provided extractor.""" try: schema = extractor(tool) if isinstance(schema, type) and issubclass(schema, BaseModel): return schema return None except: return None def _get_tool_id(self, tool: ToolLike) -> str: """Get unique identifier for tool.""" if hasattr(tool, "id"): return str(tool.id) elif hasattr(tool, "__name__"): return tool.__name__ else: return f"{tool.__class__.__name__}_{id(tool)}" def _get_tool_name(self, tool: ToolLike) -> str: """Get tool name.""" if hasattr(tool, "name"): return str(tool.name) elif hasattr(tool, "__name__"): return tool.__name__ elif hasattr(tool, "__class__"): return tool.__class__.__name__ return "unknown_tool" def _get_description(self, tool: ToolLike) -> str: """Get tool description.""" if hasattr(tool, "description") and tool.description: return str(tool.description) elif hasattr(tool, "__doc__") and tool.__doc__: return tool.__doc__.strip() return ""