dataflow.api.game_socket

WebSocket server for game state streaming with Supabase integration.

This module provides a general-purpose WebSocket server that can stream game state for any agent-based game in the Haive framework. It supports:

  • Real-time state updates

  • Supabase persistence integration with RLS

  • Player move submissions

  • AI move requests

  • Multiple game types

  • Authentication and user management

The WebSocket server can be integrated with any game agent implementation that follows the standard Haive agent interface.

Classes

GameSocketFactory

Factory for creating game-specific socket servers.

GameSocketServer

General-purpose WebSocket server for game state streaming.

Module Contents

class dataflow.api.game_socket.GameSocketFactory

Factory for creating game-specific socket servers.

This class creates specialized socket servers for different game types, with appropriate message handling and state management for each game.

Examples

# Create a chess socket server chess_socket = GameSocketFactory.create_chess_socket(app)

# Or create a custom socket server custom_socket = GameSocketFactory.create_socket(

app, agent_class=ChessAgent, state_schema=ChessState, route_prefix=”/ws/chess”

)

static create_chess_socket(app)

Create a chess-specific socket server.

Parameters:

app (fastapi.FastAPI) – The FastAPI application

Returns:

A configured GameSocketServer instance for chess

Return type:

GameSocketServer

static create_connect4_socket(app)

Create a Connect4-specific socket server.

Parameters:

app (fastapi.FastAPI) – The FastAPI application

Returns:

A configured GameSocketServer instance for Connect4

Return type:

GameSocketServer

static create_socket(app, agent_class, state_schema, route_prefix='/ws/games')

Create a game socket server for any agent and state schema.

Parameters:
  • app (fastapi.FastAPI) – The FastAPI application

  • agent_class (type[haive.core.engine.agent.agent.Agent]) – The agent class for the game

  • state_schema (type[haive.core.schema.state_schema.StateSchema]) – The state schema for the game

  • route_prefix (str) – The URL prefix for WebSocket routes

Returns:

A configured GameSocketServer instance

Return type:

GameSocketServer

static create_tic_tac_toe_socket(app)

Create a Tic Tac Toe-specific socket server.

Parameters:

app (fastapi.FastAPI) – The FastAPI application

Returns:

A configured GameSocketServer instance for Tic Tac Toe

Return type:

GameSocketServer

class dataflow.api.game_socket.GameSocketServer(app, agent_class, state_schema, route_prefix='/ws/games')

General-purpose WebSocket server for game state streaming.

This class provides a WebSocket server that can be integrated with any game agent implementation to stream game state updates in real-time. It handles connection management, message routing, and state updates.

app

The FastAPI application to add routes to

Type:

FastAPI

agent_class

The agent class for the game

Type:

Type[Agent]

state_schema

The state schema for the game

Type:

Type[StateSchema]

active_connections

Set of active WebSocket connections

Type:

Set[WebSocket]

connection_thread_map

Map of connections to thread IDs

Type:

Dict[WebSocket, str]

agents

Map of thread IDs to agent instances

Type:

Dict[str, Agent]

Initialize the game socket server.

Parameters:
  • app (fastapi.FastAPI) – The FastAPI application to add routes to

  • agent_class (type[haive.core.engine.agent.agent.Agent]) – The agent class for the game

  • state_schema (type[haive.core.schema.state_schema.StateSchema]) – The state schema for the game

  • route_prefix (str) – The URL prefix for WebSocket routes

async broadcast_to_thread(thread_id, message)

Broadcast a message to all connections for a thread.

Parameters:
cleanup(thread_id=None)

Clean up resources.

Parameters:

thread_id (str | None)

get_or_create_agent(thread_id, config_overrides=None)

Get or create an agent for a thread ID.

Parameters:
  • thread_id (str)

  • config_overrides (dict[str, Any] | None)

Return type:

haive.core.engine.agent.agent.Agent

register_connection(websocket, thread_id)

Register a WebSocket connection.

Parameters:
  • websocket (fastapi.WebSocket)

  • thread_id (str)

unregister_connection(websocket)

Unregister a WebSocket connection.

Parameters:

websocket (fastapi.WebSocket)