FastAPI Integration GuideΒΆ
Complete guide to integrating Haive MCP with FastAPI for web APIs and microservices.
OverviewΒΆ
The Haive MCP platform provides built-in FastAPI integration through:
Auto-generated routers - Platforms automatically create FastAPI routers
Pydantic serialization - Seamless JSON API responses
Async support - Full async/await patterns
Type safety - Complete type hints and validation
OpenAPI docs - Automatic API documentation
Basic IntegrationΒΆ
Simple FastAPI AppΒΆ
from fastapi import FastAPI
from pathlib import Path
from haive.mcp.plugins import MCPBrowserPlugin
# Create FastAPI app
app = FastAPI(
title="MCP Server Manager",
description="Manage and browse MCP servers",
version="1.0.0"
)
# Create MCP plugin
mcp_plugin = MCPBrowserPlugin(
server_directory=Path("/home/will/Downloads/mcp_servers"),
cache_ttl=3600
)
# Mount plugin router
app.include_router(
mcp_plugin.get_router(),
prefix="/mcp",
tags=["MCP Servers"]
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Multiple Plugin IntegrationΒΆ
from fastapi import FastAPI
from pathlib import Path
from haive.mcp.plugins import MCPBrowserPlugin
app = FastAPI(title="Multi-Plugin MCP Manager")
# Development servers
dev_plugin = MCPBrowserPlugin(
server_directory=Path("/dev/mcp_servers"),
name="dev-browser"
)
app.include_router(
dev_plugin.get_router(),
prefix="/dev/mcp",
tags=["Development MCP"]
)
# Production servers
prod_plugin = MCPBrowserPlugin(
server_directory=Path("/prod/mcp_servers"),
name="prod-browser"
)
app.include_router(
prod_plugin.get_router(),
prefix="/prod/mcp",
tags=["Production MCP"]
)
# Health check
@app.get("/health")
async def health():
return {
"status": "healthy",
"plugins": {
"dev": dev_plugin.enabled,
"prod": prod_plugin.enabled
}
}
Available EndpointsΒΆ
Default Plugin EndpointsΒΆ
Each MCPBrowserPlugin provides these endpoints:
GET /servers # List all servers
GET /servers/{server_name} # Get specific server
GET /categories # List unique capabilities
GET /search # Search servers (query param: q)
GET /stats # Server statistics
POST /cache/refresh # Refresh server cache
DELETE /cache # Clear cache
Endpoint DetailsΒΆ
List Servers - GET /servers
# Example response
[
{
"name": "postgresql-server",
"description": "PostgreSQL MCP server",
"version": "1.2.0",
"capabilities": ["database", "sql"],
"mcp_version": "0.1.0",
"transport_types": ["stdio"],
"local_path": "/servers/postgresql-server",
"file_size": 1024000,
"installed_date": "2025-01-15T10:30:00",
"download_source": "npm",
"is_verified": true
}
]
Get Server - GET /servers/{server_name}
# Example: GET /servers/postgresql-server
{
"name": "postgresql-server",
"description": "PostgreSQL MCP server",
"version": "1.2.0",
"capabilities": ["database", "sql"],
"mcp_version": "0.1.0",
"transport_types": ["stdio"],
"command_template": "npx -y @modelcontextprotocol/server-postgres {connection_string}",
"local_path": "/servers/postgresql-server",
"file_size": 1024000,
"installed_date": "2025-01-15T10:30:00",
"download_source": "npm",
"is_verified": true
}
Search Servers - GET /search?q={query}
# Example: GET /search?q=database
[
{
"name": "postgresql-server",
"description": "PostgreSQL MCP server",
"version": "1.2.0",
"capabilities": ["database", "sql"]
},
{
"name": "mysql-server",
"description": "MySQL MCP server",
"version": "1.0.0",
"capabilities": ["database", "sql"]
}
]
Custom Router ExtensionsΒΆ
Extending Plugin RoutersΒΆ
from fastapi import APIRouter, HTTPException, Query
from typing import Optional, List
from haive.mcp.plugins import MCPBrowserPlugin
from haive.mcp.models import DownloadedServerInfo
class ExtendedMCPPlugin(MCPBrowserPlugin):
"""Plugin with custom endpoints"""
def get_router(self) -> APIRouter:
# Start with base router
router = super().get_router()
@router.get("/servers/by-capability/{capability}")
async def get_servers_by_capability(capability: str):
"""Get all servers with specific capability"""
servers = await self.filter_by_category(capability)
return [s.model_dump() for s in servers]
@router.get("/servers/large")
async def get_large_servers(
min_size: int = Query(default=1000000, description="Minimum size in bytes")
):
"""Get servers larger than specified size"""
servers = await self.load_servers()
large_servers = [s for s in servers if s.file_size >= min_size]
return [
{
"name": s.name,
"size": s.file_size,
"size_mb": round(s.file_size / (1024*1024), 2)
}
for s in large_servers
]
@router.post("/servers/{server_name}/verify")
async def verify_server(server_name: str):
"""Verify server installation"""
servers = await self.load_servers()
server = next((s for s in servers if s.name == server_name), None)
if not server:
raise HTTPException(404, f"Server not found: {server_name}")
# Verification logic
exists = server.local_path.exists()
correct_size = False
if exists:
actual_size = server.local_path.stat().st_size
correct_size = actual_size == server.file_size
return {
"server": server_name,
"exists": exists,
"correct_size": correct_size,
"verified": exists and correct_size
}
@router.get("/analytics/capabilities")
async def get_capability_analytics():
"""Get capability distribution analytics"""
servers = await self.load_servers()
capability_counts = {}
for server in servers:
for cap in server.capabilities:
capability_counts[cap] = capability_counts.get(cap, 0) + 1
total_servers = len(servers)
return {
"total_servers": total_servers,
"capability_distribution": [
{
"capability": cap,
"count": count,
"percentage": round(count / total_servers * 100, 1)
}
for cap, count in sorted(
capability_counts.items(),
key=lambda x: x[1],
reverse=True
)
]
}
return router
Request/Response ModelsΒΆ
Custom Pydantic ModelsΒΆ
Create custom request/response models:
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
class ServerFilter(BaseModel):
"""Request model for server filtering"""
capabilities: Optional[List[str]] = Field(default=None)
min_version: Optional[str] = Field(default=None)
max_size: Optional[int] = Field(default=None)
verified_only: bool = Field(default=False)
class ServerSummary(BaseModel):
"""Response model for server summary"""
name: str
version: str
capabilities: List[str]
size_mb: float
last_updated: datetime
class AnalyticsResponse(BaseModel):
"""Response model for analytics"""
total_servers: int
total_size_mb: float
verified_count: int
capability_distribution: List[dict]
# Usage in endpoints
@router.post("/servers/filter", response_model=List[ServerSummary])
async def filter_servers(filter_params: ServerFilter):
"""Filter servers with custom criteria"""
servers = await plugin.load_servers()
# Apply filters
filtered = servers
if filter_params.capabilities:
filtered = [
s for s in filtered
if any(cap in s.capabilities for cap in filter_params.capabilities)
]
if filter_params.verified_only:
filtered = [s for s in filtered if s.is_verified]
# Convert to response model
return [
ServerSummary(
name=s.name,
version=s.version,
capabilities=s.capabilities,
size_mb=round(s.file_size / (1024*1024), 2),
last_updated=s.installed_date
)
for s in filtered
]
Error HandlingΒΆ
Custom Exception HandlersΒΆ
from fastapi import HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import ValidationError
import logging
logger = logging.getLogger(__name__)
@app.exception_handler(ValidationError)
async def validation_exception_handler(request: Request, exc: ValidationError):
"""Handle Pydantic validation errors"""
logger.error(f"Validation error: {exc}")
return JSONResponse(
status_code=422,
content={
"error": "Validation Error",
"details": exc.errors(),
"message": "Invalid input data"
}
)
@app.exception_handler(FileNotFoundError)
async def file_not_found_handler(request: Request, exc: FileNotFoundError):
"""Handle file not found errors"""
logger.error(f"File not found: {exc}")
return JSONResponse(
status_code=404,
content={
"error": "File Not Found",
"message": str(exc)
}
)
class MCPException(Exception):
"""Custom MCP exception"""
def __init__(self, message: str, status_code: int = 500):
self.message = message
self.status_code = status_code
super().__init__(message)
@app.exception_handler(MCPException)
async def mcp_exception_handler(request: Request, exc: MCPException):
"""Handle custom MCP exceptions"""
logger.error(f"MCP error: {exc.message}")
return JSONResponse(
status_code=exc.status_code,
content={
"error": "MCP Error",
"message": exc.message
}
)
Graceful Error ResponsesΒΆ
from fastapi import HTTPException
class ExtendedMCPPlugin(MCPBrowserPlugin):
def get_router(self) -> APIRouter:
router = super().get_router()
@router.get("/servers/{server_name}")
async def get_server_safe(server_name: str):
"""Get server with proper error handling"""
try:
servers = await self.load_servers()
server = next((s for s in servers if s.name == server_name), None)
if not server:
raise HTTPException(
status_code=404,
detail={
"error": "Server not found",
"server_name": server_name,
"available_servers": [s.name for s in servers[:5]]
}
)
return server.model_dump()
except Exception as e:
logger.error(f"Error loading server {server_name}: {e}")
raise HTTPException(
status_code=500,
detail={
"error": "Internal server error",
"message": "Failed to load server information"
}
)
Middleware & CORSΒΆ
CORS ConfigurationΒΆ
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000", "https://myapp.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
Custom MiddlewareΒΆ
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
import time
import logging
class LoggingMiddleware(BaseHTTPMiddleware):
"""Log all API requests"""
async def dispatch(self, request: Request, call_next):
start_time = time.time()
# Log request
logger.info(f"Request: {request.method} {request.url}")
response = await call_next(request)
# Log response
process_time = time.time() - start_time
logger.info(
f"Response: {response.status_code} "
f"({process_time:.3f}s)"
)
return response
app.add_middleware(LoggingMiddleware)
class CacheMiddleware(BaseHTTPMiddleware):
"""Add cache headers to responses"""
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# Add cache headers for GET requests
if request.method == "GET":
response.headers["Cache-Control"] = "public, max-age=300" # 5 minutes
return response
app.add_middleware(CacheMiddleware)
TestingΒΆ
FastAPI Test ClientΒΆ
from fastapi.testclient import TestClient
import pytest
from pathlib import Path
import tempfile
import json
@pytest.fixture
def test_app():
"""Create test FastAPI app"""
app = FastAPI()
# Create temporary server directory
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Create mock server files
mock_server_dir = temp_path / "postgresql-server"
mock_server_dir.mkdir()
(mock_server_dir / "package.json").write_text(
json.dumps({
"name": "@modelcontextprotocol/server-postgres",
"version": "1.2.0",
"description": "PostgreSQL MCP server"
})
)
plugin = MCPBrowserPlugin(server_directory=temp_path)
app.include_router(plugin.get_router(), prefix="/mcp")
yield app
def test_list_servers(test_app):
"""Test server listing endpoint"""
client = TestClient(test_app)
response = client.get("/mcp/servers")
assert response.status_code == 200
servers = response.json()
assert isinstance(servers, list)
assert len(servers) >= 1
server = servers[0]
assert "name" in server
assert "version" in server
assert "capabilities" in server
def test_get_specific_server(test_app):
"""Test getting specific server"""
client = TestClient(test_app)
# First get list to find a server name
servers_response = client.get("/mcp/servers")
servers = servers_response.json()
if servers:
server_name = servers[0]["name"]
response = client.get(f"/mcp/servers/{server_name}")
assert response.status_code == 200
server = response.json()
assert server["name"] == server_name
def test_server_not_found(test_app):
"""Test 404 for non-existent server"""
client = TestClient(test_app)
response = client.get("/mcp/servers/nonexistent")
assert response.status_code == 404
Async TestingΒΆ
import pytest
import asyncio
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_async_endpoints():
"""Test async endpoints"""
app = create_test_app() # Your app factory
async with AsyncClient(app=app, base_url="http://test") as client:
# Test server loading
response = await client.get("/mcp/servers")
assert response.status_code == 200
# Test search
response = await client.get("/mcp/search?q=database")
assert response.status_code == 200
results = response.json()
# Verify search results
for server in results:
assert "database" in server["name"].lower() or \
"database" in server["description"].lower() or \
"database" in server["capabilities"]
Production DeploymentΒΆ
Docker DeploymentΒΆ
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY pyproject.toml poetry.lock ./
RUN pip install poetry && \
poetry config virtualenvs.create false && \
poetry install --only=main
# Copy application
COPY . .
# Expose port
EXPOSE 8000
# Run with uvicorn
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
Docker ComposeΒΆ
version: '3.8'
services:
mcp-api:
build: .
ports:
- "8000:8000"
environment:
- MCP_SERVER_DIR=/data/mcp_servers
- MCP_CACHE_TTL=3600
- MCP_API_KEY=${MCP_API_KEY}
volumes:
- ./mcp_servers:/data/mcp_servers:ro
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./ssl:/etc/nginx/ssl:ro
depends_on:
- mcp-api
restart: unless-stopped
Production ConfigurationΒΆ
import os
from fastapi import FastAPI
from pathlib import Path
def create_production_app():
"""Create production-ready FastAPI app"""
app = FastAPI(
title="MCP Server Manager",
description="Production MCP server management API",
version="1.0.0",
docs_url="/docs" if os.getenv("ENABLE_DOCS") else None,
redoc_url="/redoc" if os.getenv("ENABLE_DOCS") else None
)
# Production plugin configuration
plugin = MCPBrowserPlugin(
server_directory=Path(os.getenv("MCP_SERVER_DIR", "/data/mcp_servers")),
cache_ttl=int(os.getenv("MCP_CACHE_TTL", "3600"))
)
app.include_router(
plugin.get_router(),
prefix="/api/v1/mcp",
tags=["MCP Servers"]
)
# Health check
@app.get("/health")
async def health():
return {"status": "healthy", "version": "1.0.0"}
return app
app = create_production_app()
Performance OptimizationΒΆ
Response CachingΒΆ
from functools import lru_cache
from typing import Dict, Any
import json
import hashlib
class CachedMCPPlugin(MCPBrowserPlugin):
"""Plugin with response caching"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._response_cache = {}
def _cache_key(self, endpoint: str, params: Dict[str, Any]) -> str:
"""Generate cache key for endpoint and parameters"""
key_data = f"{endpoint}:{json.dumps(params, sort_keys=True)}"
return hashlib.md5(key_data.encode()).hexdigest()
@lru_cache(maxsize=100)
def _get_cached_response(self, cache_key: str, timestamp: float):
"""LRU cached response getter"""
return self._response_cache.get(cache_key)
def get_router(self) -> APIRouter:
router = super().get_router()
@router.get("/servers/cached")
async def get_servers_cached():
"""Cached server listing"""
cache_key = self._cache_key("servers", {})
timestamp = time.time()
# Check cache first
cached = self._get_cached_response(cache_key, int(timestamp / 60)) # 1-minute buckets
if cached:
return cached
# Load fresh data
servers = await self.load_servers()
response = [s.model_dump() for s in servers]
# Cache response
self._response_cache[cache_key] = response
return response
Async OptimizationΒΆ
import asyncio
from concurrent.futures import ThreadPoolExecutor
class OptimizedMCPPlugin(MCPBrowserPlugin):
"""Optimized plugin with async improvements"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._executor = ThreadPoolExecutor(max_workers=4)
async def load_servers_parallel(self) -> List[DownloadedServerInfo]:
"""Load servers in parallel"""
server_dirs = [
d for d in self.server_directory.iterdir()
if d.is_dir()
]
# Process servers in parallel
tasks = [
asyncio.create_task(self._load_single_server(server_dir))
for server_dir in server_dirs
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter successful results
servers = [
result for result in results
if isinstance(result, DownloadedServerInfo)
]
return servers
async def _load_single_server(self, server_dir: Path) -> DownloadedServerInfo:
"""Load single server info asynchronously"""
loop = asyncio.get_event_loop()
# Run CPU-intensive operations in thread pool
server_info = await loop.run_in_executor(
self._executor,
self._extract_server_info,
server_dir
)
return server_info
Next StepsΒΆ
Real-World Examples - Production usage patterns
performance-optimization - Scaling and optimization
troubleshooting - Common issues and solutions