Technical Implementation

Chatbot Evaluation System - Technical Implementation Details

Core Implementation Architecture

Application Structure (app.py)

Flask Application Configuration:

from flask import Flask, jsonify, request, render_template
from runner import evaluate
from models.qa import preload_qa_models, SUPPORTED_MODELS
import os
import logging

# Flask 3.x compatible application
app = Flask(__name__, template_folder="templates", static_folder="static")

# Global state management for warm-up
_warm_once_done = False
_warm_lock = Lock()

Route Handlers:

  1. Health Check Endpoint:
@app.route("/health", methods=["GET"])
def health():
    return jsonify({"status": "ok"})

@app.route("/health/models", methods=["GET"])
def health_models():
    try:
        from models.qa import qa_pipelines
        return jsonify({
            "supported_models": list(SUPPORTED_MODELS.keys()),
            "loaded_pipelines": list(qa_pipelines.keys()),
            "cache_dir": os.environ.get("HF_HOME")
        })
    except Exception as e:
        return jsonify({"error": str(e)}), 500
  1. Main Interface:
@app.route("/", methods=["GET"])
def index():
    return render_template("index.html", models_meta=SUPPORTED_MODELS)
  1. Evaluation Endpoint:
@app.route("/evaluate", methods=["POST"])
def evaluate_route():
    start = time.perf_counter()

    # Request validation and logging
    payload = request.get_json(force=True)
    logger.info("/evaluate payload: %s", {
        "model_name": payload.get("model_name"),
        "question_preview": payload.get("question", "")[:120]
    })

    # Core evaluation logic
    result = evaluate(payload)

    # Performance tracking
    elapsed_ms = (time.perf_counter() - start) * 1000
    logger.info("/evaluate result: elapsed_ms=%.1f", elapsed_ms)

    return jsonify(result)

Evaluation Engine (runner.py)

Core Evaluation Function:

def generate_and_evaluate(question: str, context: str, hyperparams: Dict[str, Any], model_name: str) -> Dict[str, Any]:
    # 1. Small talk detection and routing
    # 2. Context augmentation with symptom KB
    # 3. Model selection and pipeline retrieval
    # 4. QA inference with error handling
    # 5. Response sanitization
    # 6. Fallback to LLM if QA confidence low
    # 7. Parallel metrics computation
    # 8. Results aggregation and formatting

Small Talk Detection:

SMALL_TALK_REPLIES = {
    "hi": "Hello! How can I help you today?",
    "hello": "Hi there! What can I do for you?",
    "how are you": "I'm just a bot, but I'm doing great!",
    "thanks": "You're welcome!"
}

def handle_small_talk(question: str) -> Optional[str]:
    ql = question.strip().lower()
    if ql in SMALL_TALK_REPLIES or any(ql.startswith(k) for k in ("hi", "hello", "hey")):
        return SMALL_TALK_REPLIES.get(ql, SMALL_TALK_REPLIES["hi"])
    return None

Context Augmentation:

def augment_context(question: str, base_context: str) -> str:
    aug_context = base_context
    ql = question.lower()

    # Check for symptom-related questions
    for symptom, info in SYMPTOM_KB.items():
        if symptom in ql:
            aug_context = f"{base_context}\n\n{info}"
            break

    return aug_context

Model Provider Integration

Hugging Face Integration (models/qa.py)

Model Registry:

SUPPORTED_MODELS = {
    # Extractive QA models
    "DistilBERT QA": {
        "provider": "hf_qa",
        "model": "distilbert-base-uncased-distilled-squad",
        "provider_name": "Hugging Face",
        "hint": "HF free; extractive QA; fast on CPU"
    },

    # API-based models
    "OpenAI GPT-4o-mini": {
        "provider": "openai",
        "model": "gpt-4o-mini",
        "provider_name": "OpenAI",
        "hint": "Requires OPENAI API key"
    },

    # Local chat models
    "TinyLlama 1.1B Chat": {
        "provider": "hf_chat",
        "model": "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
        "provider_name": "TinyLlama (HF)",
        "hint": "HF free; very small; fast on CPU"
    }
}

Provider-Specific Wrappers:

  1. Extractive QA Wrapper:
def _build_hf_qa_wrapper(model_name: str) -> Callable:
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForQuestionAnswering.from_pretrained(model_name)
    pipeline = pipeline("question-answering", model=model, tokenizer=tokenizer)

    def wrapper(**kwargs):
        result = pipeline(
            question=kwargs.get("question"),
            context=kwargs.get("context"),
            top_k=kwargs.get("top_k", 1),
            handle_impossible_answer=True
        )
        return result
    return wrapper
  1. OpenAI Wrapper:
def _build_openai_wrapper(model_name: str) -> Callable:
    client = OpenAI(api_key=get_api_key("openai"))

    def wrapper(**kwargs):
        response = client.chat.completions.create(
            model=model_name,
            messages=[
                {"role": "system", "content": MEDICAL_SYSTEM_PROMPT},
                {"role": "user", "content": f"Context: {kwargs['context']}\nQuestion: {kwargs['question']}"}
            ],
            temperature=0.6,
            max_tokens=256
        )
        return {
            "answer": response.choices[0].message.content,
            "score": 0.99
        }
    return wrapper
  1. Local Chat Wrapper:
def _build_hf_chat_wrapper(model_name: str) -> Callable:
    def wrapper(**kwargs):
        # Lazy loading for memory efficiency
        model, tokenizer, device = load_transformer_model_for_generation(model_name)

        prompt = f"Context: {kwargs['context']}\nQuestion: {kwargs['question']}\nAnswer:"
        inputs = tokenizer(prompt, return_tensors="pt").to(device)

        outputs = model.generate(
            **inputs,
            max_new_tokens=kwargs.get("max_new_tokens", 96),
            temperature=kwargs.get("temperature", 0.6),
            do_sample=True
        )

        response = tokenizer.decode(outputs[0], skip_special_tokens=True)
        answer = response.split("Answer:")[1].strip()

        return {"answer": answer, "score": 0.99}
    return wrapper

Model Loading Strategy (models/loader.py)

Lazy Loading Implementation:

def load_transformer_model_for_generation(model_name: str, device: str = "cpu"):
    if model_name not in _loaded_models:
        print(f"Loading model {model_name}...")

        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=torch.float16 if device == "cuda" else torch.float32,
            low_cpu_mem_usage=True
        )

        # Store loaded model
        _loaded_models[model_name] = (model, tokenizer, device)

    return _loaded_models[model_name]

Device Management:

def set_device_policy(policy: str):
    global _device_policy
    _device_policy = policy

def get_device() -> str:
    if _device_policy == "gpu" and torch.cuda.is_available():
        return "cuda"
    return "cpu"

Metrics Computation Engine

Metrics Registry (metrics/__init__.py)

Available Metrics:

METRIC_REGISTRY = {
    "bleu": "metrics.ngram",
    "rouge_l": "metrics.ngram",
    "sem_sim": "metrics.semsim",
    "diversity": "metrics.diversity",
    "bert_score": "metrics.bert_score",
    "bleurt": "metrics.bleurt",
    "comet": "metrics.comet",
    "toxicity": "metrics.toxicity",
    "latency": "metrics.latency"
}

Core Metrics Implementation

  1. BLEU/ROUGE (metrics/ngram.py):
def compute_ngram_metrics(predictions: List[str], references: List[List[str]], options: Dict) -> Dict:
    # Compute BLEU scores
    bleu_scores = []
    for pred, refs in zip(predictions, references):
        bleu = sacrebleu.corpus_bleu([pred], [refs])
        bleu_scores.append(bleu.score)

    # Compute ROUGE-L scores
    rouge_scores = []
    for pred, refs in zip(predictions, references):
        rouge = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
        scores = rouge.score(pred, refs[0])
        rouge_scores.append(scores['rougeL'].fmeasure)

    return {
        "aggregate": {
            "BLEU": sum(bleu_scores) / len(bleu_scores),
            "ROUGE_L": sum(rouge_scores) / len(rouge_scores)
        },
        "per_item": [{"BLEU": bleu, "ROUGE_L": rouge} for bleu, rouge in zip(bleu_scores, rouge_scores)]
    }
  1. Semantic Similarity (metrics/semsim.py):
def compute_semantic_similarity(predictions: List[str], references: List[List[str]], options: Dict) -> Dict:
    # Load sentence transformer model (cached)
    model = SentenceTransformer('sentence-transformers/all-mpnet-base-v2')

    # Compute embeddings
    pred_embeddings = model.encode(predictions)
    ref_embeddings = model.encode([ref[0] for ref in references])

    # Calculate cosine similarities
    similarities = []
    for pred_emb, ref_emb in zip(pred_embeddings, ref_embeddings):
        similarity = cosine_similarity([pred_emb], [ref_emb])[0][0]
        similarities.append(similarity)

    return {
        "aggregate": {"SemSim": sum(similarities) / len(similarities)},
        "per_item": [{"SemSim": sim} for sim in similarities]
    }
  1. BERTScore (metrics/bert_score.py):
def compute_bert_score(predictions: List[str], references: List[List[str]], options: Dict) -> Dict:
    # Use smaller model for efficiency
    model_type = options.get("model_type", "microsoft/deberta-base-mnli")

    # Compute BERTScore
    P, R, F1 = bertscore.score(
        predictions,
        [ref[0] for ref in references],
        model_type=model_type,
        device=options.get("device", "cpu"),
        batch_size=options.get("batch_size", 1)
    )

    return {
        "aggregate": {"BERTScore_F1": F1.mean().item()},
        "per_item": [{"BERTScore_F1": f1.item()} for f1 in F1]
    }

Heavy Metrics (Optional)

BLEURT (metrics/bleurt.py):

def compute_bleurt(predictions: List[str], references: List[List[str]], options: Dict) -> Dict:
    # Load BLEURT model (large download)
    bleurt = evaluate.load("bleurt", module_type="metric")

    # Compute scores
    scores = []
    for pred, refs in zip(predictions, references):
        score = bleurt.compute(predictions=[pred], references=[refs[0]])
        scores.append(score["scores"][0])

    return {
        "aggregate": {"BLEURT": sum(scores) / len(scores)},
        "per_item": [{"BLEURT": score} for score in scores]
    }

Configuration Management

Environment Configuration (utils/config.py)

API Key Management:

def get_api_key(provider: str) -> Optional[str]:
    # Check environment variables first
    env_key = os.getenv(f"{provider.upper()}_API_KEY")
    if env_key:
        return env_key

    # Check config file
    config_path = os.path.join(os.getcwd(), "config.json")
    if os.path.exists(config_path):
        with open(config_path, 'r') as f:
            config = json.load(f)
            return config.get(f"{provider}_api_key")

    return None

def get_openrouter_headers() -> Optional[Dict[str, str]]:
    # Additional headers for OpenRouter requests
    return {
        "HTTP-Referer": os.getenv("OPENROUTER_SITE_URL", ""),
        "X-Title": os.getenv("OPENROUTER_SITE_NAME", "")
    }

Configuration Validation

Startup Validation:

def validate_configuration():
    # Check required directories
    cache_dir = os.path.join(os.getcwd(), "hf_cache")
    os.makedirs(cache_dir, exist_ok=True)

    # Set environment defaults
    os.environ.setdefault("HF_HOME", cache_dir)
    os.environ.setdefault("TRANSFORMERS_CACHE", cache_dir)

    # Validate API keys for registered models
    missing_keys = []
    for model_name, config in SUPPORTED_MODELS.items():
        provider = config.get("provider")
        if provider in ["openai", "gemini", "openrouter"]:
            if not get_api_key(provider):
                missing_keys.append(f"{provider.upper()}_API_KEY for {model_name}")

    if missing_keys:
        print(f"Warning: Missing API keys: {missing_keys}")

Error Handling and Fallbacks

Multi-Level Error Handling

  1. Model-Level Fallbacks:
def get_qa_pipeline_with_fallback(model_name: str):
    # Try primary model
    pipeline = get_qa_pipeline(model_name)
    if pipeline:
        return pipeline

    # Try to reload models
    try:
        preload_qa_models()
        pipeline = get_qa_pipeline(model_name)
        if pipeline:
            return pipeline
    except Exception as e:
        logger.warning(f"Preload retry failed: {e}")

    # Fallback to any available model
    from models.qa import qa_pipelines
    if qa_pipelines:
        fallback_name, pipeline = next(iter(qa_pipelines.items()))
        logger.warning(f"Falling back to {fallback_name}")
        return pipeline

    return None
  1. LLM Fallback for Low-Confidence QA:
def qa_with_llm_fallback(question, context, qa_result):
    # Check if QA confidence is too low
    if (qa_result.get("answer") == "No answer found." or
        qa_result.get("score", 0) < 0.05):

        # Use local LLM as fallback
        model, tokenizer, device = load_transformer_model_for_generation("distilgpt2")

        prompt = f"Context: {context}\nQuestion: {question}\nAnswer:"
        inputs = tokenizer(prompt, return_tensors="pt").to(device)

        outputs = model.generate(
            **inputs,
            max_new_tokens=48,
            temperature=0.6,
            do_sample=True
        )

        response = tokenizer.decode(outputs[0], skip_special_tokens=True)
        answer = response.split("Answer:")[1].strip()

        return {"answer": answer, "fallback_used": True}

    return qa_result

Comprehensive Error Reporting

Structured Error Responses:

def format_error_response(error: Exception, context: Dict) -> Dict[str, Any]:
    return {
        "error": str(error),
        "error_type": type(error).__name__,
        "context": context,
        "timestamp": time.time(),
        "suggestions": generate_error_suggestions(error, context)
    }

Performance Optimizations

Model Preloading Strategy

Startup Preloading (app.py):

def _maybe_warm_once():
    global _warm_once_done
    if _warm_once_done:
        return

    with _warm_lock:
        if _warm_once_done:
            return

        try:
            # Set cache defaults
            _set_cache_env_defaults()

            # Preload QA models
            preload_qa_models()

            # Warm core caches
            _warm_core_caches()

            _warm_once_done = True
            logger.info("Warm-up complete")
        except Exception as e:
            logger.warning(f"Warm-up failed: {e}")

Selective Warming:

def _warm_core_caches():
    # Always warm sentence transformers
    from models.loader import load_sentence_transformer
    load_sentence_transformer("sentence-transformers/all-mpnet-base-v2")

    # Optionally warm heavy metrics based on environment flags
    if os.getenv("PRELOAD_OPTIONAL_METRICS") == "1":
        _warm_bert_score()

    if os.getenv("PRELOAD_HEAVY_METRICS") == "1":
        _warm_heavy_metrics()

Caching Strategy

Hugging Face Cache Management:

def _set_cache_env_defaults():
    default_cache = os.path.join(os.getcwd(), "hf_cache")
    os.makedirs(default_cache, exist_ok=True)

    # Set environment variables for consistent caching
    os.environ.setdefault("HF_HOME", default_cache)
    os.environ.setdefault("TRANSFORMERS_CACHE", default_cache)
    os.environ.setdefault("HF_DATASETS_CACHE", default_cache)

In-Memory Caching:

# Cache for frequently used components
_model_cache = {}
_tokenizer_cache = {}

def get_cached_model(model_name: str):
    if model_name not in _model_cache:
        _model_cache[model_name] = load_model(model_name)
    return _model_cache[model_name]

Monitoring and Logging

Structured Logging

Request Tracing:

def log_evaluation_request(payload: Dict, start_time: float):
    logger.info("/evaluate payload: %s", {
        "model_name": payload.get("model_name"),
        "question_preview": payload.get("question", "")[:120],
        "hyperparams": payload.get("hyperparams", {}),
        "timestamp": time.time()
    })

def log_evaluation_result(result: Dict, elapsed_ms: float):
    logger.info("/evaluate result: %s", {
        "keys": list(result.keys()),
        "model_response_len": len(result.get("model_response", "")),
        "elapsed_ms": elapsed_ms,
        "timestamp": time.time()
    })

Performance Monitoring:

def track_performance_metrics():
    # Track model loading times
    # Track inference latency
    # Track memory usage
    # Track cache hit rates
    pass

This comprehensive technical implementation provides a robust, scalable foundation for the chatbot evaluation system with proper error handling, performance optimization, and monitoring capabilities.