Source code for haive.core.schema.field_utils
"""Field utilities for the Haive Schema System.
from typing import Any
This module provides a comprehensive set of utilities for creating, extracting, and
manipulating Pydantic fields within the Haive Schema System. It ensures consistent
handling of field metadata, types, and defaults across the entire framework.
The utilities in this module serve as the low-level foundation for the Schema System,
handling technical details like:
- Creating fields with standardized metadata
- Working with Annotated types for metadata embedding
- Extracting metadata from type annotations
- Type inference and manipulation
- Resolver functions for reducers
Core functions include:
- create_field: Create a standard Pydantic field with metadata
- create_annotated_field: Create a field using Python's Annotated type for metadata
- extract_type_metadata: Extract base type and metadata from annotations
- infer_field_type: Intelligently determine types from values
- get_common_reducers: Access standard reducer functions
- resolve_reducer: Convert reducer names to functions
These utilities are primarily used by FieldDefinition, SchemaComposer, and
StateSchemaManager to implement higher-level functionality.
Examples:
from haive.core.schema.field_utils import (
create_field, create_annotated_field, get_common_reducers
)
from typing import List
import operator
# Create a standard field
field_type, field_info = create_field(
field_type=List[str],
default_factory=list,
description="List of items",
shared=True,
reducer=operator.add
)
# Create an annotated field with embedded metadata
field_type, field_info = create_annotated_field(
field_type=List[str],
default_factory=list,
description="List of items",
shared=True,
reducer=operator.add
)
# Get common reducer functions
reducers = get_common_reducers()
add_messages = reducers["add_messages"] # LangGraph's message list combiner
"""
import logging
import operator
import re
from collections.abc import Callable
from typing import Annotated, Any, ForwardRef, Optional, TypeVar, get_args, get_origin
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
# ============================================================================
# FIELD NAMING UTILITIES
# ============================================================================
[docs]
def camel_to_snake_case(name: str) -> str:
"""Convert CamelCase to snake_case.
Args:
name: CamelCase string to convert
Returns:
snake_case version of the string
Examples:
>>> camel_to_snake_case("QueryRefinementResponse")
'query_refinement_response'
>>> camel_to_snake_case("UserProfile")
'user_profile'
>>> camel_to_snake_case("APIKey")
'api_key'
"""
# Handle sequences of uppercase letters (like "APIKey" -> "api_key")
name = re.sub("([A-Z]+)([A-Z][a-z])", r"\1_\2", name)
# Handle normal CamelCase (like "QueryRefinement" -> "query_refinement")
name = re.sub("([a-z0-9])([A-Z])", r"\1_\2", name)
return name.lower()
[docs]
def create_field_name_from_model(
model_class: type[BaseModel], remove_suffixes: bool = False
) -> str:
"""Create a proper field name from a Pydantic model class.
Args:
model_class: The Pydantic model class
remove_suffixes: Whether to remove common suffixes like "Response", "Result"
Returns:
A properly formatted snake_case field name
Examples:
>>> class QueryRefinementResponse(BaseModel): pass
>>> create_field_name_from_model(QueryRefinementResponse)
'query_refinement_response'
>>> create_field_name_from_model(QueryRefinementResponse, remove_suffixes=True)
'query_refinement'
"""
class_name = model_class.__name__
# Convert to snake_case
field_name = camel_to_snake_case(class_name)
# Remove common suffixes if requested
if remove_suffixes:
suffixes_to_remove = ["_response", "_result", "_output", "_data"]
for suffix in suffixes_to_remove:
if field_name.endswith(suffix):
field_name = field_name[: -len(suffix)]
break
# Ensure we have a valid field name
if not field_name or field_name == "_":
field_name = "parsed_result"
return field_name
[docs]
def get_field_info_from_model(model_class: type[BaseModel]) -> dict[str, Any]:
"""Get field info from a model class, checking for annotations.
This function looks for field integration annotations on the model class
and returns field information for schema integration.
Args:
model_class: The Pydantic model class
Returns:
Dictionary with field configuration for schema integration
"""
result = {}
# Check for custom field name annotation
field_name = getattr(model_class, "__field_name__", None)
if field_name:
result["field_name"] = field_name
else:
# Use auto-generated field name
result["field_name"] = create_field_name_from_model(model_class)
# Check for field description
field_description = getattr(model_class, "__field_description__", None)
if field_description:
result["description"] = field_description
else:
result["description"] = f"Parsed {model_class.__name__}"
# Check for field type override
field_type = getattr(model_class, "__field_type__", None)
if field_type:
result["field_type"] = field_type
else:
result["field_type"] = Optional[model_class]
# Check for other field configuration
field_config = getattr(model_class, "__field_config__", {})
result.update(field_config)
return result
# Decorators for adding field info to models
[docs]
def field_name(name: str):
"""Simple decorator to set the field name for schema integration.
Args:
name: The field name to use in schema integration
Examples:
>>> @field_name("query_refinement")
... class QueryRefinementResponse(BaseModel):
... pass
"""
def decorator(cls) -> Any:
"""Decorator.
Returns:
[TODO: Add return description]
"""
cls.__field_name__ = name
return cls
return decorator
[docs]
def field_description(description: str):
"""Decorator to set the field description for schema integration.
Args:
description: The field description to use
Examples:
>>> @field_description("Refined query results")
... class QueryRefinementResponse(BaseModel):
... pass
"""
def decorator(cls) -> Any:
"""Decorator.
Returns:
[TODO: Add return description]
"""
cls.__field_description__ = description
return cls
return decorator
[docs]
def field_config(**config) -> Any:
"""Decorator to set field configuration for schema integration.
Args:
**config: Field configuration options
Examples:
>>> @field_config(required=True, default=None)
... class QueryRefinementResponse(BaseModel):
... pass
"""
def decorator(cls) -> Any:
"""Decorator.
Returns:
[TODO: Add return description]
"""
cls.__field_config__ = config
return cls
return decorator
# Type variables
T = TypeVar("T")
FieldType = TypeVar("FieldType")
DefaultType = TypeVar("DefaultType")
ReducerType = TypeVar("ReducerType", bound=Callable[[Any, Any], Any])
[docs]
class FieldMetadata:
"""Standardized container for field metadata in the Haive Schema System.
This class encapsulates all metadata associated with a field, serving as a
comprehensive representation of field properties beyond what Pydantic directly
supports. It provides a structured way to manage:
- Basic field properties (description, title, etc.)
- Haive-specific properties (shared status, reducer functions, etc.)
- Engine I/O tracking (input/output relationships with engines)
- Structured output model associations
FieldMetadata provides methods for converting between different metadata
representations, including dictionaries for Field instantiation and annotation
objects for Annotated types. It also supports merging metadata from different
sources and serializing reducer functions.
This class serves as a single source of truth for field metadata throughout
the Schema System, ensuring consistent handling of field properties across
schema composition, manipulation, and serialization operations.
Attributes:
description (Optional[str]): Human-readable description of the field
shared (bool): Whether the field is shared with parent graphs
reducer (Optional[Callable]): Function to combine field values during updates
source (Optional[str]): Component that provided this field
input_for (List[str]): Engines this field serves as input for
output_from (List[str]): Engines this field is output from
structured_model (Optional[str]): Name of structured model this field belongs to
title (Optional[str]): Field title (for OpenAPI/Schema generation)
extra (Dict[str, Any]): Additional metadata properties
"""
[docs]
def __init__(
self,
description: str | None = None,
shared: bool = False,
reducer: Callable | None = None,
source: str | None = None,
input_for: list[str] | None = None,
output_from: list[str] | None = None,
structured_model: str | None = None,
title: str | None = None,
**extra,
):
"""Initialize field metadata with comprehensive properties.
Args:
description: Human-readable description of the field
shared: Whether field is shared with parent graphs
reducer: Function to combine field values during state updates
source: Component that provided this field
input_for: List of engines this field serves as input for
output_from: List of engines this field is output from
structured_model: Name of structured model this field belongs to
title: Optional field title (for OpenAPI/Schema generation)
**extra: Additional metadata properties
"""
self.description = description
self.shared = shared
self.reducer = reducer
self.source = source
self.input_for = input_for or []
self.output_from = output_from or []
self.structured_model = structured_model
self.title = title
self.extra = extra
[docs]
def to_dict(self) -> dict[str, Any]:
"""Convert metadata to dictionary for Field instantiation.
Returns:
Dictionary of metadata suitable for pydantic.Field constructor
"""
result = {}
if self.description:
result["description"] = self.description
if self.title:
result["title"] = self.title
# Add standard Field parameters from extra
standard_params = {
"gt",
"ge",
"lt",
"le",
"multiple_of",
"min_items",
"max_items",
"min_length",
"max_length",
"pattern",
"discriminator",
"json_schema_extra",
"validation_alias",
"serialization_alias",
"validation_alias_priority",
"serialization_alias_priority",
"deprecated",
"examples",
}
for k, v in self.extra.items():
if k in standard_params:
result[k] = v
return result
[docs]
def to_annotation_metadata(self) -> list[Any]:
"""Convert to a list of metadata objects for Annotated types.
Returns:
List of metadata objects for use in Annotated[Type, ...]
"""
metadata = []
# Add reducer as first metadata if present
if self.reducer:
metadata.append(self.reducer)
# Create a dictionary for other metadata
other_meta = {}
if self.shared:
other_meta["shared"] = True
if self.source:
other_meta["source"] = self.source
if self.input_for:
other_meta["input_for"] = self.input_for
if self.output_from:
other_meta["output_from"] = self.output_from
if self.structured_model:
other_meta["structured_model"] = self.structured_model
# Add to metadata list if we have any other metadata
if other_meta:
metadata.append(other_meta)
return metadata
[docs]
def merge(self, other: "FieldMetadata") -> "FieldMetadata":
"""Merge with another FieldMetadata instance.
Args:
other: FieldMetadata instance to merge with
Returns:
New FieldMetadata instance with merged data
"""
# Start with a copy of the current metadata
result = FieldMetadata(
description=other.description or self.description,
shared=other.shared or self.shared,
reducer=other.reducer or self.reducer,
source=other.source or self.source,
title=other.title or self.title,
)
# Merge lists
result.input_for = list(set(self.input_for + other.input_for))
result.output_from = list(set(self.output_from + other.output_from))
# Structured model takes the newer value
result.structured_model = other.structured_model or self.structured_model
# Merge extra attributes, with other taking precedence
result.extra = self.extra.copy()
for k, v in other.extra.items():
result.extra[k] = v
return result
[docs]
def get_reducer_name(self) -> str | None:
"""Get serializable name for the reducer function.
Returns:
String name of the reducer function or None
"""
if not self.reducer:
return None
# Handle operator module functions
if hasattr(self.reducer, "__module__") and self.reducer.__module__ in (
"operator",
"_operator",
):
return f"operator.{self.reducer.__name__}"
# Handle lambda functions
if hasattr(self.reducer, "__name__") and self.reducer.__name__ == "<lambda>":
return "<lambda>"
# Handle standard functions
if hasattr(self.reducer, "__name__"):
if (
hasattr(self.reducer, "__module__")
and self.reducer.__module__ != "__main__"
):
# Use fully qualified name for imported functions
return f"{self.reducer.__module__}.{self.reducer.__name__}"
return self.reducer.__name__
# Last resort: string representation
return str(self.reducer)
[docs]
@classmethod
def from_annotation(cls, annotation: type) -> Optional["FieldMetadata"]:
"""Extract field metadata from an annotated type.
Args:
annotation: Type annotation to extract metadata from
Returns:
FieldMetadata if metadata was found, None otherwise
"""
# Check if this is an Annotated type
if get_origin(annotation) is not Annotated:
return None
# Get args from Annotated[Type, ...]
args = get_args(annotation)
if len(args) < 2:
return None
# Extract metadata from annotation
metadata = FieldMetadata()
# Process each metadata object
for meta in args[1:]:
if callable(meta) and not isinstance(meta, type):
# Found a callable, assume it's a reducer
metadata.reducer = meta
elif isinstance(meta, dict):
# Dictionary of metadata
if "shared" in meta:
metadata.shared = meta["shared"]
if "source" in meta:
metadata.source = meta["source"]
if "input_for" in meta:
metadata.input_for = meta["input_for"]
if "output_from" in meta:
metadata.output_from = meta["output_from"]
if "structured_model" in meta:
metadata.structured_model = meta["structured_model"]
if "description" in meta:
metadata.description = meta["description"]
if "title" in meta:
metadata.title = meta["title"]
# Add any other keys to extra
for k, v in meta.items():
if k not in {
"shared",
"source",
"input_for",
"output_from",
"structured_model",
"description",
"title",
}:
metadata.extra[k] = v
return metadata
[docs]
def create_field(
field_type: type[T],
default: Any = None,
default_factory: Callable[[], T] | None = None,
metadata: FieldMetadata | None = None,
description: str | None = None,
shared: bool = False,
reducer: Callable | None = None,
make_optional: bool = True,
**kwargs,
) -> tuple[type, Field]:
"""Create a standardized Pydantic field with consistent metadata handling.
Args:
field_type: The type of the field
default: Default value (used if default_factory is None)
default_factory: Optional factory function for default value
metadata: Optional FieldMetadata object for comprehensive metadata
description: Optional field description (ignored if metadata is provided)
shared: Whether field is shared with parent (ignored if metadata is provided)
reducer: Optional reducer function (ignored if metadata is provided)
make_optional: Whether to make the field Optional if it's not already
**kwargs: Additional field parameters
Returns:
Tuple of (field_type, field_info) ready for Pydantic model creation
"""
from typing import Optional as OptionalType
# Process field type
if (
make_optional
and field_type is not type(None)
and get_origin(field_type) is not OptionalType
):
field_type = OptionalType[field_type]
# Prepare field metadata
meta = metadata or FieldMetadata(
description=description, shared=shared, reducer=reducer, **kwargs
)
# Build field parameters
field_params = meta.to_dict()
# Handle default value or factory
if default_factory is not None:
field_info = Field(default_factory=default_factory, **field_params)
else:
# Use Pydantic's natural default handling - don't force explicit None
# This matches the behavior from commit 99cfc8b4
field_info = Field(default=default, **field_params)
# Return the field definition
return field_type, field_info
[docs]
def create_annotated_field(
field_type: type[T],
default: Any = None,
default_factory: Callable[[], T] | None = None,
metadata: FieldMetadata | None = None,
description: str | None = None,
shared: bool = False,
reducer: Callable | None = None,
make_optional: bool = True,
**kwargs,
) -> tuple[type, Field]:
"""Create a Pydantic field using Python's Annotated type for embedded metadata.
This function creates a field for Pydantic models using the Annotated type to embed
metadata directly in the type annotation. This approach aligns with Pydantic v2's
design and provides better support for schema composition and manipulation.
By embedding metadata in the Annotated type, field properties like shared status
and reducer functions stay attached to the field type itself, which allows them
to be preserved during operations like schema composition and subclassing.
The function supports both direct metadata parameters (description, shared, reducer)
and a comprehensive FieldMetadata object for more complex metadata.
Args:
field_type (Type[T]): The Python type of the field (e.g., str, List[int])
default (Any, optional): Default value for the field. Used if default_factory
is not provided. Defaults to None.
default_factory (Optional[Callable[[], T]], optional): Factory function that
returns the default value. Takes precedence over default if both are
provided. Defaults to None.
metadata (Optional[FieldMetadata], optional): Comprehensive field metadata
object. If provided, other metadata parameters (description, shared,
reducer) are ignored. Defaults to None.
description (Optional[str], optional): Human-readable description of the field.
Ignored if metadata is provided. Defaults to None.
shared (bool, optional): Whether the field is shared with parent graphs.
Ignored if metadata is provided. Defaults to False.
reducer (Optional[Callable], optional): Function to combine field values during
updates. Ignored if metadata is provided. Defaults to None.
make_optional (bool, optional): Whether to make the field Optional[T] if it's
not already. This ensures the field can be None, which is important for
state management. Defaults to True.
**kwargs: Additional field parameters passed to FieldMetadata or Field.
Returns:
Tuple[Type, Field]: A tuple containing:
- field_type: The annotated type with embedded metadata
- field_info: The Pydantic Field object with standard properties
Examples:
from typing import List
from pydantic import create_model, Field
import operator
# Create an annotated field with shared status and reducer
field_type, field_info = create_annotated_field(
field_type=List[str],
default_factory=list,
description="List of items",
shared=True,
reducer=operator.add
)
# Create a model using the field
MyModel = create_model(
"MyModel",
items=(field_type, field_info)
)
# The model will have "items" as a shared field with an add reducer
# The metadata stays attached to the field type
"""
from typing import Optional as OptionalType
# Prepare field metadata
meta = metadata or FieldMetadata(
description=description, shared=shared, reducer=reducer, **kwargs
)
# Get base type (unwrap Optional if needed)
base_type = field_type
if get_origin(field_type) is OptionalType:
base_type = get_args(field_type)[0]
# Create Annotated type with metadata
meta_objects = meta.to_annotation_metadata()
if meta_objects:
annotated_type = Annotated[(base_type, *meta_objects)]
# Re-wrap in Optional if needed
if make_optional or get_origin(field_type) is OptionalType:
field_type = OptionalType[annotated_type]
else:
field_type = annotated_type
elif make_optional and get_origin(field_type) is not OptionalType:
field_type = OptionalType[field_type]
# Build field parameters (only basic ones for Field constructor)
field_params = {}
if meta.description:
field_params["description"] = meta.description
if meta.title:
field_params["title"] = meta.title
# Add standard Field parameters from extra
standard_params = {
"gt",
"ge",
"lt",
"le",
"multiple_of",
"min_items",
"max_items",
"min_length",
"max_length",
"pattern",
"discriminator",
"json_schema_extra",
"validation_alias",
"serialization_alias",
"validation_alias_priority",
"serialization_alias_priority",
"deprecated",
"examples",
}
for k, v in meta.extra.items():
if k in standard_params:
field_params[k] = v
# Handle default value or factory
if default_factory is not None:
field_info = Field(default_factory=default_factory, **field_params)
else:
field_info = Field(default=default, **field_params)
# Return the field definition
return field_type, field_info
[docs]
def extract_field_info(
field_info: Field,
) -> tuple[Any, Callable | None, dict[str, Any]]:
"""Extract useful information from a Pydantic Field object.
Args:
field_info: Pydantic Field object
Returns:
Tuple of (default_value, default_factory, metadata_dict)
"""
# Extract default or default_factory
default = getattr(field_info, "default", ...)
default_factory = getattr(field_info, "default_factory", None)
# Extract metadata
metadata = {}
for key in ["description", "title"]:
value = getattr(field_info, key, None)
if value is not None:
metadata[key] = value
# Extract extra (varies between Pydantic v1 and v2)
if hasattr(field_info, "json_schema_extra") and field_info.json_schema_extra:
metadata.update(field_info.json_schema_extra)
elif hasattr(field_info, "extra") and field_info.extra:
metadata.update(field_info.extra)
return default, default_factory, metadata
[docs]
def extract_type_metadata(
type_annotation: type,
) -> tuple[type, FieldMetadata | None]:
"""Extract base type and metadata from a type annotation.
Args:
type_annotation: Type annotation to extract from
Returns:
Tuple of (base_type, field_metadata)
"""
# For Annotated types, extract metadata
if get_origin(type_annotation) is Annotated:
args = get_args(type_annotation)
base_type = args[0]
metadata = FieldMetadata.from_annotation(type_annotation)
return base_type, metadata
# For Optional[Annotated[...]] types
from typing import Optional as OptionalType
if get_origin(type_annotation) is OptionalType:
inner_type = get_args(type_annotation)[0]
if get_origin(inner_type) is Annotated:
base_type, metadata = extract_type_metadata(inner_type)
return OptionalType[base_type], metadata
# For other types, return as is with no metadata
return type_annotation, None
[docs]
def format_type_annotation(type_annotation: type) -> str:
"""Format a type annotation for display or documentation.
Args:
type_annotation: Type annotation to format
Returns:
Formatted string representation of the type
"""
# Handle primitive types
if type_annotation is str:
return "str"
if type_annotation is int:
return "int"
if type_annotation is float:
return "float"
if type_annotation is bool:
return "bool"
if type_annotation is list or type_annotation is list:
return "List"
if type_annotation is dict or type_annotation is dict:
return "Dict"
if type_annotation is None or type_annotation is type(None):
return "None"
# Get origin and arguments
origin = get_origin(type_annotation)
args = get_args(type_annotation)
# Handle None origin
if origin is None:
if isinstance(type_annotation, ForwardRef):
return f'"{type_annotation.__forward_arg__}"'
if hasattr(type_annotation, "__name__"):
if (
hasattr(type_annotation, "__module__")
and type_annotation.__module__ != "builtins"
):
return f"{type_annotation.__module__}.{type_annotation.__name__}"
return type_annotation.__name__
return str(type_annotation).replace("typing.", "")
# Extract base type name
if hasattr(origin, "__name__"):
origin_name = origin.__name__
else:
origin_name = str(origin).replace("typing.", "")
# Handle special cases
from typing import Optional as OptionalType
if origin is OptionalType:
inner_type = format_type_annotation(args[0])
return f"Optional[{inner_type}]"
if origin is Union:
# Check if it's Optional (Union with None)
if len(args) == 2 and (args[1] is None or args[1] is type(None)):
inner_type = format_type_annotation(args[0])
return f"Optional[{inner_type}]"
if len(args) == 2 and (args[0] is None or args[0] is type(None)):
inner_type = format_type_annotation(args[1])
return f"Optional[{inner_type}]"
inner_types = [format_type_annotation(arg) for arg in args]
return f"Union[{', '.join(inner_types)}]"
if origin is list or origin is list:
if args:
inner_type = format_type_annotation(args[0])
return f"List[{inner_type}]"
return "List"
if origin is dict or origin is dict:
if len(args) == 2:
key_type = format_type_annotation(args[0])
value_type = format_type_annotation(args[1])
return f"Dict[{key_type}, {value_type}]"
return "Dict"
if origin is Annotated:
# For Annotated types, format the base type but ignore metadata
return format_type_annotation(args[0])
# Generic handling
if args:
inner_types = [format_type_annotation(arg) for arg in args]
return f"{origin_name}[{', '.join(inner_types)}]"
return origin_name
[docs]
def get_common_reducers() -> dict[str, Callable]:
"""Get a registry of common reducer functions.
Returns:
Dictionary of reducer name -> reducer function
"""
reducers = {}
# Add operator module reducers
for name in dir(operator):
if not name.startswith("_"):
func = getattr(operator, name)
if callable(func):
reducers[f"operator.{name}"] = func
reducers[name] = (
func # Also store without prefix for backward compatibility
)
# Add common list operations
def add_lists(a, b) -> Any:
"""Add Lists.
Args:
a: [TODO: Add description]
b: [TODO: Add description]
Returns:
[TODO: Add return description]
"""
return (a or []) + (b or [])
reducers["add_lists"] = add_lists
def concat_lists(a, b) -> Any:
"""Concat Lists.
Args:
a: [TODO: Add description]
b: [TODO: Add description]
Returns:
[TODO: Add return description]
"""
return (a or []) + (b or [])
reducers["concat_lists"] = concat_lists
# Try to import langgraph's add_messages if available
try:
from langgraph.graph import add_messages
reducers["add_messages"] = add_messages
except ImportError:
# Fallback implementation
def add_messages(a, b) -> Any:
"""Add Messages.
Args:
a: [TODO: Add description]
b: [TODO: Add description]
Returns:
[TODO: Add return description]
"""
return (a or []) + (b or [])
reducers["add_messages"] = add_messages
# Add string operations
def concat_strings(a, b) -> Any:
"""Concat Strings.
Args:
a: [TODO: Add description]
b: [TODO: Add description]
Returns:
[TODO: Add return description]
"""
return (a or "") + (b or "")
reducers["concat_strings"] = concat_strings
# Add numeric operations
def sum_values(a, b) -> Any:
"""Sum Values.
Args:
a: [TODO: Add description]
b: [TODO: Add description]
Returns:
[TODO: Add return description]
"""
return (a or 0) + (b or 0)
reducers["sum_values"] = sum_values
# Add other common functions
reducers["max"] = max
reducers["min"] = min
return reducers
[docs]
def resolve_reducer(reducer_name: str) -> Callable | None:
"""Resolve a reducer function from its name.
Args:
reducer_name: Name of the reducer to resolve
Returns:
Callable reducer function or None if not found
"""
# Check common reducers first
common_reducers = get_common_reducers()
if reducer_name in common_reducers:
return common_reducers[reducer_name]
# Handle operator module functions
if reducer_name.startswith("operator."):
func_name = reducer_name.split(".", 1)[1]
if hasattr(operator, func_name):
return getattr(operator, func_name)
# Try to import by module.function_name
if "." in reducer_name:
try:
module_name, func_name = reducer_name.rsplit(".", 1)
module = __import__(module_name, fromlist=[func_name])
return getattr(module, func_name)
except (ImportError, AttributeError):
logger.warning(f"Could not resolve reducer: {reducer_name}")
# Special case for lambda
if reducer_name == "<lambda>":
def generic_lambda_reducer(a, b) -> Any:
"""Generic Lambda Reducer.
Args:
a: [TODO: Add description]
b: [TODO: Add description]
Returns:
[TODO: Add return description]
"""
# Simple fallback that concatenates lists or dicts, otherwise takes
# b
if isinstance(a, list | tuple) and isinstance(b, list | tuple):
return a + b
if isinstance(a, dict) and isinstance(b, dict):
result = a.copy()
result.update(b)
return result
return b
return generic_lambda_reducer
# Could not resolve
return None
[docs]
def infer_field_type(value: Any) -> type:
"""Infer the field type from a value.
Args:
value: Value to infer type from
Returns:
Inferred type
"""
from typing import Any as AnyType
from typing import Optional
if value is None:
return Optional[AnyType]
if isinstance(value, str):
return Optional[str]
if isinstance(value, int):
return Optional[int]
if isinstance(value, float):
return Optional[float]
if isinstance(value, bool):
return Optional[bool]
if isinstance(value, list):
if value and all(isinstance(x, type(value[0])) for x in value):
item_type = infer_field_type(value[0])
# Strip Optional from item_type
if get_origin(item_type) is Optional:
item_type = get_args(item_type)[0]
return Optional[list[item_type]]
return Optional[list[AnyType]]
if isinstance(value, dict):
if value and all(isinstance(k, str) for k in value):
return Optional[dict[str, AnyType]]
return Optional[dict[AnyType, AnyType]]
return Optional[AnyType]