πŸš€ Production Deployment GuideΒΆ

This guide covers everything needed to deploy Haive Agents in production environments, from single-server deployments to large-scale distributed systems.

Deployment ArchitectureΒΆ

Single Server DeploymentΒΆ

Best for: Small to medium applications, development staging

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚           Production Server         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  Web Server (Nginx/Apache)         β”‚
β”‚  β”œβ”€β”€ Load Balancer                 β”‚
β”‚  └── SSL Termination               β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  Application Server                 β”‚
β”‚  β”œβ”€β”€ Haive Agents Service          β”‚
β”‚  β”œβ”€β”€ API Gateway                   β”‚
β”‚  └── Background Workers            β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  Database Layer                     β”‚
β”‚  β”œβ”€β”€ PostgreSQL (primary data)     β”‚
β”‚  β”œβ”€β”€ Redis (caching/queues)        β”‚
β”‚  └── Neo4j (graph memory)          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Multi-Server ArchitectureΒΆ

Best for: High-traffic applications, enterprise deployments

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Load Balancerβ”‚    β”‚  API Gateway  β”‚    β”‚   Monitoring  β”‚
β”‚   (HAProxy)   β”‚    β”‚   (Kong)      β”‚    β”‚  (Prometheus) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
        β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                Agent Cluster                              β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ Agent Node 1β”‚ Agent Node 2β”‚ Agent Node 3β”‚ Background Jobs β”‚
β”‚             β”‚             β”‚             β”‚                 β”‚
β”‚ - ReactAgentβ”‚ - SimpleAgentβ”‚ - MultiAgentβ”‚ - Memory Cons. β”‚
β”‚ - Tools     β”‚ - Memory    β”‚ - Coord.    β”‚ - Maintenance  β”‚
β”‚ - API       β”‚ - Vector DB β”‚ - Workflow  β”‚ - Analytics    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
        β”‚                           β”‚                β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  PostgreSQL    β”‚     β”‚     Redis      β”‚ β”‚     Neo4j      β”‚
β”‚   Cluster      β”‚     β”‚    Cluster     β”‚ β”‚   Cluster      β”‚
β”‚ (Primary/Read) β”‚     β”‚  (Cache/Queue) β”‚ β”‚ (Graph Memory) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Docker DeploymentΒΆ

Basic Docker SetupΒΆ

# Dockerfile
FROM python:3.11-slim

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Create app user
RUN groupadd -r appuser && useradd -r -g appuser appuser

# Set working directory
WORKDIR /app

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .
RUN chown -R appuser:appuser /app

# Switch to app user
USER appuser

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8000/health || exit 1

# Start command
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose SetupΒΆ

# docker-compose.yml
version: '3.8'

services:
  # Main application
  haive-agents:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - DATABASE_URL=postgresql://haive:password@postgres:5432/haive_agents
      - REDIS_URL=redis://redis:6379/0
      - NEO4J_URI=neo4j://neo4j:7687
      - NEO4J_PASSWORD=${NEO4J_PASSWORD}
      - LOG_LEVEL=INFO
    depends_on:
      - postgres
      - redis
      - neo4j
    volumes:
      - ./logs:/app/logs
      - ./memory:/app/memory
    restart: unless-stopped

  # PostgreSQL database
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: haive_agents
      POSTGRES_USER: haive
      POSTGRES_PASSWORD: password
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./init-db.sql:/docker-entrypoint-initdb.d/init.sql
    ports:
      - "5432:5432"
    restart: unless-stopped

  # Redis cache and queues
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes
    restart: unless-stopped

  # Neo4j graph database
  neo4j:
    image: neo4j:5.15
    environment:
      NEO4J_AUTH: neo4j/${NEO4J_PASSWORD}
      NEO4J_PLUGINS: '["apoc", "graph-data-science"]'
      NEO4J_dbms_security_procedures_unrestricted: gds.*,apoc.*
    ports:
      - "7474:7474"
      - "7687:7687"
    volumes:
      - neo4j_data:/data
      - neo4j_logs:/logs
    restart: unless-stopped

  # Background worker
  worker:
    build: .
    command: python -m celery worker -A app.celery --loglevel=info
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - DATABASE_URL=postgresql://haive:password@postgres:5432/haive_agents
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - postgres
      - redis
    volumes:
      - ./logs:/app/logs
    restart: unless-stopped

  # Monitoring with Prometheus
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.console.libraries=/etc/prometheus/console_libraries'
      - '--web.console.templates=/etc/prometheus/consoles'
    restart: unless-stopped

  # Grafana dashboard
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
    volumes:
      - grafana_data:/var/lib/grafana
      - ./monitoring/dashboards:/etc/grafana/provisioning/dashboards
    restart: unless-stopped

volumes:
  postgres_data:
  redis_data:
  neo4j_data:
  neo4j_logs:
  prometheus_data:
  grafana_data:

Production FastAPI ApplicationΒΆ

# main.py
import asyncio
import logging
import time
from contextlib import asynccontextmanager
from typing import Dict, Any, List

from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from pydantic import BaseModel, Field
import uvicorn

from haive.agents import SimpleAgent, ReactAgent, MultiAgent
from haive.core.engine.aug_llm import AugLLMConfig
from monitoring import setup_metrics, track_request
from auth import get_current_user
from rate_limiting import RateLimiter

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Global agent instances (initialized at startup)
agents: Dict[str, Any] = {}
rate_limiter = RateLimiter()

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifecycle management."""
    # Startup
    logger.info("Starting Haive Agents service...")

    # Initialize agents
    await initialize_agents()

    # Setup monitoring
    setup_metrics(app)

    logger.info("Service ready!")
    yield

    # Shutdown
    logger.info("Shutting down service...")
    await cleanup_agents()

# FastAPI app
app = FastAPI(
    title="Haive Agents API",
    description="Production API for Haive AI Agents",
    version="1.0.0",
    docs_url="/docs",
    redoc_url="/redoc",
    lifespan=lifespan
)

# Middleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Configure appropriately for production
    allow_credentials=True,
    allow_methods=["GET", "POST"],
    allow_headers=["*"]
)

# Request/Response models
class AgentRequest(BaseModel):
    message: str = Field(..., min_length=1, max_length=10000)
    agent_type: str = Field("simple", regex="^(simple|react|multi)$")
    user_id: str = Field(..., min_length=1, max_length=100)
    session_id: str = Field(None, max_length=100)
    parameters: Dict[str, Any] = Field(default_factory=dict)

class AgentResponse(BaseModel):
    response: str
    agent_type: str
    processing_time: float
    session_id: str
    metadata: Dict[str, Any] = Field(default_factory=dict)

class HealthCheck(BaseModel):
    status: str
    timestamp: float
    version: str
    agents_loaded: int
    uptime: float

# Global variables for health tracking
start_time = time.time()

async def initialize_agents():
    """Initialize all agent instances."""
    global agents

    try:
        # Basic configuration
        basic_config = AugLLMConfig(
            provider="openai",
            model="gpt-3.5-turbo",
            temperature=0.7
        )

        # Advanced configuration
        advanced_config = AugLLMConfig(
            provider="openai",
            model="gpt-4",
            temperature=0.5
        )

        # Create agent instances
        agents = {
            "simple": SimpleAgent(
                name="production_simple",
                engine=basic_config
            ),

            "react": ReactAgent(
                name="production_react",
                engine=advanced_config,
                tools=[]  # Add your tools here
            ),

            "multi": MultiAgent(
                name="production_multi",
                agents=[
                    SimpleAgent(name="coordinator", engine=basic_config),
                    SimpleAgent(name="processor", engine=basic_config)
                ],
                execution_mode="sequential"
            )
        }

        logger.info(f"Initialized {len(agents)} agents")

    except Exception as e:
        logger.error(f"Failed to initialize agents: {e}")
        raise

async def cleanup_agents():
    """Clean up agent resources."""
    global agents

    for name, agent in agents.items():
        try:
            # Cleanup agent resources if needed
            if hasattr(agent, 'cleanup'):
                await agent.cleanup()
            logger.info(f"Cleaned up agent: {name}")
        except Exception as e:
            logger.error(f"Error cleaning up agent {name}: {e}")

@app.get("/health", response_model=HealthCheck)
async def health_check():
    """Health check endpoint."""
    return HealthCheck(
        status="healthy",
        timestamp=time.time(),
        version="1.0.0",
        agents_loaded=len(agents),
        uptime=time.time() - start_time
    )

@app.post("/chat", response_model=AgentResponse)
async def chat_with_agent(
    request: AgentRequest,
    background_tasks: BackgroundTasks,
    current_user: str = Depends(get_current_user)
):
    """Main chat endpoint."""

    start_time = time.time()

    try:
        # Rate limiting
        if not await rate_limiter.check_rate_limit(request.user_id):
            raise HTTPException(status_code=429, detail="Rate limit exceeded")

        # Get agent
        agent = agents.get(request.agent_type)
        if not agent:
            raise HTTPException(status_code=400, detail="Invalid agent type")

        # Process request
        response = await agent.arun(request.message)

        # Calculate processing time
        processing_time = time.time() - start_time

        # Background tasks
        background_tasks.add_task(
            track_request,
            user_id=request.user_id,
            agent_type=request.agent_type,
            processing_time=processing_time,
            success=True
        )

        return AgentResponse(
            response=response,
            agent_type=request.agent_type,
            processing_time=processing_time,
            session_id=request.session_id or "default",
            metadata={
                "model": agent.engine.model,
                "temperature": agent.engine.temperature
            }
        )

    except HTTPException:
        raise
    except Exception as e:
        # Log error
        logger.error(f"Chat error: {e}", exc_info=True)

        # Track failed request
        background_tasks.add_task(
            track_request,
            user_id=request.user_id,
            agent_type=request.agent_type,
            processing_time=time.time() - start_time,
            success=False,
            error=str(e)
        )

        raise HTTPException(status_code=500, detail="Internal server error")

@app.get("/agents")
async def list_agents():
    """List available agents."""
    return {
        "agents": list(agents.keys()),
        "total": len(agents)
    }

@app.get("/metrics")
async def get_metrics():
    """Prometheus metrics endpoint."""
    # Return Prometheus metrics
    from monitoring import generate_metrics
    return generate_metrics()

if __name__ == "__main__":
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=8000,
        reload=False,  # Disable in production
        workers=1,     # Single worker for shared state
        log_level="info"
    )

Kubernetes DeploymentΒΆ

Kubernetes ManifestsΒΆ

# namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: haive-agents

---
# configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: haive-config
  namespace: haive-agents
data:
  LOG_LEVEL: "INFO"
  DATABASE_URL: "postgresql://haive:password@postgres:5432/haive_agents"
  REDIS_URL: "redis://redis:6379/0"
  NEO4J_URI: "neo4j://neo4j:7687"

---
# secret.yaml
apiVersion: v1
kind: Secret
metadata:
  name: haive-secrets
  namespace: haive-agents
type: Opaque
data:
  OPENAI_API_KEY: <base64-encoded-key>
  NEO4J_PASSWORD: <base64-encoded-password>

---
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: haive-agents
  namespace: haive-agents
  labels:
    app: haive-agents
spec:
  replicas: 3
  selector:
    matchLabels:
      app: haive-agents
  template:
    metadata:
      labels:
        app: haive-agents
    spec:
      containers:
      - name: haive-agents
        image: haive-agents:latest
        ports:
        - containerPort: 8000
        envFrom:
        - configMapRef:
            name: haive-config
        - secretRef:
            name: haive-secrets
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
        volumeMounts:
        - name: memory-storage
          mountPath: /app/memory
      volumes:
      - name: memory-storage
        persistentVolumeClaim:
          claimName: memory-pvc

---
# service.yaml
apiVersion: v1
kind: Service
metadata:
  name: haive-agents-service
  namespace: haive-agents
spec:
  selector:
    app: haive-agents
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: ClusterIP

---
# ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: haive-agents-ingress
  namespace: haive-agents
  annotations:
    kubernetes.io/ingress.class: nginx
    cert-manager.io/cluster-issuer: letsencrypt-prod
    nginx.ingress.kubernetes.io/rate-limit: "100"
spec:
  tls:
  - hosts:
    - api.haive.example.com
    secretName: haive-tls
  rules:
  - host: api.haive.example.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: haive-agents-service
            port:
              number: 80

Horizontal Pod AutoscalerΒΆ

# hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: haive-agents-hpa
  namespace: haive-agents
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: haive-agents
  minReplicas: 2
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 50
        periodSeconds: 60
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 10
        periodSeconds: 60

Monitoring and ObservabilityΒΆ

Prometheus MetricsΒΆ

# monitoring.py
import time
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Request

# Metrics
REQUEST_COUNT = Counter(
    'haive_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status_code', 'agent_type']
)

REQUEST_DURATION = Histogram(
    'haive_request_duration_seconds',
    'HTTP request duration',
    ['method', 'endpoint', 'agent_type']
)

AGENT_PROCESSING_TIME = Histogram(
    'haive_agent_processing_seconds',
    'Agent processing time',
    ['agent_type']
)

ACTIVE_AGENTS = Gauge(
    'haive_active_agents',
    'Number of active agent instances',
    ['agent_type']
)

MEMORY_USAGE = Gauge(
    'haive_memory_usage_bytes',
    'Memory usage by component',
    ['component']
)

ERROR_COUNT = Counter(
    'haive_errors_total',
    'Total errors',
    ['error_type', 'agent_type']
)

def setup_metrics(app):
    """Setup metrics collection."""

    @app.middleware("http")
    async def metrics_middleware(request: Request, call_next):
        start_time = time.time()

        response = await call_next(request)

        # Record metrics
        REQUEST_COUNT.labels(
            method=request.method,
            endpoint=request.url.path,
            status_code=response.status_code,
            agent_type=request.headers.get('X-Agent-Type', 'unknown')
        ).inc()

        REQUEST_DURATION.labels(
            method=request.method,
            endpoint=request.url.path,
            agent_type=request.headers.get('X-Agent-Type', 'unknown')
        ).observe(time.time() - start_time)

        return response

async def track_request(user_id: str, agent_type: str, processing_time: float,
                       success: bool, error: str = None):
    """Track individual request metrics."""

    # Record processing time
    AGENT_PROCESSING_TIME.labels(agent_type=agent_type).observe(processing_time)

    # Record errors
    if not success and error:
        ERROR_COUNT.labels(
            error_type=classify_error(error),
            agent_type=agent_type
        ).inc()

def classify_error(error_msg: str) -> str:
    """Classify error for metrics."""
    error_lower = error_msg.lower()

    if "rate limit" in error_lower:
        return "rate_limit"
    elif "timeout" in error_lower:
        return "timeout"
    elif "validation" in error_lower:
        return "validation"
    else:
        return "other"

def generate_metrics():
    """Generate Prometheus metrics."""
    return generate_latest()

Logging ConfigurationΒΆ

# logging_config.py
import logging
import json
import sys
from datetime import datetime
from typing import Dict, Any

class StructuredFormatter(logging.Formatter):
    """JSON structured logging formatter."""

    def format(self, record):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno
        }

        # Add extra fields
        if hasattr(record, 'user_id'):
            log_entry['user_id'] = record.user_id
        if hasattr(record, 'session_id'):
            log_entry['session_id'] = record.session_id
        if hasattr(record, 'agent_type'):
            log_entry['agent_type'] = record.agent_type
        if hasattr(record, 'processing_time'):
            log_entry['processing_time'] = record.processing_time

        # Add exception info if present
        if record.exc_info:
            log_entry['exception'] = self.formatException(record.exc_info)

        return json.dumps(log_entry)

def setup_logging(log_level: str = "INFO", log_file: str = None):
    """Setup structured logging."""

    # Create formatter
    formatter = StructuredFormatter()

    # Console handler
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(formatter)

    # Configure root logger
    root_logger = logging.getLogger()
    root_logger.setLevel(getattr(logging, log_level.upper()))
    root_logger.addHandler(console_handler)

    # File handler if specified
    if log_file:
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(formatter)
        root_logger.addHandler(file_handler)

    # Suppress noisy loggers
    logging.getLogger("urllib3").setLevel(logging.WARNING)
    logging.getLogger("httpx").setLevel(logging.WARNING)

# Usage
setup_logging("INFO", "/app/logs/haive-agents.log")

Performance OptimizationΒΆ

Connection PoolingΒΆ

# database.py
import asyncpg
import aioredis
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

class DatabaseManager:
    """Async database connection management."""

    def __init__(self):
        self.pg_pool = None
        self.redis_pool = None
        self.engine = None
        self.session_factory = None

    async def initialize(self):
        """Initialize database connections."""

        # PostgreSQL pool
        self.pg_pool = await asyncpg.create_pool(
            "postgresql://haive:password@postgres:5432/haive_agents",
            min_size=5,
            max_size=20,
            command_timeout=60
        )

        # Redis pool
        self.redis_pool = await aioredis.from_url(
            "redis://redis:6379",
            max_connections=20,
            encoding="utf-8",
            decode_responses=True
        )

        # SQLAlchemy engine
        self.engine = create_async_engine(
            "postgresql+asyncpg://haive:password@postgres:5432/haive_agents",
            pool_size=10,
            max_overflow=20,
            pool_pre_ping=True,
            echo=False
        )

        self.session_factory = sessionmaker(
            self.engine, class_=AsyncSession, expire_on_commit=False
        )

    async def get_pg_connection(self):
        """Get PostgreSQL connection from pool."""
        return await self.pg_pool.acquire()

    async def get_redis_connection(self):
        """Get Redis connection from pool."""
        return self.redis_pool

    async def get_session(self):
        """Get SQLAlchemy session."""
        async with self.session_factory() as session:
            yield session

    async def cleanup(self):
        """Cleanup connections."""
        if self.pg_pool:
            await self.pg_pool.close()
        if self.redis_pool:
            await self.redis_pool.close()
        if self.engine:
            await self.engine.dispose()

# Global database manager
db_manager = DatabaseManager()

Caching StrategyΒΆ

# caching.py
import json
import hashlib
from typing import Any, Optional
import aioredis
from functools import wraps

class CacheManager:
    """Intelligent caching for agent responses."""

    def __init__(self, redis_url: str):
        self.redis_url = redis_url
        self.redis = None

    async def initialize(self):
        """Initialize Redis connection."""
        self.redis = await aioredis.from_url(self.redis_url)

    def cache_key(self, agent_type: str, message: str, **kwargs) -> str:
        """Generate cache key for agent request."""
        key_data = {
            "agent_type": agent_type,
            "message": message,
            **kwargs
        }
        key_str = json.dumps(key_data, sort_keys=True)
        return f"agent_cache:{hashlib.md5(key_str.encode()).hexdigest()}"

    async def get_cached_response(self, agent_type: str, message: str,
                                **kwargs) -> Optional[str]:
        """Get cached agent response."""
        if not self.redis:
            return None

        key = self.cache_key(agent_type, message, **kwargs)
        cached = await self.redis.get(key)

        if cached:
            return json.loads(cached)
        return None

    async def cache_response(self, agent_type: str, message: str,
                           response: str, ttl: int = 3600, **kwargs):
        """Cache agent response."""
        if not self.redis:
            return

        key = self.cache_key(agent_type, message, **kwargs)
        await self.redis.setex(
            key,
            ttl,
            json.dumps({
                "response": response,
                "timestamp": time.time()
            })
        )

    async def invalidate_pattern(self, pattern: str):
        """Invalidate cache entries matching pattern."""
        if not self.redis:
            return

        keys = await self.redis.keys(f"agent_cache:{pattern}*")
        if keys:
            await self.redis.delete(*keys)

# Cache decorator
def cached_agent_call(ttl: int = 3600):
    def decorator(func):
        @wraps(func)
        async def wrapper(agent_type: str, message: str, *args, **kwargs):
            cache_manager = kwargs.get('cache_manager')

            if cache_manager:
                # Try cache first
                cached = await cache_manager.get_cached_response(
                    agent_type, message, **kwargs
                )
                if cached:
                    return cached['response']

            # Execute function
            response = await func(agent_type, message, *args, **kwargs)

            # Cache result
            if cache_manager:
                await cache_manager.cache_response(
                    agent_type, message, response, ttl, **kwargs
                )

            return response
        return wrapper
    return decorator

Load TestingΒΆ

Performance Testing ScriptΒΆ

# load_test.py
import asyncio
import aiohttp
import time
import statistics
from typing import List, Dict

async def single_request(session: aiohttp.ClientSession,
                        url: str, payload: Dict) -> Dict:
    """Make a single request and measure performance."""
    start_time = time.time()

    try:
        async with session.post(url, json=payload) as response:
            end_time = time.time()

            return {
                "status_code": response.status,
                "response_time": end_time - start_time,
                "success": response.status == 200,
                "size": len(await response.text())
            }
    except Exception as e:
        return {
            "status_code": 0,
            "response_time": time.time() - start_time,
            "success": False,
            "error": str(e)
        }

async def load_test(base_url: str, concurrent_users: int = 10,
                   requests_per_user: int = 10):
    """Run load test against agent API."""

    url = f"{base_url}/chat"
    payload = {
        "message": "What is artificial intelligence?",
        "agent_type": "simple",
        "user_id": "load_test_user"
    }

    async with aiohttp.ClientSession() as session:
        # Create tasks for concurrent users
        tasks = []

        for user_id in range(concurrent_users):
            for request_id in range(requests_per_user):
                user_payload = payload.copy()
                user_payload["user_id"] = f"user_{user_id}"

                task = single_request(session, url, user_payload)
                tasks.append(task)

        # Execute all requests
        print(f"Starting load test: {concurrent_users} users, {requests_per_user} requests each")
        start_time = time.time()

        results = await asyncio.gather(*tasks)

        end_time = time.time()

        # Analyze results
        analyze_results(results, end_time - start_time)

def analyze_results(results: List[Dict], total_time: float):
    """Analyze load test results."""

    successful_requests = [r for r in results if r['success']]
    failed_requests = [r for r in results if not r['success']]

    response_times = [r['response_time'] for r in successful_requests]

    print(f"\n=== Load Test Results ===")
    print(f"Total requests: {len(results)}")
    print(f"Successful: {len(successful_requests)}")
    print(f"Failed: {len(failed_requests)}")
    print(f"Success rate: {len(successful_requests)/len(results):.1%}")
    print(f"Total time: {total_time:.2f}s")
    print(f"Requests/second: {len(results)/total_time:.1f}")

    if response_times:
        print(f"\nResponse Times:")
        print(f"Average: {statistics.mean(response_times):.2f}s")
        print(f"Median: {statistics.median(response_times):.2f}s")
        print(f"95th percentile: {sorted(response_times)[int(len(response_times)*0.95)]:.2f}s")
        print(f"Min: {min(response_times):.2f}s")
        print(f"Max: {max(response_times):.2f}s")

    if failed_requests:
        print(f"\nErrors:")
        error_types = {}
        for req in failed_requests:
            error = req.get('error', f"HTTP {req['status_code']}")
            error_types[error] = error_types.get(error, 0) + 1

        for error, count in error_types.items():
            print(f"  {error}: {count}")

if __name__ == "__main__":
    asyncio.run(load_test("http://localhost:8000", 50, 20))

Security HardeningΒΆ

Security ChecklistΒΆ

Application Security

  • βœ… Input validation and sanitization

  • βœ… Output encoding and CSRF protection

  • βœ… API authentication and authorization

  • βœ… Rate limiting and DDoS protection

  • βœ… Secure headers and HTTPS enforcement

  • βœ… Secret management (not hardcoded)

  • βœ… Dependency scanning and updates

  • βœ… Error handling (no sensitive data leakage)

Infrastructure Security

  • βœ… Container security scanning

  • βœ… Network segmentation and firewalls

  • βœ… Database encryption at rest and in transit

  • βœ… Regular security patches and updates

  • βœ… Backup encryption and testing

  • βœ… Access logging and monitoring

  • βœ… Intrusion detection systems

  • βœ… Vulnerability assessments

Deployment Security

# Security-hardened Dockerfile
FROM python:3.11-slim

# Security updates
RUN apt-get update && apt-get upgrade -y \
    && apt-get install -y --no-install-recommends \
    gcc g++ curl \
    && rm -rf /var/lib/apt/lists/* \
    && apt-get clean

# Create non-root user
RUN groupadd -r appuser \
    && useradd -r -g appuser -d /app -s /bin/false appuser

# Set secure permissions
WORKDIR /app
COPY --chown=appuser:appuser . .

# Install dependencies
RUN pip install --no-cache-dir --upgrade pip \
    && pip install --no-cache-dir -r requirements.txt

# Remove unnecessary packages
RUN apt-get purge -y gcc g++ \
    && apt-get autoremove -y

# Switch to non-root user
USER appuser

# Security headers
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    PYTHONPATH=/app

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s \
  CMD curl -f http://localhost:8000/health || exit 1

EXPOSE 8000
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Deployment Best PracticesΒΆ

Pre-Deployment Checklist

  1. Code Review: All code changes reviewed and approved

  2. Testing: Unit, integration, and load tests passing

  3. Security: Security scanning and vulnerability assessment

  4. Configuration: Environment-specific configurations validated

  5. Dependencies: All dependencies updated and scanned

  6. Documentation: Deployment and rollback procedures documented

  7. Monitoring: Alerts and monitoring configured

  8. Backup: Database and configuration backups verified

Deployment Process

  1. Blue-Green Deployment: Deploy to staging environment

  2. Smoke Testing: Basic functionality verification

  3. Load Testing: Performance validation under load

  4. Gradual Rollout: Progressive traffic switching

  5. Monitoring: Real-time metrics and alerts monitoring

  6. Rollback Plan: Immediate rollback capability if issues arise

Post-Deployment Verification

  1. Health Checks: All services responding correctly

  2. Performance Metrics: Response times within SLA

  3. Error Rates: Error rates below acceptable thresholds

  4. Resource Usage: CPU, memory, disk usage normal

  5. User Experience: End-to-end functionality working

  6. Alerts: No critical alerts firing

This comprehensive deployment guide ensures your Haive Agents run reliably and securely in production environments, from small-scale deployments to enterprise-grade systems handling thousands of concurrent users.