from typing import Any, TypeVar, Union
# from langchain_core.output_parsers
from pydantic import BaseModel, Field
from haive.core.engine.output_parser.types import OutputParserType
# class OutputParsingInputSchema(BaseModel):
"""
Output parser engine for the Haive framework.
This module provides an engine implementation for LangChain output parsers,
allowing them to be used consistently within the Haive framework's engine system.
"""
import inspect
import logging
import uuid
from datetime import datetime
from enum import Enum
from langchain_core.messages import BaseMessage
from langchain_core.runnables import RunnableConfig
from haive.core.engine.base import InvokableEngine
from haive.core.engine.base.types import EngineType
# Deferred import to avoid circular dependency - registry imports engine
logger = logging.getLogger(__name__)
# Define a bounded type variable for output types
TOut = TypeVar(
"TOut",
bound=bool
| str
| int
| float
| datetime
| list[Any]
| dict[str, Any]
| list[dict[str, Any]]
| BaseModel
| Any,
)
# Input can be string, message, or collection
TIn = TypeVar("TIn", bound=str | BaseMessage | list[BaseMessage] | dict[str, Any])
# TODO: Re-enable registration after fixing circular imports
# @register_component(registry_getter="engine_registry", component_type=EngineType.OUTPUT_PARSER)
[docs]
class OutputParserEngine(InvokableEngine[TIn, TOut]):
"""Engine that wraps LangChain OutputParsers to convert text to structured data.
The generic parameter TOut represents the specific return type of the parser.
This allows for type-safe usage of parsers in your workflows.
Supported output types include:
- Primitive types (bool, str, int, float, datetime)
- Collection types (List[str], Dict[str, Any], etc.)
- Pydantic models (any BaseModel subclass)
- Specialized types (pandas.DataFrame, Enum values)
- Custom structured types
Examples:
# Create a JSON parser
json_parser = create_output_parser_engine(
OutputParserType.JSON,
name="my_json_parser"
)
# Use in a graph
graph.add_node("parse_json", json_parser)
"""
engine_type: EngineType = Field(default=EngineType.OUTPUT_PARSER)
# Parser configuration
parser_type: OutputParserType = Field(
description="Type of parser to use (from OutputParserType enum)"
)
parser_config: dict[str, Any] = Field(
default_factory=dict,
description="Additional configuration parameters for the parser",
)
# Special configuration fields for specific parsers
pydantic_model: type[BaseModel] | None = Field(
default=None,
description="Pydantic model for PydanticOutputParser",
exclude=True, # Exclude from serialization
)
regex_pattern: str | None = Field(
default=None, description="Regex pattern for RegexParser"
)
enum_class: type[Enum] | None = Field(
default=None, description="Enum class for EnumOutputParser", exclude=True
)
response_schemas: list[dict[str, Any]] | None = Field(
default=None, description="Response schemas for StructuredOutputParser"
)
[docs]
def get_output_fields(self) -> dict[str, tuple]:
"""Define output field requirements based on parser type."""
if self.parser_type == OutputParserType.BOOLEAN:
return {"result": (bool, Field(description="Parsed boolean result"))}
if self.parser_type in [
OutputParserType.LIST,
OutputParserType.COMMA_SEPARATED_LIST,
OutputParserType.NUMBERED_LIST,
OutputParserType.MARKDOWN_LIST,
]:
return {
"result": (
list[str],
Field(default_factory=list, description="Parsed list result"),
)
}
if self.parser_type == OutputParserType.DATETIME:
return {"result": (datetime, Field(description="Parsed datetime result"))}
if self.parser_type in [
OutputParserType.JSON,
OutputParserType.SIMPLE_JSON,
OutputParserType.COMBINING,
OutputParserType.REGEX_DICT,
OutputParserType.XML,
OutputParserType.YAML,
]:
return {
"result": (
dict[str, Any],
Field(default_factory=dict, description="Parsed dictionary result"),
)
}
if self.parser_type == OutputParserType.PYDANTIC and self.pydantic_model:
return {
"result": (
self.pydantic_model,
Field(description="Parsed Pydantic model"),
)
}
if self.parser_type == OutputParserType.STRING:
return {"result": (str, Field(description="Parsed string result"))}
if self.parser_type == OutputParserType.PANDAS_DATAFRAME:
# Using Any here since we can't import pandas directly in the type
# hint
return {"result": (Any, Field(description="Parsed pandas DataFrame"))}
if self.parser_type == OutputParserType.ENUM and self.enum_class:
return {"result": (self.enum_class, Field(description="Parsed enum value"))}
if self.parser_type in [
OutputParserType.OPENAI_TOOLS,
OutputParserType.OPENAI_TOOLS_KEY,
OutputParserType.PYDANTIC_TOOLS,
]:
return {"result": (Any, Field(description="Parsed tool output"))}
# Default for unknown types
return {"result": (Any, Field(description="Parsed result"))}
[docs]
def create_runnable(self, runnable_config: RunnableConfig | None = None) -> Any:
"""Create the appropriate parser based on configuration.
Args:
runnable_config: Optional runtime configuration
Returns:
An instantiated output parser
Raises:
ValueError: If parser type is unknown or required config is missing
ImportError: If required packages are not installed
"""
params = self.apply_runnable_config(runnable_config) or {}
config = {**self.parser_config, **params}
# Common module prefixes
MODULES = {
"core": "langchain_core.output_parsers",
"langchain": "langchain.output_parsers",
"openai_tools": "langchain_core.output_parsers.openai_tools",
}
# General parser configuration with more concise format
# Format: parser_type: (module_key, class_name_override, config_handler)
# If class_name_override is None, derive from enum value
# If config_handler is None, just pass filtered kwargs
PARSER_MAP = {
# Core simple parsers
OutputParserType.LIST: ("core", None, None),
OutputParserType.COMMA_SEPARATED_LIST: ("core", None, None),
OutputParserType.NUMBERED_LIST: ("core", None, None),
OutputParserType.MARKDOWN_LIST: ("core", None, None),
OutputParserType.STRING: ("core", "StrOutputParser", None),
OutputParserType.JSON: (
"core",
None,
lambda s, c: {"schema": c.get("schema")},
),
OutputParserType.SIMPLE_JSON: ("core", None, None),
OutputParserType.XML: ("core", None, None),
OutputParserType.PYDANTIC: (
"core",
None,
lambda s, c: {"pydantic_object": s.pydantic_model},
),
# LangChain extension parsers
OutputParserType.BOOLEAN: ("langchain", None, None),
OutputParserType.DATETIME: ("langchain", None, None),
OutputParserType.COMBINING: (
"langchain",
None,
lambda s, c: {"parsers": c.get("parsers", [])},
),
OutputParserType.REGEX: (
"langchain",
None,
lambda s, c: {
"regex_pattern": s.regex_pattern,
"output_keys": c.get("output_keys", []),
},
),
OutputParserType.REGEX_DICT: (
"langchain",
None,
lambda s, c: {"regex_patterns": c.get("regex_patterns", {})},
),
OutputParserType.PANDAS_DATAFRAME: ("langchain", None, None),
OutputParserType.YAML: ("langchain", None, None),
OutputParserType.ENUM: (
"langchain",
None,
lambda s, c: {"enum": s.enum_class},
),
# OpenAI tools parsers
OutputParserType.OPENAI_TOOLS: (
"openai_tools",
"JsonOutputToolsParser",
lambda s, c: {"schema": c.get("schema")},
),
OutputParserType.OPENAI_TOOLS_KEY: (
"openai_tools",
"JsonOutputKeyToolsParser",
lambda s, c: {"schema": c.get("schema"), "key": c.get("key")},
),
OutputParserType.PYDANTIC_TOOLS: (
"openai_tools",
"PydanticToolsParser",
lambda s, c: {"pydantic_schemas": c.get("pydantic_schemas", [])},
),
}
# Special case handlers - for parsers that need custom instantiation
SPECIAL_HANDLERS = {
OutputParserType.STRUCTURED: lambda s, c: self._create_structured_parser(
s.response_schemas
)
}
# Required attributes and parameters validation
REQUIRED_ATTRS = {
OutputParserType.PYDANTIC: ["pydantic_model"],
OutputParserType.REGEX: ["regex_pattern"],
OutputParserType.ENUM: ["enum_class"],
OutputParserType.STRUCTURED: ["response_schemas"],
}
REQUIRED_PARAMS = {
OutputParserType.COMBINING: ["parsers"],
OutputParserType.REGEX: ["output_keys"],
OutputParserType.REGEX_DICT: ["regex_patterns"],
OutputParserType.OPENAI_TOOLS: ["schema"],
OutputParserType.OPENAI_TOOLS_KEY: ["schema", "key"],
OutputParserType.PYDANTIC_TOOLS: ["pydantic_schemas"],
}
try:
# Check for special case handlers first
if self.parser_type in SPECIAL_HANDLERS:
return SPECIAL_HANDLERS[self.parser_type](self, config)
# Check if we have a configuration for this parser type
if self.parser_type not in PARSER_MAP:
raise TypeError(f"Unknown parser type: {self.parser_type}")
module_key, class_override, config_handler = PARSER_MAP[self.parser_type]
# Get the module path
if module_key not in MODULES:
raise ValueError(f"Unknown module key: {module_key}")
module_path = MODULES[module_key]
# Determine class name
if class_override:
class_name = class_override
else:
# Convert enum value to CamelCase and append OutputParser
parts = self.parser_type.value.split("_")
class_name = "".join(p.capitalize() for p in parts) + "OutputParser"
# Validate required attributes
if self.parser_type in REQUIRED_ATTRS:
for attr in REQUIRED_ATTRS[self.parser_type]:
if getattr(self, attr, None) is None:
raise ValueError(
f"{self.parser_type} requires {attr} to be set"
)
# Validate required parameters
if self.parser_type in REQUIRED_PARAMS:
for param in REQUIRED_PARAMS[self.parser_type]:
if param not in config:
raise ValueError(
f"{self.parser_type} requires '{param}' in configuration"
)
# Import the parser class
parser_class = self._import_class(module_path, class_name)
# Determine parameters
if config_handler:
params = config_handler(self, config)
else:
params = self._filter_kwargs_for_class(parser_class, config)
# Create and return instance
return parser_class(**params)
except ImportError as e:
logger.exception(f"Failed to import parser for {self.parser_type}: {e!s}")
raise ImportError(
f"Required package for {self.parser_type} parser not installed: {e!s}"
)
[docs]
def invoke(
self, input_data: TIn, runnable_config: RunnableConfig | None = None
) -> TOut:
"""Invoke the parser with the input data.
Args:
input_data: Input text, message, or dictionary
runnable_config: Optional runtime configuration
Returns:
Parsed output based on parser type
Raises:
ValueError: If the input cannot be processed
Exception: If parsing fails and no error handler is provided
"""
# Create the parser
parser = self.create_runnable(runnable_config)
# Extract text from various input types
text = self._extract_text(input_data)
# Parse the text
try:
if (
hasattr(parser, "parse_with_prompt")
and runnable_config
and "prompt" in runnable_config.get("configurable", {})
):
# If we have a prompt in the config and the parser supports it
prompt = runnable_config["configurable"]["prompt"]
return parser.parse_with_prompt(text, prompt)
# Otherwise, use regular parse
return parser.parse(text)
except Exception as e:
# Handle parsing errors
logger.exception(f"Error parsing with {self.parser_type}: {e!s}")
if "error_handler" in self.parser_config:
error_handler = self.parser_config["error_handler"]
return error_handler(text, e)
# Re-raise if no error handler
raise
[docs]
async def ainvoke(
self, input_data: TIn, runnable_config: RunnableConfig | None = None
) -> TOut:
"""Asynchronously invoke the parser with the input data.
Args:
input_data: Input text, message, or dictionary
runnable_config: Optional runtime configuration
Returns:
Parsed output based on parser type
"""
# Create the parser
parser = self.create_runnable(runnable_config)
# Extract text
text = self._extract_text(input_data)
# Parse asynchronously if supported
try:
if (
hasattr(parser, "aparse_with_prompt")
and runnable_config
and "prompt" in runnable_config.get("configurable", {})
):
prompt = runnable_config["configurable"]["prompt"]
return await parser.aparse_with_prompt(text, prompt)
if hasattr(parser, "aparse"):
return await parser.aparse(text)
# Fall back to synchronous parsing in an async context
import asyncio
return await asyncio.to_thread(parser.parse, text)
except Exception as e:
logger.exception(
f"Error parsing asynchronously with {self.parser_type}: {e!s}"
)
if "error_handler" in self.parser_config:
error_handler = self.parser_config["error_handler"]
return await asyncio.to_thread(error_handler, text, e)
raise
def _extract_text(self, input_data: TIn) -> str:
"""Extract text content from various input types.
Args:
input_data: Input data in various formats
Returns:
Extracted text string
Raises:
ValueError: If text cannot be extracted
"""
if isinstance(input_data, str):
# If input is already a string, use it directly
return input_data
if hasattr(input_data, "content") and isinstance(input_data.content, str):
# If input is a message with text content, extract it
return input_data.content
if isinstance(input_data, list) and all(
hasattr(m, "content") for m in input_data
):
# If input is a list of messages, concatenate their contents
return "\n".join(
m.content for m in input_data if isinstance(m.content, str)
)
if isinstance(input_data, dict):
# Try to extract text from dictionary
if "text" in input_data:
return str(input_data["text"])
if "content" in input_data:
return str(input_data["content"])
if "input" in input_data:
return str(input_data["input"])
# Try to convert to string as a fallback
try:
return str(input_data)
except Exception as e:
raise ValueError(f"Cannot extract text from input: {e!s}")
def _import_class(self, module_path: str, class_name: str) -> type:
"""Dynamically import a class."""
import importlib
module = importlib.import_module(module_path)
return getattr(module, class_name)
def _filter_kwargs_for_class(
self, cls: type, kwargs: dict[str, Any]
) -> dict[str, Any]:
"""Filter kwargs to only include those accepted by the class's __init__."""
if not hasattr(cls, "__init__"):
return {}
# Get the signature of the class's __init__ method
sig = inspect.signature(cls.__init__)
# Get parameter names, excluding 'self'
valid_params = [param for param in sig.parameters if param != "self"]
# Return only the kwargs that are valid parameters
return {k: v for k, v in kwargs.items() if k in valid_params}
def _create_structured_parser(self, schemas: list[dict[str, Any]]) -> Any:
"""Create a StructuredOutputParser instance."""
from langchain.output_parsers import ResponseSchema, StructuredOutputParser
response_schemas = [ResponseSchema(**schema) for schema in schemas]
return StructuredOutputParser.from_response_schemas(response_schemas)
def create_output_parser_engine(
parser_type: str | OutputParserType, name: str | None = None, **kwargs
) -> OutputParserEngine:
"""Factory function to create an OutputParserEngine.
Args:
parser_type: Type of parser (from OutputParserType enum or string)
name: Optional name for the engine
**kwargs: Additional configuration parameters
Returns:
Configured OutputParserEngine
"""
# Convert string to enum if needed
if isinstance(parser_type, str):
try:
parser_type = OutputParserType(parser_type)
except ValueError:
raise TypeError(f"Unknown parser type: {parser_type}")
if name is None:
name = f"{parser_type.value}_parser_{uuid.uuid4().hex[:6]}"
# Extract special parameters
parser_config = {
k: v
for k, v in kwargs.items()
if k
not in ["pydantic_model", "regex_pattern", "enum_class", "response_schemas"]
}
return OutputParserEngine(
name=name,
parser_type=parser_type,
parser_config=parser_config,
pydantic_model=kwargs.get("pydantic_model"),
regex_pattern=kwargs.get("regex_pattern"),
enum_class=kwargs.get("enum_class"),
response_schemas=kwargs.get("response_schemas"),
)
# Convenience functions for common parser types
def create_str_parser(name: str | None = None, **kwargs) -> OutputParserEngine:
"""Create a StrOutputParser engine."""
return create_output_parser_engine(OutputParserType.STRING, name, **kwargs)
def create_list_parser(
list_type: str = "list", name: str | None = None, **kwargs
) -> OutputParserEngine:
"""Create a list parser engine.
Args:
list_type: Type of list parser ('list', 'comma_separated', 'numbered', 'markdown')
name: Optional name for the engine
**kwargs: Additional configuration
Returns:
List parser engine
"""
type_map = {
"list": OutputParserType.LIST,
"comma_separated": OutputParserType.COMMA_SEPARATED_LIST,
"numbered": OutputParserType.NUMBERED_LIST,
"markdown": OutputParserType.MARKDOWN_LIST,
}
if list_type not in type_map:
raise ValueError(
f"Unknown list type: {list_type}. Must be one of {list(type_map.keys())}"
)
return create_output_parser_engine(type_map[list_type], name, **kwargs)
def create_json_parser(
name: str | None = None, schema: dict | None = None, **kwargs
) -> OutputParserEngine:
"""Create a JsonOutputParser engine."""
if schema:
kwargs["schema"] = schema
return create_output_parser_engine(OutputParserType.JSON, name, **kwargs)
def create_pydantic_parser(
pydantic_model: type[BaseModel], name: str | None = None, **kwargs
) -> OutputParserEngine:
"""Create a PydanticOutputParser engine."""
return create_output_parser_engine(
OutputParserType.PYDANTIC, name, pydantic_model=pydantic_model, **kwargs
)
def create_structured_parser(
response_schemas: list[dict[str, Any]], name: str | None = None, **kwargs
) -> OutputParserEngine:
"""Create a StructuredOutputParser engine."""
return create_output_parser_engine(
OutputParserType.STRUCTURED, name, response_schemas=response_schemas, **kwargs
)
def create_regex_parser(
regex_pattern: str, output_keys: list[str], name: str | None = None, **kwargs
) -> OutputParserEngine:
"""Create a RegexParser engine."""
return create_output_parser_engine(
OutputParserType.REGEX,
name,
regex_pattern=regex_pattern,
output_keys=output_keys,
**kwargs,
)
def create_enum_parser(
enum_class: type[Enum], name: str | None = None, **kwargs
) -> OutputParserEngine:
"""Create an EnumOutputParser engine."""
return create_output_parser_engine(
OutputParserType.ENUM, name, enum_class=enum_class, **kwargs
)