haive.core.persistence.utils¶

Utility functions for the Haive persistence system.

This module provides helper functions for working with checkpointers and their associated resources. It includes utilities for connection pool management, serialization/deserialization of metadata, and other common operations needed across different persistence implementations.

The utilities are designed to be used by the persistence system internals and generally aren’t intended to be used directly by application code. They provide consistent behavior across different checkpointer implementations and handle edge cases and error conditions gracefully.

Functions¶

deserialize_metadata(metadata_str)

Deserialize metadata from JSON string to dictionary.

ensure_async_pool_open(checkpointer)

Ensure that an async PostgreSQL connection pool is properly opened.

ensure_pool_open(checkpointer)

Ensure that a PostgreSQL connection pool is properly opened.

register_thread(checkpointer, thread_id[, metadata])

Register a thread in the PostgreSQL database if needed.

register_thread_async(checkpointer, thread_id[, metadata])

Register a thread in the PostgreSQL database asynchronously.

serialize_metadata(metadata)

Serialize metadata dictionary to JSON string.

Module Contents¶

haive.core.persistence.utils.deserialize_metadata(metadata_str)[source]¶

Deserialize metadata from JSON string to dictionary.

Parameters:

metadata_str (str) – JSON string containing serialized metadata

Returns:

Deserialized metadata dictionary

Return type:

Dict[str, Any]

async haive.core.persistence.utils.ensure_async_pool_open(checkpointer)[source]¶

Ensure that an async PostgreSQL connection pool is properly opened.

This asynchronous function checks if an async checkpointer has an associated connection pool and ensures that it’s properly opened. It handles different async pool implementations and versions, checking appropriate attributes and calling the async open method if needed.

The function is particularly important for async contexts, where proper connection management is critical for maintaining good performance and resource utilization. It prevents errors from closed or unopened pools in async code.

Parameters:

checkpointer (Any) – The async checkpointer instance to check for a connection pool

Returns:

The opened async connection pool if one was found and

opened, None otherwise

Return type:

Optional[Any]

Note

This function gracefully handles the case where the async PostgreSQL dependencies are not available, making it safe to call even if the async database modules are not installed.

Examples

async def prepare_checkpointer(checkpointer):

# Ensure the pool is open before using it pool = await ensure_async_pool_open(checkpointer) if pool:

print(“Pool is ready for use”)

# Continue with checkpointer operations…

haive.core.persistence.utils.ensure_pool_open(checkpointer)[source]¶

Ensure that a PostgreSQL connection pool is properly opened.

This function checks if a checkpointer has an associated connection pool and ensures that it’s properly opened. It handles different pool implementations and versions, checking appropriate attributes and calling the open method if needed.

The function is used to ensure that connection pools are ready for use before attempting database operations, preventing errors from closed or unopened pools.

Parameters:

checkpointer (Any) – The checkpointer instance to check for a connection pool

Returns:

The opened connection pool if one was found and opened,

None otherwise

Return type:

Optional[Any]

Note

This function gracefully handles the case where psycopg_pool is not available, making it safe to call even if the PostgreSQL dependencies are not installed.

haive.core.persistence.utils.register_thread(checkpointer, thread_id, metadata=None)[source]¶

Register a thread in the PostgreSQL database if needed.

Parameters:
  • checkpointer (Any)

  • thread_id (str)

  • metadata (dict[str, Any] | None)

Return type:

bool

async haive.core.persistence.utils.register_thread_async(checkpointer, thread_id, metadata=None)[source]¶

Register a thread in the PostgreSQL database asynchronously.

Parameters:
  • checkpointer (Any)

  • thread_id (str)

  • metadata (dict[str, Any] | None)

Return type:

bool

haive.core.persistence.utils.serialize_metadata(metadata)[source]¶

Serialize metadata dictionary to JSON string.

Parameters:

metadata (dict[str, Any]) – Dictionary containing metadata to serialize

Returns:

JSON string representation of the metadata

Return type:

str