Source code for haive.core.engine.agent.protocols

"""Agent protocols - Protocol definitions for Agent capabilities.

This module defines protocol interfaces that describe the expected
functionality of Agent classes in the Haive framework. These protocols
enable runtime type checking and promote consistent interfaces across
different agent implementations.
"""

from collections.abc import AsyncGenerator, Generator
from typing import TYPE_CHECKING, Any, Protocol, TypeVar, runtime_checkable

from langchain_core.runnables import RunnableConfig

# Define type variables for input and output
TIn = TypeVar("TIn")
TOut = TypeVar("TOut")
TState = TypeVar("TState")


[docs] @runtime_checkable class AgentProtocol(Protocol[TIn, TOut, TState]): """Protocol defining the core functionality of an Agent. This protocol specifies the minimum interface requirements for Agent implementations in the Haive framework. """ @property def app(self) -> Any: """Return the compiled agent application.""" ...
[docs] def run( self, input_data: TIn, thread_id: str | None = None, debug: bool = True, config: RunnableConfig | None = None, **kwargs, ) -> TOut: """Synchronously run the agent with input data. Args: input_data: Input data for the agent thread_id: Optional thread ID for persistence debug: Whether to enable debug mode config: Optional runtime configuration **kwargs: Additional runtime configuration Returns: Output from the agent """ ...
[docs] async def arun( self, input_data: TIn, thread_id: str | None = None, config: RunnableConfig | None = None, **kwargs, ) -> TOut: """Asynchronously run the agent with input data. Args: input_data: Input data for the agent thread_id: Optional thread ID for persistence config: Optional runtime configuration **kwargs: Additional runtime configuration Returns: Output from the agent """ ...
[docs] def compile(self) -> None: """Compile the agent's workflow graph.""" ...
[docs] def setup_workflow(self) -> None: """Set up the workflow graph for this agent.""" ...
[docs] @runtime_checkable class StreamingAgentProtocol(Protocol[TIn, TOut]): """Protocol defining streaming functionality for Agents. This protocol extends the core agent functionality with streaming capabilities for real-time outputs. """
[docs] def stream( self, input_data: TIn, thread_id: str | None = None, stream_mode: str = "values", config: RunnableConfig | None = None, debug: bool = True, **kwargs, ) -> Generator[dict[str, Any], None, None]: """Stream agent execution with input data. Args: input_data: Input data for the agent thread_id: Optional thread ID for persistence stream_mode: Stream mode (values, updates, debug, etc.) config: Optional runtime configuration debug: Whether to enable debug mode **kwargs: Additional runtime configuration Yields: State updates during execution """ ...
[docs] async def astream( self, input_data: TIn, thread_id: str | None = None, stream_mode: str = "values", config: RunnableConfig | None = None, **kwargs, ) -> AsyncGenerator[dict[str, Any], None]: """Asynchronously stream agent execution with input data. Args: input_data: Input data for the agent thread_id: Optional thread ID for persistence stream_mode: Stream mode (values, updates, debug, etc.) config: Optional runtime configuration **kwargs: Additional runtime configuration Yields: Async iterator of state updates during execution """ ...
[docs] @runtime_checkable class PersistentAgentProtocol(Protocol): """Protocol defining persistence functionality for Agents. This protocol specifies methods related to state persistence and thread management in agents. """
[docs] def save_state_history(self, runnable_config: RunnableConfig | None = None) -> bool: """Save the current agent state to a JSON file. Args: runnable_config: Optional runnable configuration Returns: True if successful, False otherwise """ ...
[docs] def inspect_state( self, thread_id: str | None = None, config: RunnableConfig | None = None ) -> None: """Inspect the current state of the agent. Args: thread_id: Optional thread ID for persistence config: Optional runtime configuration """ ...
[docs] def reset_state( self, thread_id: str | None = None, config: RunnableConfig | None = None ) -> bool: """Reset the agent's state for a thread. Args: thread_id: Optional thread ID for persistence config: Optional runtime configuration Returns: True if successful, False otherwise """ ...
[docs] @runtime_checkable class VisualizationAgentProtocol(Protocol): """Protocol defining visualization capabilities for Agents. This protocol specifies methods related to graph visualization and debugging. """
[docs] def visualize_graph(self, output_path: str | None = None) -> None: """Generate and save a visualization of the agent's graph. Args: output_path: Optional custom path for visualization output """ ...
[docs] @runtime_checkable class ExtensibilityAgentProtocol(Protocol): """Protocol defining pattern-based extensibility for Agents. This protocol specifies methods related to pattern application and graph modification. """
[docs] def apply_pattern(self, pattern_name: str, **kwargs) -> None: """Apply a graph pattern to the agent's workflow. Args: pattern_name: Name of the pattern to apply **kwargs: Pattern-specific parameters """ ...
# Combined protocol for comprehensive agent capabilities
[docs] @runtime_checkable class FullAgentProtocol( AgentProtocol, StreamingAgentProtocol, PersistentAgentProtocol, VisualizationAgentProtocol, ExtensibilityAgentProtocol, Protocol, ): """Protocol combining all agent capabilities. This protocol represents a fully-featured agent with all available capabilities in the Haive framework. """
# Type assertions to verify that Agent class implements protocols if TYPE_CHECKING: from haive.core.engine.agent.agent import Agent def assert_agent_protocols(agent_class: type[Agent]): """Assert that Agent class implements all protocols.""" assert issubclass( agent_class, AgentProtocol ), "Agent must implement AgentProtocol" assert issubclass( agent_class, StreamingAgentProtocol ), "Agent must implement StreamingAgentProtocol" assert issubclass( agent_class, PersistentAgentProtocol ), "Agent must implement PersistentAgentProtocol" assert issubclass( agent_class, VisualizationAgentProtocol ), "Agent must implement VisualizationAgentProtocol" assert issubclass( agent_class, ExtensibilityAgentProtocol ), "Agent must implement ExtensibilityAgentProtocol" assert issubclass( agent_class, FullAgentProtocol ), "Agent must implement FullAgentProtocol"