dataflow.persistence.supabase_adapter¶
Supabase persistence adapter for the Haive framework.
This module provides an adapter for persisting data to Supabase’s PostgreSQL database. It handles the connection management, Row-Level Security (RLS) context, and provides methods for storing and retrieving data.
The adapter integrates with Haive’s core persistence system, specifically the PostgreSQL checkpointer, to provide a consistent interface for data storage and retrieval while respecting Supabase’s security model.
Key features: - RLS context management for proper access control - Connection pooling and management - Checkpointing for LangGraph state persistence - Thread registration for conversation tracking
Typical usage example:
from haive.dataflow.persistence.supabase_adapter import SupabasePersistence
# Create the persistence adapter persistence = SupabasePersistence()
# Register a thread thread_id = await persistence.register_thread(
user_id=”user-123”, metadata={“agent_id”: “agent-456”}
)
# Store a checkpoint await persistence.store_checkpoint(
thread_id=thread_id, checkpoint_id=”checkpoint-1”, state={“key”: “value”}, user_id=”user-123”
)
# Retrieve a checkpoint checkpoint = await persistence.get_checkpoint(
thread_id=thread_id, checkpoint_id=”checkpoint-1”, user_id=”user-123”
)
Classes¶
Adapter for PostgreSQL persistence with Supabase RLS support. |
Module Contents¶
- class dataflow.persistence.supabase_adapter.SupabasePersistence¶
Adapter for PostgreSQL persistence with Supabase RLS support.
This class provides an adapter for persisting data to Supabase’s PostgreSQL database with proper Row-Level Security (RLS) context management. It handles the complexities of setting the RLS context for database operations while providing a simple interface for storing and retrieving data.
The adapter is designed to work with Haive’s core persistence system, particularly the PostgreSQL checkpointer for LangGraph state persistence.
- postgres_config¶
Configuration for PostgreSQL connection
- supabase_config¶
Configuration for Supabase connection
Initialize the persistence adapter.
Loads the PostgreSQL and Supabase configurations from environment variables and prepares the adapter for use.
- async get_checkpointer()¶
Get a PostgreSQL checkpointer configured for Supabase.
This method returns a checkpointer that can be passed to an agent’s configuration to enable state persistence.
- Returns:
A configured PostgreSQL checkpointer
- Return type:
PostgresSaver
Examples
persistence = SupabasePersistence() checkpointer = await persistence.get_checkpointer()
# Pass to agent config agent_config.runnable_config = {
- “configurable”: {
“thread_id”: thread_id, “checkpointer”: checkpointer
}
}
- async get_state(thread_id, user_id)¶
Get conversation state with RLS enforcement.
- async get_thread_info(thread_id, user_id)¶
Get thread information from the public.threads table.
- async get_user_threads(user_id, limit=50)¶
Get all threads for a user from the public.threads table.
- async register_thread(thread_id, user_id, metadata=None)¶
Register a thread with user ownership.
- async rls_context(connection, user_id)¶
Set RLS context for the duration of an operation.
This async context manager sets the Row-Level Security (RLS) context for a database connection, allowing operations to be performed with the security context of a specific user. It ensures that the context is properly cleared after the operation completes, even if an exception occurs.
- Parameters:
connection – The PostgreSQL database connection
user_id (str) – The ID of the user to set as the RLS context
- Yields:
None – Control is yielded back to the caller with the RLS context set
Examples
- async with persistence.rls_context(connection, “user-123”):
# Operations here will be performed with the RLS context of user-123 await connection.execute(“SELECT * FROM protected_table”)
# RLS context is cleared after the block exits
- async update_state(thread_id, user_id, data, metadata=None)¶
Update conversation state with RLS enforcement.