Deployment Guide

Chatbot Evaluation System - Deployment Guide and Performance Optimization

Deployment Strategies

1. Development Deployment

Local Development Setup

# 1. Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# 2. Install dependencies
pip install -r requirements.txt

# 3. Configure API keys (choose one method)

# Method A: Environment variables (recommended)
export OPENAI_API_KEY="sk-your-key"
export GEMINI_API_KEY="your-gemini-key"
export OPENROUTER_API_KEY="sk-or-your-key"

# Method B: Config file
cp config.example.json config.json
# Edit config.json with your API keys

# 4. Run development server
python app.py
# Application available at http://localhost:5000

Development Configuration

# app.py development settings
if __name__ == "__main__":
    _set_cache_env_defaults()
    preload_qa_models()
    _warm_core_caches()

    # Development server configuration
    app.run(
        host="0.0.0.0",
        port=5000,
        debug=True,
        use_reloader=False  # Disable for model caching
    )

2. Production Deployment

Docker Deployment

# Dockerfile
FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .
RUN mkdir -p hf_cache

# Set cache directory
ENV HF_HOME=/app/hf_cache
ENV TRANSFORMERS_CACHE=/app/hf_cache

# Expose port
EXPOSE 5000

# Run application
CMD ["python", "app.py"]
# docker-compose.yml
version: '3.8'
services:
  chatbot-eval:
    build: .
    ports:
      - "5000:5000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - GEMINI_API_KEY=${GEMINI_API_KEY}
    volumes:
      - ./hf_cache:/app/hf_cache
    restart: unless-stopped

Kubernetes Deployment

# deployment.yml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: chatbot-evaluation
spec:
  replicas: 3
  selector:
    matchLabels:
      app: chatbot-evaluation
  template:
    metadata:
      labels:
        app: chatbot-evaluation
    spec:
      containers:
      - name: chatbot-evaluation
        image: chatbot-evaluation:latest
        ports:
        - containerPort: 5000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: api-keys
              key: openai
        resources:
          requests:
            memory: "4Gi"
            cpu: "1000m"
          limits:
            memory: "8Gi"
            cpu: "2000m"

3. Cloud Deployment Options

AWS Deployment

# Using Elastic Beanstalk
eb init chatbot-evaluation
eb create production-env

# Using ECS Fargate
aws ecs create-cluster --cluster-name chatbot-eval-cluster
aws ecs register-task-definition --family chatbot-eval --task-role chatbot-eval-role
aws ecs create-service --cluster chatbot-eval-cluster --service-name chatbot-eval-service

Google Cloud Deployment

# Using Cloud Run
gcloud run deploy chatbot-evaluation \
  --source . \
  --platform managed \
  --region us-central1 \
  --allow-unauthenticated

# Using GKE
gcloud container clusters create chatbot-eval-cluster
kubectl apply -f kubernetes/

Azure Deployment

# Using Container Instances
az container create \
  --resource-group chatbot-eval-rg \
  --name chatbot-evaluation \
  --image chatbot-evaluation:latest \
  --ports 5000 \
  --environment-variables OPENAI_API_KEY=$OPENAI_API_KEY

Performance Optimization

1. Model Loading Optimization

Preloading Strategy

# app.py - Model preloading on startup
@app.before_request
def _maybe_warm_once():
    global _warm_once_done
    if not _warm_once_done:
        with _warm_lock:
            _set_cache_env_defaults()
            preload_qa_models()                    # Load QA models
            _warm_core_caches()                    # Warm sentence transformers
            _warm_once_done = True

Selective Warming Based on Environment

def _warm_core_caches():
    # Always warm essential components
    from models.loader import load_sentence_transformer
    load_sentence_transformer("sentence-transformers/all-mpnet-base-v2")

    # Optional: warm BERTScore (medium size)
    if os.getenv("PRELOAD_OPTIONAL_METRICS") == "1":
        _warm_bert_score()

    # Optional: warm heavy metrics (large models)
    if os.getenv("PRELOAD_HEAVY_METRICS") == "1":
        _warm_heavy_metrics()

def _warm_bert_score():
    import bert_score
    bert_score.score(
        cands=["test"], refs=[["test"]],
        model_type="microsoft/deberta-base-mnli",
        verbose=False, device="cpu", batch_size=1
    )

def _warm_heavy_metrics():
    # Warm Detoxify
    from detoxify import Detoxify
    Detoxify("original")

    # Warm BLEURT
    import evaluate
    evaluate.load("bleurt", module_type="metric")

    # Warm COMET
    evaluate.load("comet", module_type="metric")

2. Memory Optimization

Model Quantization

# 8-bit quantization for memory reduction
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    load_in_8bit=True,
    device_map="auto"
)

# Benefits:
# - 50-60% memory reduction
# - Minimal quality loss (1-2%)
# - Faster loading times

KV-Cache Optimization

class OptimizedGeneration:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
        self.kv_cache = {}

    def generate_with_cache(self, prompt, max_tokens=100):
        # Encode prompt once and cache
        inputs = self.tokenizer(prompt, return_tensors="pt")

        # Generate with persistent KV cache
        for _ in range(max_tokens):
            outputs = self.model(
                inputs,
                past_key_values=self.kv_cache,
                use_cache=True
            )
            next_token = outputs.logits[:, -1:].argmax(dim=-1)
            inputs = next_token
            self.kv_cache = outputs.past_key_values

        return self.tokenizer.decode(inputs[0])

Memory-Efficient Batching

def batch_evaluate(items, batch_size=4):
    results = []
    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]

        # Process batch together
        batch_results = evaluate_batch(batch)

        results.extend(batch_results)

        # Clear cache between batches
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

    return results

3. Inference Speed Optimization

Async Processing

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def evaluate_async(question, context, model_name):
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as executor:
        result = await loop.run_in_executor(
            executor,
            evaluate_sync,
            question, context, model_name
        )
    return result

def evaluate_sync(question, context, model_name):
    # Synchronous evaluation logic
    return generate_and_evaluate(question, context, {}, model_name)

Parallel Metrics Computation

def compute_metrics_parallel(response_text, question, options):
    metrics_to_compute = ["bleu", "rouge_l", "sem_sim"]

    # Add optional metrics based on options
    if options.get("enable_bert_score"):
        metrics_to_compute.append("bert_score")

    # Compute metrics in parallel
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = {
            executor.submit(compute_single_metric, metric, response_text, question): metric
            for metric in metrics_to_compute
        }

        results = {}
        for future in concurrent.futures.as_completed(futures):
            metric_name = futures[future]
            results[metric_name] = future.result()

    return results

4. Caching Strategy

Hugging Face Cache Management

def _set_cache_env_defaults():
    # Centralized cache directory
    cache_dir = os.path.join(os.getcwd(), "hf_cache")
    os.makedirs(cache_dir, exist_ok=True)

    # Set environment variables
    os.environ.setdefault("HF_HOME", cache_dir)
    os.environ.setdefault("TRANSFORMERS_CACHE", cache_dir)
    os.environ.setdefault("HF_DATASETS_CACHE", cache_dir)

    # Configure offline mode if cache exists
    if os.path.exists(cache_dir):
        os.environ.setdefault("TRANSFORMERS_OFFLINE", "1")

In-Memory Model Caching

class ModelCache:
    def __init__(self, max_models=5):
        self.cache = {}
        self.max_models = max_models

    def get_model(self, model_name):
        if model_name in self.cache:
            return self.cache[model_name]

        # Load model if not cached
        model = load_model(model_name)

        # Implement LRU eviction if needed
        if len(self.cache) >= self.max_models:
            oldest_model = min(self.cache.keys(), key=lambda k: self.cache[k]['last_used'])
            del self.cache[oldest_model]

        self.cache[model_name] = {
            'model': model,
            'last_used': time.time()
        }

        return model

5. Network Optimization

API Rate Limit Management

class RateLimitManager:
    def __init__(self):
        self.request_times = defaultdict(list)
        self.limits = {
            'openai': 60,      # requests per minute
            'gemini': 60,
            'openrouter': 120
        }

    def can_make_request(self, provider):
        now = time.time()
        limit = self.limits.get(provider, 60)

        # Clean old requests (outside 1-minute window)
        self.request_times[provider] = [
            t for t in self.request_times[provider]
            if now - t < 60
        ]

        # Check if under limit
        return len(self.request_times[provider]) < limit

    def record_request(self, provider):
        self.request_times[provider].append(time.time())

Request Batching for API Models

class BatchedAPIClient:
    def __init__(self, provider, batch_size=10, max_wait=5.0):
        self.provider = provider
        self.batch_size = batch_size
        self.max_wait = max_wait
        self.pending_requests = []
        self.last_flush = time.time()

    async def make_request(self, messages, **kwargs):
        # Add to pending batch
        future = asyncio.Future()
        self.pending_requests.append((messages, kwargs, future))

        # Flush batch if full or time exceeded
        if (len(self.pending_requests) >= self.batch_size or
            time.time() - self.last_flush >= self.max_wait):
            await self._flush_batch()

        return await future

    async def _flush_batch(self):
        if not self.pending_requests:
            return

        # Group requests by model and parameters
        batch_data = self._group_requests()

        # Make API calls in parallel
        responses = await self._make_batch_api_calls(batch_data)

        # Distribute responses to futures
        self._distribute_responses(responses)

        self.pending_requests = []
        self.last_flush = time.time()

Monitoring and Observability

1. Health Check Endpoints

System Health

@app.route("/health", methods=["GET"])
def health():
    return jsonify({
        "status": "ok",
        "timestamp": time.time(),
        "version": "1.0.0"
    })

Model Health

@app.route("/health/models", methods=["GET"])
def health_models():
    try:
        from models.qa import qa_pipelines
        return jsonify({
            "supported_models": list(SUPPORTED_MODELS.keys()),
            "loaded_pipelines": list(qa_pipelines.keys()),
            "cache_dir": os.environ.get("HF_HOME"),
            "memory_usage": get_memory_usage(),
            "cache_stats": get_cache_stats()
        })
    except Exception as e:
        return jsonify({"error": str(e)}), 500

2. Performance Monitoring

Metrics Collection

def collect_performance_metrics():
    metrics = {
        "model_loading_time": measure_model_loading_time(),
        "inference_latency": measure_inference_latency(),
        "memory_usage": get_memory_usage(),
        "cache_hit_rate": calculate_cache_hit_rate(),
        "api_rate_limits": get_api_rate_limit_status()
    }

    # Send to monitoring system
    send_metrics_to_monitoring(metrics)

Logging Configuration

import logging.config

LOGGING_CONFIG = {
    'version': 1,
    'formatters': {
        'detailed': {
            'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        },
        'json': {
            'format': '{"timestamp": "%(asctime)s", "level": "%(levelname)s", "module": "%(name)s", "message": "%(message)s"}'
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'detailed',
            'level': 'INFO'
        },
        'file': {
            'class': 'logging.FileHandler',
            'filename': 'chatbot_eval.log',
            'formatter': 'json',
            'level': 'DEBUG'
        }
    },
    'loggers': {
        '': {
            'handlers': ['console', 'file'],
            'level': 'DEBUG'
        }
    }
}

logging.config.dictConfig(LOGGING_CONFIG)

3. Error Tracking and Alerting

Error Classification

def classify_error(error: Exception, context: Dict) -> str:
    error_type = type(error).__name__

    if "API" in error_type or "Connection" in error_type:
        return "API_ERROR"
    elif "CUDA" in error_type or "Memory" in error_type:
        return "RESOURCE_ERROR"
    elif "Model" in error_type or "Pipeline" in error_type:
        return "MODEL_ERROR"
    else:
        return "UNKNOWN_ERROR"

Alert Configuration

ALERT_RULES = {
    "error_rate": {
        "threshold": 0.05,  # 5% error rate
        "window": 300,      # 5 minutes
        "severity": "HIGH"
    },
    "latency": {
        "threshold": 10.0,  # 10 seconds
        "window": 60,       # 1 minute
        "severity": "MEDIUM"
    },
    "memory_usage": {
        "threshold": 0.9,   # 90% memory usage
        "window": 300,      # 5 minutes
        "severity": "HIGH"
    }
}

Scaling Strategies

1. Horizontal Scaling

Load Balancer Configuration

# nginx.conf
upstream chatbot_eval_backend {
    server 127.0.0.1:5001;
    server 127.0.0.1:5002;
    server 127.0.0.1:5003;
}

server {
    listen 80;
    location / {
        proxy_pass http://chatbot_eval_backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
    }
}

Session Management

# Stateless application design
# No server-side sessions
# All state managed in request/response
# Horizontal scaling ready

2. Vertical Scaling

Resource Optimization

# GPU memory management
def optimize_gpu_memory():
    # Clear unused tensors
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

    # Use mixed precision
    from torch.cuda.amp import autocast
    with autocast():
        # Model inference
        pass

Model Sharding

# Distribute model across multiple GPUs
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    device_map="auto"  # Automatic sharding
)

Security Considerations

1. API Key Security

Secure Key Storage

# Use environment variables or secret management
def get_secure_api_key(provider: str) -> str:
    # Method 1: Environment variables
    key = os.getenv(f"{provider.upper()}_API_KEY")
    if key:
        return key

    # Method 2: Kubernetes secrets
    secret_path = f"/run/secrets/{provider}_api_key"
    if os.path.exists(secret_path):
        with open(secret_path, 'r') as f:
            return f.read().strip()

    raise ValueError(f"No API key found for {provider}")

Request Security

# Validate requests
def validate_request(request_data: Dict) -> bool:
    # Check for required fields
    required = ["question", "model_name"]
    for field in required:
        if field not in request_data:
            return False

    # Validate field types and lengths
    if not isinstance(request_data["question"], str):
        return False
    if len(request_data["question"]) > 10000:  # Max length
        return False

    return True

2. Content Security

Response Sanitization

def sanitize_response(response: str) -> str:
    # Remove potentially harmful content
    # Check length limits
    # Validate against safety guidelines
    # Apply medical disclaimers

    return sanitized_response

Input Validation

def validate_input(question: str, context: str) -> Tuple[bool, str]:
    # Check for malicious content
    if contains_malicious_patterns(question):
        return False, "Invalid input detected"

    # Check length constraints
    if len(question) > 1000 or len(context) > 10000:
        return False, "Input too long"

    # Check for appropriate content
    if contains_inappropriate_content(question):
        return False, "Inappropriate content detected"

    return True, "Valid"

Backup and Recovery

1. Model Cache Backup

# Backup model cache
tar -czf hf_cache_backup_$(date +%Y%m%d).tar.gz hf_cache/

# Restore model cache
tar -xzf hf_cache_backup_20231201.tar.gz

2. Configuration Backup

# Backup configuration
cp config.json config_backup_$(date +%Y%m%d_%H%M%S).json

3. Log Rotation

# Log rotation configuration
import logging.handlers

handler = logging.handlers.RotatingFileHandler(
    'chatbot_eval.log',
    maxBytes=100*1024*1024,  # 100MB
    backupCount=5
)

Troubleshooting Guide

Common Issues

1. Model Loading Failures

# Clear cache and retry
def troubleshoot_model_loading(model_name: str):
    # Clear Hugging Face cache
    cache_dir = os.environ.get("HF_HOME")
    if os.path.exists(cache_dir):
        shutil.rmtree(cache_dir)

    # Reset environment
    os.environ.pop("HF_HOME", None)
    os.environ.pop("TRANSFORMERS_CACHE", None)

    # Retry loading
    return load_model_with_retry(model_name)

2. Memory Issues

def troubleshoot_memory_issues():
    # Check memory usage
    if torch.cuda.is_available():
        print(f"GPU Memory: {torch.cuda.memory_allocated()/1024**3:.1f}GB")

    # Clear caches
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

    # Reduce batch size
    set_batch_size(get_optimal_batch_size())

3. API Rate Limits

def handle_api_rate_limits(provider: str):
    # Implement exponential backoff
    backoff_times = [1, 2, 4, 8, 16, 32]

    for attempt, backoff in enumerate(backoff_times):
        try:
            return make_api_request(provider)
        except RateLimitError:
            if attempt < len(backoff_times) - 1:
                time.sleep(backoff)
                continue
            else:
                raise

This comprehensive deployment and optimization guide provides a complete roadmap for deploying and maintaining the chatbot evaluation system in production environments while ensuring optimal performance, reliability, and security.