dataflow.persistence.supabase_adapter¶

Supabase persistence adapter for the Haive framework.

This module provides an adapter for persisting data to Supabase’s PostgreSQL database. It handles the connection management, Row-Level Security (RLS) context, and provides methods for storing and retrieving data.

The adapter integrates with Haive’s core persistence system, specifically the PostgreSQL checkpointer, to provide a consistent interface for data storage and retrieval while respecting Supabase’s security model.

Key features: - RLS context management for proper access control - Connection pooling and management - Checkpointing for LangGraph state persistence - Thread registration for conversation tracking

Typical usage example:

from haive.dataflow.persistence.supabase_adapter import SupabasePersistence

# Create the persistence adapter persistence = SupabasePersistence()

# Register a thread thread_id = await persistence.register_thread(

user_id=”user-123”, metadata={“agent_id”: “agent-456”}

)

# Store a checkpoint await persistence.store_checkpoint(

thread_id=thread_id, checkpoint_id=”checkpoint-1”, state={“key”: “value”}, user_id=”user-123”

)

# Retrieve a checkpoint checkpoint = await persistence.get_checkpoint(

thread_id=thread_id, checkpoint_id=”checkpoint-1”, user_id=”user-123”

)

Classes¶

SupabasePersistence

Adapter for PostgreSQL persistence with Supabase RLS support.

Module Contents¶

class dataflow.persistence.supabase_adapter.SupabasePersistence¶

Adapter for PostgreSQL persistence with Supabase RLS support.

This class provides an adapter for persisting data to Supabase’s PostgreSQL database with proper Row-Level Security (RLS) context management. It handles the complexities of setting the RLS context for database operations while providing a simple interface for storing and retrieving data.

The adapter is designed to work with Haive’s core persistence system, particularly the PostgreSQL checkpointer for LangGraph state persistence.

postgres_config¶

Configuration for PostgreSQL connection

supabase_config¶

Configuration for Supabase connection

Initialize the persistence adapter.

Loads the PostgreSQL and Supabase configurations from environment variables and prepares the adapter for use.

async get_checkpointer()¶

Get a PostgreSQL checkpointer configured for Supabase.

This method returns a checkpointer that can be passed to an agent’s configuration to enable state persistence.

Returns:

A configured PostgreSQL checkpointer

Return type:

PostgresSaver

Examples

persistence = SupabasePersistence() checkpointer = await persistence.get_checkpointer()

# Pass to agent config agent_config.runnable_config = {

“configurable”: {

“thread_id”: thread_id, “checkpointer”: checkpointer

}

}

async get_state(thread_id, user_id)¶

Get conversation state with RLS enforcement.

Parameters:
  • thread_id (str) – Thread ID

  • user_id (str) – User ID for RLS enforcement

Returns:

State data if found and accessible, None otherwise

Return type:

Any | None

async get_thread_info(thread_id, user_id)¶

Get thread information from the public.threads table.

Parameters:
  • thread_id (str) – Thread ID

  • user_id (str) – User ID for RLS enforcement

Returns:

Thread information dictionary or None if not found

Return type:

dict[str, Any] | None

async get_user_threads(user_id, limit=50)¶

Get all threads for a user from the public.threads table.

Parameters:
  • user_id (str) – User ID for RLS enforcement

  • limit (int) – Maximum number of threads to return

Returns:

List of thread dictionaries

Return type:

list[dict[str, Any]]

async register_thread(thread_id, user_id, metadata=None)¶

Register a thread with user ownership.

Parameters:
  • thread_id (str) – Thread ID

  • user_id (str) – User ID for ownership

  • metadata (dict[str, Any] | None) – Optional metadata

Returns:

True if successful, False otherwise

Return type:

bool

async rls_context(connection, user_id)¶

Set RLS context for the duration of an operation.

This async context manager sets the Row-Level Security (RLS) context for a database connection, allowing operations to be performed with the security context of a specific user. It ensures that the context is properly cleared after the operation completes, even if an exception occurs.

Parameters:
  • connection – The PostgreSQL database connection

  • user_id (str) – The ID of the user to set as the RLS context

Yields:

None – Control is yielded back to the caller with the RLS context set

Examples

async with persistence.rls_context(connection, “user-123”):

# Operations here will be performed with the RLS context of user-123 await connection.execute(“SELECT * FROM protected_table”)

# RLS context is cleared after the block exits

async update_state(thread_id, user_id, data, metadata=None)¶

Update conversation state with RLS enforcement.

Parameters:
  • thread_id (str) – Thread ID

  • user_id (str) – User ID for RLS enforcement

  • data (Any) – State data to store

  • metadata (dict[str, Any] | None) – Optional metadata

Returns:

True if successful, False otherwise

Return type:

bool