dataflow.api.db¶

Attributes¶

Classes¶

DatabaseManager

Manages database operations for the agent registry.

Module Contents¶

class dataflow.api.db.DatabaseManager(connection_params: dict[str, Any])¶

Manages database operations for the agent registry.

Parameters:

connection_params (dict[str, Any])

close() None¶

Close the database connection.

Return type:

None

connect() bool¶

Connect to the database.

Returns:

True if connection successful, False otherwise

Return type:

bool

create_schema() bool¶

Create the AI schema if it doesn’t exist.

Returns:

True if successful, False otherwise

Return type:

bool

create_tables() bool¶

Create the necessary tables in the AI schema.

Returns:

True if successful, False otherwise

Return type:

bool

get_agent_configs(agent_type: str | None = None) list[dict[str, Any]]¶

Get agent configurations from the database.

Parameters:

agent_type (str | None) – Optional agent type to filter by

Returns:

List of agent configuration dictionaries

Return type:

list[dict[str, Any]]

register_agent_config(name: str, class_obj: Any, agent_type: str = 'agent') bool¶

Register an agent configuration in the database.

Parameters:
  • name (str) – Agent name

  • class_obj (Any) – Agent configuration class

  • agent_type (str) – Type of agent (e.g., ‘agent’, ‘game’)

Returns:

True if successful, False otherwise

Return type:

bool

connection = None¶
connection_params¶
schema_name = 'ai'¶
dataflow.api.db.logger¶