Source code for haive.core.persistence.sqlite_config

"""SQLite-based persistence implementation for the Haive framework.

from typing import Any, Dict
This module provides a SQLite-backed checkpoint persistence implementation that
stores state data in a local SQLite database file. This allows for durable state
persistence without requiring external database services, making it ideal for
local development, testing, and single-instance deployments.

The SQLite implementation strikes a balance between the simplicity of in-memory
storage and the durability of full database solutions like PostgreSQL. It offers
file-based persistence with minimal setup, while still providing basic thread
tracking and checkpoint management capabilities.

Key advantages of the SQLite implementation include:
- No external dependencies beyond the Python standard library
- Simple file-based storage requiring no separate database service
- Compatibility with both synchronous and asynchronous operations
- Support for both full history and shallow (latest-only) storage modes
- Automatic schema creation and management
"""

import json
import logging
import os
import sqlite3
import uuid
from typing import Any

from pydantic import Field

from haive.core.persistence.base import CheckpointerConfig
from haive.core.persistence.types import CheckpointerType
from haive.core.persistence.utils import deserialize_metadata, serialize_metadata

logger = logging.getLogger(__name__)


[docs] class SQLiteSaver: """A LangGraph-compatible checkpointer implementation using SQLite. This class provides a simple but effective implementation of the LangGraph checkpointer interface using SQLite as the storage backend. It handles state persistence, thread tracking, and checkpoint management through a local SQLite database file. The implementation automatically creates and manages the necessary database schema, including tables for threads and checkpoints. It provides methods for storing and retrieving checkpoint data, managing thread information, and tracking checkpoint relationships. Key features include: - File-based persistence with minimal setup requirements - Support for tracking parent-child relationships between checkpoints - Thread management with metadata and activity tracking - Automatic schema creation and database directory management - Efficient storage and retrieval of checkpoint data - JSON serialization for flexible data storage This implementation is ideal for local development, testing, and single-instance deployments where a full database service like PostgreSQL would be overkill. """ def __init__(self, db_path: str): """Initialize the SQLite saver with a database file path. This constructor sets up the SQLite checkpointer, ensuring the database directory exists and initializing the required schema. It automatically creates the database file if it doesn't exist. Args: db_path: Path to the SQLite database file where state will be stored. This can be an absolute or relative path. The directory structure will be created if it doesn't exist. Examples: # Create a SQLite checkpointer in the 'data' directory saver = SQLiteSaver("data/agent_state.db") # Use with a graph from langgraph.graph import Graph graph = Graph(checkpointer=saver) """ self.db_path = db_path self._ensure_db_dir() self.setup() def _ensure_db_dir(self): """Ensure the database directory exists.""" db_dir = os.path.dirname(self.db_path) if db_dir and not os.path.exists(db_dir): os.makedirs(db_dir, exist_ok=True)
[docs] def setup(self) -> None: """Set up the SQLite database schema. This method creates the necessary database tables for storing checkpoints and thread information if they don't already exist. It establishes the schema structure with appropriate relationships and constraints. The schema includes: 1. A 'threads' table for tracking conversation threads with: - thread_id: Unique identifier for each thread - created_at: Timestamp when the thread was created - last_access: Timestamp of the most recent activity - metadata: JSON blob for storing additional thread information 2. A 'checkpoints' table for storing state checkpoints with: - checkpoint_id: Unique identifier for each checkpoint - thread_id: Foreign key linking to the thread - checkpoint_ns: Namespace for organizing checkpoints - parent_checkpoint_id: For tracking checkpoint relationships - created_at: Timestamp when the checkpoint was created - data: The serialized checkpoint data - metadata: JSON blob for storing additional checkpoint information The method is automatically called during initialization but can also be called explicitly to recreate or validate the schema. """ with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Create threads table cursor.execute( """ CREATE TABLE IF NOT EXISTS threads ( thread_id TEXT PRIMARY KEY, created_at TEXT DEFAULT CURRENT_TIMESTAMP, last_access TEXT DEFAULT CURRENT_TIMESTAMP, metadata TEXT DEFAULT '{}' ) """ ) # Create checkpoints table cursor.execute( """ CREATE TABLE IF NOT EXISTS checkpoints ( checkpoint_id TEXT, thread_id TEXT, checkpoint_ns TEXT DEFAULT '', parent_checkpoint_id TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, data TEXT, metadata TEXT DEFAULT '{}', PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id), FOREIGN KEY (thread_id) REFERENCES threads (thread_id) ON DELETE CASCADE ) """ ) conn.commit()
[docs] def get(self, config: dict[str, Any]) -> dict[str, Any] | None: """Retrieve a checkpoint from the SQLite database. This method retrieves a specific checkpoint from the database based on the provided configuration. It handles extracting the necessary identifiers from the configuration, constructing and executing the appropriate query, and deserializing the retrieved data. The method can retrieve either a specific checkpoint (if checkpoint_id is provided) or the most recent checkpoint for a thread (if only thread_id is specified). Args: config: Configuration dictionary containing: - thread_id: The thread identifier (required) - checkpoint_id: Optional specific checkpoint to retrieve - configurable: Optional nested dictionary with additional parameters Returns: Dict[str, Any]: The checkpoint data if found, including: - channel_values: The actual state data - metadata: Additional information about the checkpoint - id: The checkpoint identifier Or None if no matching checkpoint is found Examples: # Get the latest checkpoint for a thread checkpoint = saver.get({"thread_id": "user_123"}) # Get a specific checkpoint checkpoint = saver.get({ "thread_id": "user_123", "checkpoint_id": "checkpoint_456" }) if checkpoint: # Use the state data state_data = checkpoint["channel_values"] """ thread_id = config["configurable"]["thread_id"] checkpoint_id = config["configurable"].get("checkpoint_id") checkpoint_ns = config["configurable"].get("checkpoint_ns", "") with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() if checkpoint_id: # Get specific checkpoint cursor.execute( """ SELECT data FROM checkpoints WHERE thread_id = ? AND checkpoint_ns = ? AND checkpoint_id = ? """, (thread_id, checkpoint_ns, checkpoint_id), ) else: # Get latest checkpoint cursor.execute( """ SELECT data FROM checkpoints WHERE thread_id = ? AND checkpoint_ns = ? ORDER BY created_at DESC LIMIT 1 """, (thread_id, checkpoint_ns), ) row = cursor.fetchone() if row: return json.loads(row["data"]) return None
[docs] def put( self, config: dict[str, Any], checkpoint: dict[str, Any], metadata: dict[str, Any] | None = None, new_versions: dict[str, Any] | None = None, ) -> dict[str, Any]: """Save a checkpoint to the database. Args: config: Configuration with thread_id and optional checkpoint_id checkpoint: The checkpoint data to save metadata: Optional metadata to associate with the checkpoint new_versions: Optional channel versions (ignored in SQLite implementation) Returns: Updated config with checkpoint_id """ thread_id = config["configurable"]["thread_id"] checkpoint_ns = config["configurable"].get("checkpoint_ns", "") parent_checkpoint_id = config["configurable"].get("checkpoint_id") # Generate a checkpoint ID if not present in the data checkpoint_id = checkpoint.get("id") if not checkpoint_id: checkpoint_id = str(uuid.uuid4()) checkpoint["id"] = checkpoint_id # Serialize data and metadata serialized_data = json.dumps(checkpoint) serialized_metadata = serialize_metadata(metadata or {}) with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Ensure thread exists cursor.execute( """ INSERT OR IGNORE INTO threads (thread_id, last_access) VALUES (?, CURRENT_TIMESTAMP) """, (thread_id,), ) # Update last access time if thread already exists cursor.execute( """ UPDATE threads SET last_access = CURRENT_TIMESTAMP WHERE thread_id = ? """, (thread_id,), ) # Insert checkpoint cursor.execute( """ INSERT INTO checkpoints (checkpoint_id, thread_id, checkpoint_ns, parent_checkpoint_id, data, metadata) VALUES (?, ?, ?, ?, ?, ?) """, ( checkpoint_id, thread_id, checkpoint_ns, parent_checkpoint_id, serialized_data, serialized_metadata, ), ) conn.commit() # Return updated config return { "configurable": { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": checkpoint_id, } }
[docs] def list( self, config: dict[str, Any], limit: int | None = None, filter: dict[str, Any] | None = None, before: dict[str, Any] | None = None, ) -> list[tuple[dict[str, Any], dict[str, Any]]]: """List checkpoints for a thread. Args: config: Configuration with thread_id limit: Optional maximum number of checkpoints to return filter: Optional filter conditions (not implemented) before: Optional checkpoint to start listing from (not implemented) Returns: List of (config, checkpoint) tuples """ thread_id = config["configurable"]["thread_id"] checkpoint_ns = config["configurable"].get("checkpoint_ns", "") with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() query = """ SELECT checkpoint_id, parent_checkpoint_id, data, metadata FROM checkpoints WHERE thread_id = ? AND checkpoint_ns = ? ORDER BY created_at DESC """ params = [thread_id, checkpoint_ns] if limit is not None: query += " LIMIT ?" params.append(limit) cursor.execute(query, params) rows = cursor.fetchall() result = [] for row in rows: checkpoint_data = json.loads(row["data"]) checkpoint_config = { "configurable": { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": row["checkpoint_id"], } } # Create parent config if available parent_config = None if row["parent_checkpoint_id"]: parent_config = { "configurable": { "thread_id": thread_id, "checkpoint_ns": checkpoint_ns, "checkpoint_id": row["parent_checkpoint_id"], } } # Use a namedtuple-like structure to match LangGraph's API class CheckpointTuple: def __init__( self, config, checkpoint, metadata, parent_config, writes=None ): """Init . Args: config: [TODO: Add description] checkpoint: [TODO: Add description] metadata: [TODO: Add description] parent_config: [TODO: Add description] writes: [TODO: Add description] """ self.config = config self.checkpoint = checkpoint self.metadata = metadata self.parent_config = parent_config self.writes = writes or [] metadata = deserialize_metadata(row["metadata"]) result.append( CheckpointTuple( checkpoint_config, checkpoint_data, metadata, parent_config ) ) return result
[docs] class SQLiteCheckpointerConfig(CheckpointerConfig): """Configuration for SQLite-based checkpoint persistence. This class provides a comprehensive configuration for using SQLite as a persistence backend for agent state. It offers the simplicity of file-based storage without requiring external database services, making it ideal for local development, testing, and single-instance deployments. SQLite persistence strikes a balance between the simplicity of in-memory storage and the durability of full database solutions like PostgreSQL. It provides persistent storage across application restarts while requiring minimal setup and configuration. Key features include: - File-based persistence with no external database dependencies - Support for both full and shallow storage modes - Thread registration and tracking with metadata - Checkpoint management with parent-child relationships - Simple configuration with minimal required parameters - Automatic database file and directory creation The implementation is particularly well-suited for: - Local development and testing environments - Single-instance deployments where simplicity is preferred - Applications with modest concurrency requirements - Scenarios where file-based persistence is sufficient Examples: from haive.core.persistence import SQLiteCheckpointerConfig # Create a basic SQLite checkpointer config = SQLiteCheckpointerConfig( db_path="data/agent_state.db" ) # Create a checkpointer checkpointer = config.create_checkpointer() # Use with a graph from langgraph.graph import Graph graph = Graph(checkpointer=checkpointer) Note: While SQLite supports concurrent readers, it has limitations for concurrent writers. For high-concurrency production environments, consider using PostgresCheckpointerConfig instead. """ type: CheckpointerType = CheckpointerType.SQLITE # SQLite configuration db_path: str = Field( default="./checkpoints.db", description="Path to SQLite database file" ) # Runtime settings setup_needed: bool = Field( default=True, description="Whether to initialize DB tables on startup" ) # Internal state (not serialized) checkpointer: Any | None = Field(default=None, exclude=True)
[docs] def create_checkpointer(self) -> Any: """Create a SQLite checkpointer based on this configuration. This method instantiates and returns a SQLiteSaver object configured with the database path specified in this configuration. It caches the created checkpointer instance for reuse, ensuring that multiple calls to this method return the same instance. The method handles the creation of the database file and directory structure if they don't already exist, and initializes the database schema with the required tables. Returns: Any: A SQLiteSaver instance ready for use with LangGraph Examples: config = SQLiteCheckpointerConfig(db_path="data/state.db") checkpointer = config.create_checkpointer() # Use with a graph graph = Graph(checkpointer=checkpointer) """ if self.checkpointer is None: self.checkpointer = SQLiteSaver(self.db_path) return self.checkpointer
[docs] def register_thread( self, thread_id: str, name: str | None = None, metadata: dict[str, Any] | None = None, ) -> None: """Register or update a thread in the SQLite database. This method registers a new thread in the database or updates an existing thread's metadata and last access time. It ensures that the thread entry exists before any checkpoints are created for that thread, maintaining proper database integrity. Thread registration is important for tracking agent conversations and associating metadata with them, such as user information, session data, or other contextual information that might be useful for analytics or debugging. Args: thread_id: Unique identifier for the thread to register or update name: Optional human-readable name for the thread (currently unused) metadata: Optional dictionary of metadata to associate with the thread, which can include any JSON-serializable information relevant to the thread (user info, session data, etc.) Examples: config = SQLiteCheckpointerConfig(db_path="data/state.db") # Register a new thread with metadata config.register_thread( thread_id="user_123", name="John's Conversation", metadata={ "user_id": "user_123", "session_start": "2023-04-01T12:00:00Z", "source": "web_app" } ) # Later, you can use this thread_id with checkpoints config.put_checkpoint( {"configurable": {"thread_id": "user_123"}}, {"key": "value"} ) """ self.create_checkpointer() # Convert metadata to JSON string if provided metadata_json = serialize_metadata(metadata or {}) with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Check if thread already exists cursor.execute("SELECT 1 FROM threads WHERE thread_id = ?", (thread_id,)) thread_exists = cursor.fetchone() is not None if not thread_exists: # Insert new thread cursor.execute( """ INSERT INTO threads (thread_id, metadata) VALUES (?, ?) """, (thread_id, metadata_json), ) logger.info(f"Thread {thread_id} registered in SQLite") else: # Update last access time cursor.execute( """ UPDATE threads SET last_access = CURRENT_TIMESTAMP WHERE thread_id = ? """, (thread_id,), ) logger.debug(f"Thread {thread_id} already exists in SQLite")
[docs] def put_checkpoint( self, config: dict[str, Any], data: Any, metadata: dict[str, Any] | None = None, ) -> dict[str, Any]: """Store a checkpoint in the SQLite database. Args: config: Configuration with thread_id and optional checkpoint_id data: The checkpoint data to store metadata: Optional metadata to associate with the checkpoint Returns: Updated config with checkpoint_id """ checkpointer = self.create_checkpointer() # Structure the data as expected checkpoint_data = { "id": config["configurable"].get( "checkpoint_id", "" ), # Will be auto-generated if empty "channel_values": data, } # Store the checkpoint return checkpointer.put(config, checkpoint_data, metadata)
[docs] def get_checkpoint(self, config: dict[str, Any]) -> dict[str, Any] | None: """Retrieve a checkpoint from the SQLite database. Args: config: Configuration with thread_id and optional checkpoint_id Returns: The checkpoint data if found, None otherwise """ checkpointer = self.create_checkpointer() # Get the checkpoint result = checkpointer.get(config) # Extract and return channel_values if available if result and "channel_values" in result: return result["channel_values"] return result
[docs] def list_checkpoints( self, config: dict[str, Any], limit: int | None = None ) -> list[tuple[dict[str, Any], Any]]: """List checkpoints for a thread. Args: config: Configuration with thread_id limit: Optional maximum number of checkpoints to return Returns: List of (config, checkpoint) tuples """ checkpointer = self.create_checkpointer() # List checkpoints checkpoint_tuples = checkpointer.list(config, limit=limit) # Extract and return (config, channel_values) tuples result = [] for cp in checkpoint_tuples: checkpoint_data = cp.checkpoint channel_values = checkpoint_data.get("channel_values", checkpoint_data) result.append((cp.config, channel_values)) return result
[docs] async def create_async_checkpointer(self) -> Any: """Create an asynchronous checkpointer instance. Note: SQLite doesn't have true async support, so this returns the same checkpointer as create_checkpointer(). Returns: Any: A SQLiteSaver instance (same as synchronous version) """ return self.create_checkpointer()
[docs] async def initialize_async_checkpointer(self) -> Any: """Initialize an async checkpointer. Note: SQLite doesn't require async initialization, so this simply returns the checkpointer instance. Returns: Any: A SQLiteSaver instance ready for use """ return self.create_checkpointer()
[docs] def close(self) -> None: """Close any resources associated with this checkpointer.""" # SQLite connections are closed automatically after each operation self.checkpointer = None