haive.core.persistence.handlers =============================== .. py:module:: haive.core.persistence.handlers .. autoapi-nested-parse:: High-level persistence handling utilities for the Haive framework. This module provides high-level functions for managing persistence operations, including checkpointer creation, configuration interpretation, state recovery, and thread management. It serves as a convenient interface layer that abstracts away the details of specific persistence implementations. The utilities in this module are designed to work with both simple configuration dictionaries and full CheckpointerConfig objects, automatically handling fallbacks, error recovery, and sensible defaults. They provide a robust interface for both synchronous and asynchronous usage patterns. Key functions: - setup_checkpointer: Create appropriate checkpointer based on configuration - get_checkpoint: Retrieve state from persistence - put_checkpoint: Store state in persistence - register_thread: Register a thread for tracking and management This module enables a more declarative approach to persistence configuration, allowing users to specify what they want rather than how to implement it. Functions --------- .. autoapisummary:: haive.core.persistence.handlers.close_async_pool_if_needed haive.core.persistence.handlers.close_async_pool_if_needed haive.core.persistence.handlers.close_pool_if_needed haive.core.persistence.handlers.ensure_async_pool_open haive.core.persistence.handlers.ensure_pool_open haive.core.persistence.handlers.get_thread_id_from_config haive.core.persistence.handlers.prepare_merged_input haive.core.persistence.handlers.register_async_thread_if_needed haive.core.persistence.handlers.register_async_thread_if_needed haive.core.persistence.handlers.register_thread_if_needed haive.core.persistence.handlers.setup_async_checkpointer haive.core.persistence.handlers.setup_checkpointer Module Contents --------------- .. py:function:: close_async_pool_if_needed(checkpointer, pool = None) :async: Close an async PostgreSQL connection pool if it was previously opened. This should be called in finally blocks after async operations. :param checkpointer: The checkpointer to check :param pool: The pool to close. If None, will try to find the pool from the checkpointer. .. py:function:: close_async_pool_if_needed(checkpointer, pool = None) :async: Close an async PostgreSQL connection pool if it was previously opened. This should be called in finally blocks after operations. :param checkpointer: The checkpointer to check :param pool: The pool to close. If None, will try to find the pool from the checkpointer. .. py:function:: close_pool_if_needed(checkpointer, pool = None) Close a PostgreSQL connection pool if it was previously opened. This should be called in finally blocks after operations. :param checkpointer: The checkpointer to check :param pool: The pool to close. If None, will try to find the pool from the checkpointer. .. py:function:: ensure_async_pool_open(checkpointer) :async: Ensure that any async PostgreSQL connection pool is properly opened. This should be called before any async operation that uses the checkpointer. :param checkpointer: The checkpointer to check :returns: The opened pool if one was found and opened, None otherwise .. py:function:: ensure_pool_open(checkpointer) Ensure that any PostgreSQL connection pool is properly opened. This should be called before any operation that uses the checkpointer. :param checkpointer: The checkpointer to check :returns: The opened pool if one was found and opened, None otherwise .. py:function:: get_thread_id_from_config(config) Extract thread_id from a RunnableConfig. :param config: Configuration to extract from :returns: Thread ID if found, None otherwise .. py:function:: prepare_merged_input(input_data, previous_state = None, runtime_config = None, input_schema=None, state_schema=None) Process input data and merge with previous state if available. :param input_data: Input data in various formats :param previous_state: Previous state from checkpointer :param runtime_config: Runtime configuration :param input_schema: Schema for input validation :param state_schema: Schema for state validation :returns: Processed input data merged with previous state .. py:function:: register_async_thread_if_needed(checkpointer, thread_id, metadata = None) :async: Register a thread in the persistence system asynchronously if needed. :param checkpointer: The checkpointer to use :param thread_id: Thread ID to register :param metadata: Optional metadata to associate with the thread .. py:function:: register_async_thread_if_needed(checkpointer, thread_id, metadata = None) :async: Register a thread in the persistence system asynchronously if needed. :param checkpointer: The checkpointer to use :param thread_id: Thread ID to register :param metadata: Optional metadata to associate with the thread .. py:function:: register_thread_if_needed(checkpointer, thread_id, metadata = None) Register a thread in the persistence system if needed. :param checkpointer: The checkpointer to use :param thread_id: Thread ID to register :param metadata: Optional metadata to associate with the thread .. py:function:: setup_async_checkpointer(config) :async: Set up the appropriate async checkpointer based on persistence configuration. This function analyzes the provided configuration and creates the appropriate async checkpointer based on the persistence settings. It properly handles different checkpointer types with a focus on async PostgreSQL connections. :param config: Configuration containing persistence settings :returns: A configured async checkpointer instance .. py:function:: setup_checkpointer(config) Set up the appropriate checkpointer based on persistence configuration. This function creates and configures a checkpointer instance based on the provided configuration. It handles a variety of configuration formats, including direct CheckpointerConfig objects and configuration dictionaries embedded in larger config objects. The function provides intelligent fallbacks and error handling, ensuring that a working checkpointer is always returned - falling back to a memory checkpointer if the requested configuration cannot be satisfied. :param config: Configuration containing persistence settings, which can be: - A direct CheckpointerConfig instance - An object with a 'persistence' attribute containing configuration - An object with a 'persistence' dictionary specifying type and parameters :returns: A configured checkpointer instance ready for use :rtype: Any Examples:: # Using a direct config object from haive.core.persistence import MemoryCheckpointerConfig memory_config = MemoryCheckpointerConfig() checkpointer = setup_checkpointer(memory_config) # Using a config object with persistence attribute class AgentConfig: def __init__(self): self.persistence = {"type": "postgres", "db_host": "localhost"} agent_config = AgentConfig() checkpointer = setup_checkpointer(agent_config) # With fallback to memory if configuration fails try: checkpointer = setup_checkpointer({"persistence": {"type": "invalid"}}) # Will fall back to memory checkpointer except Exception: # Should not reach here - function handles errors internally pass