dataflow.api.routes.agent_routes¶
WebSocket and REST API endpoints for agent interactions.
This module provides WebSocket-based communication with Haive agents, enabling real-time interactions, streaming responses, and persistent conversation state. It also includes REST endpoints for agent management and configuration.
The WebSocket protocol supports different message types for user messages, agent responses, status updates, and error handling. Connections are managed per thread, allowing multiple concurrent agent sessions.
Key components: - WebSocket connection manager for handling multiple clients - Message types and formats for structured communication - Authentication and authorization using Supabase - Agent configuration and customization options - Streaming response support for real-time feedback
Typical usage example:
# Client-side WebSocket example import websockets import json import asyncio
- async def connect_to_agent():
uri = “ws://localhost:8000/api/ws/agent/chat?token=YOUR_AUTH_TOKEN” async with websockets.connect(uri) as websocket:
# Send initial configuration await websocket.send(json.dumps({
“type”: “config”, “content”: {
“agent_name”: “TextAnalyzer”, “provider”: “openai”, “model”: “gpt-4”, “stream”: True
}
}))
# Send a message to the agent await websocket.send(json.dumps({
“type”: “message”, “content”: “Analyze this text for sentiment”
}))
# Receive streaming responses while True:
response = json.loads(await websocket.recv()) if response[“type”] == “response”:
print(response[“content”])
- elif response[“type”] == “state_complete”:
break
asyncio.run(connect_to_agent())
Classes¶
Configuration for agent chat sessions via WebSocket. |
|
Manages WebSocket connections for agent chat sessions. |
|
WebSocket message format for agent communication. |
|
WebSocket message types for agent communication. |
Functions¶
|
Configure agent with LLM settings. |
|
Validate JWT token and return user ID. |
|
Load agent configuration from package. |
|
Reset/clear a chat thread. |
|
WebSocket endpoint for real-time chat with an agent. |
Module Contents¶
- class dataflow.api.routes.agent_routes.AgentChatConfig(/, **data)¶
Bases:
pydantic.BaseModel
Configuration for agent chat sessions via WebSocket.
This model defines the configuration options for an agent chat session, including which agent to use, LLM settings, and behavior options like streaming and persistence.
- Parameters:
data (Any)
- agent_name¶
Name of the agent to use for this chat session
- provider¶
LLM provider to use (e.g., AZURE, OPENAI, ANTHROPIC)
- model¶
Specific model to use from the provider
- temperature¶
Sampling temperature for response generation (0.0-1.0)
- system_prompt¶
Optional override for the agent’s system prompt
- persistent¶
Whether to persist chat state between messages
- stream¶
Whether to stream responses incrementally
- extra_params¶
Additional provider-specific parameters
Examples
>>> config = AgentChatConfig( ... agent_name="TextAnalyzer", ... provider=LLMProvider.OPENAI, ... model="gpt-4", ... temperature=0.5, ... system_prompt="You are an expert text analyst.", ... stream=True ... )
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- class dataflow.api.routes.agent_routes.ConnectionManager¶
Manages WebSocket connections for agent chat sessions.
This class provides functionality for managing WebSocket connections, organizing them by thread, and handling connection lifecycle events. It supports multiple concurrent connections to the same thread, enabling features like shared sessions and observers.
The connection manager maintains: - Active WebSocket connections grouped by thread ID - Metadata for each thread (configuration, state, etc.) - Thread-safe operations with asyncio locks
- active_connections¶
Dictionary mapping thread IDs to lists of WebSocket connections
- thread_metadata¶
Dictionary mapping thread IDs to metadata dictionaries
- _lock¶
Asyncio lock for thread-safe operations on shared data structures
Initialize the connection manager.
Creates empty dictionaries for tracking connections and metadata, and initializes the asyncio lock for thread safety.
- async broadcast_to_thread(thread_id, message)¶
Broadcast message to all connections in a thread.
- async connect(websocket, thread_id, user_id)¶
Connect a WebSocket to a thread.
This method accepts a new WebSocket connection and associates it with the specified thread. If this is the first connection to the thread, it also initializes the thread metadata.
- Parameters:
- Returns:
True if the connection was successful, False otherwise
- Return type:
- Raises:
WebSocketDisconnect – If the connection cannot be established
Examples
>>> manager = ConnectionManager() >>> success = await manager.connect(websocket, "thread-123", "user-456") >>> if success: ... print("Connection established")
- class dataflow.api.routes.agent_routes.WSMessage(/, **data)¶
Bases:
pydantic.BaseModel
WebSocket message format for agent communication.
This model defines the standard format for all messages exchanged over the WebSocket connection. It provides a consistent structure with metadata for message handling and tracking.
- Parameters:
data (Any)
- type¶
The type of message (from WSMessageType enum)
- content¶
The actual message content (type depends on message type)
- thread_id¶
Optional ID for persistent chat threads
- stream_index¶
Optional index for streaming response chunks
- timestamp¶
When the message was created (defaults to current time)
Examples
>>> message = WSMessage( ... type=WSMessageType.MESSAGE, ... content="Analyze this text", ... thread_id="thread-123" ... ) >>> json_str = message.json()
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- class dataflow.api.routes.agent_routes.WSMessageType¶
-
WebSocket message types for agent communication.
This enumeration defines the different types of messages that can be exchanged over the WebSocket connection. Each type has a specific purpose and expected format.
- MESSAGE¶
User message sent to the agent
- RESPONSE¶
Agent response sent back to the user
- STATUS¶
System status updates (connection, processing, etc.)
- ERROR¶
Error messages for exception handling
- STATE¶
Incremental agent state updates during processing
- STATE_COMPLETE¶
Final agent state after processing completes
Initialize self. See help(type(self)) for accurate signature.
- async dataflow.api.routes.agent_routes.configure_agent(config, chat_config)¶
Configure agent with LLM settings.
- Parameters:
config (haive.core.engine.base.agent_config.AgentConfig)
chat_config (AgentChatConfig)
- Return type:
haive.core.engine.base.agent_config.AgentConfig
- dataflow.api.routes.agent_routes.get_user_from_token(token)¶
Validate JWT token and return user ID.
- async dataflow.api.routes.agent_routes.load_agent_config(agent_name, user_id, thread_id)¶
Load agent configuration from package.
- async dataflow.api.routes.agent_routes.reset_thread(thread_id, user_id=Depends(require_auth))¶
Reset/clear a chat thread.
- async dataflow.api.routes.agent_routes.websocket_chat_endpoint(websocket, agent_name, token=Query(..., description='JWT authentication token'), thread_id=Query(None, description='Existing thread ID for persistence'), config=Query(None, description='JSON encoded chat configuration'))¶
WebSocket endpoint for real-time chat with an agent.