haive.core.persistence.supabase_config

Supabase-based persistence implementation for the Haive framework.

This module provides a Supabase-backed checkpoint persistence implementation that stores state data in a Supabase Postgres database. This allows for cloud-based, scalable state persistence with built-in security features like Row Level Security (RLS) policies.

Supabase offers a fully-managed Postgres database service with authentication, realtime features, and other cloud infrastructure benefits. This implementation leverages these capabilities to provide a production-ready persistence solution with proper relational design and security policies.

Key advantages of the Supabase implementation include: - Cloud-hosted and fully-managed database with high availability - Built-in authentication and security features - Realtime capabilities for live state updates - Compatibility with both synchronous and asynchronous operations - Support for both full history and shallow storage modes - Automatic schema creation and management with migrations

Classes

SupabaseCheckpointerConfig

Configuration for Supabase-based checkpoint persistence.

SupabaseSaver

A LangGraph-compatible checkpointer implementation using Supabase.

Functions

get_supabase_client()

Lazy import of supabase client to avoid heavy initialization.

sanitize_sql(sql)

Basic SQL sanitization without dataflow dependency.

Module Contents

class haive.core.persistence.supabase_config.SupabaseCheckpointerConfig(/, **data)

Bases: haive.core.persistence.base.CheckpointerConfig

Configuration for Supabase-based checkpoint persistence.

This class provides a comprehensive configuration for using Supabase as a cloud-based persistence backend for agent state. It leverages Supabase’s managed PostgreSQL service with additional security features like Row Level Security (RLS) policies and authentication integration.

Supabase offers a fully-managed database service with cloud infrastructure benefits, making it ideal for production deployments where scalability, reliability, and security are paramount. The implementation includes proper relational database design with appropriate indexes and constraints for optimal performance.

Key features include:

  • Cloud-hosted and fully-managed PostgreSQL database

  • Row Level Security (RLS) policies for multi-tenant data isolation

  • Authentication integration for secure access control

  • Proper relational design with foreign key relationships

  • Support for both full and shallow storage modes

  • Efficient indexing for optimal query performance

  • Optional integration with shared Supabase clients

The implementation is particularly well-suited for:

  • Multi-tenant SaaS applications requiring data isolation

  • Production deployments needing cloud infrastructure benefits

  • Applications requiring secure data access controls

  • Environments needing scalable, managed database services

Example:

# Create a Supabase checkpointer with direct credentials
config = SupabaseCheckpointerConfig(
    supabase_url="https://your-project.supabase.co",
    supabase_key="your-api-key",
    user_id="user-123"  # For RLS policies
)

# Or use shared client from dataflow module (if available)
config = SupabaseCheckpointerConfig(
    user_id="user-123"  # Only need user_id when using shared client
)

# Create a checkpointer
checkpointer = config.create_checkpointer()

Note

Requires the supabase-py package to be installed. For shared client functionality, the haive.dataflow.db.supabase module must be available.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:

data (Any)

close()

Close any resources associated with this checkpointer.

This is a no-op for Supabase as HTTP clients don’t need explicit closing.

Return type:

None

create_checkpointer()

Create a Supabase checkpointer based on this configuration.

This method instantiates and returns a SupabaseSaver object configured with the Supabase credentials and settings specified in this configuration. It handles the creation of the Supabase client, either directly or by using a shared client from the dataflow module if available.

The method includes robust error handling with automatic fallback to an in-memory checkpointer if the Supabase connection fails. This ensures that the application can continue to function even if the Supabase service is temporarily unavailable.

Returns:

A SupabaseSaver instance ready for use with LangGraph,

or a MemorySaver instance as fallback in case of errors

Return type:

Any

Example:

config = SupabaseCheckpointerConfig(
    supabase_url="https://your-project.supabase.co",
    supabase_key="your-api-key",
    user_id="user-123"
)

try:
    # Create the checkpointer
    checkpointer = config.create_checkpointer()

    # Use with a graph
    graph = Graph(checkpointer=checkpointer)
except Exception as e:
    print(f"Error creating Supabase checkpointer: {e}")
    # Handle error...

Note

This method caches the created checkpointer instance, ensuring that multiple calls return the same instance for efficiency. If schema setup is needed, it’s performed only on the first call.

get_checkpoint(config)

Retrieve a checkpoint from the Supabase database.

Parameters:

config (dict[str, Any]) – Configuration with thread_id and optional checkpoint_id

Returns:

The checkpoint data if found, None otherwise

Return type:

dict[str, Any] | None

list_checkpoints(config, limit=None)

List checkpoints for a thread.

Parameters:
  • config (dict[str, Any]) – Configuration with thread_id

  • limit (int | None) – Optional maximum number of checkpoints to return

Returns:

List of (config, checkpoint) tuples

Return type:

list[tuple[dict[str, Any], Any]]

put_checkpoint(config, data, metadata=None)

Store a checkpoint in the Supabase database.

Parameters:
  • config (dict[str, Any]) – Configuration with thread_id and optional checkpoint_id

  • data (Any) – The checkpoint data to store

  • metadata (dict[str, Any] | None) – Optional metadata to associate with the checkpoint

Returns:

Updated config with checkpoint_id

Return type:

dict[str, Any]

register_thread(thread_id, name=None, metadata=None)

Register a thread in the Supabase database.

Parameters:
  • thread_id (str) – The thread ID to register

  • name (str | None) – Optional thread name

  • metadata (dict[str, Any] | None) – Optional metadata dict

Return type:

None

validate_supabase_available()

Validate that Supabase dependencies are available.

Return type:

Self

class haive.core.persistence.supabase_config.SupabaseSaver(supabase_url=None, supabase_key=None, client=None, user_id=None, initialize_schema=True)

A LangGraph-compatible checkpointer implementation using Supabase.

This class provides a robust implementation of the LangGraph checkpointer interface using Supabase as the storage backend. It stores state data in a Supabase Postgres database with proper relational design, security policies, and cloud infrastructure benefits.

The implementation automatically creates and manages the necessary database schema, including tables for threads and checkpoints with appropriate relationships and indexes for optimal performance. It integrates with Supabase’s authentication system and Row Level Security (RLS) policies to ensure data isolation and security.

Key features include:

  • Cloud-hosted and fully-managed database infrastructure

  • Proper relational design with foreign key relationships

  • Row Level Security (RLS) policies for data isolation

  • Authentication integration for secure multi-tenant deployments

  • Efficient storage and retrieval of checkpoint data

  • JSON serialization for flexible data storage

  • Support for both full history and shallow (latest-only) storage modes

This implementation is ideal for production deployments where scalability, reliability, and security are paramount considerations. It offers a balance of performance and features suitable for multi-tenant applications and environments where proper data isolation is required.

Initialize the Supabase saver.

Parameters:
  • supabase_url (str | None) – Supabase project URL (not needed if client provided)

  • supabase_key (str | None) – Supabase API key (not needed if client provided)

  • client (Any | None) – Existing Supabase client (if provided, URL and key are ignored)

  • user_id (str | None) – Optional user ID for RLS policies

  • initialize_schema (bool) – Whether to initialize database schema

delete_thread(thread_id, user_id=None)

Delete a thread and all its checkpoints.

Parameters:
  • thread_id (str) – Thread ID

  • user_id (str | None) – User ID

Returns:

True if successful, False otherwise

Return type:

bool

get(config)

Get a checkpoint from the database.

Parameters:

config (dict[str, Any]) – Configuration with thread_id and optional checkpoint_id

Returns:

Checkpoint data if found, None otherwise

Return type:

dict[str, Any] | None

get_internal_thread_id(thread_id, user_id=None)

Get the internal thread ID from an external thread ID.

Parameters:
  • thread_id (str) – External thread ID

  • user_id (str | None) – User ID

Returns:

Internal thread ID if found, None otherwise

Return type:

str | None

list(config, limit=None, filter=None, before=None)

List checkpoints for a thread.

Parameters:
  • config (dict[str, Any]) – Configuration with thread_id

  • limit (int | None) – Optional maximum number of checkpoints to return

  • filter (dict[str, Any] | None) – Optional filter conditions

  • before (dict[str, Any] | None) – Optional checkpoint to start listing from

Returns:

List of checkpoint tuples

Return type:

list[Any]

put(config, checkpoint, metadata=None, new_versions=None)

Save a checkpoint to the database.

Parameters:
  • config (dict[str, Any]) – Configuration with thread_id and optional checkpoint_id

  • checkpoint (dict[str, Any]) – The checkpoint data to save

  • metadata (dict[str, Any] | None) – Optional metadata to associate with the checkpoint

  • new_versions (dict[str, Any] | None) – Optional channel versions

Returns:

Updated config with checkpoint_id

Return type:

dict[str, Any]

register_thread(thread_id, user_id=None, name=None, metadata=None)

Register a thread in the system.

Parameters:
  • thread_id (str) – Thread ID

  • user_id (str | None) – User ID who owns this thread

  • name (str | None) – Optional thread name

  • metadata (dict[str, Any] | None) – Optional thread metadata

Returns:

The internal database ID for the thread

Return type:

str

register_user(user_id, email=None, metadata=None)

Register a user in the system.

Parameters:
  • user_id (str) – User ID

  • email (str | None) – Optional user email

  • metadata (dict[str, Any] | None) – Optional user metadata

Returns:

The user ID

Return type:

str

setup()

Set up the Supabase database schema.

Creates necessary tables, foreign key relationships, indexes, and RLS policies for secure access.

Return type:

None

haive.core.persistence.supabase_config.get_supabase_client()

Lazy import of supabase client to avoid heavy initialization.

Return type:

Any | None

haive.core.persistence.supabase_config.sanitize_sql(sql)

Basic SQL sanitization without dataflow dependency.

Parameters:

sql (str)

Return type:

str