haive.core.persistence.handlers

High-level persistence handling utilities for the Haive framework.

This module provides high-level functions for managing persistence operations, including checkpointer creation, configuration interpretation, state recovery, and thread management. It serves as a convenient interface layer that abstracts away the details of specific persistence implementations.

The utilities in this module are designed to work with both simple configuration dictionaries and full CheckpointerConfig objects, automatically handling fallbacks, error recovery, and sensible defaults. They provide a robust interface for both synchronous and asynchronous usage patterns.

Key functions: - setup_checkpointer: Create appropriate checkpointer based on configuration - get_checkpoint: Retrieve state from persistence - put_checkpoint: Store state in persistence - register_thread: Register a thread for tracking and management

This module enables a more declarative approach to persistence configuration, allowing users to specify what they want rather than how to implement it.

Functions

close_async_pool_if_needed(checkpointer[, pool])

Close an async PostgreSQL connection pool if it was previously opened.

close_async_pool_if_needed(checkpointer[, pool])

Close an async PostgreSQL connection pool if it was previously opened.

close_pool_if_needed(checkpointer[, pool])

Close a PostgreSQL connection pool if it was previously opened.

ensure_async_pool_open(checkpointer)

Ensure that any async PostgreSQL connection pool is properly opened.

ensure_pool_open(checkpointer)

Ensure that any PostgreSQL connection pool is properly opened.

get_thread_id_from_config(config)

Extract thread_id from a RunnableConfig.

prepare_merged_input(input_data[, previous_state, ...])

Process input data and merge with previous state if available.

register_async_thread_if_needed(checkpointer, thread_id)

Register a thread in the persistence system asynchronously if needed.

register_async_thread_if_needed(checkpointer, thread_id)

Register a thread in the persistence system asynchronously if needed.

register_thread_if_needed(checkpointer, thread_id[, ...])

Register a thread in the persistence system if needed.

setup_async_checkpointer(config)

Set up the appropriate async checkpointer based on persistence configuration.

setup_checkpointer(config)

Set up the appropriate checkpointer based on persistence configuration.

Module Contents

async haive.core.persistence.handlers.close_async_pool_if_needed(checkpointer, pool=None)[source]

Close an async PostgreSQL connection pool if it was previously opened.

This should be called in finally blocks after async operations.

Parameters:
  • checkpointer (Any) – The checkpointer to check

  • pool (Any) – The pool to close. If None, will try to find the pool from the checkpointer.

Return type:

None

async haive.core.persistence.handlers.close_async_pool_if_needed(checkpointer, pool=None)[source]

Close an async PostgreSQL connection pool if it was previously opened.

This should be called in finally blocks after operations.

Parameters:
  • checkpointer (Any) – The checkpointer to check

  • pool (Any) – The pool to close. If None, will try to find the pool from the checkpointer.

Return type:

None

haive.core.persistence.handlers.close_pool_if_needed(checkpointer, pool=None)[source]

Close a PostgreSQL connection pool if it was previously opened.

This should be called in finally blocks after operations.

Parameters:
  • checkpointer (Any) – The checkpointer to check

  • pool (Any) – The pool to close. If None, will try to find the pool from the checkpointer.

Return type:

None

async haive.core.persistence.handlers.ensure_async_pool_open(checkpointer)[source]

Ensure that any async PostgreSQL connection pool is properly opened.

This should be called before any async operation that uses the checkpointer.

Parameters:

checkpointer (Any) – The checkpointer to check

Returns:

The opened pool if one was found and opened, None otherwise

Return type:

Any | None

haive.core.persistence.handlers.ensure_pool_open(checkpointer)[source]

Ensure that any PostgreSQL connection pool is properly opened.

This should be called before any operation that uses the checkpointer.

Parameters:

checkpointer (Any) – The checkpointer to check

Returns:

The opened pool if one was found and opened, None otherwise

Return type:

Any | None

haive.core.persistence.handlers.get_thread_id_from_config(config)[source]

Extract thread_id from a RunnableConfig.

Parameters:

config (dict[str, Any]) – Configuration to extract from

Returns:

Thread ID if found, None otherwise

Return type:

str | None

haive.core.persistence.handlers.prepare_merged_input(input_data, previous_state=None, runtime_config=None, input_schema=None, state_schema=None)[source]

Process input data and merge with previous state if available.

Parameters:
  • input_data (str | list[str] | dict[str, Any] | pydantic.BaseModel) – Input data in various formats

  • previous_state (Any | None) – Previous state from checkpointer

  • runtime_config (dict[str, Any] | None) – Runtime configuration

  • input_schema – Schema for input validation

  • state_schema – Schema for state validation

Returns:

Processed input data merged with previous state

Return type:

Any

async haive.core.persistence.handlers.register_async_thread_if_needed(checkpointer, thread_id, metadata=None)[source]

Register a thread in the persistence system asynchronously if needed.

Parameters:
  • checkpointer (Any) – The checkpointer to use

  • thread_id (str) – Thread ID to register

  • metadata (dict[str, Any] | None) – Optional metadata to associate with the thread

Return type:

None

async haive.core.persistence.handlers.register_async_thread_if_needed(checkpointer, thread_id, metadata=None)[source]

Register a thread in the persistence system asynchronously if needed.

Parameters:
  • checkpointer (Any) – The checkpointer to use

  • thread_id (str) – Thread ID to register

  • metadata (dict[str, Any] | None) – Optional metadata to associate with the thread

Return type:

None

haive.core.persistence.handlers.register_thread_if_needed(checkpointer, thread_id, metadata=None)[source]

Register a thread in the persistence system if needed.

Parameters:
  • checkpointer (Any) – The checkpointer to use

  • thread_id (str) – Thread ID to register

  • metadata (dict[str, Any] | None) – Optional metadata to associate with the thread

Return type:

None

async haive.core.persistence.handlers.setup_async_checkpointer(config)[source]

Set up the appropriate async checkpointer based on persistence configuration.

This function analyzes the provided configuration and creates the appropriate async checkpointer based on the persistence settings. It properly handles different checkpointer types with a focus on async PostgreSQL connections.

Parameters:

config (Any) – Configuration containing persistence settings

Returns:

A configured async checkpointer instance

Return type:

Any

haive.core.persistence.handlers.setup_checkpointer(config)[source]

Set up the appropriate checkpointer based on persistence configuration.

This function creates and configures a checkpointer instance based on the provided configuration. It handles a variety of configuration formats, including direct CheckpointerConfig objects and configuration dictionaries embedded in larger config objects.

The function provides intelligent fallbacks and error handling, ensuring that a working checkpointer is always returned - falling back to a memory checkpointer if the requested configuration cannot be satisfied.

Parameters:

config (Any) – Configuration containing persistence settings, which can be: - A direct CheckpointerConfig instance - An object with a ‘persistence’ attribute containing configuration - An object with a ‘persistence’ dictionary specifying type and parameters

Returns:

A configured checkpointer instance ready for use

Return type:

Any

Examples:

# Using a direct config object
from haive.core.persistence import MemoryCheckpointerConfig
memory_config = MemoryCheckpointerConfig()
checkpointer = setup_checkpointer(memory_config)

# Using a config object with persistence attribute
class AgentConfig:
    def __init__(self):
        self.persistence = {"type": "postgres", "db_host": "localhost"}

agent_config = AgentConfig()
checkpointer = setup_checkpointer(agent_config)

# With fallback to memory if configuration fails
try:
    checkpointer = setup_checkpointer({"persistence": {"type": "invalid"}})
    # Will fall back to memory checkpointer
except Exception:
    # Should not reach here - function handles errors internally
    pass