"""Loader Strategy System for Document Engine.
This module implements the loader strategy system for intelligent loader selection
based on source type, performance requirements, and capabilities.
"""
import importlib
import logging
from enum import Enum
from typing import Any, Literal
from langchain_core.document_loaders.base import BaseLoader
from pydantic import BaseModel, Field
from haive.core.engine.document.loaders.sources.implementation import EnhancedSource
logger = logging.getLogger(__name__)
[docs]
class LoaderPriority(str, Enum):
"""Priority levels for loader selection."""
HIGHEST = "highest"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
LOWEST = "lowest"
[docs]
class LoaderCapability(str, Enum):
"""Capabilities that loaders may support."""
ASYNC = "async"
METADATA = "metadata"
CONTENT_EXTRACTION = "content_extraction"
TEXT_EXTRACTION = "text_extraction"
IMAGE_EXTRACTION = "image_extraction"
TABLE_EXTRACTION = "table_extraction"
STRUCTURE_PRESERVATION = "structure_preservation"
LAZY_LOADING = "lazy_loading"
PAGINATION = "pagination"
CHUNKING = "chunking"
FILTERING = "filtering"
BATCHING = "batching"
[docs]
class LoaderStrategy(BaseModel):
"""Information about a document loader strategy."""
# Identification
strategy_name: str = Field(..., description="Unique name for this loader strategy")
# Loader class information
loader_class: str = Field(..., description="Name of the loader class")
module_path: str = Field(
default="langchain_community.document_loaders",
description="Import path for the loader module",
)
# Performance characteristics
speed: Literal["fast", "medium", "slow"] = Field(
default="medium", description="Relative speed of the loader"
)
quality: Literal["low", "medium", "high"] = Field(
default="medium", description="Quality of document extraction"
)
resource_usage: Literal["low", "medium", "high"] = Field(
default="medium", description="Resource consumption of the loader"
)
# Capabilities
supports_async: bool = Field(
default=False, description="Whether the loader supports async loading"
)
supports_metadata: bool = Field(
default=True, description="Whether the loader extracts document metadata"
)
supports_batching: bool = Field(
default=False, description="Whether the loader supports batch loading"
)
capabilities: list[LoaderCapability] = Field(
default_factory=list, description="Special capabilities of this loader"
)
# Suitability indicators
best_for: list[str] = Field(
default_factory=list,
description="Types of content this loader is best suited for",
)
# Requirements
requires_dependencies: list[str] = Field(
default_factory=list,
description="Additional dependencies required for this loader",
)
requires_authentication: bool = Field(
default=False, description="Whether this loader requires authentication"
)
# Priority and availability
priority: LoaderPriority = Field(
default=LoaderPriority.MEDIUM, description="Selection priority for this loader"
)
is_available: bool = Field(
default=True, description="Whether this loader is currently available"
)
[docs]
def create_loader(
self, source: EnhancedSource, options: dict[str, Any]
) -> BaseLoader | None:
"""Create a loader instance for the given source."""
try:
# Import the loader class
module = importlib.import_module(self.module_path)
loader_cls = getattr(module, self.loader_class)
# Create loader with appropriate arguments
if hasattr(source, "source_path"):
return loader_cls(source.source_path, **options)
return loader_cls(**options)
except Exception as e:
logger.exception(f"Failed to create loader {self.loader_class}: {e}")
return None
[docs]
def check_availability(self) -> bool:
"""Check if this loader strategy is available."""
try:
# Try to import the required module and class
module = importlib.import_module(self.module_path)
getattr(module, self.loader_class)
# Check for required dependencies
for dep in self.requires_dependencies:
try:
importlib.import_module(dep)
except ImportError:
logger.warning(
f"Dependency {dep} not available for {self.strategy_name}"
)
return False
return True
except Exception as e:
logger.warning(f"Loader {self.strategy_name} not available: {e}")
return False
[docs]
class LoaderStrategyRegistry:
"""Registry for managing loader strategies."""
def __init__(self) -> None:
"""Init .
Returns:
[TODO: Add return description]
"""
self._strategies: dict[str, LoaderStrategy] = {}
self._register_default_strategies()
def _register_default_strategies(self):
"""Register default loader strategies."""
# PDF loaders
self.register(
LoaderStrategy(
strategy_name="pdf_pymupdf",
loader_class="PyMuPDFLoader",
speed="fast",
quality="high",
resource_usage="medium",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["pdf", "document"],
requires_dependencies=["pymupdf"],
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="pdf_pdfplumber",
loader_class="PDFPlumberLoader",
speed="medium",
quality="high",
resource_usage="medium",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.TABLE_EXTRACTION,
],
best_for=["pdf", "tables"],
requires_dependencies=["pdfplumber"],
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="pdf_unstructured",
loader_class="UnstructuredPDFLoader",
speed="slow",
quality="high",
resource_usage="high",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.IMAGE_EXTRACTION,
LoaderCapability.TABLE_EXTRACTION,
],
best_for=["pdf", "scanned", "ocr"],
requires_dependencies=["unstructured", "pdf2image", "pytesseract"],
priority=LoaderPriority.MEDIUM,
)
)
# Web loaders
self.register(
LoaderStrategy(
strategy_name="web_base",
loader_class="WebBaseLoader",
speed="fast",
quality="medium",
resource_usage="low",
capabilities=[LoaderCapability.TEXT_EXTRACTION],
best_for=["web", "html", "url"],
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="playwright",
loader_class="PlaywrightURLLoader",
speed="slow",
quality="high",
resource_usage="high",
capabilities=[LoaderCapability.TEXT_EXTRACTION, LoaderCapability.ASYNC],
best_for=["web", "javascript", "dynamic"],
requires_dependencies=["playwright"],
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="selenium",
loader_class="SeleniumURLLoader",
speed="slow",
quality="high",
resource_usage="high",
capabilities=[LoaderCapability.TEXT_EXTRACTION],
best_for=["web", "javascript", "dynamic"],
requires_dependencies=["selenium"],
priority=LoaderPriority.MEDIUM,
)
)
# Document loaders
self.register(
LoaderStrategy(
strategy_name="docx",
loader_class="Docx2txtLoader",
speed="fast",
quality="medium",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["docx", "doc", "document"],
requires_dependencies=["docx2txt"],
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="unstructured_word",
loader_class="UnstructuredWordDocumentLoader",
speed="medium",
quality="high",
resource_usage="medium",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
LoaderCapability.TABLE_EXTRACTION,
],
best_for=["docx", "doc"],
requires_dependencies=["unstructured"],
priority=LoaderPriority.HIGH,
)
)
# Text loaders
self.register(
LoaderStrategy(
strategy_name="text_file",
loader_class="TextLoader",
speed="fast",
quality="low",
resource_usage="low",
capabilities=[LoaderCapability.TEXT_EXTRACTION],
best_for=["text", "txt", "log"],
priority=LoaderPriority.LOW,
)
)
# CSV loaders
self.register(
LoaderStrategy(
strategy_name="csv",
loader_class="CSVLoader",
speed="fast",
quality="medium",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["csv", "data"],
priority=LoaderPriority.MEDIUM,
)
)
# JSON loaders
self.register(
LoaderStrategy(
strategy_name="json",
loader_class="JSONLoader",
speed="fast",
quality="medium",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
LoaderCapability.STRUCTURE_PRESERVATION,
],
best_for=["json", "jsonl"],
priority=LoaderPriority.MEDIUM,
)
)
# Markdown loaders
self.register(
LoaderStrategy(
strategy_name="markdown",
loader_class="UnstructuredMarkdownLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.STRUCTURE_PRESERVATION,
],
best_for=["md", "markdown"],
requires_dependencies=["unstructured"],
priority=LoaderPriority.HIGH,
)
)
# Excel loaders
self.register(
LoaderStrategy(
strategy_name="excel",
loader_class="UnstructuredExcelLoader",
speed="medium",
quality="high",
resource_usage="medium",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.TABLE_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["xlsx", "xls", "excel"],
requires_dependencies=["unstructured", "openpyxl"],
priority=LoaderPriority.HIGH,
)
)
# PowerPoint loaders
self.register(
LoaderStrategy(
strategy_name="powerpoint",
loader_class="UnstructuredPowerPointLoader",
speed="medium",
quality="high",
resource_usage="medium",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.IMAGE_EXTRACTION,
],
best_for=["pptx", "ppt", "powerpoint"],
requires_dependencies=["unstructured"],
priority=LoaderPriority.HIGH,
)
)
# Email loaders
self.register(
LoaderStrategy(
strategy_name="email",
loader_class="UnstructuredEmailLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["eml", "msg", "email"],
requires_dependencies=["unstructured"],
priority=LoaderPriority.HIGH,
)
)
# HTML loaders
self.register(
LoaderStrategy(
strategy_name="html",
loader_class="UnstructuredHTMLLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.STRUCTURE_PRESERVATION,
],
best_for=["html", "htm"],
requires_dependencies=["unstructured"],
priority=LoaderPriority.HIGH,
)
)
# XML loaders
self.register(
LoaderStrategy(
strategy_name="xml",
loader_class="UnstructuredXMLLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.STRUCTURE_PRESERVATION,
],
best_for=["xml"],
requires_dependencies=["unstructured"],
priority=LoaderPriority.HIGH,
)
)
# RTF loaders
self.register(
LoaderStrategy(
strategy_name="rtf",
loader_class="UnstructuredRTFLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[LoaderCapability.TEXT_EXTRACTION],
best_for=["rtf"],
requires_dependencies=["unstructured"],
priority=LoaderPriority.MEDIUM,
)
)
# Directory loaders
self.register(
LoaderStrategy(
strategy_name="directory",
loader_class="DirectoryLoader",
speed="medium",
quality="medium",
resource_usage="medium",
capabilities=[LoaderCapability.BATCHING],
best_for=["directory", "folder"],
priority=LoaderPriority.MEDIUM,
)
)
# Git repository loaders
self.register(
LoaderStrategy(
strategy_name="git",
loader_class="GitLoader",
speed="slow",
quality="high",
resource_usage="medium",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["git", "repository", "code"],
requires_dependencies=["gitpython"],
priority=LoaderPriority.MEDIUM,
)
)
# YouTube loaders
self.register(
LoaderStrategy(
strategy_name="youtube",
loader_class="YoutubeLoader",
module_path="langchain_community.document_loaders",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["youtube", "video"],
requires_dependencies=["youtube-transcript-api"],
priority=LoaderPriority.HIGH,
)
)
# Wikipedia loaders
self.register(
LoaderStrategy(
strategy_name="wikipedia",
loader_class="WikipediaLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["wikipedia", "wiki"],
requires_dependencies=["wikipedia"],
priority=LoaderPriority.HIGH,
)
)
# ArXiv loaders
self.register(
LoaderStrategy(
strategy_name="arxiv",
loader_class="ArxivLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["arxiv", "research", "paper"],
requires_dependencies=["arxiv"],
priority=LoaderPriority.HIGH,
)
)
# Additional PDF loaders
self.register(
LoaderStrategy(
strategy_name="pypdf",
loader_class="PyPDFLoader",
speed="fast",
quality="medium",
resource_usage="low",
capabilities=[LoaderCapability.TEXT_EXTRACTION],
best_for=["pdf"],
requires_dependencies=["pypdf"],
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="pdfminer",
loader_class="PDFMinerLoader",
speed="medium",
quality="high",
resource_usage="medium",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.STRUCTURE_PRESERVATION,
],
best_for=["pdf", "complex"],
requires_dependencies=["pdfminer.six"],
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="pypdfium2",
loader_class="PyPDFium2Loader",
speed="fast",
quality="high",
resource_usage="medium",
capabilities=[LoaderCapability.TEXT_EXTRACTION],
best_for=["pdf"],
requires_dependencies=["pypdfium2"],
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="mathpix_pdf",
loader_class="MathpixPDFLoader",
speed="slow",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.IMAGE_EXTRACTION,
],
best_for=["pdf", "math", "scientific"],
requires_dependencies=["mathpix-pdf-to-html"],
requires_authentication=True,
priority=LoaderPriority.LOW,
)
)
# YAML loaders - use JSONLoader which supports YAML
self.register(
LoaderStrategy(
strategy_name="yaml",
loader_class="JSONLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.STRUCTURE_PRESERVATION,
],
best_for=["yaml", "yml", "config"],
requires_dependencies=["pyyaml"],
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="toml",
loader_class="TomlLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.STRUCTURE_PRESERVATION,
],
best_for=["toml", "config"],
requires_dependencies=["toml"],
priority=LoaderPriority.MEDIUM,
)
)
# Code/Programming loaders
self.register(
LoaderStrategy(
strategy_name="python",
loader_class="PythonLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.STRUCTURE_PRESERVATION,
],
best_for=["py", "python", "code"],
requires_dependencies=[],
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="notebook",
loader_class="NotebookLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.STRUCTURE_PRESERVATION,
],
best_for=["ipynb", "jupyter", "notebook"],
requires_dependencies=["nbformat"],
priority=LoaderPriority.HIGH,
)
)
# Scientific/Academic loaders
self.register(
LoaderStrategy(
strategy_name="bibtex",
loader_class="BibtexLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["bib", "bibtex", "bibliography"],
requires_dependencies=["pybtex"],
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="pubmed",
loader_class="PubMedLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["pubmed", "medical", "research"],
requires_dependencies=["biopython"],
priority=LoaderPriority.HIGH,
)
)
# OpenDocument formats
self.register(
LoaderStrategy(
strategy_name="odt",
loader_class="UnstructuredODTLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[LoaderCapability.TEXT_EXTRACTION],
best_for=["odt", "opendocument"],
requires_dependencies=["unstructured"],
priority=LoaderPriority.MEDIUM,
)
)
# E-book formats
self.register(
LoaderStrategy(
strategy_name="epub",
loader_class="UnstructuredEPubLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.STRUCTURE_PRESERVATION,
],
best_for=["epub", "ebook"],
requires_dependencies=["unstructured", "ebooklib"],
priority=LoaderPriority.HIGH,
)
)
# Media/Subtitle loaders
self.register(
LoaderStrategy(
strategy_name="srt",
loader_class="SRTLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["srt", "subtitle", "caption"],
requires_dependencies=[],
priority=LoaderPriority.MEDIUM,
)
)
# Image loaders
self.register(
LoaderStrategy(
strategy_name="image",
loader_class="UnstructuredImageLoader",
speed="slow",
quality="high",
resource_usage="high",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.IMAGE_EXTRACTION,
],
best_for=["jpg", "jpeg", "png", "gif", "bmp", "tiff", "image"],
requires_dependencies=["unstructured", "pytesseract", "pillow"],
priority=LoaderPriority.MEDIUM,
)
)
# Cloud storage loaders
self.register(
LoaderStrategy(
strategy_name="s3_file",
loader_class="S3FileLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[LoaderCapability.TEXT_EXTRACTION],
best_for=["s3", "aws"],
requires_dependencies=["boto3"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="s3_directory",
loader_class="S3DirectoryLoader",
speed="medium",
quality="high",
resource_usage="medium",
capabilities=[LoaderCapability.BATCHING],
best_for=["s3", "aws", "directory"],
requires_dependencies=["boto3"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="gcs_file",
loader_class="GCSFileLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[LoaderCapability.TEXT_EXTRACTION],
best_for=["gcs", "google"],
requires_dependencies=["google-cloud-storage"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="azure_blob",
loader_class="AzureBlobStorageFileLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[LoaderCapability.TEXT_EXTRACTION],
best_for=["azure", "blob"],
requires_dependencies=["azure-storage-blob"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
# Database loaders
self.register(
LoaderStrategy(
strategy_name="sql_database",
loader_class="SQLDatabaseLoader",
speed="medium",
quality="high",
resource_usage="medium",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["sql", "database"],
requires_dependencies=["sqlalchemy"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="mongodb",
loader_class="MongodbLoader",
speed="medium",
quality="high",
resource_usage="medium",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["mongodb", "nosql"],
requires_dependencies=["pymongo"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
# API/Service loaders
self.register(
LoaderStrategy(
strategy_name="notion",
loader_class="NotionDBLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["notion"],
requires_dependencies=["notion-client"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="confluence",
loader_class="ConfluenceLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["confluence", "wiki"],
requires_dependencies=["atlassian-python-api"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="google_drive",
loader_class="GoogleDriveLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["gdrive", "google"],
requires_dependencies=["google-auth", "google-api-python-client"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
# Chat/Messaging loaders
self.register(
LoaderStrategy(
strategy_name="slack",
loader_class="SlackDirectoryLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["slack", "chat"],
requires_dependencies=[],
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="discord",
loader_class="DiscordChatLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["discord", "chat"],
requires_dependencies=[],
priority=LoaderPriority.MEDIUM,
)
)
# News/Content loaders
self.register(
LoaderStrategy(
strategy_name="reddit",
loader_class="RedditPostsLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["reddit", "social"],
requires_dependencies=["praw"],
requires_authentication=True,
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="hackernews",
loader_class="HNLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["hackernews", "hn"],
requires_dependencies=["beautifulsoup4"],
priority=LoaderPriority.MEDIUM,
)
)
# More Unstructured file loaders
self.register(
LoaderStrategy(
strategy_name="unstructured_file",
loader_class="UnstructuredFileLoader",
speed="medium",
quality="high",
resource_usage="medium",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.STRUCTURE_PRESERVATION,
],
best_for=["generic", "unknown"],
requires_dependencies=["unstructured"],
priority=LoaderPriority.LOW,
)
)
self.register(
LoaderStrategy(
strategy_name="unstructured_rst",
loader_class="UnstructuredRSTLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.STRUCTURE_PRESERVATION,
],
best_for=["rst", "restructuredtext"],
requires_dependencies=["unstructured"],
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="unstructured_org_mode",
loader_class="UnstructuredOrgModeLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.STRUCTURE_PRESERVATION,
],
best_for=["org", "orgmode"],
requires_dependencies=["unstructured", "pandoc"],
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="unstructured_tsv",
loader_class="UnstructuredTSVLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.TABLE_EXTRACTION,
],
best_for=["tsv", "tab"],
requires_dependencies=["unstructured"],
priority=LoaderPriority.MEDIUM,
)
)
# Archive/Compressed file loaders
self.register(
LoaderStrategy(
strategy_name="unstructured_chm",
loader_class="UnstructuredCHMLoader",
speed="medium",
quality="high",
resource_usage="medium",
capabilities=[LoaderCapability.TEXT_EXTRACTION],
best_for=["chm", "help"],
requires_dependencies=["unstructured"],
priority=LoaderPriority.LOW,
)
)
self.register(
LoaderStrategy(
strategy_name="mhtml",
loader_class="MHTMLLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.STRUCTURE_PRESERVATION,
],
best_for=["mhtml", "mht"],
requires_dependencies=["beautifulsoup4"],
priority=LoaderPriority.MEDIUM,
)
)
# Specialized document loaders
self.register(
LoaderStrategy(
strategy_name="vsdx",
loader_class="VsdxLoader",
speed="medium",
quality="high",
resource_usage="medium",
capabilities=[LoaderCapability.TEXT_EXTRACTION],
best_for=["vsdx", "visio"],
requires_dependencies=["vsdx", "beautifulsoup4"],
priority=LoaderPriority.LOW,
)
)
self.register(
LoaderStrategy(
strategy_name="outlook_message",
loader_class="OutlookMessageLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["msg", "outlook"],
requires_dependencies=["extract_msg"],
priority=LoaderPriority.MEDIUM,
)
)
# Data science loaders
self.register(
LoaderStrategy(
strategy_name="dataframe",
loader_class="DataFrameLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.TABLE_EXTRACTION,
],
best_for=["dataframe", "pandas"],
requires_dependencies=["pandas"],
priority=LoaderPriority.MEDIUM,
)
)
# Enhanced web loaders
self.register(
LoaderStrategy(
strategy_name="recursive_url",
loader_class="RecursiveUrlLoader",
speed="slow",
quality="high",
resource_usage="high",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.BATCHING,
],
best_for=["website", "crawl"],
requires_dependencies=["beautifulsoup4"],
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="sitemap",
loader_class="SitemapLoader",
speed="medium",
quality="high",
resource_usage="medium",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.BATCHING,
],
best_for=["sitemap", "xml"],
requires_dependencies=["beautifulsoup4"],
priority=LoaderPriority.MEDIUM,
)
)
# Async loaders
self.register(
LoaderStrategy(
strategy_name="async_html",
loader_class="AsyncHtmlLoader",
speed="fast",
quality="medium",
resource_usage="low",
capabilities=[LoaderCapability.TEXT_EXTRACTION, LoaderCapability.ASYNC],
best_for=["html", "async"],
requires_dependencies=["aiohttp", "beautifulsoup4"],
priority=LoaderPriority.MEDIUM,
)
)
# More API loaders
self.register(
LoaderStrategy(
strategy_name="unstructured_api",
loader_class="UnstructuredAPIFileLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.IMAGE_EXTRACTION,
LoaderCapability.TABLE_EXTRACTION,
],
best_for=["api", "unstructured"],
requires_dependencies=["unstructured"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
# News/RSS loaders
self.register(
LoaderStrategy(
strategy_name="rss_feed",
loader_class="RSSFeedLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["rss", "feed"],
requires_dependencies=["feedparser"],
priority=LoaderPriority.MEDIUM,
)
)
# Knowledge base loaders
self.register(
LoaderStrategy(
strategy_name="obsidian",
loader_class="ObsidianLoader",
speed="medium",
quality="high",
resource_usage="medium",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
LoaderCapability.BATCHING,
],
best_for=["obsidian", "vault", "markdown"],
requires_dependencies=[],
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="roam",
loader_class="RoamLoader",
speed="medium",
quality="high",
resource_usage="medium",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["roam", "graph"],
requires_dependencies=[],
priority=LoaderPriority.MEDIUM,
)
)
# More cloud/service loaders
self.register(
LoaderStrategy(
strategy_name="dropbox",
loader_class="DropboxLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["dropbox", "cloud"],
requires_dependencies=["dropbox"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="onedrive",
loader_class="OneDriveLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["onedrive", "microsoft"],
requires_dependencies=["o365"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="sharepoint",
loader_class="SharePointLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["sharepoint", "microsoft"],
requires_dependencies=["shareplum"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
# Blockchain/Crypto loaders
self.register(
LoaderStrategy(
strategy_name="etherscan",
loader_class="EtherscanLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["ethereum", "blockchain", "crypto"],
requires_dependencies=["web3"],
requires_authentication=True,
priority=LoaderPriority.MEDIUM,
)
)
# Scientific data loaders
self.register(
LoaderStrategy(
strategy_name="conllu",
loader_class="CoNLLULoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.STRUCTURE_PRESERVATION,
],
best_for=["conllu", "nlp", "linguistics"],
requires_dependencies=["conllu"],
priority=LoaderPriority.MEDIUM,
)
)
# Chat export loaders
self.register(
LoaderStrategy(
strategy_name="whatsapp",
loader_class="WhatsAppChatLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["whatsapp", "chat"],
requires_dependencies=[],
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="telegram",
loader_class="TelegramChatLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["telegram", "chat"],
requires_dependencies=[],
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="facebook_chat",
loader_class="FacebookChatLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["facebook", "messenger", "chat"],
requires_dependencies=[],
priority=LoaderPriority.MEDIUM,
)
)
# More specialized loaders
self.register(
LoaderStrategy(
strategy_name="chatgpt",
loader_class="ChatGPTLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["chatgpt", "openai", "conversation"],
requires_dependencies=[],
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="figma",
loader_class="FigmaFileLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["figma", "design"],
requires_dependencies=[],
requires_authentication=True,
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="trello",
loader_class="TrelloLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["trello", "kanban", "boards"],
requires_dependencies=[],
requires_authentication=True,
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="airtable",
loader_class="AirtableLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.TABLE_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["airtable", "database", "spreadsheet"],
requires_dependencies=["pyairtable"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
# More database loaders
self.register(
LoaderStrategy(
strategy_name="elasticsearch",
loader_class="ElasticsearchLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["elasticsearch", "search", "elastic"],
requires_dependencies=["elasticsearch"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="cassandra",
loader_class="CassandraLoader",
speed="medium",
quality="high",
resource_usage="medium",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["cassandra", "nosql"],
requires_dependencies=["cassandra-driver"],
requires_authentication=True,
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="couchbase",
loader_class="CouchbaseLoader",
speed="medium",
quality="high",
resource_usage="medium",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["couchbase", "nosql"],
requires_dependencies=["couchbase"],
requires_authentication=True,
priority=LoaderPriority.MEDIUM,
)
)
# Document collaboration platforms
self.register(
LoaderStrategy(
strategy_name="jira",
loader_class="JiraLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["jira", "issues", "tickets"],
requires_dependencies=["atlassian-python-api"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="asana",
loader_class="AsanaLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["asana", "tasks", "projects"],
requires_dependencies=["asana"],
requires_authentication=True,
priority=LoaderPriority.MEDIUM,
)
)
# Code repository loaders
self.register(
LoaderStrategy(
strategy_name="github_issues",
loader_class="GitHubIssuesLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["github", "issues", "bugs"],
requires_dependencies=[],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="gitlab",
loader_class="GitLabLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["gitlab", "code", "repository"],
requires_dependencies=["python-gitlab"],
requires_authentication=True,
priority=LoaderPriority.MEDIUM,
)
)
# Scientific/Academic loaders
self.register(
LoaderStrategy(
strategy_name="semantic_scholar",
loader_class="SemanticScholarLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["research", "papers", "academic"],
requires_dependencies=["semanticscholar"],
priority=LoaderPriority.HIGH,
)
)
# Streaming/Media platforms
self.register(
LoaderStrategy(
strategy_name="twitch",
loader_class="TwitchLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["twitch", "streaming", "chat"],
requires_dependencies=["twitchio"],
requires_authentication=True,
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="spotify",
loader_class="SpotifyLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["spotify", "music", "podcasts"],
requires_dependencies=["spotipy"],
requires_authentication=True,
priority=LoaderPriority.MEDIUM,
)
)
# E-commerce/Payment loaders
self.register(
LoaderStrategy(
strategy_name="stripe",
loader_class="StripeLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["stripe", "payments", "transactions"],
requires_dependencies=["stripe"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="shopify",
loader_class="AirbyteShopifyLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["shopify", "ecommerce", "products"],
requires_dependencies=["airbyte-cdk"],
requires_authentication=True,
priority=LoaderPriority.MEDIUM,
)
)
# Weather/Environmental data
self.register(
LoaderStrategy(
strategy_name="weather",
loader_class="WeatherDataLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["weather", "climate", "forecast"],
requires_dependencies=[],
requires_authentication=True,
priority=LoaderPriority.MEDIUM,
)
)
# Note-taking apps
self.register(
LoaderStrategy(
strategy_name="evernote",
loader_class="EverNoteLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["evernote", "notes"],
requires_dependencies=["lxml"],
priority=LoaderPriority.HIGH,
)
)
self.register(
LoaderStrategy(
strategy_name="onenote",
loader_class="OneNoteLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["onenote", "notes", "microsoft"],
requires_dependencies=["msal"],
requires_authentication=True,
priority=LoaderPriority.HIGH,
)
)
# Additional specialized loaders
self.register(
LoaderStrategy(
strategy_name="gutenberg",
loader_class="GutenbergLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["gutenberg", "books", "literature"],
requires_dependencies=[],
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="mastodon",
loader_class="MastodonTootsLoader",
speed="medium",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
],
best_for=["mastodon", "fediverse", "social"],
requires_dependencies=["mastodon.py"],
requires_authentication=True,
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="mediawiki",
loader_class="MWDumpLoader",
speed="slow",
quality="high",
resource_usage="high",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.METADATA,
LoaderCapability.BATCHING,
],
best_for=["mediawiki", "wiki", "dump"],
requires_dependencies=["mwparserfromhell", "mwxml"],
priority=LoaderPriority.MEDIUM,
)
)
self.register(
LoaderStrategy(
strategy_name="duckdb",
loader_class="DuckDBLoader",
speed="fast",
quality="high",
resource_usage="low",
capabilities=[
LoaderCapability.TEXT_EXTRACTION,
LoaderCapability.TABLE_EXTRACTION,
],
best_for=["duckdb", "analytics", "sql"],
requires_dependencies=["duckdb"],
priority=LoaderPriority.HIGH,
)
)
[docs]
def register(self, strategy: LoaderStrategy):
"""Register a new loader strategy."""
# Check availability when registering
strategy.is_available = strategy.check_availability()
self._strategies[strategy.strategy_name] = strategy
logger.debug(
f"Registered loader strategy: {strategy.strategy_name} "
f"(available: {strategy.is_available})"
)
[docs]
def get_strategy(self, name: str) -> LoaderStrategy | None:
"""Get a strategy by name."""
return self._strategies.get(name)
[docs]
def list_strategies(self, available_only: bool = True) -> list[LoaderStrategy]:
"""List all strategies."""
strategies = list(self._strategies.values())
if available_only:
strategies = [s for s in strategies if s.is_available]
return strategies
[docs]
def find_strategies_for_source(
self, source: EnhancedSource, preferences: dict[str, Any] | None = None
) -> list[LoaderStrategy]:
"""Find suitable strategies for a source."""
if preferences is None:
preferences = {}
suitable_strategies = []
for strategy in self._strategies.values():
if not strategy.is_available:
continue
# Check if strategy is suitable for this source type
source_type_str = source.source_type.value
if strategy.best_for and source_type_str not in strategy.best_for:
# Check for file extension match
if hasattr(source, "source_path"):
from pathlib import Path
ext = Path(source.source_path).suffix.lower().lstrip(".")
if ext not in strategy.best_for:
continue
else:
continue
# Check authentication requirements
if (
strategy.requires_authentication
and not source.requires_authentication()
):
continue
# Apply preferences
if preferences.get("prefer_speed") and strategy.speed != "fast":
continue
if preferences.get("prefer_quality") and strategy.quality != "high":
continue
if preferences.get("require_async") and not strategy.supports_async:
continue
suitable_strategies.append(strategy)
# Sort by priority and quality
suitable_strategies.sort(
key=lambda s: (
s.priority.value,
{"high": 3, "medium": 2, "low": 1}[s.quality],
{"fast": 3, "medium": 2, "slow": 1}[s.speed],
),
reverse=True,
)
return suitable_strategies
[docs]
def select_best_strategy(
self, source: EnhancedSource, preferences: dict[str, Any] | None = None
) -> LoaderStrategy | None:
"""Select the best strategy for a source."""
strategies = self.find_strategies_for_source(source, preferences)
return strategies[0] if strategies else None
# Global registry instance
strategy_registry = LoaderStrategyRegistry()
[docs]
def create_loader(
source: EnhancedSource,
strategy_name: str | None = None,
options: dict[str, Any] | None = None,
preferences: dict[str, Any] | None = None,
) -> BaseLoader | None:
"""Create a loader for the given source."""
if options is None:
options = {}
if strategy_name:
# Use specific strategy
strategy = strategy_registry.get_strategy(strategy_name)
if not strategy:
logger.error(f"Strategy {strategy_name} not found")
return None
if not strategy.is_available:
logger.error(f"Strategy {strategy_name} not available")
return None
else:
# Auto-select best strategy
strategy = strategy_registry.select_best_strategy(source, preferences)
if not strategy:
logger.error(
f"No suitable strategy found for source type {source.source_type}"
)
return None
return strategy.create_loader(source, options)
# Export key components
__all__ = [
"LoaderCapability",
"LoaderPriority",
"LoaderStrategy",
"LoaderStrategyRegistry",
"create_loader",
"strategy_registry",
]