Managing MCP ServersΒΆ

Operational guide for managing MCP servers in production environments with 1900+ available servers.

Production DeploymentΒΆ

Pre-Installation SetupΒΆ

Prepare your environment for MCP servers:

# Install prerequisites
sudo apt-get update
sudo apt-get install -y nodejs npm python3-pip docker.io git

# Verify installations
node --version    # Should be v18+
npm --version     # Should be v9+
python3 --version # Should be 3.8+
docker --version  # Should be 20+

# Create MCP directories
sudo mkdir -p /opt/mcp/{servers,configs,logs,data}
sudo chown -R $USER:$USER /opt/mcp

Server Installation StrategiesΒΆ

Choose the right installation strategy:

from haive.mcp.manager import MCPManager, InstallStrategy

manager = MCPManager()

# Strategy 1: Pre-install all core servers
await manager.batch_install(
    servers=["filesystem", "postgres", "github", "search"],
    strategy=InstallStrategy.PARALLEL,
    verify=True
)

# Strategy 2: On-demand installation
agent = EnhancedMCPAgent(
    name="prod_agent",
    engine=AugLLMConfig(),
    mcp_categories=["core"],
    auto_install=True,  # Install as needed
    install_timeout=120
)

# Strategy 3: Docker-based deployment
await manager.deploy_with_docker(
    compose_file="docker-compose.mcp.yml",
    env_file=".env.production"
)

Health MonitoringΒΆ

Continuous Health ChecksΒΆ

Implement robust health monitoring:

from haive.mcp.monitoring import HealthMonitor
import asyncio

class ProductionHealthMonitor:
    """Production-grade health monitoring."""

    def __init__(self, manager):
        self.manager = manager
        self.monitor = HealthMonitor(manager)
        self.alert_threshold = 3  # Failures before alert
        self.failure_counts = {}

    async def monitor_loop(self):
        """Continuous monitoring loop."""
        while True:
            try:
                # Check all servers
                health_status = await self.monitor.check_all()

                for server, status in health_status.items():
                    if status['healthy']:
                        # Reset failure count on success
                        self.failure_counts[server] = 0
                    else:
                        # Increment failure count
                        self.failure_counts[server] = \
                            self.failure_counts.get(server, 0) + 1

                        # Alert if threshold reached
                        if self.failure_counts[server] >= self.alert_threshold:
                            await self.alert_unhealthy(server, status)

                            # Attempt recovery
                            await self.attempt_recovery(server)

                # Log metrics
                self.log_metrics(health_status)

            except Exception as e:
                print(f"Monitor error: {e}")

            # Wait before next check
            await asyncio.sleep(30)

    async def alert_unhealthy(self, server, status):
        """Send alerts for unhealthy servers."""
        # Send to monitoring system
        print(f"ALERT: {server} unhealthy - {status['error']}")
        # Could integrate with PagerDuty, Slack, etc.

    async def attempt_recovery(self, server):
        """Try to recover unhealthy server."""
        try:
            # Restart the server
            await self.manager.restart_server(server)
            print(f"Restarted {server}")
        except Exception as e:
            print(f"Recovery failed for {server}: {e}")

Metrics and ObservabilityΒΆ

Track detailed metrics:

from prometheus_client import Counter, Histogram, Gauge
import time

# Define metrics
mcp_requests = Counter('mcp_requests_total', 'Total MCP requests', ['server', 'tool'])
mcp_errors = Counter('mcp_errors_total', 'Total MCP errors', ['server', 'error_type'])
mcp_latency = Histogram('mcp_latency_seconds', 'MCP request latency', ['server'])
mcp_active_servers = Gauge('mcp_active_servers', 'Number of active MCP servers')

class MetricsCollector:
    """Collect and export metrics."""

    def track_request(self, server, tool, duration, success):
        """Track individual request metrics."""
        mcp_requests.labels(server=server, tool=tool).inc()
        mcp_latency.labels(server=server).observe(duration)

        if not success:
            mcp_errors.labels(server=server, error_type='request_failed').inc()

    def update_active_servers(self, count):
        """Update active server count."""
        mcp_active_servers.set(count)

Performance OptimizationΒΆ

Connection PoolingΒΆ

Optimize connections for high throughput:

from haive.mcp.pooling import ConnectionPool

# Configure connection pool
pool_config = {
    "min_connections": 5,
    "max_connections": 50,
    "connection_timeout": 10,
    "idle_timeout": 300,
    "max_lifetime": 3600,
    "validation_interval": 60
}

pool = ConnectionPool(**pool_config)

# Use with manager
manager = MCPManager(connection_pool=pool)

# Monitor pool statistics
stats = pool.get_stats()
print(f"Active connections: {stats['active']}")
print(f"Idle connections: {stats['idle']}")
print(f"Total created: {stats['total_created']}")

Caching StrategiesΒΆ

Implement intelligent caching:

from haive.mcp.cache import MCPCache, CacheStrategy
import redis

# Redis-backed cache
redis_client = redis.Redis(host='localhost', port=6379)

cache = MCPCache(
    backend=redis_client,
    strategy=CacheStrategy.LRU,
    max_size=10000,
    ttl=3600
)

# Cache decorator for expensive operations
@cache.cached(ttl=1800)
async def expensive_mcp_operation(server, params):
    """Cached MCP operation."""
    return await manager.execute_tool(server, "expensive_tool", params)

# Warm up cache
async def warm_cache():
    """Pre-populate cache with common operations."""
    common_operations = [
        ("filesystem", {"action": "list", "path": "/"}),
        ("github", {"action": "list_repos"}),
        ("postgres", {"action": "list_tables"})
    ]

    for server, params in common_operations:
        await expensive_mcp_operation(server, params)

Load BalancingΒΆ

Distribute load across server instances:

from haive.mcp.loadbalancer import LoadBalancer, Strategy

# Configure load balancer
lb = LoadBalancer(
    strategy=Strategy.ROUND_ROBIN,
    health_check_interval=10
)

# Add server instances
lb.add_instance("filesystem", "fs-1", "localhost:8001")
lb.add_instance("filesystem", "fs-2", "localhost:8002")
lb.add_instance("filesystem", "fs-3", "localhost:8003")

# Use load-balanced servers
async def balanced_request(tool, params):
    """Execute request on load-balanced server."""
    instance = lb.get_instance("filesystem")
    return await instance.execute(tool, params)

Disaster RecoveryΒΆ

Backup and RestoreΒΆ

Implement backup strategies:

from haive.mcp.backup import BackupManager
import schedule

backup_manager = BackupManager(
    backup_dir="/backup/mcp",
    retention_days=30
)

async def backup_servers():
    """Backup all server configurations and data."""
    # Backup configurations
    await backup_manager.backup_configs(manager.get_all_configs())

    # Backup server data
    for server in manager.list_servers():
        data = await manager.export_server_data(server)
        await backup_manager.backup_server_data(server, data)

    print(f"Backup completed: {backup_manager.last_backup}")

async def restore_servers(backup_id):
    """Restore servers from backup."""
    # Restore configurations
    configs = await backup_manager.restore_configs(backup_id)
    for server, config in configs.items():
        await manager.apply_config(server, config)

    # Restore server data
    for server in configs.keys():
        data = await backup_manager.restore_server_data(server, backup_id)
        await manager.import_server_data(server, data)

    print(f"Restore completed from backup: {backup_id}")

# Schedule regular backups
schedule.every(6).hours.do(lambda: asyncio.run(backup_servers()))

Failover StrategiesΒΆ

Implement automatic failover:

class FailoverManager:
    """Manage server failover."""

    def __init__(self, manager):
        self.manager = manager
        self.primary_servers = {}
        self.backup_servers = {}

    def configure_failover(self, server, primary_url, backup_urls):
        """Configure failover for a server."""
        self.primary_servers[server] = primary_url
        self.backup_servers[server] = backup_urls

    async def handle_failure(self, server):
        """Handle server failure with failover."""
        if server not in self.backup_servers:
            raise Exception(f"No backup configured for {server}")

        # Try each backup
        for backup_url in self.backup_servers[server]:
            try:
                # Switch to backup
                await self.manager.switch_server_url(server, backup_url)
                print(f"Failed over {server} to {backup_url}")
                return True
            except Exception as e:
                print(f"Backup {backup_url} failed: {e}")

        return False

Security Best PracticesΒΆ

Access ControlΒΆ

Implement fine-grained access control:

from haive.mcp.security import AccessControl, Permission

# Configure access control
ac = AccessControl()

# Define roles
ac.add_role("admin", [Permission.ALL])
ac.add_role("developer", [
    Permission.READ,
    Permission.EXECUTE,
    Permission.LIST
])
ac.add_role("viewer", [Permission.READ, Permission.LIST])

# Assign permissions to servers
ac.set_server_permissions("filesystem", {
    "admin": ["*"],
    "developer": ["read_file", "list_directory"],
    "viewer": ["list_directory"]
})

# Check permissions
if ac.check_permission("developer", "filesystem", "write_file"):
    await manager.execute_tool("filesystem", "write_file", params)
else:
    raise PermissionError("Insufficient permissions")

Audit LoggingΒΆ

Track all server operations:

from haive.mcp.audit import AuditLogger

audit = AuditLogger(
    log_file="/var/log/mcp/audit.log",
    include_params=True,
    mask_sensitive=True
)

# Log server operations
@audit.log_operation
async def execute_sensitive_operation(server, tool, params):
    """Execute and audit sensitive operations."""
    result = await manager.execute_tool(server, tool, params)
    return result

# Query audit logs
logs = audit.query(
    server="filesystem",
    user="admin",
    start_time=datetime.now() - timedelta(hours=24)
)

Scaling StrategiesΒΆ

Horizontal ScalingΒΆ

Scale servers horizontally:

# Docker Swarm deployment
docker swarm init
docker service create \
  --name mcp-filesystem \
  --replicas 3 \
  --publish 8080:8080 \
  mcp/filesystem-server

# Kubernetes deployment
kubectl apply -f mcp-deployment.yaml
kubectl scale deployment mcp-filesystem --replicas=5

Auto-scalingΒΆ

Implement auto-scaling based on load:

from haive.mcp.autoscale import AutoScaler

autoscaler = AutoScaler(
    manager=manager,
    min_instances=2,
    max_instances=10,
    target_cpu=70,
    target_memory=80,
    scale_up_threshold=3,  # Consecutive checks
    scale_down_threshold=5
)

# Start auto-scaling
await autoscaler.start()

# Monitor scaling events
autoscaler.on_scale_up = lambda s, n: print(f"Scaled up {s} to {n} instances")
autoscaler.on_scale_down = lambda s, n: print(f"Scaled down {s} to {n} instances")

Maintenance OperationsΒΆ

Rolling UpdatesΒΆ

Update servers without downtime:

async def rolling_update(server_name, new_version):
    """Perform rolling update of server instances."""
    instances = manager.get_server_instances(server_name)

    for instance in instances:
        # Remove from load balancer
        lb.remove_instance(server_name, instance.id)

        # Update instance
        await instance.update(new_version)

        # Health check
        if await instance.health_check():
            # Add back to load balancer
            lb.add_instance(server_name, instance.id, instance.url)
        else:
            print(f"Update failed for {instance.id}")
            # Rollback if needed
            await instance.rollback()

Maintenance ModeΒΆ

Put servers in maintenance mode:

async def enter_maintenance_mode(server):
    """Put server in maintenance mode."""
    # Drain connections
    await manager.drain_connections(server, timeout=30)

    # Set maintenance flag
    await manager.set_maintenance_mode(server, True)

    # Redirect traffic to backup
    if server in backup_servers:
        await lb.redirect_traffic(server, backup_servers[server])

    print(f"{server} in maintenance mode")

async def exit_maintenance_mode(server):
    """Exit maintenance mode."""
    # Health check
    if await manager.health_check(server):
        # Remove maintenance flag
        await manager.set_maintenance_mode(server, False)

        # Restore traffic
        await lb.restore_traffic(server)

        print(f"{server} back online")

Next StepsΒΆ