"""Elasticsearch Retriever implementation for the Haive framework.
from typing import Any
This module provides a configuration class for the Elasticsearch retriever,
which performs full-text search and retrieval using Elasticsearch. Elasticsearch
is a distributed, RESTful search and analytics engine capable of solving complex
search problems and providing real-time search capabilities.
The ElasticsearchRetriever works by:
1. Connecting to an Elasticsearch cluster
2. Executing search queries with various scoring methods
3. Supporting both keyword and vector-based search
4. Returning ranked search results as documents
This retriever is particularly useful when:
- Working with large-scale document collections
- Need advanced search capabilities (faceting, aggregations, etc.)
- Require real-time search and indexing
- Building enterprise search applications
- Need scalable and distributed search infrastructure
The implementation integrates with LangChain's ElasticsearchRetriever while
providing a consistent Haive configuration interface with secure connection management.
"""
from typing import Any
from langchain_core.documents import Document
from pydantic import Field, SecretStr
from haive.core.common.mixins.secure_config import SecureConfigMixin
from haive.core.engine.retriever.retriever import BaseRetrieverConfig
from haive.core.engine.retriever.types import RetrieverType
[docs]
@BaseRetrieverConfig.register(RetrieverType.ELASTICSEARCH)
class ElasticsearchRetrieverConfig(SecureConfigMixin, BaseRetrieverConfig):
"""Configuration for Elasticsearch retriever in the Haive framework.
This retriever performs full-text search using Elasticsearch with support
for various search types including keyword, vector, and hybrid search.
Attributes:
retriever_type (RetrieverType): The type of retriever (always ELASTICSEARCH).
elasticsearch_url (str): Elasticsearch cluster URL.
index_name (str): Name of the Elasticsearch index to search.
username (Optional[str]): Username for Elasticsearch authentication.
password (Optional[SecretStr]): Password for authentication (auto-resolved).
k (int): Number of documents to retrieve.
search_type (str): Type of search to perform.
custom_query (Optional[Dict]): Custom Elasticsearch query DSL.
Examples:
>>> from haive.core.engine.retriever import ElasticsearchRetrieverConfig
>>>
>>> # Create the Elasticsearch retriever config
>>> config = ElasticsearchRetrieverConfig(
... name="elasticsearch_retriever",
... elasticsearch_url="https://localhost:9200",
... index_name="documents",
... username="elastic",
... k=10,
... search_type="match"
... )
>>>
>>> # Instantiate and use the retriever
>>> retriever = config.instantiate()
>>> docs = retriever.get_relevant_documents("machine learning algorithms")
>>>
>>> # Example with custom query
>>> custom_config = ElasticsearchRetrieverConfig(
... name="custom_elasticsearch_retriever",
... elasticsearch_url="https://localhost:9200",
... index_name="documents",
... custom_query={
... "bool": {
... "must": [{"match": {"content": "{query}"}}],
... "filter": [{"range": {"date": {"gte": "2023-01-01"}}}]
... }
... }
... )
"""
retriever_type: RetrieverType = Field(
default=RetrieverType.ELASTICSEARCH, description="The type of retriever"
)
# Elasticsearch connection configuration
elasticsearch_url: str = Field(
..., description="Elasticsearch cluster URL (e.g., 'https://localhost:9200')"
)
index_name: str = Field(
..., description="Name of the Elasticsearch index to search"
)
# Authentication with SecureConfigMixin
username: str | None = Field(
default=None, description="Username for Elasticsearch authentication"
)
api_key: SecretStr | None = Field(
default=None,
description="Elasticsearch API key or password (auto-resolved from ELASTICSEARCH_API_KEY)",
)
# Provider for SecureConfigMixin
provider: str = Field(
default="elasticsearch", description="Provider name for credential resolution"
)
# Search configuration
k: int = Field(
default=10, ge=1, le=100, description="Number of documents to retrieve"
)
search_type: str = Field(
default="match",
description="Search type: 'match', 'multi_match', 'bool', 'fuzzy', 'custom'",
)
# Advanced configuration
custom_query: dict[str, Any] | None = Field(
default=None,
description="Custom Elasticsearch query DSL (overrides search_type)",
)
source_fields: list[str] | None = Field(
default=None, description="Specific fields to retrieve from documents"
)
# Connection parameters
verify_certs: bool = Field(
default=True, description="Whether to verify SSL certificates"
)
timeout: int = Field(
default=30, ge=1, le=300, description="Request timeout in seconds"
)
[docs]
def get_output_fields(self) -> dict[str, tuple[type, Any]]:
"""Return output field definitions for Elasticsearch retriever."""
return {
"documents": (
list[Document],
Field(
default_factory=list,
description="Documents from Elasticsearch search",
),
),
}
[docs]
def instantiate(self) -> Any:
"""Create an Elasticsearch retriever from this configuration.
Returns:
ElasticsearchRetriever: Instantiated retriever ready for search.
Raises:
ImportError: If required packages are not available.
ValueError: If connection configuration is invalid.
"""
try:
from elasticsearch import Elasticsearch
from langchain_community.retrievers import ElasticsearchRetriever
except ImportError:
raise ImportError(
"ElasticsearchRetriever requires langchain-community and elasticsearch packages. "
"Install with: pip install langchain-community elasticsearch"
)
# Prepare connection configuration
es_config = {
"hosts": [self.elasticsearch_url],
"verify_certs": self.verify_certs,
"timeout": self.timeout,
}
# Add authentication if provided
if self.username:
api_key = self.get_api_key()
if api_key:
es_config["basic_auth"] = (self.username, api_key)
else:
es_config["basic_auth"] = (
self.username,
self.api_key.get_secret_value() if self.api_key else "",
)
# Create Elasticsearch client
es_client = Elasticsearch(**es_config)
# Prepare retriever configuration
retriever_config = {
"es_client": es_client,
"index_name": self.index_name,
"k": self.k,
}
# Add search configuration
if self.custom_query:
retriever_config["body_func"] = lambda query: self._build_custom_query(
query
)
else:
retriever_config["body_func"] = lambda query: self._build_standard_query(
query
)
if self.source_fields:
retriever_config["source_fields"] = self.source_fields
return ElasticsearchRetriever(**retriever_config)
def _build_standard_query(self, query: str) -> dict[str, Any]:
"""Build standard Elasticsearch query based on search_type."""
if self.search_type == "match":
return {"query": {"match": {"content": query}}}
if self.search_type == "multi_match":
return {
"query": {
"multi_match": {
"query": query,
"fields": ["title^2", "content", "keywords"],
}
}
}
if self.search_type == "fuzzy":
return {
"query": {"fuzzy": {"content": {"value": query, "fuzziness": "AUTO"}}}
}
# Default to match
return {"query": {"match": {"content": query}}}
def _build_custom_query(self, query: str) -> dict[str, Any]:
"""Build custom Elasticsearch query from template."""
import json
# Replace {query} placeholder in custom query
query_str = json.dumps(self.custom_query)
query_str = query_str.replace("{query}", query)
return json.loads(query_str)