haive.core.common.mixins.checkpointer_mixin

Checkpointer mixin for stateful graphs and execution persistence.

This module provides a mixin that adds checkpointing capabilities to any class that uses LangGraph or LangChain for stateful execution. It handles both synchronous and asynchronous checkpointing patterns, state restoration, and runtime configuration management.

Usage:

from pydantic import BaseModel, Field from langgraph.graph import StateGraph from haive.core.common.mixins import CheckpointerMixin from haive.core.persistence.config import CheckpointerConfig

class MyAgent(CheckpointerMixin, BaseModel):

# Define the required fields persistence: Optional[CheckpointerConfig] = Field(default=None) checkpoint_mode: str = Field(default=”sync”)

def __init__(self, **data):

super().__init__(**data) # Create graph builder = StateGraph(…) self.app = builder.compile()

# Use run with automatic checkpointing def process(self, input_data, thread_id=None):

return self.run(input_data, thread_id=thread_id)

# Use streaming with automatic checkpointing def process_stream(self, input_data, thread_id=None):

return self.stream(input_data, thread_id=thread_id)

Classes

CheckpointerMixin

Mixin that provides checkpointing capabilities for stateful graph execution.

Module Contents

class haive.core.common.mixins.checkpointer_mixin.CheckpointerMixin(/, **data)[source]

Bases: pydantic.BaseModel

Mixin that provides checkpointing capabilities for stateful graph execution.

This mixin adds methods for running stateful graph executions with checkpointing support, including automatic state restoration, thread management, and proper configuration handling for both synchronous and asynchronous execution patterns.

The mixin expects the host class to provide: - persistence: Optional[CheckpointerConfig] - Configuration for the checkpointer - checkpoint_mode: str - Mode of checkpointing (“sync”, “async”, or “none”) - runnable_config: RunnableConfig - Base configuration for runnables - input_schema, state_schema (optional) - Schemas for input and state validation - app or compile() method - The LangGraph compiled application

Parameters:

data (Any)

None publicly, but requires the above attributes from the host class.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

async arun(input_data, thread_id=None, config=None, **kwargs)[source]

Async run with checkpointer support.

This method runs a graph execution asynchronously with checkpointing support, automatically handling state restoration and persistence.

Parameters:
  • input_data (Any) – The input data for the execution.

  • thread_id (str | None) – Optional thread ID for state tracking.

  • config (langchain_core.runnables.RunnableConfig | None) – Optional configuration override.

  • **kwargs – Additional configuration parameters.

Returns:

The result of the graph execution.

Return type:

Any

async astream(input_data, thread_id=None, stream_mode='values', config=None, **kwargs)[source]

Async stream with checkpointer support.

This method streams graph execution results asynchronously with checkpointing support, automatically handling state restoration and persistence.

Parameters:
  • input_data (Any) – The input data for the execution.

  • thread_id (str | None) – Optional thread ID for state tracking.

  • stream_mode (str) – The streaming mode to use (values, actions, etc.).

  • config (langchain_core.runnables.RunnableConfig | None) – Optional configuration override.

  • **kwargs – Additional configuration parameters.

Yields:

Execution chunks as they become available.

Return type:

collections.abc.AsyncGenerator[dict[str, Any], None]

get_checkpointer(async_mode=False)[source]

Get the appropriate checkpointer.

Parameters:

async_mode (bool) – Whether to return the async checkpointer.

Returns:

The appropriate checkpointer instance or None if not available.

Return type:

Any

run(input_data, thread_id=None, config=None, **kwargs)[source]

Run with checkpointer support.

This method runs a graph execution with checkpointing support, automatically handling state restoration and persistence.

Parameters:
  • input_data (Any) – The input data for the execution.

  • thread_id (str | None) – Optional thread ID for state tracking.

  • config (langchain_core.runnables.RunnableConfig | None) – Optional configuration override.

  • **kwargs – Additional configuration parameters.

Returns:

The result of the graph execution.

Return type:

Any

stream(input_data, thread_id=None, stream_mode='values', config=None, **kwargs)[source]

Stream with checkpointer support.

This method streams graph execution results with checkpointing support, automatically handling state restoration and persistence.

Parameters:
  • input_data (Any) – The input data for the execution.

  • thread_id (str | None) – Optional thread ID for state tracking.

  • stream_mode (str) – The streaming mode to use (values, actions, etc.).

  • config (langchain_core.runnables.RunnableConfig | None) – Optional configuration override.

  • **kwargs – Additional configuration parameters.

Returns:

A generator yielding execution chunks.

Return type:

collections.abc.Generator[dict[str, Any], None, None]

model_config

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].