Persistence Layer ================= .. currentmodule:: haive.core.persistence The **Persistence Layer** is the memory backbone of intelligent AI systems - a **revolutionary data persistence framework** that provides **automatic conversation tracking**, **state checkpointing**, **session management**, **vector storage integration**, and **multi-tenant support** for building AI that remembers, learns, and evolves across time. 💾 **Beyond Simple Storage** ---------------------------- **Transform Your AI from Forgetful to Forever:** **Automatic Conversation Tracking** Every interaction automatically persisted with rich metadata, token usage, and performance metrics **State Checkpointing System** Time-travel through workflow states with point-in-time recovery and replay capabilities **Intelligent Session Management** Thread-based conversations with branching, merging, and multi-user collaboration support **Vector Memory Integration** Semantic memory storage with automatic embedding generation and similarity search **Enterprise Multi-Tenancy** Isolated data spaces with encryption, access control, and compliance features Core Persistence Components --------------------------- PostgreSQL/Supabase Integration ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. automodule:: haive.core.persistence.postgres :members: :undoc-members: **Enterprise-Grade Data Persistence** The PostgreSQL integration provides robust, scalable persistence with Supabase for modern cloud deployments. **Advanced PostgreSQL Setup**:: from haive.core.persistence import PostgresStore, SupabaseStore from haive.core.persistence.config import PersistenceConfig # Configure PostgreSQL persistence config = PersistenceConfig( connection_string=os.getenv("DATABASE_URL"), schema="haive_production", pool_size=20, max_overflow=10, pool_timeout=30, echo_sql=False, # Set True for debugging # Advanced options statement_timeout="30s", lock_timeout="10s", idle_in_transaction_session_timeout="60s", # Performance tuning work_mem="256MB", maintenance_work_mem="512MB", effective_cache_size="4GB" ) # Create store with automatic schema migration store = PostgresStore(config) await store.initialize( create_schema=True, run_migrations=True, check_version=True ) # Supabase cloud persistence supabase_store = SupabaseStore( url=os.getenv("SUPABASE_URL"), key=os.getenv("SUPABASE_ANON_KEY"), schema="public", # Realtime subscriptions enable_realtime=True, realtime_channels=["conversations", "agent_states"], # Row-level security enable_rls=True, auth_header="Authorization" ) **Conversation Persistence**:: from haive.core.persistence.models import ( Conversation, Message, MessageRole, ConversationMetadata, TokenUsage ) # Create conversation conversation = await store.create_conversation( agent_id="assistant-001", user_id="user-123", metadata=ConversationMetadata( title="Technical Discussion", tags=["python", "ai", "architecture"], context={"project": "haive-core", "version": "2.0"}, participants=["user-123", "assistant-001"], settings={"temperature": 0.7, "model": "gpt-4"} ) ) # Add messages with rich metadata message = await store.add_message( conversation_id=conversation.id, role=MessageRole.USER, content="Explain the persistence architecture", metadata={ "timestamp": datetime.utcnow(), "client_ip": "192.168.1.1", "client_version": "1.2.0", "input_tokens": 5 } ) # Add AI response with token tracking response = await store.add_message( conversation_id=conversation.id, role=MessageRole.ASSISTANT, content=ai_response_text, token_usage=TokenUsage( input_tokens=150, output_tokens=500, total_tokens=650, model="gpt-4", cost=0.0195 # $0.03 per 1K tokens ), metadata={ "processing_time": 2.5, "tools_used": ["web_search", "calculator"], "confidence_score": 0.85 } ) # Query conversations recent_conversations = await store.query_conversations( user_id="user-123", limit=10, filters={ "created_after": datetime.utcnow() - timedelta(days=7), "tags": ["python"], "min_messages": 5 }, order_by="last_message_at", include_messages=True ) State Checkpointing ~~~~~~~~~~~~~~~~~~~ .. automodule:: haive.core.persistence.checkpointer :members: :undoc-members: **Time-Travel Through AI States** The checkpointing system enables sophisticated state management with branching and recovery. **Advanced Checkpointing**:: from haive.core.persistence.checkpointer import ( Checkpointer, CheckpointConfig, CheckpointStrategy, RecoveryPolicy ) # Configure checkpointer checkpointer = Checkpointer( store=postgres_store, config=CheckpointConfig( auto_checkpoint=True, checkpoint_interval=5, # Every 5 nodes max_checkpoints_per_thread=50, compression="zstd", # Better compression encryption_key=os.getenv("CHECKPOINT_KEY"), # Retention policy retention_days=30, keep_minimum=10, keep_failed_states=True ), strategy=CheckpointStrategy.ADAPTIVE # Smart checkpointing ) # Checkpoint workflow state checkpoint_id = await checkpointer.save( thread_id="workflow-123", state=current_state, metadata={ "node": "analysis", "iteration": 3, "confidence": 0.85, "parent_checkpoint": parent_id }, tags=["milestone", "validated"] ) # Time-travel queries history = await checkpointer.get_history( thread_id="workflow-123", include_failed=False, include_metadata=True ) # Find specific checkpoint checkpoint = await checkpointer.find_checkpoint( thread_id="workflow-123", tags=["milestone"], before_timestamp=cutoff_time, with_state_matching={"status": "completed"} ) # Restore to checkpoint restored_state = await checkpointer.restore( checkpoint_id=checkpoint.id, validate_schema=True ) # Branch from checkpoint branch_id = await checkpointer.create_branch( from_checkpoint=checkpoint.id, branch_name="experiment-1", metadata={"purpose": "testing new approach"} ) # Compare checkpoints diff = await checkpointer.compare( checkpoint_a=checkpoint_id_1, checkpoint_b=checkpoint_id_2, include_details=True ) Session Management ~~~~~~~~~~~~~~~~~~ .. automodule:: haive.core.persistence.session :members: :undoc-members: **Intelligent Conversation Threading** Advanced session management for complex multi-turn interactions. **Session Patterns**:: from haive.core.persistence.session import ( SessionManager, SessionConfig, ThreadingStrategy, Session ) # Configure session manager session_manager = SessionManager( store=postgres_store, config=SessionConfig( default_ttl=3600, # 1 hour sessions max_messages_per_session=1000, enable_branching=True, enable_merging=True, # Threading strategy threading_strategy=ThreadingStrategy.TOPIC_BASED, topic_similarity_threshold=0.8 ) ) # Create session with context session = await session_manager.create_session( user_id="user-123", agent_id="assistant-001", context={ "mode": "technical_support", "expertise_level": "advanced", "previous_sessions": ["session-456", "session-789"] }, parent_session_id=None # New conversation thread ) # Session branching for exploration branch_session = await session_manager.branch_session( from_session=session.id, at_message_index=5, # Branch from 5th message branch_context={ "reason": "explore_alternative", "hypothesis": "different approach" } ) # Merge sessions back merged_session = await session_manager.merge_sessions( primary_session=session.id, secondary_session=branch_session.id, merge_strategy="intelligent", # AI-powered merge conflict_resolution="primary_wins" ) # Multi-user collaboration collab_session = await session_manager.create_collaborative_session( participants=["user-123", "user-456", "expert-789"], permissions={ "user-123": ["read", "write"], "user-456": ["read", "write"], "expert-789": ["read", "write", "moderate"] }, synchronization="real-time" ) Vector Memory Storage ~~~~~~~~~~~~~~~~~~~~~ .. automodule:: haive.core.persistence.vector :members: :undoc-members: **Semantic Memory for AI** Integration with vector databases for semantic search and memory. **Vector Memory Patterns**:: from haive.core.persistence.vector import ( VectorMemory, MemoryConfig, EmbeddingStrategy, IndexType ) # Configure vector memory vector_memory = VectorMemory( vector_store="pinecone", # or "weaviate", "qdrant", "pgvector" config=MemoryConfig( api_key=os.getenv("PINECONE_API_KEY"), environment="production", index_name="haive-memories", # Embedding configuration embedding_model="text-embedding-3-large", embedding_dimensions=3072, embedding_batch_size=100, # Index configuration index_type=IndexType.HNSW, metric="cosine", ef_construction=200, m=16 ) ) # Store conversation as memory memory_id = await vector_memory.store_conversation( conversation=conversation, metadata={ "importance": 0.9, "topics": ["architecture", "persistence"], "entities": ["PostgreSQL", "Supabase"], "timestamp": datetime.utcnow() }, # Chunking strategy chunk_size=512, chunk_overlap=50, # Processing extract_keywords=True, generate_summary=True ) # Semantic search relevant_memories = await vector_memory.search( query="How does the persistence layer handle state?", top_k=10, filters={ "timestamp": {"$gte": datetime.utcnow() - timedelta(days=30)}, "importance": {"$gte": 0.7} }, include_metadata=True, rerank=True # Use cross-encoder for reranking ) # Memory consolidation consolidated = await vector_memory.consolidate_memories( time_window=timedelta(days=7), similarity_threshold=0.85, consolidation_strategy="hierarchical", preserve_important=True ) # Episodic memory retrieval episodes = await vector_memory.get_episodic_memories( user_id="user-123", context_query="Previous discussions about persistence", temporal_weight=0.3, # Balance recency vs relevance max_episodes=5 ) Advanced Persistence Features ----------------------------- Multi-Tenant Architecture ~~~~~~~~~~~~~~~~~~~~~~~~~ **Enterprise Isolation and Scaling**:: from haive.core.persistence.multitenancy import ( TenantManager, TenantConfig, IsolationLevel, ResourceQuota ) # Configure multi-tenancy tenant_manager = TenantManager( base_store=postgres_store, config=TenantConfig( isolation_level=IsolationLevel.SCHEMA, # Schema-based isolation enable_row_level_security=True, enable_encryption_at_rest=True, # Resource quotas default_quota=ResourceQuota( max_conversations=10000, max_storage_gb=100, max_requests_per_minute=1000, max_tokens_per_month=10_000_000 ) ) ) # Create tenant tenant = await tenant_manager.create_tenant( tenant_id="org-123", name="Acme Corporation", settings={ "timezone": "America/New_York", "data_retention_days": 90, "compliance_mode": "HIPAA" }, custom_quota=ResourceQuota( max_conversations=50000, max_storage_gb=500 ) ) # Tenant-scoped operations async with tenant_manager.tenant_context("org-123") as tenant_store: # All operations scoped to tenant conversation = await tenant_store.create_conversation(...) # Cross-tenant analytics (admin only) analytics = await tenant_manager.get_analytics( include_tenants=["org-123", "org-456"], metrics=["conversation_count", "token_usage", "storage_used"], time_range="last_30_days" ) Data Migration & Backup ~~~~~~~~~~~~~~~~~~~~~~~ **Robust Data Management**:: from haive.core.persistence.migration import ( MigrationManager, BackupManager, MigrationStrategy, BackupSchedule ) # Migration management migration_manager = MigrationManager(store=postgres_store) # Run migrations await migration_manager.migrate( target_version="2.0.0", strategy=MigrationStrategy.PROGRESSIVE, # No downtime batch_size=1000, on_progress=lambda p: print(f"Progress: {p}%") ) # Backup management backup_manager = BackupManager( store=postgres_store, backup_location="s3://backups/haive", encryption_key=os.getenv("BACKUP_KEY") ) # Scheduled backups backup_manager.schedule( BackupSchedule.DAILY, retention_days=30, incremental=True, compress=True ) # Manual backup backup_id = await backup_manager.backup( include_patterns=["conversations", "checkpoints"], exclude_patterns=["temp_*", "*_draft"], metadata={"reason": "pre_deployment", "version": "1.9.0"} ) # Restore from backup await backup_manager.restore( backup_id=backup_id, target_schema="restored_data", verify_integrity=True ) Performance Optimization ------------------------ Caching Layer ~~~~~~~~~~~~~ **High-Performance Data Access**:: from haive.core.persistence.cache import ( CacheLayer, CacheStrategy, RedisCache, InMemoryCache ) # Multi-level caching cache = CacheLayer( levels=[ InMemoryCache(max_size="100MB", ttl=60), RedisCache(url="redis://localhost", ttl=3600) ], strategy=CacheStrategy.WRITE_THROUGH, # Cache warming warm_on_startup=True, warm_patterns=["recent_conversations", "active_sessions"] ) # Wrap store with cache cached_store = cache.wrap(postgres_store) # Automatic caching conversation = await cached_store.get_conversation( conversation_id, cache_key_prefix="conv", cache_ttl=300 ) Query Optimization ~~~~~~~~~~~~~~~~~~ **Efficient Data Retrieval**:: from haive.core.persistence.optimization import ( QueryOptimizer, IndexAdvisor, QueryPlan, PerformanceMonitor ) # Query optimization optimizer = QueryOptimizer(store=postgres_store) # Analyze query patterns analysis = await optimizer.analyze_queries( time_range="last_7_days", min_execution_time=100 # ms ) # Get index recommendations advisor = IndexAdvisor() recommendations = advisor.recommend_indexes( analysis.slow_queries, analysis.table_statistics ) # Apply optimizations for rec in recommendations: await store.execute(rec.create_index_sql) Performance Metrics ~~~~~~~~~~~~~~~~~~~ **Lightning-Fast Persistence** * **Write Throughput**: 10,000+ messages/second * **Query Latency**: < 10ms for indexed queries * **Checkpoint Size**: < 1MB compressed average * **Vector Search**: < 50ms for 1M vectors * **Cache Hit Rate**: > 90% for active sessions Enterprise Features ------------------- **Production-Ready Persistence** * **Compliance**: GDPR, HIPAA, SOC2 compliant storage * **Encryption**: At-rest and in-transit encryption * **Audit Logging**: Complete data access audit trail * **Disaster Recovery**: Multi-region replication * **Data Governance**: Retention policies and PII handling See Also -------- * :doc:`schema_system` - State schemas for persistence * :doc:`graph_workflows` - Checkpointing in workflows * :doc:`examples` - Persistence patterns * :doc:`configuration` - Database configuration