Source code for haive.core.schema.composer.schema_composer
"""Simplified SchemaComposer using modular mixins."""importloggingfromtypingimportAnyfromhaive.core.schema.composer.engine.engine_detectorimportEngineDetectorMixinfromhaive.core.schema.composer.engine.engine_managerimportEngineComposerMixinfromhaive.core.schema.composer.field.field_managerimportFieldManagerMixinfromhaive.core.schema.state_schemaimportStateSchemalogger=logging.getLogger(__name__)
[docs]classSchemaComposer(EngineComposerMixin,EngineDetectorMixin,FieldManagerMixin):"""Streamlined schema composer using modular mixins. This is a much smaller, focused version of SchemaComposer that delegates most functionality to specialized mixins: - EngineComposerMixin: Engine management and tracking - EngineDetectorMixin: Base class detection from components - FieldManagerMixin: Field definition and metadata management The core class focuses only on: - Initialization and coordination - High-level composition workflows - Schema building and finalization """
[docs]def__init__(self,name:str="ComposedState"):"""Initialize the schema composer. Args: name: Name for the generated schema class """self.name=nameself.processing_history:list[dict]=[]# Initialize all mixinssuper().__init__()logger.debug(f"SchemaComposer: {name}")
[docs]defadd_fields_from_components(self,components:list[Any])->"SchemaComposer":"""Add fields from a list of components. Args: components: List of components to extract fields from Returns: Self for chaining """ifnotcomponents:returnself# Detect base class requirements from componentsself._detect_base_class_requirements(components)# Process each componentfori,componentinenumerate(components):ifcomponentisNone:continuecomponent_type=component.__class__.__name__component_id=getattr(component,"name",f"component_{i}")logger.debug(f"Processing component {i}: {component_id} ({component_type})")# Add tracking entryself.processing_history.append({"action":"process_component","component_type":component_type,"component_id":component_id,})# Process based on typeifhasattr(component,"engine_type"):# It's an Enginelogger.debug(f"Component {component_id} is an Engine")self.add_engine(component)self.add_fields_from_engine(component)elifhasattr(component,"model_fields"):# It's a Pydantic modellogger.debug(f"Component {component_id} is a Pydantic model")self.add_fields_from_model(componentifisinstance(component,type)elsecomponent.__class__)elifisinstance(component,dict):# Dictionarylogger.debug(f"Component {component_id} is a dictionary")self.add_fields_from_dict(component)else:logger.debug(f"Skipping unsupported component type: {component_type}")# Ensure standard fields are presentself._ensure_standard_fields()returnself
[docs]defadd_fields_from_engine(self,engine:Any)->"SchemaComposer":"""Extract fields from an engine component."""# This will be implemented by extracting from the original SchemaComposer# For now, just track that we processed itlogger.debug(f"Would extract fields from engine: {getattr(engine,'name','unnamed')}")returnself
[docs]defadd_fields_from_model(self,model:type)->"SchemaComposer":"""Extract fields from a Pydantic model."""# This will be implemented by extracting from the original# SchemaComposerlogger.debug(f"Would extract fields from model: {model.__name__}")returnself
[docs]defadd_fields_from_dict(self,fields_dict:dict)->"SchemaComposer":"""Add fields from a dictionary definition."""# This will be implemented by extracting from the original# SchemaComposerlogger.debug(f"Would add fields from dict with {len(fields_dict)} entries")returnself
def_ensure_standard_fields(self)->None:"""Ensure standard fields are present if needed."""# Add runnable_config if we have enginesif(self.enginesand"runnable_config"notinself.fieldsand"runnable_config"notinself.base_class_fields):fromtypingimportOptionalself.add_field(name="runnable_config",field_type=Optional[dict[str,Any]],default=None,description="Runtime configuration for engines",source="auto_added",)logger.debug("Added standard field 'runnable_config'")
[docs]defbuild(self)->type[StateSchema]:"""Build and return the final schema class. Returns: A StateSchema subclass with all defined fields and metadata """# Ensure base class is detectedifself.detected_base_classisNone:self._detect_base_class_requirements()base_class=self.detected_base_class# Auto-add engine management if we have engines and using StateSchema# baseifself.enginesandissubclass(base_class,StateSchema):self.add_engine_management()logger.debug("Auto-added engine management fields based on detected engines")# Show what we're buildinglogger.debug(f"Building {self.name} with {len(self.fields)} fields using base class {base_class.__name__}")# Create field definitions for the modelfield_defs={}forname,field_definself.fields.items():# Skip fields that the base class providesifnameinself.base_class_fields:logger.debug(f"Skipping field {name} - provided by base class")continuefield_type,field_info=field_def.to_field_info()field_defs[name]=(field_type,field_info)# Create the schema using pydantic's create_modelfrompydanticimportcreate_modelschema=create_model(self.name,__base__=base_class,**field_defs)# Add StateSchema-specific attributesifissubclass(base_class,StateSchema):# Copy shared fieldsifhasattr(base_class,"__shared_fields__"):base_shared=set(getattr(base_class,"__shared_fields__",[]))schema.__shared_fields__=list(base_shared|self.shared_fields)else:schema.__shared_fields__=list(self.shared_fields)# Add engine I/O mappingsschema.__engine_io_mappings__=dict(self.engine_io_mappings)# Store engines on the schema classschema.engines=self.enginesschema.engines_by_type=dict(self.engines_by_type)logger.debug(f"Stored {len(schema.engines)} engines on schema class")returnschema
[docs]@classmethoddeffrom_components(cls,components:list[Any],name:str="ComposedState")->type[StateSchema]:"""Create a schema from components using the class method interface. This maintains backward compatibility with the original API. Args: components: List of components to compose name: Name for the generated schema Returns: Generated schema class """composer=cls(name=name)composer.add_fields_from_components(components)returncomposer.build()