Deployment Guide
Chatbot Evaluation System - Deployment Guide and Performance Optimization
Deployment Strategies
1. Development Deployment
Local Development Setup
# 1. Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# 2. Install dependencies
pip install -r requirements.txt
# 3. Configure API keys (choose one method)
# Method A: Environment variables (recommended)
export OPENAI_API_KEY="sk-your-key"
export GEMINI_API_KEY="your-gemini-key"
export OPENROUTER_API_KEY="sk-or-your-key"
# Method B: Config file
cp config.example.json config.json
# Edit config.json with your API keys
# 4. Run development server
python app.py
# Application available at http://localhost:5000
Development Configuration
# app.py development settings
if __name__ == "__main__":
_set_cache_env_defaults()
preload_qa_models()
_warm_core_caches()
# Development server configuration
app.run(
host="0.0.0.0",
port=5000,
debug=True,
use_reloader=False # Disable for model caching
)
2. Production Deployment
Docker Deployment
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
RUN mkdir -p hf_cache
# Set cache directory
ENV HF_HOME=/app/hf_cache
ENV TRANSFORMERS_CACHE=/app/hf_cache
# Expose port
EXPOSE 5000
# Run application
CMD ["python", "app.py"]
# docker-compose.yml
version: '3.8'
services:
chatbot-eval:
build: .
ports:
- "5000:5000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- GEMINI_API_KEY=${GEMINI_API_KEY}
volumes:
- ./hf_cache:/app/hf_cache
restart: unless-stopped
Kubernetes Deployment
# deployment.yml
apiVersion: apps/v1
kind: Deployment
metadata:
name: chatbot-evaluation
spec:
replicas: 3
selector:
matchLabels:
app: chatbot-evaluation
template:
metadata:
labels:
app: chatbot-evaluation
spec:
containers:
- name: chatbot-evaluation
image: chatbot-evaluation:latest
ports:
- containerPort: 5000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: api-keys
key: openai
resources:
requests:
memory: "4Gi"
cpu: "1000m"
limits:
memory: "8Gi"
cpu: "2000m"
3. Cloud Deployment Options
AWS Deployment
# Using Elastic Beanstalk
eb init chatbot-evaluation
eb create production-env
# Using ECS Fargate
aws ecs create-cluster --cluster-name chatbot-eval-cluster
aws ecs register-task-definition --family chatbot-eval --task-role chatbot-eval-role
aws ecs create-service --cluster chatbot-eval-cluster --service-name chatbot-eval-service
Google Cloud Deployment
# Using Cloud Run
gcloud run deploy chatbot-evaluation \
--source . \
--platform managed \
--region us-central1 \
--allow-unauthenticated
# Using GKE
gcloud container clusters create chatbot-eval-cluster
kubectl apply -f kubernetes/
Azure Deployment
# Using Container Instances
az container create \
--resource-group chatbot-eval-rg \
--name chatbot-evaluation \
--image chatbot-evaluation:latest \
--ports 5000 \
--environment-variables OPENAI_API_KEY=$OPENAI_API_KEY
Performance Optimization
1. Model Loading Optimization
Preloading Strategy
# app.py - Model preloading on startup
@app.before_request
def _maybe_warm_once():
global _warm_once_done
if not _warm_once_done:
with _warm_lock:
_set_cache_env_defaults()
preload_qa_models() # Load QA models
_warm_core_caches() # Warm sentence transformers
_warm_once_done = True
Selective Warming Based on Environment
def _warm_core_caches():
# Always warm essential components
from models.loader import load_sentence_transformer
load_sentence_transformer("sentence-transformers/all-mpnet-base-v2")
# Optional: warm BERTScore (medium size)
if os.getenv("PRELOAD_OPTIONAL_METRICS") == "1":
_warm_bert_score()
# Optional: warm heavy metrics (large models)
if os.getenv("PRELOAD_HEAVY_METRICS") == "1":
_warm_heavy_metrics()
def _warm_bert_score():
import bert_score
bert_score.score(
cands=["test"], refs=[["test"]],
model_type="microsoft/deberta-base-mnli",
verbose=False, device="cpu", batch_size=1
)
def _warm_heavy_metrics():
# Warm Detoxify
from detoxify import Detoxify
Detoxify("original")
# Warm BLEURT
import evaluate
evaluate.load("bleurt", module_type="metric")
# Warm COMET
evaluate.load("comet", module_type="metric")
2. Memory Optimization
Model Quantization
# 8-bit quantization for memory reduction
model = AutoModelForCausalLM.from_pretrained(
model_name,
load_in_8bit=True,
device_map="auto"
)
# Benefits:
# - 50-60% memory reduction
# - Minimal quality loss (1-2%)
# - Faster loading times
KV-Cache Optimization
class OptimizedGeneration:
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
self.kv_cache = {}
def generate_with_cache(self, prompt, max_tokens=100):
# Encode prompt once and cache
inputs = self.tokenizer(prompt, return_tensors="pt")
# Generate with persistent KV cache
for _ in range(max_tokens):
outputs = self.model(
inputs,
past_key_values=self.kv_cache,
use_cache=True
)
next_token = outputs.logits[:, -1:].argmax(dim=-1)
inputs = next_token
self.kv_cache = outputs.past_key_values
return self.tokenizer.decode(inputs[0])
Memory-Efficient Batching
def batch_evaluate(items, batch_size=4):
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
# Process batch together
batch_results = evaluate_batch(batch)
results.extend(batch_results)
# Clear cache between batches
if torch.cuda.is_available():
torch.cuda.empty_cache()
return results
3. Inference Speed Optimization
Async Processing
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def evaluate_async(question, context, model_name):
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(
executor,
evaluate_sync,
question, context, model_name
)
return result
def evaluate_sync(question, context, model_name):
# Synchronous evaluation logic
return generate_and_evaluate(question, context, {}, model_name)
Parallel Metrics Computation
def compute_metrics_parallel(response_text, question, options):
metrics_to_compute = ["bleu", "rouge_l", "sem_sim"]
# Add optional metrics based on options
if options.get("enable_bert_score"):
metrics_to_compute.append("bert_score")
# Compute metrics in parallel
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(compute_single_metric, metric, response_text, question): metric
for metric in metrics_to_compute
}
results = {}
for future in concurrent.futures.as_completed(futures):
metric_name = futures[future]
results[metric_name] = future.result()
return results
4. Caching Strategy
Hugging Face Cache Management
def _set_cache_env_defaults():
# Centralized cache directory
cache_dir = os.path.join(os.getcwd(), "hf_cache")
os.makedirs(cache_dir, exist_ok=True)
# Set environment variables
os.environ.setdefault("HF_HOME", cache_dir)
os.environ.setdefault("TRANSFORMERS_CACHE", cache_dir)
os.environ.setdefault("HF_DATASETS_CACHE", cache_dir)
# Configure offline mode if cache exists
if os.path.exists(cache_dir):
os.environ.setdefault("TRANSFORMERS_OFFLINE", "1")
In-Memory Model Caching
class ModelCache:
def __init__(self, max_models=5):
self.cache = {}
self.max_models = max_models
def get_model(self, model_name):
if model_name in self.cache:
return self.cache[model_name]
# Load model if not cached
model = load_model(model_name)
# Implement LRU eviction if needed
if len(self.cache) >= self.max_models:
oldest_model = min(self.cache.keys(), key=lambda k: self.cache[k]['last_used'])
del self.cache[oldest_model]
self.cache[model_name] = {
'model': model,
'last_used': time.time()
}
return model
5. Network Optimization
API Rate Limit Management
class RateLimitManager:
def __init__(self):
self.request_times = defaultdict(list)
self.limits = {
'openai': 60, # requests per minute
'gemini': 60,
'openrouter': 120
}
def can_make_request(self, provider):
now = time.time()
limit = self.limits.get(provider, 60)
# Clean old requests (outside 1-minute window)
self.request_times[provider] = [
t for t in self.request_times[provider]
if now - t < 60
]
# Check if under limit
return len(self.request_times[provider]) < limit
def record_request(self, provider):
self.request_times[provider].append(time.time())
Request Batching for API Models
class BatchedAPIClient:
def __init__(self, provider, batch_size=10, max_wait=5.0):
self.provider = provider
self.batch_size = batch_size
self.max_wait = max_wait
self.pending_requests = []
self.last_flush = time.time()
async def make_request(self, messages, **kwargs):
# Add to pending batch
future = asyncio.Future()
self.pending_requests.append((messages, kwargs, future))
# Flush batch if full or time exceeded
if (len(self.pending_requests) >= self.batch_size or
time.time() - self.last_flush >= self.max_wait):
await self._flush_batch()
return await future
async def _flush_batch(self):
if not self.pending_requests:
return
# Group requests by model and parameters
batch_data = self._group_requests()
# Make API calls in parallel
responses = await self._make_batch_api_calls(batch_data)
# Distribute responses to futures
self._distribute_responses(responses)
self.pending_requests = []
self.last_flush = time.time()
Monitoring and Observability
1. Health Check Endpoints
System Health
@app.route("/health", methods=["GET"])
def health():
return jsonify({
"status": "ok",
"timestamp": time.time(),
"version": "1.0.0"
})
Model Health
@app.route("/health/models", methods=["GET"])
def health_models():
try:
from models.qa import qa_pipelines
return jsonify({
"supported_models": list(SUPPORTED_MODELS.keys()),
"loaded_pipelines": list(qa_pipelines.keys()),
"cache_dir": os.environ.get("HF_HOME"),
"memory_usage": get_memory_usage(),
"cache_stats": get_cache_stats()
})
except Exception as e:
return jsonify({"error": str(e)}), 500
2. Performance Monitoring
Metrics Collection
def collect_performance_metrics():
metrics = {
"model_loading_time": measure_model_loading_time(),
"inference_latency": measure_inference_latency(),
"memory_usage": get_memory_usage(),
"cache_hit_rate": calculate_cache_hit_rate(),
"api_rate_limits": get_api_rate_limit_status()
}
# Send to monitoring system
send_metrics_to_monitoring(metrics)
Logging Configuration
import logging.config
LOGGING_CONFIG = {
'version': 1,
'formatters': {
'detailed': {
'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
},
'json': {
'format': '{"timestamp": "%(asctime)s", "level": "%(levelname)s", "module": "%(name)s", "message": "%(message)s"}'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'detailed',
'level': 'INFO'
},
'file': {
'class': 'logging.FileHandler',
'filename': 'chatbot_eval.log',
'formatter': 'json',
'level': 'DEBUG'
}
},
'loggers': {
'': {
'handlers': ['console', 'file'],
'level': 'DEBUG'
}
}
}
logging.config.dictConfig(LOGGING_CONFIG)
3. Error Tracking and Alerting
Error Classification
def classify_error(error: Exception, context: Dict) -> str:
error_type = type(error).__name__
if "API" in error_type or "Connection" in error_type:
return "API_ERROR"
elif "CUDA" in error_type or "Memory" in error_type:
return "RESOURCE_ERROR"
elif "Model" in error_type or "Pipeline" in error_type:
return "MODEL_ERROR"
else:
return "UNKNOWN_ERROR"
Alert Configuration
ALERT_RULES = {
"error_rate": {
"threshold": 0.05, # 5% error rate
"window": 300, # 5 minutes
"severity": "HIGH"
},
"latency": {
"threshold": 10.0, # 10 seconds
"window": 60, # 1 minute
"severity": "MEDIUM"
},
"memory_usage": {
"threshold": 0.9, # 90% memory usage
"window": 300, # 5 minutes
"severity": "HIGH"
}
}
Scaling Strategies
1. Horizontal Scaling
Load Balancer Configuration
# nginx.conf
upstream chatbot_eval_backend {
server 127.0.0.1:5001;
server 127.0.0.1:5002;
server 127.0.0.1:5003;
}
server {
listen 80;
location / {
proxy_pass http://chatbot_eval_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
Session Management
# Stateless application design
# No server-side sessions
# All state managed in request/response
# Horizontal scaling ready
2. Vertical Scaling
Resource Optimization
# GPU memory management
def optimize_gpu_memory():
# Clear unused tensors
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Use mixed precision
from torch.cuda.amp import autocast
with autocast():
# Model inference
pass
Model Sharding
# Distribute model across multiple GPUs
model = AutoModelForCausalLM.from_pretrained(
model_name,
device_map="auto" # Automatic sharding
)
Security Considerations
1. API Key Security
Secure Key Storage
# Use environment variables or secret management
def get_secure_api_key(provider: str) -> str:
# Method 1: Environment variables
key = os.getenv(f"{provider.upper()}_API_KEY")
if key:
return key
# Method 2: Kubernetes secrets
secret_path = f"/run/secrets/{provider}_api_key"
if os.path.exists(secret_path):
with open(secret_path, 'r') as f:
return f.read().strip()
raise ValueError(f"No API key found for {provider}")
Request Security
# Validate requests
def validate_request(request_data: Dict) -> bool:
# Check for required fields
required = ["question", "model_name"]
for field in required:
if field not in request_data:
return False
# Validate field types and lengths
if not isinstance(request_data["question"], str):
return False
if len(request_data["question"]) > 10000: # Max length
return False
return True
2. Content Security
Response Sanitization
def sanitize_response(response: str) -> str:
# Remove potentially harmful content
# Check length limits
# Validate against safety guidelines
# Apply medical disclaimers
return sanitized_response
Input Validation
def validate_input(question: str, context: str) -> Tuple[bool, str]:
# Check for malicious content
if contains_malicious_patterns(question):
return False, "Invalid input detected"
# Check length constraints
if len(question) > 1000 or len(context) > 10000:
return False, "Input too long"
# Check for appropriate content
if contains_inappropriate_content(question):
return False, "Inappropriate content detected"
return True, "Valid"
Backup and Recovery
1. Model Cache Backup
# Backup model cache
tar -czf hf_cache_backup_$(date +%Y%m%d).tar.gz hf_cache/
# Restore model cache
tar -xzf hf_cache_backup_20231201.tar.gz
2. Configuration Backup
# Backup configuration
cp config.json config_backup_$(date +%Y%m%d_%H%M%S).json
3. Log Rotation
# Log rotation configuration
import logging.handlers
handler = logging.handlers.RotatingFileHandler(
'chatbot_eval.log',
maxBytes=100*1024*1024, # 100MB
backupCount=5
)
Troubleshooting Guide
Common Issues
1. Model Loading Failures
# Clear cache and retry
def troubleshoot_model_loading(model_name: str):
# Clear Hugging Face cache
cache_dir = os.environ.get("HF_HOME")
if os.path.exists(cache_dir):
shutil.rmtree(cache_dir)
# Reset environment
os.environ.pop("HF_HOME", None)
os.environ.pop("TRANSFORMERS_CACHE", None)
# Retry loading
return load_model_with_retry(model_name)
2. Memory Issues
def troubleshoot_memory_issues():
# Check memory usage
if torch.cuda.is_available():
print(f"GPU Memory: {torch.cuda.memory_allocated()/1024**3:.1f}GB")
# Clear caches
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Reduce batch size
set_batch_size(get_optimal_batch_size())
3. API Rate Limits
def handle_api_rate_limits(provider: str):
# Implement exponential backoff
backoff_times = [1, 2, 4, 8, 16, 32]
for attempt, backoff in enumerate(backoff_times):
try:
return make_api_request(provider)
except RateLimitError:
if attempt < len(backoff_times) - 1:
time.sleep(backoff)
continue
else:
raise
This comprehensive deployment and optimization guide provides a complete roadmap for deploying and maintaining the chatbot evaluation system in production environments while ensuring optimal performance, reliability, and security.