Source code for haive.core.schema.preserve_messages_reducer
"""Custom message reducer for preserving BaseMessage fields.This module provides a custom reducer function that preserves BaseMessage objectsintact during state updates in multi-agent systems. Unlike LangGraph's defaultadd_messages reducer, this implementation avoids converting messages to dictsand back, which can cause loss of important fields like tool_call_id.The preserve_messages_reducer is critical for multi-agent tool coordination,ensuring that ToolMessage objects maintain their tool_call_id field whenpassed between agents. This prevents KeyError exceptions and enables propertool result routing.Example: >>> from haive.core.schema.preserve_messages_reducer import preserve_messages_reducer >>> from langchain_core.messages import ToolMessage >>> >>> # Create a ToolMessage with tool_call_id >>> tool_msg = ToolMessage(content="Result: 42", tool_call_id="call_123") >>> >>> # Using preserve_messages_reducer maintains the field >>> messages = preserve_messages_reducer([], [tool_msg]) >>> assert messages[0].tool_call_id == "call_123" # Preserved!Note: This reducer is automatically used by AgentSchemaComposer for message fields to ensure proper multi-agent message handling."""importloggingfromlangchain_core.messagesimportBaseMessage,convert_to_messageslogger=logging.getLogger(__name__)
[docs]defpreserve_messages_reducer(left:list,right:list)->list:"""Custom reducer that preserves BaseMessage objects to avoid losing fields. This is a replacement for LangGraph's add_messages reducer that avoids calling convert_to_messages when the messages are already BaseMessage objects. Args: left: Existing messages in state right: New messages to add Returns: Combined list of messages with BaseMessage objects preserved """# Start with existing messagesresult=list(left)ifleftelse[]ifnotright:returnresult# Handle the new messagesformsginright:ifisinstance(msg,BaseMessage):# Already a BaseMessage object - preserve it exactlyresult.append(msg)logger.debug(f"Preserved BaseMessage: {type(msg).__name__}")ifhasattr(msg,"tool_call_id"):logger.debug(f" tool_call_id: {getattr(msg,'tool_call_id','None')}")else:# Need to convert from dict - use convert_to_messages# but only for this individual messagetry:converted=convert_to_messages([msg])result.extend(converted)logger.debug(f"Converted message from dict: {type(msg)}")exceptExceptionase:logger.warning(f"Failed to convert message: {e}")# Fallback: add as-isresult.append(msg)returnresult