Source code for haive.core.graph.branches.send_mapping
"""Send mapping functionality for routing and state transformation."""importloggingfromcollections.abcimportCallablefromtypingimportAnyfrompydanticimportBaseModel,Fieldfromhaive.core.graph.common.field_utilsimportextract_field# Import from common utilitiesfromhaive.core.graph.common.typesimportStateLikelogger=logging.getLogger(__name__)
[docs]classSendMapping(BaseModel):"""Mapping configuration for generating Send objects."""node:str=Field(...,description="Target node name")fields:dict[str,str]=Field(default_factory=dict,description="Field mapping from state to Send arg")condition:str|None=Field(None,description="Optional condition expression to evaluate")transform:dict[str,Callable]|None=Field(None,description="Transformations to apply to fields")model_config={"arbitrary_types_allowed":True}
[docs]defcreate_send(self,state:StateLike)->Any|None:"""Create a Send object from state using this mapping."""fromlanggraph.typesimportSend# Check condition if specifiedifself.condition:try:# Simple condition check - could be expanded to full expression# evaluationcondition_value=extract_field(state,self.condition)ifnotcondition_value:returnNoneexceptExceptionase:logger.exception(f"Error evaluating condition {self.condition}: {e}")returnNone# Extract and transform values for Sendarg={}fortarget_field,source_fieldinself.fields.items():# Extract valuevalue=extract_field(state,source_field)# Apply transformation if specifiedifself.transformandtarget_fieldinself.transform:try:value=self.transform[target_field](value)exceptExceptionase:logger.exception(f"Error applying transform to {target_field}: {e}")arg[target_field]=valuereturnSend(self.node,arg)
[docs]classSendGenerator(BaseModel):"""Generator for Send objects based on lists or collections."""target_node:str=Field(...,description="Target node name")collection_field:str=Field(...,description="State field containing the collection")item_field:str=Field("item",description="Field name for the item in the send object")filter_function:Callable[[Any],bool]|None=Field(None,description="Function to filter items")model_config={"arbitrary_types_allowed":True}
[docs]defcreate_sends(self,state:StateLike)->list[Any]:"""Create multiple Send objects from a collection in state."""fromlanggraph.typesimportSend# Extract collectioncollection=extract_field(state,self.collection_field)ifnotcollectionornotisinstance(collection,list|tuple|set):return[]sends=[]# Process each itemforitemincollection:# Apply filter if specifiedifself.filter_functionandnotself.filter_function(item):continue# Create Send object for this itemsends.append(Send(self.target_node,{self.item_field:item}))returnsends
[docs]classSendMappingList(BaseModel):"""Collection of send mappings."""mappings:list[SendMapping]=Field(default_factory=list)generators:list[SendGenerator]=Field(default_factory=list)model_config={"arbitrary_types_allowed":True}
[docs]defcreate_sends(self,state:StateLike)->list[Any]:"""Apply all mappings and generators to state."""sends=[]# Apply standard mappingsformappinginself.mappings:send_obj=mapping.create_send(state)ifsend_obj:sends.append(send_obj)# Apply generatorsforgeneratorinself.generators:sends.extend(generator.create_sends(state))returnsends