Retrieval Metrics
# Copyright (c) 2025 Microsoft Corporation.
import sys
sys.path.insert(1, "../../../")
import nest_asyncio
nest_asyncio.apply()
%load_ext dotenv
%dotenv
import asyncio
import json
import logging
import os
from pathlib import Path
from typing import Any
import pandas as pd
import tiktoken
from pydantic import SecretStr
from benchmark_qed.config.llm_config import LLMConfig, LLMProvider
logging.basicConfig(level=logging.INFO)
if logging.getLogger("httpx") is not None:
logging.getLogger("httpx").setLevel(logging.ERROR)
# DATA CONFIGS
# Example data paths — these point to pre-built example data shipped with the repo.
# Replace with your own paths when running on a different dataset.
INPUT_DATA_PATH = Path("./example_answers")
REFERENCE_DATA_PATH = Path("./example_answers/retrieval_reference")
OUTPUT_DATA_PATH = Path("./output/retrieval_scores")
CACHE_DIR = Path("./example_answers/retrieval_reference/cache")
CACHE_ENABLED = True
# MODEL CONFIGS
API_KEY = SecretStr(os.getenv("OPENAI_API_KEY", ""))
ENCODING_MODEL = "o200k_base"
EMBEDDING_MODEL = "text-embedding-3-large"
# NOTE: To take advantage of the relevance-assessment cache, the LLM model
# and parameters below must match the settings used when the retrieval
# references (Part 1) were originally generated. Changing the model or
# parameters will cause cache misses and trigger new LLM calls.
LLM_MODEL = "gpt-5.2"
LLM_PARAMS = {
"temperature": 0.0,
"seed": 42,
}
EMBEDDING_LLM_CONFIG = LLMConfig(
model=EMBEDDING_MODEL,
api_key=API_KEY,
llm_provider=LLMProvider.OpenAIEmbedding,
)
COMPLETION_LLM_CONFIG = LLMConfig(
model=LLM_MODEL,
api_key=API_KEY,
llm_provider=LLMProvider.OpenAIChat,
call_args=LLM_PARAMS,
)
# METRICS CONFIGS
RAG_METHODS = ["vector_rag_short_context", "vector_rag_long_context"]
QUESTION_SETS = ["data_linked", "data_global"]
NUM_CLUSTERS = 40 # set to None to enable auto tuning, which will be slow
NUM_QUESTIONS = None # set to None to compute for all questions, or an integer value to select a subset of questions
SEMANTIC_REPRESENTATIVES = 50 # Increase this to reduce classification error when generating reference context for comparison
CENTROID_REPRESENTATIVES = 10 # Increase this to reduce classification error when generating reference context for comparison
RELEVANCE_THRESHOLD = 2
Extract Example Data¶
The example dataset is bundled as compressed archives under example_answers/. The text units corpus is split across two parquet files (text_units_part1.parquet and text_units_part2.parquet) for easier distribution. The cell below recombines them into a single text_units.parquet and extracts the precomputed retrieval references.
import py7zr
example_answers_path = Path("./example_answers")
# --- Recombine split parquet files ---
# The text-units corpus is split into two parts to stay under GitHub's
# 100 MB file-size limit. Recombine them into a single parquet file.
combined_parquet = example_answers_path / "text_units.parquet"
part1 = example_answers_path / "text_units_part1.parquet"
part2 = example_answers_path / "text_units_part2.parquet"
if not combined_parquet.exists() and part1.exists() and part2.exists():
print("Combining text_units_part1.parquet + text_units_part2.parquet ...")
text_units_combined = pd.concat(
[pd.read_parquet(part1), pd.read_parquet(part2)], ignore_index=True
)
text_units_combined.to_parquet(combined_parquet)
print(f"Created {combined_parquet} ({len(text_units_combined)} rows)")
else:
if combined_parquet.exists():
print(f"{combined_parquet} already exists, skipping recombination")
else:
print("Split parquet files not found — provide your own text_units.parquet")
# --- Extract retrieval reference archive ---
retrieval_reference_archive = example_answers_path / "retrieval_reference.7z"
retrieval_reference_dest = example_answers_path / "retrieval_reference"
if not retrieval_reference_dest.exists() and retrieval_reference_archive.exists():
print(f"Extracting {retrieval_reference_archive}...")
with py7zr.SevenZipFile(retrieval_reference_archive, mode="r") as archive:
archive.extractall(path=example_answers_path)
print(f"Extracted to {retrieval_reference_dest}")
else:
if retrieval_reference_dest.exists():
print(f"{retrieval_reference_dest} already exists, skipping extraction")
else:
print(f"Archive not found: {retrieval_reference_archive}")
Part 1: Reference Generation¶
Generate cluster references for each query.
Important Note: This only needs to be run once per dataset/queryset combination.
Tip: For this example dataset, pre-computed references are already provided in
example_answers/retrieval_reference.7zand will be used in Part 2. You can skip Part 1 entirely and jump straight to Part 2 to compute retrieval scores using the bundled references.
For each query, retrieve relevant clusters by testing a subset of representative chunks per cluster, to be used to evaluate RAG's cluster-based recall and fidelity measures.
from benchmark_qed.autod.data_processor.embedding import TextEmbedder
from benchmark_qed.autod.io.text_unit import load_text_units
from benchmark_qed.autoe.retrieval_metrics.reference_gen.cluster_relevance import (
ClusterRelevanceRater,
)
from benchmark_qed.autoe.retrieval_metrics.relevance_assessment.bing_rater import (
BingRelevanceRater,
)
from benchmark_qed.autoq.data_model.question import Question
from benchmark_qed.llm.factory import ModelFactory
token_encoder = tiktoken.get_encoding(ENCODING_MODEL)
text_embedder = TextEmbedder(ModelFactory.create_embedding_model(EMBEDDING_LLM_CONFIG))
llm = ModelFactory.create_chat_model(model_config=COMPLETION_LLM_CONFIG)
relevance_rater = BingRelevanceRater(
llm_client=llm,
llm_config=COMPLETION_LLM_CONFIG,
concurrent_requests=32,
cache_dir=CACHE_DIR,
cache_enabled=CACHE_ENABLED,
)
print("✅ Part 1 setup complete")
print(f" BingRelevanceRater caching: {relevance_rater.cache_enabled}")
if relevance_rater.cache_enabled:
print(f" Cache directory: {relevance_rater.cache_dir}")
stats = relevance_rater.get_cache_stats()
print(f" Cache files: {stats['cache_files']}, Size: {stats['cache_size_mb']} MB")
from benchmark_qed.autoe.retrieval import save_cluster_references_to_json
# Load text units from parquet file
text_df = pd.read_parquet(REFERENCE_DATA_PATH / "text_units.parquet")
if "short_id" not in text_df.columns:
text_df["short_id"] = text_df.index.astype(str)
corpus = load_text_units(text_df)
print(f"Loaded {len(corpus)} text units")
# embed text units if needed
# skip this if you already have embeddings in your corpus
print(f"Embedding {len(corpus)} text units")
corpus = asyncio.run(
text_embedder.embed_batch(
text_units=corpus,
batch_size=32,
)
)
print(f"Embedded {len(corpus)} text units")
# Create cluster relevance rater with text units data
cluster_rater = ClusterRelevanceRater(
text_embedder=text_embedder,
relevance_rater=relevance_rater,
corpus=corpus, # Will perform clustering once and reuse for all queries
semantic_neighbors=SEMANTIC_REPRESENTATIVES,
centroid_neighbors=CENTROID_REPRESENTATIVES,
num_clusters=NUM_CLUSTERS, # set to None to tune number of clusters, but might be slow
)
print(
f"Cluster relevance rater initialized with {len(cluster_rater.clusters)} clusters"
)
for question_set in QUESTION_SETS:
print(f"\nGenerating cluster references for question set: {question_set}")
# Load questions from vector_rag retrieval results
context_path = Path(
INPUT_DATA_PATH
/ "vector_rag_short_context"
/ f"{question_set}_retrieval_results.json"
)
with context_path.open(encoding="utf-8") as f:
retrieval_result_dicts = json.load(f)
# Extract questions from retrieval results
questions = [
Question(id=result["question_id"], text=result["question_text"])
for result in retrieval_result_dicts
]
if NUM_QUESTIONS is not None:
questions = questions[:NUM_QUESTIONS]
print(f"Loaded {len(questions)} questions")
# Generate cluster references using batch assessment
batch_results = asyncio.run(cluster_rater.assess_batch(questions))
print(f"Generated cluster references for {len(batch_results)} questions")
# Save batch results to JSON using the correct function name
output_path = Path(
REFERENCE_DATA_PATH
/ f"{question_set}_questions"
/ f"clusters_{NUM_CLUSTERS}"
/ "reference.json"
)
save_cluster_references_to_json(
batch_results,
output_path,
include_clusters=True,
clusters=cluster_rater.clusters,
)
print(f"Saved cluster references to {output_path}")
print("\n✓ Cluster reference generation completed for all question sets")
Part 2: Retrieval Scoring¶
Run this section to compute retrieval metrics using pre-generated reference clusters. This section can run independently of Part 1 — just make sure the "Extract Example Data" cell above has been run so the reference files exist under the output directory.
Cache note: The precomputed references were generated with gpt-5.2 using
temperature=0.0andseed=42. To reuse the relevance-assessment cache shipped with these references, keep the same LLM model and parameters in the config cell above. If you change the model or parameters, the cache entries will not match and new LLM calls will be made.
from benchmark_qed.autoe.data_model.retrieval_result import (
load_retrieval_results_from_dicts,
)
from benchmark_qed.autoe.retrieval_metrics.reference_gen.cluster_relevance import (
load_cluster_references_from_json,
)
from benchmark_qed.autoe.retrieval_metrics.relevance_assessment.bing_rater import (
BingRelevanceRater,
)
from benchmark_qed.autoe.retrieval_metrics.scoring.fidelity import (
FidelityMetric,
calculate_fidelity,
)
from benchmark_qed.autoe.retrieval_metrics.scoring.precision import (
get_precision_summary,
)
from benchmark_qed.autoe.retrieval_metrics.scoring.recall import calculate_recall
from benchmark_qed.autoe.retrieval_metrics.scoring.retrieval_relevance import (
BatchRelevanceResult,
assess_batch_relevance,
)
from benchmark_qed.llm.factory import ModelFactory
token_encoder = tiktoken.get_encoding(ENCODING_MODEL)
llm = ModelFactory.create_chat_model(model_config=COMPLETION_LLM_CONFIG)
relevance_rater = BingRelevanceRater(
llm_client=llm,
llm_config=COMPLETION_LLM_CONFIG,
concurrent_requests=32,
cache_dir=CACHE_DIR,
cache_enabled=CACHE_ENABLED,
)
print("✅ Part 2 setup complete")
print(f" BingRelevanceRater caching: {relevance_rater.cache_enabled}")
if relevance_rater.cache_enabled:
print(f" Cache directory: {relevance_rater.cache_dir}")
stats = relevance_rater.get_cache_stats()
print(f" Cache files: {stats['cache_files']}, Size: {stats['cache_size_mb']} MB")
Retrieval Relevance Assessment¶
For each query, use an LLM to assess which text units in the RAG's retrieved context are relevant.
for rag_method in RAG_METHODS:
print(f"Evaluating RAG method: {rag_method}")
for question_set in QUESTION_SETS:
print(f" Evaluating question set: {question_set}")
# load context from json file
context_path = Path(
INPUT_DATA_PATH / rag_method / f"{question_set}_retrieval_results.json"
)
with context_path.open(encoding="utf-8") as f:
retrieval_result_dicts = json.load(f)
retrieval_results = load_retrieval_results_from_dicts(
data=retrieval_result_dicts,
context_id_key="chunk_id", # key for chunk ID in context items
context_text_key="text", # key for chunk text in context items
question_id_key="question_id",
question_text_key="text", # question text field in top-level dict
auto_transform_context=True,
)
if NUM_QUESTIONS is not None:
retrieval_results = retrieval_results[:NUM_QUESTIONS]
relevance_results = asyncio.run(
assess_batch_relevance(
retrieval_results=retrieval_results, relevance_rater=relevance_rater
)
)
# save relevance results to json file
output_path = Path(
OUTPUT_DATA_PATH / rag_method / f"{question_set}_relevance.json"
)
relevance_results.save_to_json(output_path)
# Print cache usage summary
if relevance_rater.cache_enabled:
stats = relevance_rater.get_cache_stats()
print(
f"\n📦 Cache usage: {stats['cache_hits']} hits, {stats['cache_misses']} misses, {stats['hit_rate_percent']:.1f}% hit rate"
)
Calculate Retrieval Scores¶
Precision¶
for rag_method in RAG_METHODS:
for question_set in QUESTION_SETS:
# load relevance results from json file
relevance_results = BatchRelevanceResult.load_from_json(
Path(OUTPUT_DATA_PATH / rag_method / f"{question_set}_relevance.json")
)
# compute precision summary
precision_summary = get_precision_summary(
relevance_results, relevance_threshold=RELEVANCE_THRESHOLD
)
# save precision summary to json file
output_path = Path(
OUTPUT_DATA_PATH / rag_method / f"{question_set}_precision.json"
)
output_path.write_text(json.dumps(precision_summary))
Recall¶
for rag_method in RAG_METHODS:
for question_set in QUESTION_SETS:
print(f"Calculating recall for {rag_method} - {question_set}")
# Load relevance results (QueryRelevanceResult objects)
relevance_results = BatchRelevanceResult.load_from_json(
Path(OUTPUT_DATA_PATH / rag_method / f"{question_set}_relevance.json")
)
# Load cluster references and clusters
cluster_references_path = Path(
REFERENCE_DATA_PATH
/ f"{question_set}_questions"
/ f"clusters_{NUM_CLUSTERS}"
/ "reference.json"
)
cluster_references, clusters = load_cluster_references_from_json(
cluster_references_path
)
print(f" Loaded {len(relevance_results.results)} relevance results")
print(f" Loaded {len(cluster_references)} cluster references")
# Calculate recall metrics with cluster classification error statistics
recall_results = calculate_recall(
query_relevance_results=relevance_results.results,
retrieval_references=cluster_references,
relevance_threshold=RELEVANCE_THRESHOLD,
clusters=clusters,
)
# Save recall results to JSON file
output_path = Path(
OUTPUT_DATA_PATH / rag_method / f"{question_set}_recall.json"
)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(recall_results, indent=2, default=str))
print(f" ✅ Saved recall results to {output_path}")
Fidelity¶
Fidelity measures how similar the distribution of relevant text units is between reference clusters and query relevance results.
- Higher fidelity = More similar distributions = Better retrieval performance
- Lower fidelity = Different distributions = Retrieval may be missing key clusters or focusing on wrong areas
- Measures: Jensen-Shannon divergence (default) or Total Variation Distance (TVD)
for rag_method in RAG_METHODS:
for question_set in QUESTION_SETS:
print(f"Calculating fidelity for {rag_method} - {question_set}")
# Load relevance results (QueryRelevanceResult objects)
relevance_results = BatchRelevanceResult.load_from_json(
Path(OUTPUT_DATA_PATH / rag_method / f"{question_set}_relevance.json")
)
# Load cluster references and clusters
cluster_references_path = Path(
REFERENCE_DATA_PATH
/ f"{question_set}_questions"
/ f"clusters_{NUM_CLUSTERS}"
/ "reference.json"
)
cluster_references, clusters = load_cluster_references_from_json(
cluster_references_path
)
print(f" Loaded {len(relevance_results.results)} relevance results")
print(f" Loaded {len(cluster_references)} cluster references")
# Calculate fidelity metrics using Total Variation Distance
fidelity_results = calculate_fidelity(
query_relevance_results=relevance_results.results,
retrieval_references=cluster_references,
relevance_threshold=RELEVANCE_THRESHOLD,
clusters=clusters,
metric=FidelityMetric.JENSEN_SHANNON,
)
# Save fidelity results to JSON file
output_path = Path(
OUTPUT_DATA_PATH / rag_method / f"{question_set}_fidelity.json"
)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(fidelity_results, indent=2, default=str))
print(f" ✅ Saved fidelity results to {output_path}")
print("\n✓ Fidelity calculation completed for all RAG methods and question sets")
Summary Comparison¶
Compare precision, recall, and fidelity metrics across all RAG methods and question sets.
# Create comprehensive comparison across all metrics
import pandas as pd
comparison_data = []
for rag_method in RAG_METHODS:
for question_set in QUESTION_SETS:
row_data: dict[str, Any] = {
"Question_Set": question_set,
"RAG_Method": rag_method,
}
# Load and summarize precision metrics
try:
precision_path = Path(
OUTPUT_DATA_PATH / rag_method / f"{question_set}_precision.json"
)
precision_data = json.loads(precision_path.read_text(encoding="utf-8")).get(
"binary_precision", {}
)
row_data["Binary_Precision"] = precision_data.get(
"macro_averaged_precision", 0
)
row_data["Avg_Relevant_Chunks"] = precision_data.get(
"average_relevant_chunks", 0
)
row_data["Avg_Retrieved_Chunks"] = precision_data.get(
"average_retrieved_chunks", 0
)
except FileNotFoundError:
row_data["Binary_Precision"] = 0
# Load and summarize recall metrics
try:
recall_path = Path(
OUTPUT_DATA_PATH / rag_method / f"{question_set}_recall.json"
)
recall_data = json.loads(recall_path.read_text(encoding="utf-8"))
row_data["Recall"] = recall_data.get("macro_averaged_recall", 0)
row_data["Cluster_Classification_Error"] = recall_data.get(
"macro_averaged_classification_error", 0
)
except FileNotFoundError:
row_data["Recall"] = 0
row_data["Cluster_Classification_Error"] = 0
# Load and summarize fidelity metrics
try:
fidelity_path = Path(
OUTPUT_DATA_PATH / rag_method / f"{question_set}_fidelity.json"
)
fidelity_data = json.loads(fidelity_path.read_text(encoding="utf-8"))
row_data["Fidelity"] = fidelity_data.get("macro_averaged_fidelity", 0)
except FileNotFoundError:
row_data["Fidelity"] = 0
comparison_data.append(row_data)
# Create comparison DataFrame
comparison_df = pd.DataFrame(comparison_data).sort_values(
by=["Question_Set", "RAG_Method"]
)
print("📊 Metrics Comparison")
print("=" * 100)
print(comparison_df.round(4).to_string(index=False))
# Save comparison to CSV for further analysis
comparison_output_path = Path(OUTPUT_DATA_PATH / "metrics_comparison.csv")
comparison_df.to_csv(comparison_output_path, index=False)
print(f"\n💾 Saved comparison to {comparison_output_path}")