Technical Implementation
Chatbot Evaluation System - Technical Implementation Details
Core Implementation Architecture
Application Structure (app.py)
Flask Application Configuration:
from flask import Flask, jsonify, request, render_template
from runner import evaluate
from models.qa import preload_qa_models, SUPPORTED_MODELS
import os
import logging
# Flask 3.x compatible application
app = Flask(__name__, template_folder="templates", static_folder="static")
# Global state management for warm-up
_warm_once_done = False
_warm_lock = Lock()
Route Handlers:
- Health Check Endpoint:
@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "ok"})
@app.route("/health/models", methods=["GET"])
def health_models():
try:
from models.qa import qa_pipelines
return jsonify({
"supported_models": list(SUPPORTED_MODELS.keys()),
"loaded_pipelines": list(qa_pipelines.keys()),
"cache_dir": os.environ.get("HF_HOME")
})
except Exception as e:
return jsonify({"error": str(e)}), 500
- Main Interface:
@app.route("/", methods=["GET"])
def index():
return render_template("index.html", models_meta=SUPPORTED_MODELS)
- Evaluation Endpoint:
@app.route("/evaluate", methods=["POST"])
def evaluate_route():
start = time.perf_counter()
# Request validation and logging
payload = request.get_json(force=True)
logger.info("/evaluate payload: %s", {
"model_name": payload.get("model_name"),
"question_preview": payload.get("question", "")[:120]
})
# Core evaluation logic
result = evaluate(payload)
# Performance tracking
elapsed_ms = (time.perf_counter() - start) * 1000
logger.info("/evaluate result: elapsed_ms=%.1f", elapsed_ms)
return jsonify(result)
Evaluation Engine (runner.py)
Core Evaluation Function:
def generate_and_evaluate(question: str, context: str, hyperparams: Dict[str, Any], model_name: str) -> Dict[str, Any]:
# 1. Small talk detection and routing
# 2. Context augmentation with symptom KB
# 3. Model selection and pipeline retrieval
# 4. QA inference with error handling
# 5. Response sanitization
# 6. Fallback to LLM if QA confidence low
# 7. Parallel metrics computation
# 8. Results aggregation and formatting
Small Talk Detection:
SMALL_TALK_REPLIES = {
"hi": "Hello! How can I help you today?",
"hello": "Hi there! What can I do for you?",
"how are you": "I'm just a bot, but I'm doing great!",
"thanks": "You're welcome!"
}
def handle_small_talk(question: str) -> Optional[str]:
ql = question.strip().lower()
if ql in SMALL_TALK_REPLIES or any(ql.startswith(k) for k in ("hi", "hello", "hey")):
return SMALL_TALK_REPLIES.get(ql, SMALL_TALK_REPLIES["hi"])
return None
Context Augmentation:
def augment_context(question: str, base_context: str) -> str:
aug_context = base_context
ql = question.lower()
# Check for symptom-related questions
for symptom, info in SYMPTOM_KB.items():
if symptom in ql:
aug_context = f"{base_context}\n\n{info}"
break
return aug_context
Model Provider Integration
Hugging Face Integration (models/qa.py)
Model Registry:
SUPPORTED_MODELS = {
# Extractive QA models
"DistilBERT QA": {
"provider": "hf_qa",
"model": "distilbert-base-uncased-distilled-squad",
"provider_name": "Hugging Face",
"hint": "HF free; extractive QA; fast on CPU"
},
# API-based models
"OpenAI GPT-4o-mini": {
"provider": "openai",
"model": "gpt-4o-mini",
"provider_name": "OpenAI",
"hint": "Requires OPENAI API key"
},
# Local chat models
"TinyLlama 1.1B Chat": {
"provider": "hf_chat",
"model": "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
"provider_name": "TinyLlama (HF)",
"hint": "HF free; very small; fast on CPU"
}
}
Provider-Specific Wrappers:
- Extractive QA Wrapper:
def _build_hf_qa_wrapper(model_name: str) -> Callable:
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForQuestionAnswering.from_pretrained(model_name)
pipeline = pipeline("question-answering", model=model, tokenizer=tokenizer)
def wrapper(**kwargs):
result = pipeline(
question=kwargs.get("question"),
context=kwargs.get("context"),
top_k=kwargs.get("top_k", 1),
handle_impossible_answer=True
)
return result
return wrapper
- OpenAI Wrapper:
def _build_openai_wrapper(model_name: str) -> Callable:
client = OpenAI(api_key=get_api_key("openai"))
def wrapper(**kwargs):
response = client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": MEDICAL_SYSTEM_PROMPT},
{"role": "user", "content": f"Context: {kwargs['context']}\nQuestion: {kwargs['question']}"}
],
temperature=0.6,
max_tokens=256
)
return {
"answer": response.choices[0].message.content,
"score": 0.99
}
return wrapper
- Local Chat Wrapper:
def _build_hf_chat_wrapper(model_name: str) -> Callable:
def wrapper(**kwargs):
# Lazy loading for memory efficiency
model, tokenizer, device = load_transformer_model_for_generation(model_name)
prompt = f"Context: {kwargs['context']}\nQuestion: {kwargs['question']}\nAnswer:"
inputs = tokenizer(prompt, return_tensors="pt").to(device)
outputs = model.generate(
**inputs,
max_new_tokens=kwargs.get("max_new_tokens", 96),
temperature=kwargs.get("temperature", 0.6),
do_sample=True
)
response = tokenizer.decode(outputs[0], skip_special_tokens=True)
answer = response.split("Answer:")[1].strip()
return {"answer": answer, "score": 0.99}
return wrapper
Model Loading Strategy (models/loader.py)
Lazy Loading Implementation:
def load_transformer_model_for_generation(model_name: str, device: str = "cpu"):
if model_name not in _loaded_models:
print(f"Loading model {model_name}...")
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16 if device == "cuda" else torch.float32,
low_cpu_mem_usage=True
)
# Store loaded model
_loaded_models[model_name] = (model, tokenizer, device)
return _loaded_models[model_name]
Device Management:
def set_device_policy(policy: str):
global _device_policy
_device_policy = policy
def get_device() -> str:
if _device_policy == "gpu" and torch.cuda.is_available():
return "cuda"
return "cpu"
Metrics Computation Engine
Metrics Registry (metrics/__init__.py)
Available Metrics:
METRIC_REGISTRY = {
"bleu": "metrics.ngram",
"rouge_l": "metrics.ngram",
"sem_sim": "metrics.semsim",
"diversity": "metrics.diversity",
"bert_score": "metrics.bert_score",
"bleurt": "metrics.bleurt",
"comet": "metrics.comet",
"toxicity": "metrics.toxicity",
"latency": "metrics.latency"
}
Core Metrics Implementation
- BLEU/ROUGE (
metrics/ngram.py):
def compute_ngram_metrics(predictions: List[str], references: List[List[str]], options: Dict) -> Dict:
# Compute BLEU scores
bleu_scores = []
for pred, refs in zip(predictions, references):
bleu = sacrebleu.corpus_bleu([pred], [refs])
bleu_scores.append(bleu.score)
# Compute ROUGE-L scores
rouge_scores = []
for pred, refs in zip(predictions, references):
rouge = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
scores = rouge.score(pred, refs[0])
rouge_scores.append(scores['rougeL'].fmeasure)
return {
"aggregate": {
"BLEU": sum(bleu_scores) / len(bleu_scores),
"ROUGE_L": sum(rouge_scores) / len(rouge_scores)
},
"per_item": [{"BLEU": bleu, "ROUGE_L": rouge} for bleu, rouge in zip(bleu_scores, rouge_scores)]
}
- Semantic Similarity (
metrics/semsim.py):
def compute_semantic_similarity(predictions: List[str], references: List[List[str]], options: Dict) -> Dict:
# Load sentence transformer model (cached)
model = SentenceTransformer('sentence-transformers/all-mpnet-base-v2')
# Compute embeddings
pred_embeddings = model.encode(predictions)
ref_embeddings = model.encode([ref[0] for ref in references])
# Calculate cosine similarities
similarities = []
for pred_emb, ref_emb in zip(pred_embeddings, ref_embeddings):
similarity = cosine_similarity([pred_emb], [ref_emb])[0][0]
similarities.append(similarity)
return {
"aggregate": {"SemSim": sum(similarities) / len(similarities)},
"per_item": [{"SemSim": sim} for sim in similarities]
}
- BERTScore (
metrics/bert_score.py):
def compute_bert_score(predictions: List[str], references: List[List[str]], options: Dict) -> Dict:
# Use smaller model for efficiency
model_type = options.get("model_type", "microsoft/deberta-base-mnli")
# Compute BERTScore
P, R, F1 = bertscore.score(
predictions,
[ref[0] for ref in references],
model_type=model_type,
device=options.get("device", "cpu"),
batch_size=options.get("batch_size", 1)
)
return {
"aggregate": {"BERTScore_F1": F1.mean().item()},
"per_item": [{"BERTScore_F1": f1.item()} for f1 in F1]
}
Heavy Metrics (Optional)
BLEURT (metrics/bleurt.py):
def compute_bleurt(predictions: List[str], references: List[List[str]], options: Dict) -> Dict:
# Load BLEURT model (large download)
bleurt = evaluate.load("bleurt", module_type="metric")
# Compute scores
scores = []
for pred, refs in zip(predictions, references):
score = bleurt.compute(predictions=[pred], references=[refs[0]])
scores.append(score["scores"][0])
return {
"aggregate": {"BLEURT": sum(scores) / len(scores)},
"per_item": [{"BLEURT": score} for score in scores]
}
Configuration Management
Environment Configuration (utils/config.py)
API Key Management:
def get_api_key(provider: str) -> Optional[str]:
# Check environment variables first
env_key = os.getenv(f"{provider.upper()}_API_KEY")
if env_key:
return env_key
# Check config file
config_path = os.path.join(os.getcwd(), "config.json")
if os.path.exists(config_path):
with open(config_path, 'r') as f:
config = json.load(f)
return config.get(f"{provider}_api_key")
return None
def get_openrouter_headers() -> Optional[Dict[str, str]]:
# Additional headers for OpenRouter requests
return {
"HTTP-Referer": os.getenv("OPENROUTER_SITE_URL", ""),
"X-Title": os.getenv("OPENROUTER_SITE_NAME", "")
}
Configuration Validation
Startup Validation:
def validate_configuration():
# Check required directories
cache_dir = os.path.join(os.getcwd(), "hf_cache")
os.makedirs(cache_dir, exist_ok=True)
# Set environment defaults
os.environ.setdefault("HF_HOME", cache_dir)
os.environ.setdefault("TRANSFORMERS_CACHE", cache_dir)
# Validate API keys for registered models
missing_keys = []
for model_name, config in SUPPORTED_MODELS.items():
provider = config.get("provider")
if provider in ["openai", "gemini", "openrouter"]:
if not get_api_key(provider):
missing_keys.append(f"{provider.upper()}_API_KEY for {model_name}")
if missing_keys:
print(f"Warning: Missing API keys: {missing_keys}")
Error Handling and Fallbacks
Multi-Level Error Handling
- Model-Level Fallbacks:
def get_qa_pipeline_with_fallback(model_name: str):
# Try primary model
pipeline = get_qa_pipeline(model_name)
if pipeline:
return pipeline
# Try to reload models
try:
preload_qa_models()
pipeline = get_qa_pipeline(model_name)
if pipeline:
return pipeline
except Exception as e:
logger.warning(f"Preload retry failed: {e}")
# Fallback to any available model
from models.qa import qa_pipelines
if qa_pipelines:
fallback_name, pipeline = next(iter(qa_pipelines.items()))
logger.warning(f"Falling back to {fallback_name}")
return pipeline
return None
- LLM Fallback for Low-Confidence QA:
def qa_with_llm_fallback(question, context, qa_result):
# Check if QA confidence is too low
if (qa_result.get("answer") == "No answer found." or
qa_result.get("score", 0) < 0.05):
# Use local LLM as fallback
model, tokenizer, device = load_transformer_model_for_generation("distilgpt2")
prompt = f"Context: {context}\nQuestion: {question}\nAnswer:"
inputs = tokenizer(prompt, return_tensors="pt").to(device)
outputs = model.generate(
**inputs,
max_new_tokens=48,
temperature=0.6,
do_sample=True
)
response = tokenizer.decode(outputs[0], skip_special_tokens=True)
answer = response.split("Answer:")[1].strip()
return {"answer": answer, "fallback_used": True}
return qa_result
Comprehensive Error Reporting
Structured Error Responses:
def format_error_response(error: Exception, context: Dict) -> Dict[str, Any]:
return {
"error": str(error),
"error_type": type(error).__name__,
"context": context,
"timestamp": time.time(),
"suggestions": generate_error_suggestions(error, context)
}
Performance Optimizations
Model Preloading Strategy
Startup Preloading (app.py):
def _maybe_warm_once():
global _warm_once_done
if _warm_once_done:
return
with _warm_lock:
if _warm_once_done:
return
try:
# Set cache defaults
_set_cache_env_defaults()
# Preload QA models
preload_qa_models()
# Warm core caches
_warm_core_caches()
_warm_once_done = True
logger.info("Warm-up complete")
except Exception as e:
logger.warning(f"Warm-up failed: {e}")
Selective Warming:
def _warm_core_caches():
# Always warm sentence transformers
from models.loader import load_sentence_transformer
load_sentence_transformer("sentence-transformers/all-mpnet-base-v2")
# Optionally warm heavy metrics based on environment flags
if os.getenv("PRELOAD_OPTIONAL_METRICS") == "1":
_warm_bert_score()
if os.getenv("PRELOAD_HEAVY_METRICS") == "1":
_warm_heavy_metrics()
Caching Strategy
Hugging Face Cache Management:
def _set_cache_env_defaults():
default_cache = os.path.join(os.getcwd(), "hf_cache")
os.makedirs(default_cache, exist_ok=True)
# Set environment variables for consistent caching
os.environ.setdefault("HF_HOME", default_cache)
os.environ.setdefault("TRANSFORMERS_CACHE", default_cache)
os.environ.setdefault("HF_DATASETS_CACHE", default_cache)
In-Memory Caching:
# Cache for frequently used components
_model_cache = {}
_tokenizer_cache = {}
def get_cached_model(model_name: str):
if model_name not in _model_cache:
_model_cache[model_name] = load_model(model_name)
return _model_cache[model_name]
Monitoring and Logging
Structured Logging
Request Tracing:
def log_evaluation_request(payload: Dict, start_time: float):
logger.info("/evaluate payload: %s", {
"model_name": payload.get("model_name"),
"question_preview": payload.get("question", "")[:120],
"hyperparams": payload.get("hyperparams", {}),
"timestamp": time.time()
})
def log_evaluation_result(result: Dict, elapsed_ms: float):
logger.info("/evaluate result: %s", {
"keys": list(result.keys()),
"model_response_len": len(result.get("model_response", "")),
"elapsed_ms": elapsed_ms,
"timestamp": time.time()
})
Performance Monitoring:
def track_performance_metrics():
# Track model loading times
# Track inference latency
# Track memory usage
# Track cache hit rates
pass
This comprehensive technical implementation provides a robust, scalable foundation for the chatbot evaluation system with proper error handling, performance optimization, and monitoring capabilities.