π Production Deployment GuideΒΆ
This guide covers everything needed to deploy Haive Agents in production environments, from single-server deployments to large-scale distributed systems.
Deployment ArchitectureΒΆ
Single Server DeploymentΒΆ
Best for: Small to medium applications, development staging
βββββββββββββββββββββββββββββββββββββββ
β Production Server β
βββββββββββββββββββββββββββββββββββββββ€
β Web Server (Nginx/Apache) β
β βββ Load Balancer β
β βββ SSL Termination β
βββββββββββββββββββββββββββββββββββββββ€
β Application Server β
β βββ Haive Agents Service β
β βββ API Gateway β
β βββ Background Workers β
βββββββββββββββββββββββββββββββββββββββ€
β Database Layer β
β βββ PostgreSQL (primary data) β
β βββ Redis (caching/queues) β
β βββ Neo4j (graph memory) β
βββββββββββββββββββββββββββββββββββββββ
Multi-Server ArchitectureΒΆ
Best for: High-traffic applications, enterprise deployments
βββββββββββββββββ βββββββββββββββββ βββββββββββββββββ
β Load Balancerβ β API Gateway β β Monitoring β
β (HAProxy) β β (Kong) β β (Prometheus) β
βββββββββ¬ββββββββ βββββββββββββββββ βββββββββββββββββ
β
βββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Agent Cluster β
βββββββββββββββ¬ββββββββββββββ¬ββββββββββββββ¬ββββββββββββββββββ€
β Agent Node 1β Agent Node 2β Agent Node 3β Background Jobs β
β β β β β
β - ReactAgentβ - SimpleAgentβ - MultiAgentβ - Memory Cons. β
β - Tools β - Memory β - Coord. β - Maintenance β
β - API β - Vector DB β - Workflow β - Analytics β
βββββββββββββββ΄ββββββββββββββ΄ββββββββββββββ΄ββββββββββββββββββ
β β β
βββββββββΌβββββββββ ββββββββββΌββββββββ βββββββΌβββββββββββ
β PostgreSQL β β Redis β β Neo4j β
β Cluster β β Cluster β β Cluster β
β (Primary/Read) β β (Cache/Queue) β β (Graph Memory) β
ββββββββββββββββββ ββββββββββββββββββ ββββββββββββββββββ
Docker DeploymentΒΆ
Basic Docker SetupΒΆ
# Dockerfile
FROM python:3.11-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
curl \
&& rm -rf /var/lib/apt/lists/*
# Create app user
RUN groupadd -r appuser && useradd -r -g appuser appuser
# Set working directory
WORKDIR /app
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
RUN chown -R appuser:appuser /app
# Switch to app user
USER appuser
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Start command
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Docker Compose SetupΒΆ
# docker-compose.yml
version: '3.8'
services:
# Main application
haive-agents:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DATABASE_URL=postgresql://haive:password@postgres:5432/haive_agents
- REDIS_URL=redis://redis:6379/0
- NEO4J_URI=neo4j://neo4j:7687
- NEO4J_PASSWORD=${NEO4J_PASSWORD}
- LOG_LEVEL=INFO
depends_on:
- postgres
- redis
- neo4j
volumes:
- ./logs:/app/logs
- ./memory:/app/memory
restart: unless-stopped
# PostgreSQL database
postgres:
image: postgres:15
environment:
POSTGRES_DB: haive_agents
POSTGRES_USER: haive
POSTGRES_PASSWORD: password
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init-db.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
restart: unless-stopped
# Redis cache and queues
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
restart: unless-stopped
# Neo4j graph database
neo4j:
image: neo4j:5.15
environment:
NEO4J_AUTH: neo4j/${NEO4J_PASSWORD}
NEO4J_PLUGINS: '["apoc", "graph-data-science"]'
NEO4J_dbms_security_procedures_unrestricted: gds.*,apoc.*
ports:
- "7474:7474"
- "7687:7687"
volumes:
- neo4j_data:/data
- neo4j_logs:/logs
restart: unless-stopped
# Background worker
worker:
build: .
command: python -m celery worker -A app.celery --loglevel=info
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DATABASE_URL=postgresql://haive:password@postgres:5432/haive_agents
- REDIS_URL=redis://redis:6379/0
depends_on:
- postgres
- redis
volumes:
- ./logs:/app/logs
restart: unless-stopped
# Monitoring with Prometheus
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
restart: unless-stopped
# Grafana dashboard
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
volumes:
- grafana_data:/var/lib/grafana
- ./monitoring/dashboards:/etc/grafana/provisioning/dashboards
restart: unless-stopped
volumes:
postgres_data:
redis_data:
neo4j_data:
neo4j_logs:
prometheus_data:
grafana_data:
Production FastAPI ApplicationΒΆ
# main.py
import asyncio
import logging
import time
from contextlib import asynccontextmanager
from typing import Dict, Any, List
from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from pydantic import BaseModel, Field
import uvicorn
from haive.agents import SimpleAgent, ReactAgent, MultiAgent
from haive.core.engine.aug_llm import AugLLMConfig
from monitoring import setup_metrics, track_request
from auth import get_current_user
from rate_limiting import RateLimiter
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global agent instances (initialized at startup)
agents: Dict[str, Any] = {}
rate_limiter = RateLimiter()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifecycle management."""
# Startup
logger.info("Starting Haive Agents service...")
# Initialize agents
await initialize_agents()
# Setup monitoring
setup_metrics(app)
logger.info("Service ready!")
yield
# Shutdown
logger.info("Shutting down service...")
await cleanup_agents()
# FastAPI app
app = FastAPI(
title="Haive Agents API",
description="Production API for Haive AI Agents",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
lifespan=lifespan
)
# Middleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["*"]
)
# Request/Response models
class AgentRequest(BaseModel):
message: str = Field(..., min_length=1, max_length=10000)
agent_type: str = Field("simple", regex="^(simple|react|multi)$")
user_id: str = Field(..., min_length=1, max_length=100)
session_id: str = Field(None, max_length=100)
parameters: Dict[str, Any] = Field(default_factory=dict)
class AgentResponse(BaseModel):
response: str
agent_type: str
processing_time: float
session_id: str
metadata: Dict[str, Any] = Field(default_factory=dict)
class HealthCheck(BaseModel):
status: str
timestamp: float
version: str
agents_loaded: int
uptime: float
# Global variables for health tracking
start_time = time.time()
async def initialize_agents():
"""Initialize all agent instances."""
global agents
try:
# Basic configuration
basic_config = AugLLMConfig(
provider="openai",
model="gpt-3.5-turbo",
temperature=0.7
)
# Advanced configuration
advanced_config = AugLLMConfig(
provider="openai",
model="gpt-4",
temperature=0.5
)
# Create agent instances
agents = {
"simple": SimpleAgent(
name="production_simple",
engine=basic_config
),
"react": ReactAgent(
name="production_react",
engine=advanced_config,
tools=[] # Add your tools here
),
"multi": MultiAgent(
name="production_multi",
agents=[
SimpleAgent(name="coordinator", engine=basic_config),
SimpleAgent(name="processor", engine=basic_config)
],
execution_mode="sequential"
)
}
logger.info(f"Initialized {len(agents)} agents")
except Exception as e:
logger.error(f"Failed to initialize agents: {e}")
raise
async def cleanup_agents():
"""Clean up agent resources."""
global agents
for name, agent in agents.items():
try:
# Cleanup agent resources if needed
if hasattr(agent, 'cleanup'):
await agent.cleanup()
logger.info(f"Cleaned up agent: {name}")
except Exception as e:
logger.error(f"Error cleaning up agent {name}: {e}")
@app.get("/health", response_model=HealthCheck)
async def health_check():
"""Health check endpoint."""
return HealthCheck(
status="healthy",
timestamp=time.time(),
version="1.0.0",
agents_loaded=len(agents),
uptime=time.time() - start_time
)
@app.post("/chat", response_model=AgentResponse)
async def chat_with_agent(
request: AgentRequest,
background_tasks: BackgroundTasks,
current_user: str = Depends(get_current_user)
):
"""Main chat endpoint."""
start_time = time.time()
try:
# Rate limiting
if not await rate_limiter.check_rate_limit(request.user_id):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# Get agent
agent = agents.get(request.agent_type)
if not agent:
raise HTTPException(status_code=400, detail="Invalid agent type")
# Process request
response = await agent.arun(request.message)
# Calculate processing time
processing_time = time.time() - start_time
# Background tasks
background_tasks.add_task(
track_request,
user_id=request.user_id,
agent_type=request.agent_type,
processing_time=processing_time,
success=True
)
return AgentResponse(
response=response,
agent_type=request.agent_type,
processing_time=processing_time,
session_id=request.session_id or "default",
metadata={
"model": agent.engine.model,
"temperature": agent.engine.temperature
}
)
except HTTPException:
raise
except Exception as e:
# Log error
logger.error(f"Chat error: {e}", exc_info=True)
# Track failed request
background_tasks.add_task(
track_request,
user_id=request.user_id,
agent_type=request.agent_type,
processing_time=time.time() - start_time,
success=False,
error=str(e)
)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/agents")
async def list_agents():
"""List available agents."""
return {
"agents": list(agents.keys()),
"total": len(agents)
}
@app.get("/metrics")
async def get_metrics():
"""Prometheus metrics endpoint."""
# Return Prometheus metrics
from monitoring import generate_metrics
return generate_metrics()
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=False, # Disable in production
workers=1, # Single worker for shared state
log_level="info"
)
Kubernetes DeploymentΒΆ
Kubernetes ManifestsΒΆ
# namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: haive-agents
---
# configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: haive-config
namespace: haive-agents
data:
LOG_LEVEL: "INFO"
DATABASE_URL: "postgresql://haive:password@postgres:5432/haive_agents"
REDIS_URL: "redis://redis:6379/0"
NEO4J_URI: "neo4j://neo4j:7687"
---
# secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: haive-secrets
namespace: haive-agents
type: Opaque
data:
OPENAI_API_KEY: <base64-encoded-key>
NEO4J_PASSWORD: <base64-encoded-password>
---
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: haive-agents
namespace: haive-agents
labels:
app: haive-agents
spec:
replicas: 3
selector:
matchLabels:
app: haive-agents
template:
metadata:
labels:
app: haive-agents
spec:
containers:
- name: haive-agents
image: haive-agents:latest
ports:
- containerPort: 8000
envFrom:
- configMapRef:
name: haive-config
- secretRef:
name: haive-secrets
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
volumeMounts:
- name: memory-storage
mountPath: /app/memory
volumes:
- name: memory-storage
persistentVolumeClaim:
claimName: memory-pvc
---
# service.yaml
apiVersion: v1
kind: Service
metadata:
name: haive-agents-service
namespace: haive-agents
spec:
selector:
app: haive-agents
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: ClusterIP
---
# ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: haive-agents-ingress
namespace: haive-agents
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt-prod
nginx.ingress.kubernetes.io/rate-limit: "100"
spec:
tls:
- hosts:
- api.haive.example.com
secretName: haive-tls
rules:
- host: api.haive.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: haive-agents-service
port:
number: 80
Horizontal Pod AutoscalerΒΆ
# hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: haive-agents-hpa
namespace: haive-agents
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: haive-agents
minReplicas: 2
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 50
periodSeconds: 60
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
Monitoring and ObservabilityΒΆ
Prometheus MetricsΒΆ
# monitoring.py
import time
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Request
# Metrics
REQUEST_COUNT = Counter(
'haive_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status_code', 'agent_type']
)
REQUEST_DURATION = Histogram(
'haive_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint', 'agent_type']
)
AGENT_PROCESSING_TIME = Histogram(
'haive_agent_processing_seconds',
'Agent processing time',
['agent_type']
)
ACTIVE_AGENTS = Gauge(
'haive_active_agents',
'Number of active agent instances',
['agent_type']
)
MEMORY_USAGE = Gauge(
'haive_memory_usage_bytes',
'Memory usage by component',
['component']
)
ERROR_COUNT = Counter(
'haive_errors_total',
'Total errors',
['error_type', 'agent_type']
)
def setup_metrics(app):
"""Setup metrics collection."""
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
# Record metrics
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status_code=response.status_code,
agent_type=request.headers.get('X-Agent-Type', 'unknown')
).inc()
REQUEST_DURATION.labels(
method=request.method,
endpoint=request.url.path,
agent_type=request.headers.get('X-Agent-Type', 'unknown')
).observe(time.time() - start_time)
return response
async def track_request(user_id: str, agent_type: str, processing_time: float,
success: bool, error: str = None):
"""Track individual request metrics."""
# Record processing time
AGENT_PROCESSING_TIME.labels(agent_type=agent_type).observe(processing_time)
# Record errors
if not success and error:
ERROR_COUNT.labels(
error_type=classify_error(error),
agent_type=agent_type
).inc()
def classify_error(error_msg: str) -> str:
"""Classify error for metrics."""
error_lower = error_msg.lower()
if "rate limit" in error_lower:
return "rate_limit"
elif "timeout" in error_lower:
return "timeout"
elif "validation" in error_lower:
return "validation"
else:
return "other"
def generate_metrics():
"""Generate Prometheus metrics."""
return generate_latest()
Logging ConfigurationΒΆ
# logging_config.py
import logging
import json
import sys
from datetime import datetime
from typing import Dict, Any
class StructuredFormatter(logging.Formatter):
"""JSON structured logging formatter."""
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# Add extra fields
if hasattr(record, 'user_id'):
log_entry['user_id'] = record.user_id
if hasattr(record, 'session_id'):
log_entry['session_id'] = record.session_id
if hasattr(record, 'agent_type'):
log_entry['agent_type'] = record.agent_type
if hasattr(record, 'processing_time'):
log_entry['processing_time'] = record.processing_time
# Add exception info if present
if record.exc_info:
log_entry['exception'] = self.formatException(record.exc_info)
return json.dumps(log_entry)
def setup_logging(log_level: str = "INFO", log_file: str = None):
"""Setup structured logging."""
# Create formatter
formatter = StructuredFormatter()
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
# Configure root logger
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, log_level.upper()))
root_logger.addHandler(console_handler)
# File handler if specified
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
# Suppress noisy loggers
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
# Usage
setup_logging("INFO", "/app/logs/haive-agents.log")
Performance OptimizationΒΆ
Connection PoolingΒΆ
# database.py
import asyncpg
import aioredis
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
class DatabaseManager:
"""Async database connection management."""
def __init__(self):
self.pg_pool = None
self.redis_pool = None
self.engine = None
self.session_factory = None
async def initialize(self):
"""Initialize database connections."""
# PostgreSQL pool
self.pg_pool = await asyncpg.create_pool(
"postgresql://haive:password@postgres:5432/haive_agents",
min_size=5,
max_size=20,
command_timeout=60
)
# Redis pool
self.redis_pool = await aioredis.from_url(
"redis://redis:6379",
max_connections=20,
encoding="utf-8",
decode_responses=True
)
# SQLAlchemy engine
self.engine = create_async_engine(
"postgresql+asyncpg://haive:password@postgres:5432/haive_agents",
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
echo=False
)
self.session_factory = sessionmaker(
self.engine, class_=AsyncSession, expire_on_commit=False
)
async def get_pg_connection(self):
"""Get PostgreSQL connection from pool."""
return await self.pg_pool.acquire()
async def get_redis_connection(self):
"""Get Redis connection from pool."""
return self.redis_pool
async def get_session(self):
"""Get SQLAlchemy session."""
async with self.session_factory() as session:
yield session
async def cleanup(self):
"""Cleanup connections."""
if self.pg_pool:
await self.pg_pool.close()
if self.redis_pool:
await self.redis_pool.close()
if self.engine:
await self.engine.dispose()
# Global database manager
db_manager = DatabaseManager()
Caching StrategyΒΆ
# caching.py
import json
import hashlib
from typing import Any, Optional
import aioredis
from functools import wraps
class CacheManager:
"""Intelligent caching for agent responses."""
def __init__(self, redis_url: str):
self.redis_url = redis_url
self.redis = None
async def initialize(self):
"""Initialize Redis connection."""
self.redis = await aioredis.from_url(self.redis_url)
def cache_key(self, agent_type: str, message: str, **kwargs) -> str:
"""Generate cache key for agent request."""
key_data = {
"agent_type": agent_type,
"message": message,
**kwargs
}
key_str = json.dumps(key_data, sort_keys=True)
return f"agent_cache:{hashlib.md5(key_str.encode()).hexdigest()}"
async def get_cached_response(self, agent_type: str, message: str,
**kwargs) -> Optional[str]:
"""Get cached agent response."""
if not self.redis:
return None
key = self.cache_key(agent_type, message, **kwargs)
cached = await self.redis.get(key)
if cached:
return json.loads(cached)
return None
async def cache_response(self, agent_type: str, message: str,
response: str, ttl: int = 3600, **kwargs):
"""Cache agent response."""
if not self.redis:
return
key = self.cache_key(agent_type, message, **kwargs)
await self.redis.setex(
key,
ttl,
json.dumps({
"response": response,
"timestamp": time.time()
})
)
async def invalidate_pattern(self, pattern: str):
"""Invalidate cache entries matching pattern."""
if not self.redis:
return
keys = await self.redis.keys(f"agent_cache:{pattern}*")
if keys:
await self.redis.delete(*keys)
# Cache decorator
def cached_agent_call(ttl: int = 3600):
def decorator(func):
@wraps(func)
async def wrapper(agent_type: str, message: str, *args, **kwargs):
cache_manager = kwargs.get('cache_manager')
if cache_manager:
# Try cache first
cached = await cache_manager.get_cached_response(
agent_type, message, **kwargs
)
if cached:
return cached['response']
# Execute function
response = await func(agent_type, message, *args, **kwargs)
# Cache result
if cache_manager:
await cache_manager.cache_response(
agent_type, message, response, ttl, **kwargs
)
return response
return wrapper
return decorator
Load TestingΒΆ
Performance Testing ScriptΒΆ
# load_test.py
import asyncio
import aiohttp
import time
import statistics
from typing import List, Dict
async def single_request(session: aiohttp.ClientSession,
url: str, payload: Dict) -> Dict:
"""Make a single request and measure performance."""
start_time = time.time()
try:
async with session.post(url, json=payload) as response:
end_time = time.time()
return {
"status_code": response.status,
"response_time": end_time - start_time,
"success": response.status == 200,
"size": len(await response.text())
}
except Exception as e:
return {
"status_code": 0,
"response_time": time.time() - start_time,
"success": False,
"error": str(e)
}
async def load_test(base_url: str, concurrent_users: int = 10,
requests_per_user: int = 10):
"""Run load test against agent API."""
url = f"{base_url}/chat"
payload = {
"message": "What is artificial intelligence?",
"agent_type": "simple",
"user_id": "load_test_user"
}
async with aiohttp.ClientSession() as session:
# Create tasks for concurrent users
tasks = []
for user_id in range(concurrent_users):
for request_id in range(requests_per_user):
user_payload = payload.copy()
user_payload["user_id"] = f"user_{user_id}"
task = single_request(session, url, user_payload)
tasks.append(task)
# Execute all requests
print(f"Starting load test: {concurrent_users} users, {requests_per_user} requests each")
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
# Analyze results
analyze_results(results, end_time - start_time)
def analyze_results(results: List[Dict], total_time: float):
"""Analyze load test results."""
successful_requests = [r for r in results if r['success']]
failed_requests = [r for r in results if not r['success']]
response_times = [r['response_time'] for r in successful_requests]
print(f"\n=== Load Test Results ===")
print(f"Total requests: {len(results)}")
print(f"Successful: {len(successful_requests)}")
print(f"Failed: {len(failed_requests)}")
print(f"Success rate: {len(successful_requests)/len(results):.1%}")
print(f"Total time: {total_time:.2f}s")
print(f"Requests/second: {len(results)/total_time:.1f}")
if response_times:
print(f"\nResponse Times:")
print(f"Average: {statistics.mean(response_times):.2f}s")
print(f"Median: {statistics.median(response_times):.2f}s")
print(f"95th percentile: {sorted(response_times)[int(len(response_times)*0.95)]:.2f}s")
print(f"Min: {min(response_times):.2f}s")
print(f"Max: {max(response_times):.2f}s")
if failed_requests:
print(f"\nErrors:")
error_types = {}
for req in failed_requests:
error = req.get('error', f"HTTP {req['status_code']}")
error_types[error] = error_types.get(error, 0) + 1
for error, count in error_types.items():
print(f" {error}: {count}")
if __name__ == "__main__":
asyncio.run(load_test("http://localhost:8000", 50, 20))
Security HardeningΒΆ
Security ChecklistΒΆ
Application Security
β Input validation and sanitization
β Output encoding and CSRF protection
β API authentication and authorization
β Rate limiting and DDoS protection
β Secure headers and HTTPS enforcement
β Secret management (not hardcoded)
β Dependency scanning and updates
β Error handling (no sensitive data leakage)
Infrastructure Security
β Container security scanning
β Network segmentation and firewalls
β Database encryption at rest and in transit
β Regular security patches and updates
β Backup encryption and testing
β Access logging and monitoring
β Intrusion detection systems
β Vulnerability assessments
Deployment Security
# Security-hardened Dockerfile
FROM python:3.11-slim
# Security updates
RUN apt-get update && apt-get upgrade -y \
&& apt-get install -y --no-install-recommends \
gcc g++ curl \
&& rm -rf /var/lib/apt/lists/* \
&& apt-get clean
# Create non-root user
RUN groupadd -r appuser \
&& useradd -r -g appuser -d /app -s /bin/false appuser
# Set secure permissions
WORKDIR /app
COPY --chown=appuser:appuser . .
# Install dependencies
RUN pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir -r requirements.txt
# Remove unnecessary packages
RUN apt-get purge -y gcc g++ \
&& apt-get autoremove -y
# Switch to non-root user
USER appuser
# Security headers
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PYTHONPATH=/app
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s \
CMD curl -f http://localhost:8000/health || exit 1
EXPOSE 8000
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Deployment Best PracticesΒΆ
Pre-Deployment Checklist
Code Review: All code changes reviewed and approved
Testing: Unit, integration, and load tests passing
Security: Security scanning and vulnerability assessment
Configuration: Environment-specific configurations validated
Dependencies: All dependencies updated and scanned
Documentation: Deployment and rollback procedures documented
Monitoring: Alerts and monitoring configured
Backup: Database and configuration backups verified
Deployment Process
Blue-Green Deployment: Deploy to staging environment
Smoke Testing: Basic functionality verification
Load Testing: Performance validation under load
Gradual Rollout: Progressive traffic switching
Monitoring: Real-time metrics and alerts monitoring
Rollback Plan: Immediate rollback capability if issues arise
Post-Deployment Verification
Health Checks: All services responding correctly
Performance Metrics: Response times within SLA
Error Rates: Error rates below acceptable thresholds
Resource Usage: CPU, memory, disk usage normal
User Experience: End-to-end functionality working
Alerts: No critical alerts firing
This comprehensive deployment guide ensures your Haive Agents run reliably and securely in production environments, from small-scale deployments to enterprise-grade systems handling thousands of concurrent users.