haive.core.persistence.postgres_config¶

PostgreSQL-based persistence implementation for the Haive framework.

This module provides a PostgreSQL-backed checkpoint persistence implementation that stores state data in a PostgreSQL database. This allows for durable, reliable state persistence across application restarts and deployments.

The PostgreSQL implementation offers advanced features including connection pooling, automatic retry with exponential backoff, comprehensive error handling, and support for both synchronous and asynchronous operation modes. It integrates with LangGraph’s checkpoint system while adding enhanced robustness and configurability.

For production deployments, the PostgreSQL implementation is generally recommended over in-memory or SQLite options due to its scalability, reliability, and concurrent access capabilities.

Classes¶

PostgresCheckpointerConfig

Configuration for PostgreSQL-based checkpoint persistence.

Module Contents¶

class haive.core.persistence.postgres_config.PostgresCheckpointerConfig[source]¶

Bases: haive.core.persistence.base.CheckpointerConfig[dict[str, Any]]

Configuration for PostgreSQL-based checkpoint persistence.

This implementation provides a robust, production-ready persistence solution using PostgreSQL as the storage backend. It offers comprehensive configuration options for database connections, connection pooling, security, and performance tuning.

PostgreSQL persistence is recommended for production deployments where durability, reliability, and concurrent access are important. It supports both full history tracking and space-efficient shallow mode that only retains the most recent state.

Key features include:

  • Connection pooling for optimal performance under load

  • Automatic retry with exponential backoff for resilience

  • Comprehensive security options including SSL/TLS support

  • Support for both synchronous and asynchronous operation

  • Transaction management and prepared statement optimization

  • Thread registration for tracking active sessions

  • Support for both full history and shallow (latest-only) storage modes

The implementation maintains connection pools separately for synchronous and asynchronous usage, ensuring optimal performance in both contexts. It also includes table setup and validation to ensure the database schema is properly configured.

Examples

from haive.core.persistence import PostgresCheckpointerConfig from haive.core.persistence.types import CheckpointerMode, CheckpointStorageMode

# Create a basic PostgreSQL checkpointer config = PostgresCheckpointerConfig(

db_host=”localhost”, db_port=5432, db_name=”haive”, db_user=”postgres”, db_pass=”secure_password”, ssl_mode=”require”, mode=CheckpointerMode.ASYNC, storage_mode=CheckpointStorageMode.SHALLOW

)

# For async usage async def setup():

async_checkpointer = await config.create_async_checkpointer() # Use the checkpointer…

Notes

  • Requires the psycopg and psycopg_pool packages to be installed

  • For best performance, use connection pooling with appropriate sizing

  • Consider shallow mode for applications that don’t need full history

async create_async_checkpointer()[source]¶

Create an asynchronous PostgreSQL checkpointer.

This method creates and configures an asynchronous PostgreSQL checkpointer that matches the settings in this configuration. It handles async connection pool creation, checkpointer initialization, and database table setup.

The method automatically selects the appropriate implementation based on the storage_mode setting (full or shallow). It uses the asynchronous PostgreSQL driver and connection pool for non-blocking database operations.

Returns:

A configured AsyncPostgresSaver or AsyncShallowPostgresSaver instance

Return type:

Any

Raises:

RuntimeError – If the asynchronous PostgreSQL dependencies are missing or connection fails

Examples

config = PostgresCheckpointerConfig(

db_host=”localhost”, db_port=5432, mode=CheckpointerMode.ASYNC, storage_mode=CheckpointStorageMode.FULL

)

async def setup_graph():
try:

# Creates an AsyncPostgresSaver instance async_checkpointer = await config.create_async_checkpointer()

# Use with an async graph graph = AsyncGraph(checkpointer=async_checkpointer) return graph

except RuntimeError as e:

print(f”Failed to create async PostgreSQL checkpointer: {e}”) # Handle error

Note

This method automatically forces the mode to ASYNC for consistency, ensuring that the configuration accurately reflects the type of checkpointer being created.

create_checkpointer()[source]¶

Create a synchronous PostgreSQL checkpointer.

This method creates and configures a synchronous PostgreSQL checkpointer that matches the settings in this configuration. It handles connection pool creation, checkpointer initialization, and database table setup.

The method automatically selects the appropriate implementation based on the storage_mode setting (full or shallow), and performs error checking to ensure the requested configuration is valid.

Returns:

A configured PostgresSaver or ShallowPostgresSaver instance

Return type:

Any

Raises:
  • RuntimeError – If async mode is requested (use create_async_checkpointer instead)

  • RuntimeError – If the PostgreSQL dependencies are missing or connection fails

Examples

config = PostgresCheckpointerConfig(

db_host=”localhost”, db_port=5432, storage_mode=CheckpointStorageMode.SHALLOW

try:

# Creates a ShallowPostgresSaver instance checkpointer = config.create_checkpointer()

# Use with a graph graph = Graph(checkpointer=checkpointer)

except RuntimeError as e:

print(f”Failed to create PostgreSQL checkpointer: {e}”) # Handle error - perhaps fall back to memory checkpointer

get_connection_kwargs()[source]¶

Get connection keyword arguments for PostgreSQL connections.

This method constructs a dictionary of connection options to be passed to the PostgreSQL connection pool and individual connections. It combines the standard configuration parameters with any additional custom parameters specified in connection_kwargs.

The options include settings for transaction management, prepared statement handling, and timeout configuration, which can significantly impact performance and reliability.

Returns:

Dictionary of connection options ready to use with

PostgreSQL connections or connection pools

Return type:

Dict[str, Any]

Examples

config = PostgresCheckpointerConfig(

auto_commit=True, prepare_threshold=5, connection_kwargs={“application_name”: “haive_app”}

kwargs = config.get_connection_kwargs() # kwargs = { # “autocommit”: True, # “prepare_threshold”: 5, # “application_name”: “haive_app” # }

get_connection_uri()[source]¶

Generate a formatted connection URI for PostgreSQL.

This method constructs a properly formatted PostgreSQL connection string based on the configured connection parameters. It handles proper escaping of special characters in passwords and formatting according to PostgreSQL standards.

The method prioritizes using a direct connection string if one is provided, otherwise it builds the string from individual connection parameters.

Returns:

Formatted PostgreSQL connection string ready for use

Return type:

str

Examples

config = PostgresCheckpointerConfig(

db_host=”db.example.com”, db_port=5432, db_name=”haive”, db_user=”app_user”, db_pass=”secret_password”, ssl_mode=”require”

uri = config.get_connection_uri() # uri = “postgresql://app_user:secret_password@db.example.com:5432/haive?sslmode=require”

async initialize_async_checkpointer()[source]¶

Initialize an async checkpointer with proper resource management.

This method creates and initializes an asynchronous PostgreSQL checkpointer with proper resource lifecycle management using an async context manager. This ensures that database connections are properly closed when they’re no longer needed, preventing connection leaks and other resource issues.

Unlike create_async_checkpointer, which returns a raw checkpointer instance, this method returns an async context manager that automatically handles resource cleanup when the context is exited, making it ideal for use in production environments.

Returns:

An async context manager that yields a configured checkpointer

and automatically cleans up resources on exit

Return type:

Any

Raises:

RuntimeError – If the asynchronous PostgreSQL dependencies are missing or connection fails

Examples

config = PostgresCheckpointerConfig(

db_host=”localhost”, db_port=5432, mode=CheckpointerMode.ASYNC

)

async def run_with_managed_resources():

# Resources will be properly initialized and cleaned up async with await config.initialize_async_checkpointer() as checkpointer:

# Use checkpointer with async code graph = AsyncGraph(checkpointer=checkpointer) # Run operations with graph…

# Connection pool is automatically closed here

Note

This is the recommended method for asynchronous usage in production environments, as it ensures proper resource cleanup even if errors occur.

is_async_mode()[source]¶

Check if this configuration is set to operate in asynchronous mode.

This method determines whether the PostgreSQL checkpointer should use asynchronous operations based on the configured mode. It affects which connection pools and checkpointer implementations are used.

For PostgreSQL, this is an important distinction as it determines whether synchronous or asynchronous database drivers are used, which have different connection management patterns and performance characteristics.

Returns:

True if configured for async operations, False for synchronous

Return type:

bool