Source code for haive.core.tools.store_manager

"""Store management system for Haive agents.

This module provides a comprehensive store management system similar to LangMem,
with tools for storing, retrieving, and managing agent memories using our
PostgreSQL store infrastructure.
"""

import logging
import uuid
from datetime import datetime
from typing import Any

from pydantic import BaseModel, Field

from haive.core.persistence.store.base import SerializableStoreWrapper
from haive.core.persistence.store.factory import create_store
from haive.core.persistence.store.types import StoreType

logger = logging.getLogger(__name__)


[docs] class MemoryEntry(BaseModel): """A single memory entry in the store.""" id: str = Field(default_factory=lambda: str(uuid.uuid4())) content: str = Field(description="The memory content") category: str = Field(default="general", description="Memory category") importance: float = Field( default=0.5, ge=0.0, le=1.0, description="Importance score 0-1" ) tags: list[str] = Field(default_factory=list, description="Memory tags") metadata: dict[str, Any] = Field( default_factory=dict, description="Additional metadata" ) created_at: datetime = Field(default_factory=datetime.utcnow) updated_at: datetime = Field(default_factory=datetime.utcnow)
[docs] def to_store_value(self) -> dict[str, Any]: """Convert to store-compatible dictionary.""" return { "id": self.id, "content": self.content, "category": self.category, "importance": self.importance, "tags": self.tags, "metadata": self.metadata, "created_at": self.created_at.isoformat(), "updated_at": self.updated_at.isoformat(), }
[docs] @classmethod def from_store_value(cls, value: dict[str, Any]) -> "MemoryEntry": """Create from store dictionary.""" if "created_at" in value and isinstance(value["created_at"], str): value["created_at"] = datetime.fromisoformat(value["created_at"]) if "updated_at" in value and isinstance(value["updated_at"], str): value["updated_at"] = datetime.fromisoformat(value["updated_at"]) return cls(**value)
[docs] class StoreManager: """Centralized store management for agent memories. This class provides a high-level interface for managing agent memories using the Haive store infrastructure, similar to LangMem but customized for our architecture. Features: - Namespace-based memory isolation (user_id, agent_id, session_id) - Semantic search capabilities - Memory categorization and tagging - Importance-based retrieval - Automatic metadata management """ def __init__( self, store: SerializableStoreWrapper | None = None, default_namespace: tuple[str, ...] | None = None, store_config: dict[str, Any] | None = None, ): """Initialize the store manager. Args: store: Pre-configured store wrapper (optional) default_namespace: Default namespace for operations store_config: Configuration for creating a new store """ if store is not None: self.store = store elif store_config is not None: self.store = create_store(**store_config) else: # Default to memory store for development self.store = create_store(store_type=StoreType.MEMORY) self.default_namespace = default_namespace or ("haive", "memories") logger.info(f"StoreManager initialized with store: {type(self.store).__name__}") def _get_namespace( self, namespace: tuple[str, ...] | None = None ) -> tuple[str, ...]: """Get the namespace to use for operations.""" return namespace or self.default_namespace
[docs] def store_memory( self, content: str, category: str = "general", importance: float = 0.5, tags: list[str] | None = None, metadata: dict[str, Any] | None = None, namespace: tuple[str, ...] | None = None, memory_id: str | None = None, ) -> str: """Store a new memory. Args: content: The memory content category: Memory category (e.g., "user_preference", "fact", "event") importance: Importance score 0-1 tags: Optional tags for the memory metadata: Additional metadata namespace: Storage namespace (defaults to default_namespace) memory_id: Optional custom memory ID Returns: Memory ID """ memory = MemoryEntry( id=memory_id or str(uuid.uuid4()), content=content, category=category, importance=importance, tags=tags or [], metadata=metadata or {}, ) namespace = self._get_namespace(namespace) self.store.put(namespace, memory.id, memory.to_store_value()) logger.debug(f"Stored memory {memory.id} in namespace {namespace}") return memory.id
[docs] def retrieve_memory( self, memory_id: str, namespace: tuple[str, ...] | None = None ) -> MemoryEntry | None: """Retrieve a specific memory by ID. Args: memory_id: The memory ID to retrieve namespace: Storage namespace Returns: MemoryEntry if found, None otherwise """ namespace = self._get_namespace(namespace) value = self.store.get(namespace, memory_id) if value is None: return None return MemoryEntry.from_store_value(value)
[docs] def update_memory( self, memory_id: str, content: str | None = None, category: str | None = None, importance: float | None = None, tags: list[str] | None = None, metadata: dict[str, Any] | None = None, namespace: tuple[str, ...] | None = None, ) -> bool: """Update an existing memory. Args: memory_id: Memory ID to update content: New content (optional) category: New category (optional) importance: New importance score (optional) tags: New tags (optional) metadata: New metadata (optional) namespace: Storage namespace Returns: True if updated, False if memory not found """ memory = self.retrieve_memory(memory_id, namespace) if memory is None: return False # Update fields if provided if content is not None: memory.content = content if category is not None: memory.category = category if importance is not None: memory.importance = importance if tags is not None: memory.tags = tags if metadata is not None: memory.metadata.update(metadata) memory.updated_at = datetime.utcnow() namespace = self._get_namespace(namespace) self.store.put(namespace, memory.id, memory.to_store_value()) logger.debug(f"Updated memory {memory_id}") return True
[docs] def delete_memory( self, memory_id: str, namespace: tuple[str, ...] | None = None ) -> bool: """Delete a memory. Args: memory_id: Memory ID to delete namespace: Storage namespace Returns: True if deleted, False if not found """ namespace = self._get_namespace(namespace) # Check if memory exists first if self.store.get(namespace, memory_id) is None: return False self.store.delete(namespace, memory_id) logger.debug(f"Deleted memory {memory_id}") return True
[docs] def search_memories( self, query: str, category: str | None = None, min_importance: float | None = None, tags: list[str] | None = None, limit: int = 10, namespace: tuple[str, ...] | None = None, ) -> list[MemoryEntry]: """Search memories using semantic search. Args: query: Search query category: Filter by category min_importance: Minimum importance score tags: Required tags limit: Maximum results namespace: Storage namespace Returns: List of matching memories """ namespace = self._get_namespace(namespace) # Build filter criteria filter_criteria = {} if category: filter_criteria["category"] = category if min_importance is not None: filter_criteria["importance"] = {"$gte": min_importance} if tags: filter_criteria["tags"] = {"$in": tags} try: # Use store's search capability if available results = self.store.search( namespace=namespace, query=query, limit=limit, filter=filter_criteria ) memories = [] for result in results: # Handle different result formats if hasattr(result, "value"): memory_data = result.value elif isinstance(result, dict): memory_data = result else: continue memories.append(MemoryEntry.from_store_value(memory_data)) return memories except Exception as e: logger.warning( f"Semantic search failed: {e}. Falling back to basic search." ) # Fallback: This would require implementing a basic search # For now, return empty list return []
[docs] def list_memories_by_category( self, category: str, namespace: tuple[str, ...] | None = None, limit: int = 50, ) -> list[MemoryEntry]: """List memories by category. Args: category: Category to filter by namespace: Storage namespace limit: Maximum results Returns: List of memories in the category """ return self.search_memories( query="", # Empty query to get all category=category, limit=limit, namespace=namespace, )
[docs] def get_memory_stats( self, namespace: tuple[str, ...] | None = None ) -> dict[str, Any]: """Get statistics about stored memories. Args: namespace: Storage namespace Returns: Dictionary with memory statistics """ # This would require iterating through all memories # For now, return basic stats return { "namespace": self._get_namespace(namespace), "store_type": type(self.store).__name__, "last_updated": datetime.utcnow().isoformat(), }
[docs] def create_user_namespace(self, user_id: str) -> tuple[str, ...]: """Create a user-specific namespace. Args: user_id: User identifier Returns: Namespace tuple for the user """ return ("haive", "users", user_id, "memories")
[docs] def create_agent_namespace( self, agent_id: str, user_id: str | None = None ) -> tuple[str, ...]: """Create an agent-specific namespace. Args: agent_id: Agent identifier user_id: Optional user identifier for user-agent isolation Returns: Namespace tuple for the agent """ if user_id: return ("haive", "users", user_id, "agents", agent_id, "memories") return ("haive", "agents", agent_id, "memories")
[docs] def create_session_namespace( self, session_id: str, agent_id: str | None = None, user_id: str | None = None, ) -> tuple[str, ...]: """Create a session-specific namespace. Args: session_id: Session identifier agent_id: Optional agent identifier user_id: Optional user identifier Returns: Namespace tuple for the session """ if user_id and agent_id: return ( "haive", "users", user_id, "agents", agent_id, "sessions", session_id, ) if agent_id: return ("haive", "agents", agent_id, "sessions", session_id) return ("haive", "sessions", session_id, "memories")