haive.core.persistence.handlers¶
High-level persistence handling utilities for the Haive framework.
This module provides high-level functions for managing persistence operations, including checkpointer creation, configuration interpretation, state recovery, and thread management. It serves as a convenient interface layer that abstracts away the details of specific persistence implementations.
The utilities in this module are designed to work with both simple configuration dictionaries and full CheckpointerConfig objects, automatically handling fallbacks, error recovery, and sensible defaults. They provide a robust interface for both synchronous and asynchronous usage patterns.
Key functions: - setup_checkpointer: Create appropriate checkpointer based on configuration - get_checkpoint: Retrieve state from persistence - put_checkpoint: Store state in persistence - register_thread: Register a thread for tracking and management
This module enables a more declarative approach to persistence configuration, allowing users to specify what they want rather than how to implement it.
Functions¶
|
Close an async PostgreSQL connection pool if it was previously opened. |
|
Close an async PostgreSQL connection pool if it was previously opened. |
|
Close a PostgreSQL connection pool if it was previously opened. |
|
Ensure that any async PostgreSQL connection pool is properly opened. |
|
Ensure that any PostgreSQL connection pool is properly opened. |
|
Extract thread_id from a RunnableConfig. |
|
Process input data and merge with previous state if available. |
|
Register a thread in the persistence system asynchronously if needed. |
|
Register a thread in the persistence system asynchronously if needed. |
|
Register a thread in the persistence system if needed. |
|
Set up the appropriate async checkpointer based on persistence configuration. |
|
Set up the appropriate checkpointer based on persistence configuration. |
Module Contents¶
- async haive.core.persistence.handlers.close_async_pool_if_needed(checkpointer, pool=None)[source]¶
Close an async PostgreSQL connection pool if it was previously opened.
This should be called in finally blocks after async operations.
- Parameters:
checkpointer (Any) – The checkpointer to check
pool (Any) – The pool to close. If None, will try to find the pool from the checkpointer.
- Return type:
None
- async haive.core.persistence.handlers.close_async_pool_if_needed(checkpointer, pool=None)[source]¶
Close an async PostgreSQL connection pool if it was previously opened.
This should be called in finally blocks after operations.
- Parameters:
checkpointer (Any) – The checkpointer to check
pool (Any) – The pool to close. If None, will try to find the pool from the checkpointer.
- Return type:
None
- haive.core.persistence.handlers.close_pool_if_needed(checkpointer, pool=None)[source]¶
Close a PostgreSQL connection pool if it was previously opened.
This should be called in finally blocks after operations.
- Parameters:
checkpointer (Any) – The checkpointer to check
pool (Any) – The pool to close. If None, will try to find the pool from the checkpointer.
- Return type:
None
- async haive.core.persistence.handlers.ensure_async_pool_open(checkpointer)[source]¶
Ensure that any async PostgreSQL connection pool is properly opened.
This should be called before any async operation that uses the checkpointer.
- Parameters:
checkpointer (Any) – The checkpointer to check
- Returns:
The opened pool if one was found and opened, None otherwise
- Return type:
Any | None
- haive.core.persistence.handlers.ensure_pool_open(checkpointer)[source]¶
Ensure that any PostgreSQL connection pool is properly opened.
This should be called before any operation that uses the checkpointer.
- Parameters:
checkpointer (Any) – The checkpointer to check
- Returns:
The opened pool if one was found and opened, None otherwise
- Return type:
Any | None
- haive.core.persistence.handlers.get_thread_id_from_config(config)[source]¶
Extract thread_id from a RunnableConfig.
- haive.core.persistence.handlers.prepare_merged_input(input_data, previous_state=None, runtime_config=None, input_schema=None, state_schema=None)[source]¶
Process input data and merge with previous state if available.
- Parameters:
input_data (str | list[str] | dict[str, Any] | pydantic.BaseModel) – Input data in various formats
previous_state (Any | None) – Previous state from checkpointer
runtime_config (dict[str, Any] | None) – Runtime configuration
input_schema – Schema for input validation
state_schema – Schema for state validation
- Returns:
Processed input data merged with previous state
- Return type:
Any
- async haive.core.persistence.handlers.register_async_thread_if_needed(checkpointer, thread_id, metadata=None)[source]¶
Register a thread in the persistence system asynchronously if needed.
- async haive.core.persistence.handlers.register_async_thread_if_needed(checkpointer, thread_id, metadata=None)[source]¶
Register a thread in the persistence system asynchronously if needed.
- haive.core.persistence.handlers.register_thread_if_needed(checkpointer, thread_id, metadata=None)[source]¶
Register a thread in the persistence system if needed.
- async haive.core.persistence.handlers.setup_async_checkpointer(config)[source]¶
Set up the appropriate async checkpointer based on persistence configuration.
This function analyzes the provided configuration and creates the appropriate async checkpointer based on the persistence settings. It properly handles different checkpointer types with a focus on async PostgreSQL connections.
- Parameters:
config (Any) – Configuration containing persistence settings
- Returns:
A configured async checkpointer instance
- Return type:
Any
- haive.core.persistence.handlers.setup_checkpointer(config)[source]¶
Set up the appropriate checkpointer based on persistence configuration.
This function creates and configures a checkpointer instance based on the provided configuration. It handles a variety of configuration formats, including direct CheckpointerConfig objects and configuration dictionaries embedded in larger config objects.
The function provides intelligent fallbacks and error handling, ensuring that a working checkpointer is always returned - falling back to a memory checkpointer if the requested configuration cannot be satisfied.
- Parameters:
config (Any) – Configuration containing persistence settings, which can be: - A direct CheckpointerConfig instance - An object with a ‘persistence’ attribute containing configuration - An object with a ‘persistence’ dictionary specifying type and parameters
- Returns:
A configured checkpointer instance ready for use
- Return type:
Any
Examples:
# Using a direct config object from haive.core.persistence import MemoryCheckpointerConfig memory_config = MemoryCheckpointerConfig() checkpointer = setup_checkpointer(memory_config) # Using a config object with persistence attribute class AgentConfig: def __init__(self): self.persistence = {"type": "postgres", "db_host": "localhost"} agent_config = AgentConfig() checkpointer = setup_checkpointer(agent_config) # With fallback to memory if configuration fails try: checkpointer = setup_checkpointer({"persistence": {"type": "invalid"}}) # Will fall back to memory checkpointer except Exception: # Should not reach here - function handles errors internally pass