dataflow.api.routes.agent_routes

WebSocket and REST API endpoints for agent interactions.

This module provides WebSocket-based communication with Haive agents, enabling real-time interactions, streaming responses, and persistent conversation state. It also includes REST endpoints for agent management and configuration.

The WebSocket protocol supports different message types for user messages, agent responses, status updates, and error handling. Connections are managed per thread, allowing multiple concurrent agent sessions.

Key components: - WebSocket connection manager for handling multiple clients - Message types and formats for structured communication - Authentication and authorization using Supabase - Agent configuration and customization options - Streaming response support for real-time feedback

Typical usage example:

# Client-side WebSocket example import websockets import json import asyncio

async def connect_to_agent():

uri = “ws://localhost:8000/api/ws/agent/chat?token=YOUR_AUTH_TOKEN” async with websockets.connect(uri) as websocket:

# Send initial configuration await websocket.send(json.dumps({

“type”: “config”, “content”: {

“agent_name”: “TextAnalyzer”, “provider”: “openai”, “model”: “gpt-4”, “stream”: True

}

}))

# Send a message to the agent await websocket.send(json.dumps({

“type”: “message”, “content”: “Analyze this text for sentiment”

}))

# Receive streaming responses while True:

response = json.loads(await websocket.recv()) if response[“type”] == “response”:

print(response[“content”])

elif response[“type”] == “state_complete”:

break

asyncio.run(connect_to_agent())

Classes

AgentChatConfig

Configuration for agent chat sessions via WebSocket.

ConnectionManager

Manages WebSocket connections for agent chat sessions.

WSMessage

WebSocket message format for agent communication.

WSMessageType

WebSocket message types for agent communication.

Functions

configure_agent(config, chat_config)

Configure agent with LLM settings.

get_user_from_token(token)

Validate JWT token and return user ID.

load_agent_config(agent_name, user_id, thread_id)

Load agent configuration from package.

reset_thread(thread_id[, user_id])

Reset/clear a chat thread.

websocket_chat_endpoint(websocket, agent_name[, ...])

WebSocket endpoint for real-time chat with an agent.

Module Contents

class dataflow.api.routes.agent_routes.AgentChatConfig(/, **data)

Bases: pydantic.BaseModel

Configuration for agent chat sessions via WebSocket.

This model defines the configuration options for an agent chat session, including which agent to use, LLM settings, and behavior options like streaming and persistence.

Parameters:

data (Any)

agent_name

Name of the agent to use for this chat session

provider

LLM provider to use (e.g., AZURE, OPENAI, ANTHROPIC)

model

Specific model to use from the provider

temperature

Sampling temperature for response generation (0.0-1.0)

system_prompt

Optional override for the agent’s system prompt

persistent

Whether to persist chat state between messages

stream

Whether to stream responses incrementally

extra_params

Additional provider-specific parameters

Examples

>>> config = AgentChatConfig(
...     agent_name="TextAnalyzer",
...     provider=LLMProvider.OPENAI,
...     model="gpt-4",
...     temperature=0.5,
...     system_prompt="You are an expert text analyst.",
...     stream=True
... )

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

class dataflow.api.routes.agent_routes.ConnectionManager

Manages WebSocket connections for agent chat sessions.

This class provides functionality for managing WebSocket connections, organizing them by thread, and handling connection lifecycle events. It supports multiple concurrent connections to the same thread, enabling features like shared sessions and observers.

The connection manager maintains: - Active WebSocket connections grouped by thread ID - Metadata for each thread (configuration, state, etc.) - Thread-safe operations with asyncio locks

active_connections

Dictionary mapping thread IDs to lists of WebSocket connections

thread_metadata

Dictionary mapping thread IDs to metadata dictionaries

_lock

Asyncio lock for thread-safe operations on shared data structures

Initialize the connection manager.

Creates empty dictionaries for tracking connections and metadata, and initializes the asyncio lock for thread safety.

async broadcast_to_thread(thread_id, message)

Broadcast message to all connections in a thread.

Parameters:
async connect(websocket, thread_id, user_id)

Connect a WebSocket to a thread.

This method accepts a new WebSocket connection and associates it with the specified thread. If this is the first connection to the thread, it also initializes the thread metadata.

Parameters:
  • websocket (fastapi.WebSocket) – The WebSocket connection to add

  • thread_id (str) – The ID of the thread to connect to

  • user_id (str) – The ID of the user making the connection

Returns:

True if the connection was successful, False otherwise

Return type:

bool

Raises:

WebSocketDisconnect – If the connection cannot be established

Examples

>>> manager = ConnectionManager()
>>> success = await manager.connect(websocket, "thread-123", "user-456")
>>> if success:
...     print("Connection established")
async disconnect(websocket, thread_id)

Disconnect a WebSocket from a thread.

Parameters:
  • websocket (fastapi.WebSocket)

  • thread_id (str)

async update_activity(thread_id)

Update last activity timestamp for a thread.

Parameters:

thread_id (str)

class dataflow.api.routes.agent_routes.WSMessage(/, **data)

Bases: pydantic.BaseModel

WebSocket message format for agent communication.

This model defines the standard format for all messages exchanged over the WebSocket connection. It provides a consistent structure with metadata for message handling and tracking.

Parameters:

data (Any)

type

The type of message (from WSMessageType enum)

content

The actual message content (type depends on message type)

thread_id

Optional ID for persistent chat threads

stream_index

Optional index for streaming response chunks

timestamp

When the message was created (defaults to current time)

Examples

>>> message = WSMessage(
...     type=WSMessageType.MESSAGE,
...     content="Analyze this text",
...     thread_id="thread-123"
... )
>>> json_str = message.json()

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

class dataflow.api.routes.agent_routes.WSMessageType

Bases: str, enum.Enum

WebSocket message types for agent communication.

This enumeration defines the different types of messages that can be exchanged over the WebSocket connection. Each type has a specific purpose and expected format.

MESSAGE

User message sent to the agent

RESPONSE

Agent response sent back to the user

STATUS

System status updates (connection, processing, etc.)

ERROR

Error messages for exception handling

STATE

Incremental agent state updates during processing

STATE_COMPLETE

Final agent state after processing completes

Initialize self. See help(type(self)) for accurate signature.

async dataflow.api.routes.agent_routes.configure_agent(config, chat_config)

Configure agent with LLM settings.

Parameters:
  • config (haive.core.engine.base.agent_config.AgentConfig)

  • chat_config (AgentChatConfig)

Return type:

haive.core.engine.base.agent_config.AgentConfig

dataflow.api.routes.agent_routes.get_user_from_token(token)

Validate JWT token and return user ID.

Parameters:

token (str)

Return type:

str | None

async dataflow.api.routes.agent_routes.load_agent_config(agent_name, user_id, thread_id)

Load agent configuration from package.

Parameters:
  • agent_name (str)

  • user_id (str)

  • thread_id (str)

Return type:

haive.core.engine.base.agent_config.AgentConfig | None

async dataflow.api.routes.agent_routes.reset_thread(thread_id, user_id=Depends(require_auth))

Reset/clear a chat thread.

Parameters:
  • thread_id (str)

  • user_id (str)

async dataflow.api.routes.agent_routes.websocket_chat_endpoint(websocket, agent_name, token=Query(..., description='JWT authentication token'), thread_id=Query(None, description='Existing thread ID for persistence'), config=Query(None, description='JSON encoded chat configuration'))

WebSocket endpoint for real-time chat with an agent.

Parameters:
  • websocket (fastapi.WebSocket) – WebSocket connection

  • agent_name (str) – Name of the agent to chat with

  • token (str) – JWT authentication token

  • thread_id (str | None) – Optional thread ID for persistent chat

  • config (str | None) – Optional JSON-encoded chat configuration