dataflow.game_agent

Attributes

Classes

AgentManager

Manages agent instances and database connections.

AgentRequest

Base model for agent requests.

AgentResponseBase

!!! abstract "Usage Documentation"

CheckpointDB

Utilities for working with checkpoint database.

CheckpointInfo

Information about a checkpoint.

GenericAgentAPI

Generic API framework for any agent type.

Module Contents

class dataflow.game_agent.AgentManager(agent_class: type, config_class: type, default_persistence: str = 'postgres')

Manages agent instances and database connections.

Parameters:
  • agent_class (type)

  • config_class (type)

  • default_persistence (str)

cleanup(thread_id: str | None = None)

Clean up resources.

Parameters:

thread_id (str | None)

get_active_threads() list[str]

Get list of active thread IDs.

Return type:

list[str]

get_agent_for_connection(websocket: fastapi.WebSocket) Any | None

Get the agent for a WebSocket connection.

Parameters:

websocket (fastapi.WebSocket)

Return type:

Any | None

get_or_create_agent(thread_id: str, config_overrides: dict[str, Any] | None = None) Any

Get or create an agent for a thread ID.

Parameters:
  • thread_id (str)

  • config_overrides (dict[str, Any] | None)

Return type:

Any

register_connection(websocket: fastapi.WebSocket, thread_id: str)

Register a WebSocket connection.

Parameters:
  • websocket (fastapi.WebSocket)

  • thread_id (str)

unregister_connection(websocket: fastapi.WebSocket)

Unregister a WebSocket connection.

Parameters:

websocket (fastapi.WebSocket)

active_connections
agent_class
agents
config_class
connection_thread_map
default_persistence = 'postgres'
class dataflow.game_agent.AgentRequest(/, **data: Any)

Bases: pydantic.BaseModel

Base model for agent requests.

Parameters:

data (Any)

config_overrides: dict[str, Any] | None = None
initial_state: dict[str, Any] | None = None
persistence_type: str = 'postgres'
thread_id: str | None = None
class dataflow.game_agent.AgentResponseBase(/, **data: Any)

Bases: pydantic.BaseModel

!!! abstract “Usage Documentation”

[Models](../concepts/models.md)

A base class for creating Pydantic models.

Parameters:

data (Any)

__class_vars__

The names of the class variables defined on the model.

__private_attributes__

Metadata about the private attributes of the model.

__signature__

The synthesized __init__ [Signature][inspect.Signature] of the model.

__pydantic_complete__

Whether model building is completed, or if there are still undefined fields.

__pydantic_core_schema__

The core schema of the model.

__pydantic_custom_init__

Whether the model has a custom __init__ function.

__pydantic_decorators__

Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.

__pydantic_generic_metadata__

Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.

__pydantic_parent_namespace__

Parent namespace of the model, used for automatic rebuilding of models.

__pydantic_post_init__

The name of the post-init method for the model, if defined.

__pydantic_root_model__

Whether the model is a [RootModel][pydantic.root_model.RootModel].

__pydantic_serializer__

The pydantic-core SchemaSerializer used to dump instances of the model.

__pydantic_validator__

The pydantic-core SchemaValidator used to validate instances of the model.

__pydantic_fields__

A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.

__pydantic_computed_fields__

A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.

__pydantic_extra__

A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to ‘allow’.

__pydantic_fields_set__

The names of fields explicitly set during instantiation.

__pydantic_private__

Values of private attributes set on the model instance.

thread_id: str
timestamp: datetime.datetime = None
class dataflow.game_agent.CheckpointDB

Utilities for working with checkpoint database.

static get_checkpoints(thread_id: str) list[CheckpointInfo]
Async:

Parameters:

thread_id (str)

Return type:

list[CheckpointInfo]

Get checkpoints for a thread.

static get_threads() list[dict[str, Any]]
Async:

Return type:

list[dict[str, Any]]

Get all threads in the database.

class dataflow.game_agent.CheckpointInfo(/, **data: Any)

Bases: pydantic.BaseModel

Information about a checkpoint.

Parameters:

data (Any)

checkpoint_id: str
checkpoint_ns: str | None = None
created_at: datetime.datetime
metadata: dict[str, Any] | None = None
parent_checkpoint_id: str | None = None
thread_id: str
type: str
class dataflow.game_agent.GenericAgentAPI(app_name: str, agent_class: type[T], config_class: type[S], state_schema: type[pydantic.BaseModel] | None = None, response_model: type[pydantic.BaseModel] | None = None, default_persistence: str = 'postgres')

Bases: Generic[T, S]

Generic API framework for any agent type.

Parameters:
  • app_name (str)

  • agent_class (type[T])

  • config_class (type[S])

  • state_schema (type[pydantic.BaseModel] | None)

  • response_model (type[pydantic.BaseModel] | None)

  • default_persistence (str)

run(host: str = '0.0.0.0', port: int = 8000)

Run the API server.

Parameters:
agent_class
agent_manager
app
app_name
config_class
default_persistence = ''
response_model
state_schema = None
dataflow.game_agent.S
dataflow.game_agent.T
dataflow.game_agent.logger