Schema System ============= .. currentmodule:: haive.core.schema The **Schema System** represents a paradigm shift in AI state management - a **revolutionary dynamic architecture** that provides **type-safe state composition**, **intelligent field sharing**, **runtime schema evolution**, and **advanced reducer patterns** for building AI systems that adapt their very structure as they learn and grow. 🧬 **Beyond Static Data Models** --------------------------------- **Transform Your AI State from Fixed to Fluid:** **Dynamic Schema Composition** Build, modify, and evolve schemas at runtime with full type safety and validation, enabling AI that reshapes itself **Intelligent Field Sharing** Sophisticated field visibility controls between parent and child graphs with automatic conflict resolution **Reducer-Based Intelligence** Custom merge logic for state updates that goes beyond simple assignment to intelligent data fusion **Runtime Evolution** Hot-reload schemas, add fields on the fly, and migrate state without stopping workflows **Type-Safe Serialization** Complete state persistence with Pydantic v2, supporting complex types and custom serializers Core Schema Components ---------------------- StateSchema Foundation ~~~~~~~~~~~~~~~~~~~~~~ .. autoclass:: haive.core.schema.StateSchema :members: :undoc-members: :show-inheritance: **The Revolutionary Base for AI State Management** StateSchema extends Pydantic's BaseModel with AI-specific capabilities that enable sophisticated state management patterns impossible with traditional approaches. **Basic State Definition**:: from haive.core.schema import StateSchema, Field from typing import List, Dict, Any, Optional class AgentState(StateSchema): """Advanced agent state with intelligent features.""" # Conversation management messages: List[BaseMessage] = Field( default_factory=list, description="Conversation history with token tracking" ) # Working memory context: Dict[str, Any] = Field( default_factory=dict, description="Agent's working memory and context" ) # Analysis results insights: List[str] = Field( default_factory=list, description="Accumulated insights from analysis" ) # Confidence tracking confidence_scores: Dict[str, float] = Field( default_factory=dict, description="Confidence scores for different aspects" ) # Define sharing rules __shared_fields__ = ["messages", "context"] # Share with parent # Define merge strategies __reducer_fields__ = { "messages": lambda old, new: old + new, # Append messages "insights": lambda old, new: list(set(old + new)), # Unique insights "confidence_scores": lambda old, new: {**old, **new}, # Merge scores } **Advanced Schema Features**:: class MultiAgentState(StateSchema): """State for multi-agent coordination.""" # Agent-specific states agent_states: Dict[str, Dict[str, Any]] = Field( default_factory=dict, description="Individual agent states" ) # Shared knowledge base shared_knowledge: Dict[str, Any] = Field( default_factory=dict, description="Knowledge shared across agents" ) # Coordination metadata coordination: Dict[str, Any] = Field( default_factory=lambda: { "leader": None, "phase": "initialization", "consensus": {} } ) # Custom reducer for agent coordination @staticmethod def merge_agent_states(old: Dict, new: Dict) -> Dict: """Intelligently merge agent states with conflict resolution.""" merged = old.copy() for agent_id, state in new.items(): if agent_id in merged: # Merge with timestamp priority merged[agent_id] = merge_with_timestamps( merged[agent_id], state ) else: merged[agent_id] = state return merged __reducer_fields__ = { "agent_states": merge_agent_states, "shared_knowledge": deep_merge_dicts, } # Engine I/O mappings __engine_io__ = { "research_engine": { "input": ["query", "context"], "output": ["findings", "sources"] }, "analysis_engine": { "input": ["findings"], "output": ["insights", "confidence_scores"] } } Schema Composition System ~~~~~~~~~~~~~~~~~~~~~~~~~ .. autoclass:: haive.core.schema.SchemaComposer :members: :undoc-members: **Dynamic Schema Building at Runtime** The SchemaComposer enables building complex schemas from components, perfect for adaptive AI systems. **Dynamic Composition Example**:: from haive.core.schema import SchemaComposer, create_field from haive.core.schema.field_utils import infer_field_type # Create composer composer = SchemaComposer(name="DynamicAgentState") # Add fields from various sources composer.add_field( "messages", List[BaseMessage], default_factory=list, shared=True, reducer=append_messages ) # Add fields from engine composer.add_fields_from_engine( llm_engine, include_input=True, include_output=True ) # Add fields from Pydantic model composer.add_fields_from_model(AnalysisResult) # Add computed fields composer.add_computed_field( "token_count", lambda self: sum(msg.token_count for msg in self.messages), return_type=int ) # Build the schema DynamicState = composer.build() # Use the dynamic schema state = DynamicState() print(state.model_fields) # Shows all composed fields **Schema Evolution Pattern**:: from haive.core.schema import SchemaManager, migration # Define schema versions class UserStateV1(StateSchema): name: str preferences: Dict[str, Any] class UserStateV2(StateSchema): name: str preferences: Dict[str, Any] interaction_history: List[Dict[str, Any]] = Field(default_factory=list) preference_embeddings: Optional[List[float]] = None # Create migration @migration(from_version="1.0", to_version="2.0") def migrate_user_state(old_state: UserStateV1) -> UserStateV2: """Migrate from V1 to V2 with intelligent defaults.""" new_state = UserStateV2( name=old_state.name, preferences=old_state.preferences, interaction_history=[] # Start fresh ) # Generate embeddings from preferences if old_state.preferences: new_state.preference_embeddings = generate_embeddings( old_state.preferences ) return new_state # Apply migration manager = SchemaManager() new_state = manager.migrate(old_state, target_version="2.0") Field Management System ~~~~~~~~~~~~~~~~~~~~~~~ .. automodule:: haive.core.schema.field_utils :members: :undoc-members: **Intelligent Field Operations** The field management system provides sophisticated tools for field manipulation, type inference, and metadata handling. **Advanced Field Patterns**:: from haive.core.schema.field_utils import ( create_annotated_field, extract_type_metadata, resolve_reducer ) from typing import Annotated # Create fields with rich metadata confidence_field = create_annotated_field( field_type=Annotated[ float, Field(ge=0.0, le=1.0, description="Confidence score"), {"ui_widget": "slider", "precision": 2} ], default=0.5, shared=True ) # Extract metadata for UI generation metadata = extract_type_metadata(confidence_field) print(metadata) # {'min': 0.0, 'max': 1.0, 'ui_widget': 'slider'} # Smart reducer resolution reducer = resolve_reducer( field_type=List[str], merge_strategy="unique" # or "append", "replace", "custom" ) Multi-Agent State Coordination ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. autoclass:: haive.core.schema.MultiAgentStateSchema :members: :undoc-members: **Orchestrating Complex Multi-Agent Systems** The MultiAgentStateSchema provides advanced patterns for coordinating state across multiple agents with different schemas. **Multi-Agent Coordination Example**:: from haive.core.schema import MultiAgentStateSchema, AgentView class ResearchTeamState(MultiAgentStateSchema): """State for a research team of specialized agents.""" # Global research objective research_goal: str = Field(description="Main research objective") # Shared research data research_data: Dict[str, Any] = Field( default_factory=dict, description="Accumulated research data" ) # Agent-specific states with schemas agent_schemas = { "researcher": ResearcherState, "analyst": AnalystState, "writer": WriterState, "reviewer": ReviewerState } # Define agent interactions __agent_edges__ = { "researcher": ["analyst"], # Researcher → Analyst "analyst": ["writer"], # Analyst → Writer "writer": ["reviewer"], # Writer → Reviewer "reviewer": ["writer"] # Reviewer ↔ Writer (revision loop) } # Custom view for each agent def get_researcher_view(self) -> AgentView[ResearcherState]: """Get view tailored for researcher agent.""" return AgentView( agent_state=self.get_agent_state("researcher"), shared_data={ "goal": self.research_goal, "keywords": self.extract_keywords() }, permissions=["read", "write:findings"] ) def get_analyst_view(self) -> AgentView[AnalystState]: """Get view tailored for analyst agent.""" researcher_state = self.get_agent_state("researcher") return AgentView( agent_state=self.get_agent_state("analyst"), shared_data={ "findings": researcher_state.findings, "research_data": self.research_data }, permissions=["read", "write:analysis"] ) Reducer Patterns ~~~~~~~~~~~~~~~~ **Intelligent State Merging Strategies** .. automodule:: haive.core.schema.reducers :members: :undoc-members: **Advanced Reducer Implementations**:: from haive.core.schema.reducers import ( create_reducer, combine_reducers, conditional_reducer ) # Create custom reducers @create_reducer def merge_insights(old: List[Dict], new: List[Dict]) -> List[Dict]: """Merge insights with deduplication and scoring.""" # Create insight map with scores insight_map = {} for insight in old + new: key = insight.get("key", str(insight)) if key in insight_map: # Increase confidence for repeated insights insight_map[key]["confidence"] *= 1.1 insight_map[key]["sources"].extend(insight.get("sources", [])) else: insight_map[key] = insight # Sort by confidence and return top insights sorted_insights = sorted( insight_map.values(), key=lambda x: x.get("confidence", 0), reverse=True ) return sorted_insights[:10] # Keep top 10 # Conditional reducers message_reducer = conditional_reducer( condition=lambda old, new: len(old) + len(new) < 100, if_true=lambda old, new: old + new, # Append if under limit if_false=lambda old, new: old[-50:] + new # Keep last 50 + new ) # Combine multiple reducers complex_reducer = combine_reducers([ dedup_reducer, # Remove duplicates sort_by_timestamp, # Sort chronologically limit_size(1000), # Limit total size enrich_metadata # Add metadata ]) Schema Validation & Testing ~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Ensuring Schema Integrity** .. automodule:: haive.core.schema.validation :members: :undoc-members: **Comprehensive Validation Patterns**:: from haive.core.schema.validation import ( SchemaValidator, create_test_suite, validate_reducers ) # Create validator validator = SchemaValidator(strict_mode=True) # Validate schema definition validation_result = validator.validate_schema( MyComplexState, checks=[ "field_types", # Type annotations valid "default_values", # Defaults match types "reducer_compatibility", # Reducers match field types "circular_dependencies", # No circular refs "serialization" # Can serialize/deserialize ] ) if not validation_result.is_valid: for error in validation_result.errors: print(f"Error: {error.field} - {error.message}") # Test reducer logic reducer_test = validate_reducers( MyComplexState, test_cases=[ { "field": "messages", "old": [msg1, msg2], "new": [msg3], "expected": [msg1, msg2, msg3] } ] ) # Generate comprehensive test suite test_suite = create_test_suite( MyComplexState, include_edge_cases=True, include_performance=True ) # Run all tests results = test_suite.run() print(f"Passed: {results.passed}/{results.total}") Performance Optimization ------------------------ Schema Performance Metrics ~~~~~~~~~~~~~~~~~~~~~~~~~~ **Lightning-Fast State Operations** * **Creation**: < 1ms for complex schemas with 50+ fields * **Validation**: < 5ms for deep nested structures * **Serialization**: 100MB/s for state persistence * **Reducer Execution**: < 0.1ms per field update * **Memory**: O(1) field access, efficient COW updates **Optimization Techniques**:: from haive.core.schema.optimization import ( OptimizedSchema, enable_caching, lazy_field ) @enable_caching class HighPerformanceState(OptimizedSchema): """Optimized state for high-frequency updates.""" # Lazy-loaded expensive fields embeddings: List[float] = lazy_field( loader=lambda self: generate_embeddings(self.text), cache_ttl=3600 # Cache for 1 hour ) # Batched updates metrics: Dict[str, float] = Field( default_factory=dict, metadata={"batch_updates": True} ) # Custom optimized reducer __reducer_fields__ = { "metrics": optimized_merge_metrics # C-extension reducer } class Config: # Performance optimizations validate_assignment = False # Skip validation on assignment copy_on_model_validation = False # Reference, don't copy use_enum_values = True # Store enum values, not objects Advanced Patterns ----------------- Schema Inheritance Hierarchies ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Building Complex Schema Hierarchies**:: from haive.core.schema import StateSchema, create_schema_hierarchy # Base schema for all agents class BaseAgentState(StateSchema): id: str created_at: datetime messages: List[BaseMessage] = Field(default_factory=list) class Meta: abstract = True # Specialized schemas class ResearchAgentState(BaseAgentState): sources: List[str] = Field(default_factory=list) findings: Dict[str, Any] = Field(default_factory=dict) credibility_scores: Dict[str, float] = Field(default_factory=dict) class AnalysisAgentState(BaseAgentState): hypotheses: List[str] = Field(default_factory=list) evidence: Dict[str, List[str]] = Field(default_factory=dict) confidence_matrix: List[List[float]] = Field(default_factory=list) # Create hierarchy with shared behavior hierarchy = create_schema_hierarchy({ "base": BaseAgentState, "research": ResearchAgentState, "analysis": AnalysisAgentState, "synthesis": SynthesisAgentState }) # Automatic schema selection appropriate_schema = hierarchy.select_schema( task_type="research", capabilities_required=["web_search", "source_validation"] ) Dynamic Schema Generation ~~~~~~~~~~~~~~~~~~~~~~~~~ **Runtime Schema Creation from Data**:: from haive.core.schema import infer_schema, generate_schema # Infer schema from data sample_data = { "user_id": "123", "preferences": {"theme": "dark", "language": "en"}, "history": [ {"action": "click", "timestamp": "2024-01-01"}, {"action": "purchase", "item": "book", "price": 29.99} ] } InferredSchema = infer_schema( data=sample_data, name="UserActivityState", include_validators=True, detect_patterns=True # Detect emails, URLs, etc. ) # Generate schema from specification spec = { "fields": { "query": {"type": "str", "description": "User query"}, "embedding": {"type": "List[float]", "length": 1536}, "metadata": {"type": "Dict[str, Any]", "optional": True} }, "reducers": { "embedding": "average_vectors" }, "shared": ["query"] } GeneratedSchema = generate_schema(spec, name="QueryState") Schema Composition Patterns ~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Advanced Composition Techniques**:: from haive.core.schema import ( compose_schemas, merge_schemas, extend_schema ) # Compose multiple schemas ComposedState = compose_schemas({ "conversation": ConversationState, "knowledge": KnowledgeState, "planning": PlanningState }, name="SuperAgentState") # Merge with conflict resolution MergedState = merge_schemas( [StateA, StateB, StateC], conflict_resolution="last_wins", # or "first_wins", "error" name="MergedState" ) # Extend with additional fields ExtendedState = extend_schema( BaseState, additional_fields={ "new_field": (str, Field(default="")), "computed": (int, Field(default_factory=lambda: 42)) }, name="ExtendedState" ) Enterprise Features ------------------- **Production-Ready Schema Management** * **Schema Registry**: Centralized schema versioning and discovery * **Migration Framework**: Zero-downtime schema migrations * **Validation Pipeline**: Comprehensive validation before deployment * **Performance Monitoring**: Real-time schema operation metrics * **Access Control**: Field-level permissions and encryption See Also -------- * :doc:`engine_architecture` - Integrate schemas with engines * :doc:`graph_workflows` - Use schemas in graph workflows * :doc:`patterns` - Advanced schema patterns * :doc:`examples` - Real-world schema examples