Source code for haive.core.config.auth_runnable

"""Haive-specific extension of runnable config management with PostgreSQL integration.

from typing import Any
This module extends RunnableConfigManager to provide both Supabase authentication
integration and PostgreSQL persistence support for the Haive framework. It creates
a unified configuration system that handles authentication, session management,
and database persistence in a cohesive manner.

The HaiveRunnableConfigManager inherits all functionality from the base RunnableConfigManager
while adding specialized methods for Supabase user authentication, thread persistence,
and PostgreSQL integration. This design ensures proper user context is maintained
throughout conversation threads and persisted correctly in PostgreSQL.

Classes:
    HaiveRunnableConfigManager: Extended config manager with Supabase auth and PostgreSQL integration

Example:

Examples:
    >>> # Create a config with Supabase authentication
    >>> config = HaiveRunnableConfigManager.create_with_auth(
    >>> supabase_user_id="auth0|1234567890",
    >>> username="john.doe",
    >>> email="john.doe@example.com"
    >>> )
    >>>
    >>> # Add PostgreSQL persistence information
    >>> config = HaiveRunnableConfigManager.add_persistence_info(
    >>> config,
    >>> db_session_id="pgsql-session-123",
    >>> persistence_type="postgres"
    >>> )
    >>>
    >>> # Add engine-specific configuration
    >>> config = HaiveRunnableConfigManager.add_engine_config(
    >>> config,
    >>> "my_llm_engine",
    >>> temperature=0.7
    >>> )
"""

import copy
import json
import uuid
from datetime import datetime
from typing import Any

try:
    from langchain_core.runnables import RunnableConfig
except ImportError:
    # Fallback for documentation builds
    class RunnableConfig: pass

from haive.core.config.runnable import RunnableConfigManager


[docs] class HaiveRunnableConfigManager(RunnableConfigManager): """Enhanced runnable config manager with Supabase authentication and PostgreSQL integration. Extends the base RunnableConfigManager with methods for Supabase user authentication, enhanced session management, and PostgreSQL persistence configuration. This class provides a unified interface for managing authentication context and database persistence throughout the Haive framework. Key capabilities: - Authentication context management with Supabase user IDs - Session tracking with user-agent associations - Thread persistence configuration for PostgreSQL - Permission and authorization management - Serialization utilities for database storage """
[docs] @staticmethod def create_with_auth( supabase_user_id: str, username: str | None = None, email: str | None = None, tenant_id: str | None = None, permissions: list[str] | None = None, thread_id: str | None = None, **kwargs, ) -> RunnableConfig: """Create a RunnableConfig with Supabase authentication information. Args: supabase_user_id: The Supabase/Auth0 user ID username: Optional username for user identification email: Optional email address for user identification tenant_id: Optional tenant/organization ID for multi-tenant systems permissions: Optional list of user permissions thread_id: Optional thread ID for persistence (generated if not provided) **kwargs: Additional parameters to include in configurable section Returns: A properly structured RunnableConfig with authentication information """ # Initialize with base configuration config = RunnableConfigManager.create(thread_id=thread_id, **kwargs) # Add authentication information auth_info = { "supabase_user_id": supabase_user_id, "auth_timestamp": datetime.now().isoformat(), } # Add optional fields if provided if username: auth_info["username"] = username if email: auth_info["email"] = email if tenant_id: auth_info["tenant_id"] = tenant_id if permissions: auth_info["permissions"] = permissions # Add to configurable section config["configurable"]["auth"] = auth_info config["configurable"]["user_id"] = supabase_user_id # Add metadata for tracing if "metadata" not in config: config["metadata"] = {} config["metadata"]["auth_provider"] = "supabase" return config
[docs] @staticmethod def update_auth_info(config: RunnableConfig, **auth_updates) -> RunnableConfig: """Update authentication information in an existing config. Args: config: Existing RunnableConfig to update **auth_updates: Authentication information to update Returns: Updated RunnableConfig """ result = copy.deepcopy(config) # Ensure configurable and auth sections exist if "configurable" not in result: result["configurable"] = {} if "auth" not in result["configurable"]: result["configurable"]["auth"] = {} # Update auth information for key, value in auth_updates.items(): result["configurable"]["auth"][key] = value # Update timestamp result["configurable"]["auth"]["auth_timestamp"] = datetime.now().isoformat() return result
[docs] @staticmethod def create_agent_session( supabase_user_id: str, agent_id: str, agent_type: str | None = None, session_data: dict[str, Any] | None = None, thread_id: str | None = None, **kwargs, ) -> RunnableConfig: """Create a session configuration for agent interaction. Args: supabase_user_id: The Supabase/Auth0 user ID agent_id: Unique identifier for the agent agent_type: Optional type of agent session_data: Optional additional session data thread_id: Optional thread ID (generated if not provided) **kwargs: Additional parameters Returns: RunnableConfig with session information """ # Create base config with auth config = HaiveRunnableConfigManager.create_with_auth( supabase_user_id=supabase_user_id, thread_id=thread_id or str(uuid.uuid4()), **kwargs, ) # Add session information session_info = { "agent_id": agent_id, "session_id": str(uuid.uuid4()), "started_at": datetime.now().isoformat(), "status": "active", } # Add optional fields if agent_type: session_info["agent_type"] = agent_type if session_data: session_info["data"] = session_data # Add to configurable section config["configurable"]["session"] = session_info return config
[docs] @staticmethod def get_auth_info(config: RunnableConfig) -> dict[str, Any]: """Extract authentication information from a RunnableConfig. Args: config: RunnableConfig to extract from Returns: Authentication information dictionary or empty dict if not found """ if config and "configurable" in config and "auth" in config["configurable"]: return config["configurable"]["auth"] return {}
[docs] @staticmethod def get_supabase_user_id(config: RunnableConfig) -> str | None: """Extract Supabase user ID from a RunnableConfig. Args: config: RunnableConfig to extract from Returns: Supabase user ID if present, otherwise None """ auth_info = HaiveRunnableConfigManager.get_auth_info(config) return auth_info.get("supabase_user_id")
[docs] @staticmethod def has_permission(config: RunnableConfig, permission: str) -> bool: """Check if the configuration has a specific permission. Args: config: RunnableConfig to check permission: Permission to check for Returns: True if the permission is present, False otherwise """ auth_info = HaiveRunnableConfigManager.get_auth_info(config) permissions = auth_info.get("permissions", []) return permission in permissions
[docs] @staticmethod def add_permissions(config: RunnableConfig, *permissions: str) -> RunnableConfig: """Add permissions to the configuration. Args: config: RunnableConfig to update *permissions: Permissions to add Returns: Updated RunnableConfig """ result = copy.deepcopy(config) # Ensure auth section exists if "configurable" not in result: result["configurable"] = {} if "auth" not in result["configurable"]: result["configurable"]["auth"] = {} if "permissions" not in result["configurable"]["auth"]: result["configurable"]["auth"]["permissions"] = [] # Add permissions current_permissions = result["configurable"]["auth"]["permissions"] for permission in permissions: if permission not in current_permissions: current_permissions.append(permission) return result
[docs] @staticmethod def get_session_info(config: RunnableConfig) -> dict[str, Any]: """Extract session information from a RunnableConfig. Args: config: RunnableConfig to extract from Returns: Session information dictionary or empty dict if not found """ if config and "configurable" in config and "session" in config["configurable"]: return config["configurable"]["session"] return {}
[docs] @staticmethod def update_session_status(config: RunnableConfig, status: str) -> RunnableConfig: """Update session status in the configuration. Args: config: RunnableConfig to update status: New session status Returns: Updated RunnableConfig """ result = copy.deepcopy(config) # Ensure session section exists if "configurable" not in result: result["configurable"] = {} if "session" not in result["configurable"]: result["configurable"]["session"] = {} # Update status result["configurable"]["session"]["status"] = status result["configurable"]["session"]["updated_at"] = datetime.now().isoformat() return result
[docs] @staticmethod def add_engine_by_id( config: RunnableConfig, engine_id: str, **params ) -> RunnableConfig: """Add configuration specifically targeting an engine by ID. Args: config: RunnableConfig to update engine_id: Engine ID to target **params: Parameters for the engine Returns: Updated RunnableConfig """ return RunnableConfigManager.add_engine_config(config, engine_id, **params)
[docs] @staticmethod def add_persistence_info( config: RunnableConfig, db_session_id: str | None = None, persistence_type: str = "postgres", db_pool_id: str | None = None, checkpoint_ns: str = "", **persistence_params, ) -> RunnableConfig: """Add PostgreSQL persistence information to a config. Args: config: Existing RunnableConfig to update db_session_id: Optional database session identifier persistence_type: Type of persistence (postgres, memory, etc.) db_pool_id: Optional connection pool identifier checkpoint_ns: Checkpoint namespace for organizing checkpoints **persistence_params: Additional persistence parameters Returns: Updated RunnableConfig with persistence information """ result = copy.deepcopy(config) # Ensure configurable section exists if "configurable" not in result: result["configurable"] = {} # Set up persistence section if "persistence" not in result["configurable"]: result["configurable"]["persistence"] = {} # Add persistence information persistence_info = { "type": persistence_type, "timestamp": datetime.now().isoformat(), } # Add optional fields if provided if db_session_id: persistence_info["db_session_id"] = db_session_id if db_pool_id: persistence_info["db_pool_id"] = db_pool_id # Add checkpoint namespace result["configurable"]["checkpoint_ns"] = checkpoint_ns # Add additional parameters for key, value in persistence_params.items(): persistence_info[key] = value # Update the persistence section result["configurable"]["persistence"].update(persistence_info) return result
[docs] @staticmethod def get_persistence_info(config: RunnableConfig) -> dict[str, Any]: """Extract persistence information from a RunnableConfig. Args: config: RunnableConfig to extract from Returns: Persistence information dictionary or empty dict if not found """ if ( config and "configurable" in config and "persistence" in config["configurable"] ): return config["configurable"]["persistence"] return {}
[docs] @staticmethod def is_postgres_persistence(config: RunnableConfig) -> bool: """Check if a config is using PostgreSQL persistence. Args: config: RunnableConfig to check Returns: True if PostgreSQL persistence is configured, False otherwise """ persistence_info = HaiveRunnableConfigManager.get_persistence_info(config) return persistence_info.get("type", "").lower() == "postgres"
[docs] @staticmethod def create_with_postgres( thread_id: str | None = None, user_id: str | None = None, db_connection_info: dict[str, Any] | None = None, checkpoint_ns: str = "", **kwargs, ) -> RunnableConfig: """Create a config with PostgreSQL persistence configuration. Args: thread_id: Optional thread ID (generated if not provided) user_id: Optional user ID for authentication context db_connection_info: Optional database connection parameters checkpoint_ns: Checkpoint namespace for organizing checkpoints **kwargs: Additional parameters to include in configurable section Returns: RunnableConfig with PostgreSQL persistence configuration """ # Create base config config = RunnableConfigManager.create( thread_id=thread_id, user_id=user_id, **kwargs ) # Add persistence information db_info = db_connection_info or {} persistence_info = { "type": "postgres", "db_session_id": db_info.get("session_id", str(uuid.uuid4())), "db_host": db_info.get("host", "localhost"), "db_port": db_info.get("port", 5432), "db_name": db_info.get("database", "postgres"), "db_user": db_info.get("user", "postgres"), "setup_needed": db_info.get("setup_needed", True), "timestamp": datetime.now().isoformat(), } # Add to configurable section config["configurable"]["persistence"] = persistence_info config["configurable"]["checkpoint_ns"] = checkpoint_ns return config
[docs] @staticmethod def update_checkpoint_id( config: RunnableConfig, checkpoint_id: str ) -> RunnableConfig: """Update the checkpoint ID in a config. Args: config: RunnableConfig to update checkpoint_id: New checkpoint ID Returns: Updated RunnableConfig """ result = copy.deepcopy(config) # Ensure configurable section exists if "configurable" not in result: result["configurable"] = {} # Update checkpoint ID result["configurable"]["checkpoint_id"] = checkpoint_id return result
[docs] @staticmethod def get_checkpoint_id(config: RunnableConfig) -> str | None: """Extract checkpoint ID from a RunnableConfig. Args: config: RunnableConfig to extract from Returns: Checkpoint ID if present, otherwise None """ return HaiveRunnableConfigManager.extract_value(config, "checkpoint_id")
[docs] @staticmethod def get_checkpoint_ns(config: RunnableConfig) -> str: """Extract checkpoint namespace from a RunnableConfig. Args: config: RunnableConfig to extract from Returns: Checkpoint namespace or empty string if not found """ return HaiveRunnableConfigManager.extract_value(config, "checkpoint_ns", "")
[docs] @staticmethod def create_thread_checkpoint_config( thread_id: str, checkpoint_id: str | None = None, checkpoint_ns: str = "", **kwargs, ) -> RunnableConfig: """Create a minimal config for checkpoint operations with thread ID. Args: thread_id: Thread ID for the conversation checkpoint_id: Optional specific checkpoint ID checkpoint_ns: Checkpoint namespace **kwargs: Additional parameters Returns: RunnableConfig suitable for checkpoint operations """ config = { "configurable": {"thread_id": thread_id, "checkpoint_ns": checkpoint_ns} } if checkpoint_id: config["configurable"]["checkpoint_id"] = checkpoint_id # Add any additional parameters for key, value in kwargs.items(): config["configurable"][key] = value return config
[docs] @staticmethod def serialize_to_json(config: RunnableConfig) -> str: """Serialize a RunnableConfig to a JSON string. Args: config: RunnableConfig to serialize Returns: JSON string representation """ # Use a custom encoder function to handle datetime objects def encoder(obj) -> Any: if isinstance(obj, datetime): return obj.isoformat() raise TypeError(f"Object of type {type(obj)} is not JSON serializable") return json.dumps(config, default=encoder)
[docs] @staticmethod def deserialize_from_json(json_str: str) -> RunnableConfig: """Deserialize a RunnableConfig from a JSON string. Args: json_str: JSON string to deserialize Returns: Deserialized RunnableConfig """ return json.loads(json_str)