Managing MCP ServersΒΆ
Operational guide for managing MCP servers in production environments with 1900+ available servers.
Production DeploymentΒΆ
Pre-Installation SetupΒΆ
Prepare your environment for MCP servers:
# Install prerequisites
sudo apt-get update
sudo apt-get install -y nodejs npm python3-pip docker.io git
# Verify installations
node --version # Should be v18+
npm --version # Should be v9+
python3 --version # Should be 3.8+
docker --version # Should be 20+
# Create MCP directories
sudo mkdir -p /opt/mcp/{servers,configs,logs,data}
sudo chown -R $USER:$USER /opt/mcp
Server Installation StrategiesΒΆ
Choose the right installation strategy:
from haive.mcp.manager import MCPManager, InstallStrategy
manager = MCPManager()
# Strategy 1: Pre-install all core servers
await manager.batch_install(
servers=["filesystem", "postgres", "github", "search"],
strategy=InstallStrategy.PARALLEL,
verify=True
)
# Strategy 2: On-demand installation
agent = EnhancedMCPAgent(
name="prod_agent",
engine=AugLLMConfig(),
mcp_categories=["core"],
auto_install=True, # Install as needed
install_timeout=120
)
# Strategy 3: Docker-based deployment
await manager.deploy_with_docker(
compose_file="docker-compose.mcp.yml",
env_file=".env.production"
)
Health MonitoringΒΆ
Continuous Health ChecksΒΆ
Implement robust health monitoring:
from haive.mcp.monitoring import HealthMonitor
import asyncio
class ProductionHealthMonitor:
"""Production-grade health monitoring."""
def __init__(self, manager):
self.manager = manager
self.monitor = HealthMonitor(manager)
self.alert_threshold = 3 # Failures before alert
self.failure_counts = {}
async def monitor_loop(self):
"""Continuous monitoring loop."""
while True:
try:
# Check all servers
health_status = await self.monitor.check_all()
for server, status in health_status.items():
if status['healthy']:
# Reset failure count on success
self.failure_counts[server] = 0
else:
# Increment failure count
self.failure_counts[server] = \
self.failure_counts.get(server, 0) + 1
# Alert if threshold reached
if self.failure_counts[server] >= self.alert_threshold:
await self.alert_unhealthy(server, status)
# Attempt recovery
await self.attempt_recovery(server)
# Log metrics
self.log_metrics(health_status)
except Exception as e:
print(f"Monitor error: {e}")
# Wait before next check
await asyncio.sleep(30)
async def alert_unhealthy(self, server, status):
"""Send alerts for unhealthy servers."""
# Send to monitoring system
print(f"ALERT: {server} unhealthy - {status['error']}")
# Could integrate with PagerDuty, Slack, etc.
async def attempt_recovery(self, server):
"""Try to recover unhealthy server."""
try:
# Restart the server
await self.manager.restart_server(server)
print(f"Restarted {server}")
except Exception as e:
print(f"Recovery failed for {server}: {e}")
Metrics and ObservabilityΒΆ
Track detailed metrics:
from prometheus_client import Counter, Histogram, Gauge
import time
# Define metrics
mcp_requests = Counter('mcp_requests_total', 'Total MCP requests', ['server', 'tool'])
mcp_errors = Counter('mcp_errors_total', 'Total MCP errors', ['server', 'error_type'])
mcp_latency = Histogram('mcp_latency_seconds', 'MCP request latency', ['server'])
mcp_active_servers = Gauge('mcp_active_servers', 'Number of active MCP servers')
class MetricsCollector:
"""Collect and export metrics."""
def track_request(self, server, tool, duration, success):
"""Track individual request metrics."""
mcp_requests.labels(server=server, tool=tool).inc()
mcp_latency.labels(server=server).observe(duration)
if not success:
mcp_errors.labels(server=server, error_type='request_failed').inc()
def update_active_servers(self, count):
"""Update active server count."""
mcp_active_servers.set(count)
Performance OptimizationΒΆ
Connection PoolingΒΆ
Optimize connections for high throughput:
from haive.mcp.pooling import ConnectionPool
# Configure connection pool
pool_config = {
"min_connections": 5,
"max_connections": 50,
"connection_timeout": 10,
"idle_timeout": 300,
"max_lifetime": 3600,
"validation_interval": 60
}
pool = ConnectionPool(**pool_config)
# Use with manager
manager = MCPManager(connection_pool=pool)
# Monitor pool statistics
stats = pool.get_stats()
print(f"Active connections: {stats['active']}")
print(f"Idle connections: {stats['idle']}")
print(f"Total created: {stats['total_created']}")
Caching StrategiesΒΆ
Implement intelligent caching:
from haive.mcp.cache import MCPCache, CacheStrategy
import redis
# Redis-backed cache
redis_client = redis.Redis(host='localhost', port=6379)
cache = MCPCache(
backend=redis_client,
strategy=CacheStrategy.LRU,
max_size=10000,
ttl=3600
)
# Cache decorator for expensive operations
@cache.cached(ttl=1800)
async def expensive_mcp_operation(server, params):
"""Cached MCP operation."""
return await manager.execute_tool(server, "expensive_tool", params)
# Warm up cache
async def warm_cache():
"""Pre-populate cache with common operations."""
common_operations = [
("filesystem", {"action": "list", "path": "/"}),
("github", {"action": "list_repos"}),
("postgres", {"action": "list_tables"})
]
for server, params in common_operations:
await expensive_mcp_operation(server, params)
Load BalancingΒΆ
Distribute load across server instances:
from haive.mcp.loadbalancer import LoadBalancer, Strategy
# Configure load balancer
lb = LoadBalancer(
strategy=Strategy.ROUND_ROBIN,
health_check_interval=10
)
# Add server instances
lb.add_instance("filesystem", "fs-1", "localhost:8001")
lb.add_instance("filesystem", "fs-2", "localhost:8002")
lb.add_instance("filesystem", "fs-3", "localhost:8003")
# Use load-balanced servers
async def balanced_request(tool, params):
"""Execute request on load-balanced server."""
instance = lb.get_instance("filesystem")
return await instance.execute(tool, params)
Disaster RecoveryΒΆ
Backup and RestoreΒΆ
Implement backup strategies:
from haive.mcp.backup import BackupManager
import schedule
backup_manager = BackupManager(
backup_dir="/backup/mcp",
retention_days=30
)
async def backup_servers():
"""Backup all server configurations and data."""
# Backup configurations
await backup_manager.backup_configs(manager.get_all_configs())
# Backup server data
for server in manager.list_servers():
data = await manager.export_server_data(server)
await backup_manager.backup_server_data(server, data)
print(f"Backup completed: {backup_manager.last_backup}")
async def restore_servers(backup_id):
"""Restore servers from backup."""
# Restore configurations
configs = await backup_manager.restore_configs(backup_id)
for server, config in configs.items():
await manager.apply_config(server, config)
# Restore server data
for server in configs.keys():
data = await backup_manager.restore_server_data(server, backup_id)
await manager.import_server_data(server, data)
print(f"Restore completed from backup: {backup_id}")
# Schedule regular backups
schedule.every(6).hours.do(lambda: asyncio.run(backup_servers()))
Failover StrategiesΒΆ
Implement automatic failover:
class FailoverManager:
"""Manage server failover."""
def __init__(self, manager):
self.manager = manager
self.primary_servers = {}
self.backup_servers = {}
def configure_failover(self, server, primary_url, backup_urls):
"""Configure failover for a server."""
self.primary_servers[server] = primary_url
self.backup_servers[server] = backup_urls
async def handle_failure(self, server):
"""Handle server failure with failover."""
if server not in self.backup_servers:
raise Exception(f"No backup configured for {server}")
# Try each backup
for backup_url in self.backup_servers[server]:
try:
# Switch to backup
await self.manager.switch_server_url(server, backup_url)
print(f"Failed over {server} to {backup_url}")
return True
except Exception as e:
print(f"Backup {backup_url} failed: {e}")
return False
Security Best PracticesΒΆ
Access ControlΒΆ
Implement fine-grained access control:
from haive.mcp.security import AccessControl, Permission
# Configure access control
ac = AccessControl()
# Define roles
ac.add_role("admin", [Permission.ALL])
ac.add_role("developer", [
Permission.READ,
Permission.EXECUTE,
Permission.LIST
])
ac.add_role("viewer", [Permission.READ, Permission.LIST])
# Assign permissions to servers
ac.set_server_permissions("filesystem", {
"admin": ["*"],
"developer": ["read_file", "list_directory"],
"viewer": ["list_directory"]
})
# Check permissions
if ac.check_permission("developer", "filesystem", "write_file"):
await manager.execute_tool("filesystem", "write_file", params)
else:
raise PermissionError("Insufficient permissions")
Audit LoggingΒΆ
Track all server operations:
from haive.mcp.audit import AuditLogger
audit = AuditLogger(
log_file="/var/log/mcp/audit.log",
include_params=True,
mask_sensitive=True
)
# Log server operations
@audit.log_operation
async def execute_sensitive_operation(server, tool, params):
"""Execute and audit sensitive operations."""
result = await manager.execute_tool(server, tool, params)
return result
# Query audit logs
logs = audit.query(
server="filesystem",
user="admin",
start_time=datetime.now() - timedelta(hours=24)
)
Scaling StrategiesΒΆ
Horizontal ScalingΒΆ
Scale servers horizontally:
# Docker Swarm deployment
docker swarm init
docker service create \
--name mcp-filesystem \
--replicas 3 \
--publish 8080:8080 \
mcp/filesystem-server
# Kubernetes deployment
kubectl apply -f mcp-deployment.yaml
kubectl scale deployment mcp-filesystem --replicas=5
Auto-scalingΒΆ
Implement auto-scaling based on load:
from haive.mcp.autoscale import AutoScaler
autoscaler = AutoScaler(
manager=manager,
min_instances=2,
max_instances=10,
target_cpu=70,
target_memory=80,
scale_up_threshold=3, # Consecutive checks
scale_down_threshold=5
)
# Start auto-scaling
await autoscaler.start()
# Monitor scaling events
autoscaler.on_scale_up = lambda s, n: print(f"Scaled up {s} to {n} instances")
autoscaler.on_scale_down = lambda s, n: print(f"Scaled down {s} to {n} instances")
Maintenance OperationsΒΆ
Rolling UpdatesΒΆ
Update servers without downtime:
async def rolling_update(server_name, new_version):
"""Perform rolling update of server instances."""
instances = manager.get_server_instances(server_name)
for instance in instances:
# Remove from load balancer
lb.remove_instance(server_name, instance.id)
# Update instance
await instance.update(new_version)
# Health check
if await instance.health_check():
# Add back to load balancer
lb.add_instance(server_name, instance.id, instance.url)
else:
print(f"Update failed for {instance.id}")
# Rollback if needed
await instance.rollback()
Maintenance ModeΒΆ
Put servers in maintenance mode:
async def enter_maintenance_mode(server):
"""Put server in maintenance mode."""
# Drain connections
await manager.drain_connections(server, timeout=30)
# Set maintenance flag
await manager.set_maintenance_mode(server, True)
# Redirect traffic to backup
if server in backup_servers:
await lb.redirect_traffic(server, backup_servers[server])
print(f"{server} in maintenance mode")
async def exit_maintenance_mode(server):
"""Exit maintenance mode."""
# Health check
if await manager.health_check(server):
# Remove maintenance flag
await manager.set_maintenance_mode(server, False)
# Restore traffic
await lb.restore_traffic(server)
print(f"{server} back online")
Next StepsΒΆ
Review Server Manager Guide for manager details
Check Advanced Configuration for configuration
Explore Advanced Topics for advanced patterns
See the API Reference