haive.core.graph.node.utils

Utility functions for creating and working with nodes.

This module provides convenience functions for creating different types of nodes and extracting information from schemas for node integration.

Functions

create_branch_node(condition, routes[, name, ...])

Create a branch node.

create_node(engine_or_callable[, name, command_goto, ...])

Create a node from an engine or callable.

create_send_node(send_targets[, send_field, name, ...])

Create a send node for fan-out operations.

create_validation_node(schemas[, name, command_goto, ...])

Create a validation node.

extract_io_mapping_from_schema(schema, engine_id)

Extract input and output mappings from a schema for a specific engine.

Module Contents

haive.core.graph.node.utils.create_branch_node(condition, routes, name=None, input_mapping=None, **kwargs)[source]

Create a branch node.

Parameters:
  • condition (collections.abc.Callable) – Function that evaluates the state and returns a key for routing

  • routes (dict[Any, str]) – Mapping from condition outputs to node names

  • name (str | None) – Optional name for the node

  • input_mapping (dict[str, str] | None) – Mapping from state keys to condition function input keys

  • **kwargs – Additional options for the node configuration

Returns:

Branch node function

Return type:

collections.abc.Callable

haive.core.graph.node.utils.create_node(engine_or_callable, name=None, command_goto=None, input_mapping=None, output_mapping=None, retry_policy=None, **kwargs)[source]

Create a node from an engine or callable.

Parameters:
  • engine_or_callable (Any) – Engine, engine name, or callable function

  • name (str | None) – Optional name for the node (defaults to engine name or function name)

  • command_goto (haive.core.graph.node.types.CommandGoto | None) – Next node to go to after this node

  • input_mapping (dict[str, str] | None) – Mapping from state keys to engine input keys

  • output_mapping (dict[str, str] | None) – Mapping from engine output keys to state keys

  • retry_policy (langgraph.types.RetryPolicy | None) – Retry policy for node execution

  • **kwargs – Additional options for the node configuration

Returns:

Node function that can be added to a graph

Return type:

collections.abc.Callable

haive.core.graph.node.utils.create_send_node(send_targets, send_field=None, name=None, input_mapping=None, distribution='round_robin', payload_extractor=None, send_all_to_each=False, **kwargs)[source]

Create a send node for fan-out operations.

Parameters:
  • send_targets (list[str]) – List of target node names to send items to

  • send_field (str | None) – Key in the state containing items to distribute (not required if using payload_extractor)

  • name (str | None) – Optional name for the node

  • input_mapping (dict[str, str] | None) – Mapping from state keys to node input keys

  • distribution (str) – Strategy for distributing items (“round_robin”, “all”, “balanced”)

  • payload_extractor (collections.abc.Callable | None) – Optional function to extract payloads from state (overrides send_field)

  • send_all_to_each (bool) – If True, sends all items to each target (instead of distributing)

  • **kwargs – Additional options for the node configuration

Returns:

Send node function

Return type:

collections.abc.Callable

haive.core.graph.node.utils.create_validation_node(schemas, name=None, command_goto=None, messages_field='messages', **kwargs)[source]

Create a validation node.

Parameters:
  • schemas (list[type[pydantic.BaseModel] | collections.abc.Callable]) – List of validation schemas or functions

  • name (str | None) – Optional name for the node

  • command_goto (haive.core.graph.node.types.CommandGoto | None) – Next node to go to after this node

  • messages_field (str) – Name of the messages key in the state

  • **kwargs – Additional options for the node configuration

Returns:

Validation node function

Return type:

collections.abc.Callable

haive.core.graph.node.utils.extract_io_mapping_from_schema(schema, engine_id)[source]

Extract input and output mappings from a schema for a specific engine.

Parameters:
  • schema (type) – StateSchema class

  • engine_id (str) – ID or name of the engine to extract mappings for

Returns:

Dictionary with “inputs” and “outputs” mappings

Return type:

dict[str, dict[str, str]]