haive.core.persistence.postgres_config¶
PostgreSQL-based persistence implementation for the Haive framework.
This module provides a PostgreSQL-backed checkpoint persistence implementation that stores state data in a PostgreSQL database. This allows for durable, reliable state persistence across application restarts and deployments.
The PostgreSQL implementation offers advanced features including connection pooling, automatic retry with exponential backoff, comprehensive error handling, and support for both synchronous and asynchronous operation modes. It integrates with LangGraph’s checkpoint system while adding enhanced robustness and configurability.
For production deployments, the PostgreSQL implementation is generally recommended over in-memory or SQLite options due to its scalability, reliability, and concurrent access capabilities.
Classes¶
Configuration for PostgreSQL-based checkpoint persistence. |
Module Contents¶
- class haive.core.persistence.postgres_config.PostgresCheckpointerConfig[source]¶
Bases:
haive.core.persistence.base.CheckpointerConfig
[dict
[str
,Any
]]Configuration for PostgreSQL-based checkpoint persistence.
This implementation provides a robust, production-ready persistence solution using PostgreSQL as the storage backend. It offers comprehensive configuration options for database connections, connection pooling, security, and performance tuning.
PostgreSQL persistence is recommended for production deployments where durability, reliability, and concurrent access are important. It supports both full history tracking and space-efficient shallow mode that only retains the most recent state.
Key features include:
Connection pooling for optimal performance under load
Automatic retry with exponential backoff for resilience
Comprehensive security options including SSL/TLS support
Support for both synchronous and asynchronous operation
Transaction management and prepared statement optimization
Thread registration for tracking active sessions
Support for both full history and shallow (latest-only) storage modes
The implementation maintains connection pools separately for synchronous and asynchronous usage, ensuring optimal performance in both contexts. It also includes table setup and validation to ensure the database schema is properly configured.
Examples
from haive.core.persistence import PostgresCheckpointerConfig from haive.core.persistence.types import CheckpointerMode, CheckpointStorageMode
# Create a basic PostgreSQL checkpointer config = PostgresCheckpointerConfig(
db_host=”localhost”, db_port=5432, db_name=”haive”, db_user=”postgres”, db_pass=”secure_password”, ssl_mode=”require”, mode=CheckpointerMode.ASYNC, storage_mode=CheckpointStorageMode.SHALLOW
)
# For async usage async def setup():
async_checkpointer = await config.create_async_checkpointer() # Use the checkpointer…
Notes
Requires the psycopg and psycopg_pool packages to be installed
For best performance, use connection pooling with appropriate sizing
Consider shallow mode for applications that don’t need full history
- async create_async_checkpointer()[source]¶
Create an asynchronous PostgreSQL checkpointer.
This method creates and configures an asynchronous PostgreSQL checkpointer that matches the settings in this configuration. It handles async connection pool creation, checkpointer initialization, and database table setup.
The method automatically selects the appropriate implementation based on the storage_mode setting (full or shallow). It uses the asynchronous PostgreSQL driver and connection pool for non-blocking database operations.
- Returns:
A configured AsyncPostgresSaver or AsyncShallowPostgresSaver instance
- Return type:
Any
- Raises:
RuntimeError – If the asynchronous PostgreSQL dependencies are missing or connection fails
Examples
- config = PostgresCheckpointerConfig(
db_host=”localhost”, db_port=5432, mode=CheckpointerMode.ASYNC, storage_mode=CheckpointStorageMode.FULL
)
- async def setup_graph():
- try:
# Creates an AsyncPostgresSaver instance async_checkpointer = await config.create_async_checkpointer()
# Use with an async graph graph = AsyncGraph(checkpointer=async_checkpointer) return graph
- except RuntimeError as e:
print(f”Failed to create async PostgreSQL checkpointer: {e}”) # Handle error
Note
This method automatically forces the mode to ASYNC for consistency, ensuring that the configuration accurately reflects the type of checkpointer being created.
- create_checkpointer()[source]¶
Create a synchronous PostgreSQL checkpointer.
This method creates and configures a synchronous PostgreSQL checkpointer that matches the settings in this configuration. It handles connection pool creation, checkpointer initialization, and database table setup.
The method automatically selects the appropriate implementation based on the storage_mode setting (full or shallow), and performs error checking to ensure the requested configuration is valid.
- Returns:
A configured PostgresSaver or ShallowPostgresSaver instance
- Return type:
Any
- Raises:
RuntimeError – If async mode is requested (use create_async_checkpointer instead)
RuntimeError – If the PostgreSQL dependencies are missing or connection fails
Examples
- config = PostgresCheckpointerConfig(
db_host=”localhost”, db_port=5432, storage_mode=CheckpointStorageMode.SHALLOW
- try:
# Creates a ShallowPostgresSaver instance checkpointer = config.create_checkpointer()
# Use with a graph graph = Graph(checkpointer=checkpointer)
- except RuntimeError as e:
print(f”Failed to create PostgreSQL checkpointer: {e}”) # Handle error - perhaps fall back to memory checkpointer
- get_connection_kwargs()[source]¶
Get connection keyword arguments for PostgreSQL connections.
This method constructs a dictionary of connection options to be passed to the PostgreSQL connection pool and individual connections. It combines the standard configuration parameters with any additional custom parameters specified in connection_kwargs.
The options include settings for transaction management, prepared statement handling, and timeout configuration, which can significantly impact performance and reliability.
- Returns:
- Dictionary of connection options ready to use with
PostgreSQL connections or connection pools
- Return type:
Dict[str, Any]
Examples
- config = PostgresCheckpointerConfig(
auto_commit=True, prepare_threshold=5, connection_kwargs={“application_name”: “haive_app”}
kwargs = config.get_connection_kwargs() # kwargs = { # “autocommit”: True, # “prepare_threshold”: 5, # “application_name”: “haive_app” # }
- get_connection_uri()[source]¶
Generate a formatted connection URI for PostgreSQL.
This method constructs a properly formatted PostgreSQL connection string based on the configured connection parameters. It handles proper escaping of special characters in passwords and formatting according to PostgreSQL standards.
The method prioritizes using a direct connection string if one is provided, otherwise it builds the string from individual connection parameters.
- Returns:
Formatted PostgreSQL connection string ready for use
- Return type:
Examples
- config = PostgresCheckpointerConfig(
db_host=”db.example.com”, db_port=5432, db_name=”haive”, db_user=”app_user”, db_pass=”secret_password”, ssl_mode=”require”
uri = config.get_connection_uri() # uri = “postgresql://app_user:secret_password@db.example.com:5432/haive?sslmode=require”
- async initialize_async_checkpointer()[source]¶
Initialize an async checkpointer with proper resource management.
This method creates and initializes an asynchronous PostgreSQL checkpointer with proper resource lifecycle management using an async context manager. This ensures that database connections are properly closed when they’re no longer needed, preventing connection leaks and other resource issues.
Unlike create_async_checkpointer, which returns a raw checkpointer instance, this method returns an async context manager that automatically handles resource cleanup when the context is exited, making it ideal for use in production environments.
- Returns:
- An async context manager that yields a configured checkpointer
and automatically cleans up resources on exit
- Return type:
Any
- Raises:
RuntimeError – If the asynchronous PostgreSQL dependencies are missing or connection fails
Examples
- config = PostgresCheckpointerConfig(
db_host=”localhost”, db_port=5432, mode=CheckpointerMode.ASYNC
)
- async def run_with_managed_resources():
# Resources will be properly initialized and cleaned up async with await config.initialize_async_checkpointer() as checkpointer:
# Use checkpointer with async code graph = AsyncGraph(checkpointer=checkpointer) # Run operations with graph…
# Connection pool is automatically closed here
Note
This is the recommended method for asynchronous usage in production environments, as it ensures proper resource cleanup even if errors occur.
- is_async_mode()[source]¶
Check if this configuration is set to operate in asynchronous mode.
This method determines whether the PostgreSQL checkpointer should use asynchronous operations based on the configured mode. It affects which connection pools and checkpointer implementations are used.
For PostgreSQL, this is an important distinction as it determines whether synchronous or asynchronous database drivers are used, which have different connection management patterns and performance characteristics.
- Returns:
True if configured for async operations, False for synchronous
- Return type: