First_agent_template / eval /rag_eval.py
mathidot's picture
build option trading agent modules
8f1601b
Raw
History Blame Contribute Delete
26.4 kB
from __future__ import annotations
import argparse
import csv
import json
import math
import re
import shutil
import zipfile
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Iterable
import chromadb
import requests
from llama_index.core import StorageContext, VectorStoreIndex
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.schema import Document
from llama_index.core.schema import NodeWithScore, TextNode
from llama_index.vector_stores.chroma import ChromaVectorStore
from tools.query_knowledge import (
BM25Retriever,
EMBED_MODEL_NAME,
RERANKER_MODEL_NAME,
CrossEncoderReranker,
configure_model_cache,
resolve_embed_model_name,
)
PROJECT_ROOT = Path(__file__).resolve().parents[1]
EVAL_DIR = PROJECT_ROOT / "eval"
DATA_DIR = EVAL_DIR / "data"
INDEX_DIR = EVAL_DIR / "indexes"
REPORT_DIR = EVAL_DIR / "reports"
BEIR_URLS = {
"scifact": "https://public.ukp.informatik.tu-darmstadt.de/thakur/BEIR/datasets/scifact.zip",
"fiqa": "https://public.ukp.informatik.tu-darmstadt.de/thakur/BEIR/datasets/fiqa.zip",
}
DATASET_ALIASES = {
"beir/scifact": "scifact",
"beir/fiqa": "fiqa",
"open-ragbench": "open_ragbench",
"open_ragbench": "open_ragbench",
"t2-ragbench": "t2_ragbench",
"t2_ragbench": "t2_ragbench",
"local-options": "local_options",
"local_options": "local_options",
}
@dataclass
class EvalCorpus:
name: str
documents: list[dict[str, Any]]
queries: list[dict[str, Any]]
qrels: dict[str, set[str]]
def ensure_dirs() -> None:
DATA_DIR.mkdir(parents=True, exist_ok=True)
INDEX_DIR.mkdir(parents=True, exist_ok=True)
REPORT_DIR.mkdir(parents=True, exist_ok=True)
def download_file(url: str, destination: Path) -> None:
destination.parent.mkdir(parents=True, exist_ok=True)
with requests.get(url, stream=True, timeout=60) as response:
response.raise_for_status()
with destination.open("wb") as file:
for chunk in response.iter_content(chunk_size=1024 * 1024):
if chunk:
file.write(chunk)
def read_jsonl(path: Path) -> Iterable[dict[str, Any]]:
with path.open("r", encoding="utf-8") as file:
for line in file:
line = line.strip()
if line:
yield json.loads(line)
def prepare_beir_dataset(dataset_name: str) -> Path:
ensure_dirs()
if dataset_name not in BEIR_URLS:
raise ValueError(f"Unsupported BEIR dataset: {dataset_name}")
target_dir = DATA_DIR / "beir" / dataset_name
corpus_path = target_dir / "corpus.jsonl"
if corpus_path.exists():
return target_dir
zip_path = DATA_DIR / "downloads" / f"{dataset_name}.zip"
if not zip_path.exists():
download_file(BEIR_URLS[dataset_name], zip_path)
extract_root = DATA_DIR / "beir"
extract_root.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path) as archive:
archive.extractall(extract_root)
if not corpus_path.exists():
raise FileNotFoundError(f"BEIR extraction did not create {corpus_path}")
return target_dir
def load_beir_dataset(
dataset_name: str,
split: str,
max_corpus_docs: int | None,
max_queries: int | None,
) -> EvalCorpus:
dataset_dir = prepare_beir_dataset(dataset_name)
all_queries = {
str(row["_id"]): row.get("text", "")
for row in read_jsonl(dataset_dir / "queries.jsonl")
}
qrels_path = dataset_dir / "qrels" / f"{split}.tsv"
if not qrels_path.exists():
candidates = sorted((dataset_dir / "qrels").glob("*.tsv"))
if not candidates:
raise FileNotFoundError(f"No qrels found under {dataset_dir / 'qrels'}")
qrels_path = candidates[0]
all_qrels: dict[str, set[str]] = {}
with qrels_path.open("r", encoding="utf-8") as file:
reader = csv.DictReader(file, delimiter="\t")
for row in reader:
query_id = str(row.get("query-id") or row.get("query_id"))
corpus_id = str(row.get("corpus-id") or row.get("corpus_id"))
score = int(row.get("score", 1))
if score <= 0:
continue
all_qrels.setdefault(query_id, set()).add(corpus_id)
queries = []
required_doc_ids = set()
for query_id, relevant_docs in all_qrels.items():
if query_id not in all_queries:
continue
if max_corpus_docs and len(required_doc_ids | relevant_docs) > max_corpus_docs:
continue
required_doc_ids.update(relevant_docs)
queries.append(
{
"query_id": query_id,
"question": all_queries[query_id],
"relevant_doc_ids": sorted(relevant_docs),
}
)
if max_queries and len(queries) >= max_queries:
break
documents = []
seen_doc_ids = set()
for row in read_jsonl(dataset_dir / "corpus.jsonl"):
doc_id = str(row["_id"])
if required_doc_ids and doc_id not in required_doc_ids:
if max_corpus_docs and len(documents) >= max_corpus_docs:
continue
if max_corpus_docs and len(documents) + len(required_doc_ids - seen_doc_ids) >= max_corpus_docs:
continue
title = row.get("title") or ""
text = row.get("text") or ""
documents.append(
{
"doc_id": doc_id,
"title": title,
"text": f"{title}\n{text}".strip(),
"metadata": {"source_dataset": f"beir/{dataset_name}"},
}
)
seen_doc_ids.add(doc_id)
if max_corpus_docs and len(documents) >= max_corpus_docs and required_doc_ids.issubset(seen_doc_ids):
break
if not documents or not queries:
raise ValueError(
f"Dataset beir/{dataset_name} has no evaluable documents/queries. "
"Increase --max-corpus-docs or use a larger sample."
)
return EvalCorpus(
name=f"beir_{dataset_name}",
documents=documents,
queries=queries,
qrels={query["query_id"]: set(query["relevant_doc_ids"]) for query in queries},
)
def snapshot_hf_dataset(repo_id: str, local_name: str) -> Path:
from huggingface_hub import snapshot_download
ensure_dirs()
target_dir = DATA_DIR / "hf" / local_name
if target_dir.exists():
return target_dir
snapshot_download(
repo_id=repo_id,
repo_type="dataset",
local_dir=str(target_dir),
local_dir_use_symlinks=False,
)
return target_dir
def flatten_open_ragbench_section(section: dict[str, Any]) -> str:
parts = [section.get("text") or ""]
tables = section.get("tables") or {}
if isinstance(tables, dict):
parts.extend(str(value) for value in tables.values())
return "\n".join(part for part in parts if part)
def load_open_ragbench(
max_corpus_docs: int | None,
max_queries: int | None,
) -> EvalCorpus:
dataset_dir = snapshot_hf_dataset("vectara/open_ragbench", "open_ragbench")
root = dataset_dir / "pdf" / "arxiv"
if not root.exists():
root = dataset_dir / "official" / "pdf" / "arxiv"
if not root.exists():
raise FileNotFoundError(f"Open RAGBench root not found: {root}")
queries_data = json.loads((root / "queries.json").read_text(encoding="utf-8"))
qrels_data = json.loads((root / "qrels.json").read_text(encoding="utf-8"))
documents = []
qrels: dict[str, set[str]] = {}
required_doc_ids = set()
selected_query_ids = []
for query_id, qrel in qrels_data.items():
doc_id = str(qrel.get("doc_id"))
if not doc_id or doc_id == "None":
continue
selected_query_ids.append(str(query_id))
required_doc_ids.add(doc_id)
if max_queries and len(selected_query_ids) >= max_queries:
break
allowed_doc_ids = set()
corpus_files = sorted((root / "corpus").glob("*.json"))
for corpus_file in corpus_files:
paper = json.loads(corpus_file.read_text(encoding="utf-8"))
paper_id = str(paper.get("id") or corpus_file.stem)
is_required = paper_id in required_doc_ids
if max_corpus_docs and not is_required:
missing_required_count = len(required_doc_ids - allowed_doc_ids)
if len(documents) + missing_required_count >= max_corpus_docs:
continue
allowed_doc_ids.add(paper_id)
section_texts = []
for section_index, section in enumerate(paper.get("sections") or []):
section_text = flatten_open_ragbench_section(section)
if section_text:
section_texts.append(f"[section {section_index}]\n{section_text}")
text = "\n\n".join(
part
for part in [paper.get("title") or "", paper.get("abstract") or "", *section_texts]
if part
)
documents.append(
{
"doc_id": paper_id,
"title": paper.get("title") or paper_id,
"text": text,
"metadata": {
"source_dataset": "open_ragbench",
"categories": ",".join(paper.get("categories") or []),
},
}
)
if max_corpus_docs and len(documents) >= max_corpus_docs:
break
queries = []
for query_id in selected_query_ids:
qrel = qrels_data[query_id]
doc_id = str(qrel.get("doc_id"))
if doc_id not in allowed_doc_ids:
continue
query_payload = queries_data.get(query_id) or {}
question = query_payload.get("query") if isinstance(query_payload, dict) else str(query_payload)
qrels[str(query_id)] = {doc_id}
queries.append(
{
"query_id": str(query_id),
"question": question,
"relevant_doc_ids": [doc_id],
}
)
if max_queries and len(queries) >= max_queries:
break
if not documents or not queries:
raise ValueError("Open RAGBench produced no evaluable sample.")
return EvalCorpus("open_ragbench", documents, queries, qrels)
def load_t2_ragbench(
max_corpus_docs: int | None,
max_queries: int | None,
) -> EvalCorpus:
dataset_dir = snapshot_hf_dataset("G4KMU/t2-ragbench", "t2_ragbench")
parquet_files = sorted(dataset_dir.rglob("*.parquet"))
jsonl_files = sorted(dataset_dir.rglob("*.jsonl"))
if not parquet_files and not jsonl_files:
raise FileNotFoundError(f"No parquet/jsonl files found in {dataset_dir}")
rows: list[dict[str, Any]] = []
if parquet_files:
import pandas as pd
for parquet_file in parquet_files:
frame = pd.read_parquet(parquet_file)
rows.extend(frame.to_dict(orient="records"))
if max_queries and len(rows) >= max_queries * 5:
break
else:
for jsonl_file in jsonl_files:
rows.extend(read_jsonl(jsonl_file))
if max_queries and len(rows) >= max_queries * 5:
break
documents_by_id: dict[str, dict[str, Any]] = {}
queries = []
qrels: dict[str, set[str]] = {}
for index, row in enumerate(rows):
question = first_present(row, ["question", "query", "Question"])
answer = first_present(row, ["answer", "Answer", "response"])
context = first_present(row, ["context", "evidence", "gold_context", "text", "document"])
table = first_present(row, ["table", "Table", "markdown_table"])
doc_id = str(first_present(row, ["doc_id", "document_id", "filename", "pdf_path", "source"]) or f"row-{index}")
if not question or not context:
continue
text = "\n".join(part for part in [str(context), str(table or "")] if part)
if doc_id not in documents_by_id:
documents_by_id[doc_id] = {
"doc_id": doc_id,
"title": str(first_present(row, ["company", "ticker", "title", "Title"]) or doc_id),
"text": text,
"metadata": {"source_dataset": "t2_ragbench", "answer": str(answer or "")},
}
queries.append(
{
"query_id": str(first_present(row, ["qid", "query_id", "id"]) or f"q-{index}"),
"question": str(question),
"relevant_doc_ids": [doc_id],
}
)
qrels[queries[-1]["query_id"]] = {doc_id}
if max_queries and len(queries) >= max_queries:
break
documents = list(documents_by_id.values())
if max_corpus_docs:
documents = documents[:max_corpus_docs]
allowed = {document["doc_id"] for document in documents}
queries = [query for query in queries if query["relevant_doc_ids"][0] in allowed]
qrels = {query["query_id"]: set(query["relevant_doc_ids"]) for query in queries}
if not documents or not queries:
raise ValueError("T2-RAGBench produced no evaluable sample.")
return EvalCorpus("t2_ragbench", documents, queries, qrels)
def first_present(row: dict[str, Any], keys: list[str]) -> Any:
for key in keys:
value = row.get(key)
if value is not None and value != "":
return value
return None
def load_local_options_eval(max_queries: int | None) -> EvalCorpus:
cases_path = EVAL_DIR / "local_options_eval.jsonl"
if not cases_path.exists():
raise FileNotFoundError(
f"Local options eval set not found: {cases_path}. "
"Create JSONL cases with question, expected_pages, expected_keywords."
)
from tools.query_knowledge import load_pdf_file
pdf_files = sorted((PROJECT_ROOT / "knowledge_base" / "raw").rglob("*.pdf"))
if not pdf_files:
pdf_files = sorted((PROJECT_ROOT / "tools" / "knowledge_base" / "raw").rglob("*.pdf"))
documents = []
for pdf_file in pdf_files:
for doc_index, document in enumerate(load_pdf_file(pdf_file)):
documents.append(
{
"doc_id": f"{pdf_file.name}:{document.metadata.get('page_number')}:{doc_index}",
"title": document.metadata.get("section_path") or pdf_file.name,
"text": document.text,
"metadata": document.metadata,
}
)
queries = []
qrels: dict[str, set[str]] = {}
for case_index, case in enumerate(read_jsonl(cases_path)):
query_id = str(case.get("id") or f"local-{case_index}")
relevant_ids = []
expected_pages = set(case.get("expected_pages") or [])
expected_keywords = case.get("expected_keywords") or []
for document in documents:
metadata = document.get("metadata") or {}
page_hit = metadata.get("page_number") in expected_pages
keyword_hit = any(keyword in document["text"] for keyword in expected_keywords)
if page_hit or keyword_hit:
relevant_ids.append(document["doc_id"])
queries.append(
{
"query_id": query_id,
"question": case["question"],
"relevant_doc_ids": relevant_ids,
}
)
qrels[query_id] = set(relevant_ids)
if max_queries and len(queries) >= max_queries:
break
if not documents or not queries:
raise ValueError("Local options eval set produced no evaluable sample.")
return EvalCorpus("local_options", documents, queries, qrels)
def load_eval_corpus(args: argparse.Namespace) -> EvalCorpus:
dataset = DATASET_ALIASES.get(args.dataset, args.dataset)
if dataset in {"scifact", "fiqa"}:
return load_beir_dataset(dataset, args.split, args.max_corpus_docs, args.max_queries)
if dataset == "open_ragbench":
return load_open_ragbench(args.max_corpus_docs, args.max_queries)
if dataset == "t2_ragbench":
return load_t2_ragbench(args.max_corpus_docs, args.max_queries)
if dataset == "local_options":
return load_local_options_eval(args.max_queries)
raise ValueError(f"Unknown dataset: {args.dataset}")
def collection_safe_name(value: str) -> str:
safe = re.sub(r"[^A-Za-z0-9_-]+", "_", value)
return safe.strip("_") or "default"
def build_index(corpus: EvalCorpus, chunk_size: int, chunk_overlap: int, rebuild: bool) -> VectorStoreIndex:
configure_model_cache()
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
index_path = INDEX_DIR / corpus.name
if rebuild and index_path.exists():
shutil.rmtree(index_path)
index_path.mkdir(parents=True, exist_ok=True)
db = chromadb.PersistentClient(path=str(index_path))
embed_slug = collection_safe_name(EMBED_MODEL_NAME)
collection_name = f"{corpus.name}_{embed_slug}_eval"
if rebuild:
try:
db.delete_collection(collection_name)
except Exception:
pass
collection = db.get_or_create_collection(collection_name)
vector_store = ChromaVectorStore(chroma_collection=collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
embed_model = HuggingFaceEmbedding(
model_name=resolve_embed_model_name(),
cache_folder=str(PROJECT_ROOT / "hf_cache" / "sentence_transformers"),
)
if collection.count() == 0:
documents = [
Document(
text=document["text"],
metadata={
"doc_id": document["doc_id"],
"title": document.get("title", ""),
**(document.get("metadata") or {}),
},
)
for document in corpus.documents
]
splitter = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
nodes = splitter.get_nodes_from_documents(documents)
VectorStoreIndex(
nodes,
storage_context=storage_context,
embed_model=embed_model,
show_progress=True,
)
return VectorStoreIndex.from_vector_store(vector_store, embed_model=embed_model)
def build_bm25_retriever(corpus: EvalCorpus, chunk_size: int, chunk_overlap: int) -> BM25Retriever:
documents = [
Document(
text=document["text"],
metadata={
"doc_id": document["doc_id"],
"title": document.get("title", ""),
**(document.get("metadata") or {}),
},
)
for document in corpus.documents
]
splitter = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
nodes = splitter.get_nodes_from_documents(documents)
text_nodes = [
TextNode(id_=node.node_id, text=node.get_content(), metadata=node.metadata)
for node in nodes
]
return BM25Retriever(text_nodes)
def merge_eval_results(
vector_results: list[NodeWithScore],
bm25_results: list[NodeWithScore],
top_k: int,
) -> list[NodeWithScore]:
merged: dict[str, NodeWithScore] = {}
for rank, result in enumerate(vector_results):
node_id = result.node.node_id
merged[node_id] = NodeWithScore(node=result.node, score=1.0 / (rank + 1))
for rank, result in enumerate(bm25_results):
node_id = result.node.node_id
reciprocal_rank_score = 1.0 / (rank + 1)
if node_id in merged:
merged[node_id].score = (merged[node_id].score or 0.0) + reciprocal_rank_score
else:
merged[node_id] = NodeWithScore(node=result.node, score=reciprocal_rank_score)
results = list(merged.values())
results.sort(key=lambda item: item.score or float("-inf"), reverse=True)
return results[:top_k]
def evaluate_retrieval(
corpus: EvalCorpus,
index: VectorStoreIndex,
top_k: int,
use_reranker: bool = False,
use_hybrid: bool = False,
chunk_size: int = 512,
chunk_overlap: int = 64,
reranker_model_name: str = RERANKER_MODEL_NAME,
reranker_candidates: int = 25,
) -> dict[str, Any]:
retrieve_top_k = max(reranker_candidates, top_k) if use_reranker else max(top_k * 5, top_k)
retriever = index.as_retriever(similarity_top_k=retrieve_top_k)
bm25_retriever = (
build_bm25_retriever(corpus, chunk_size, chunk_overlap)
if use_hybrid
else None
)
reranker = CrossEncoderReranker(reranker_model_name) if use_reranker else None
cases = []
hit_counts = {1: 0, 3: 0, 5: 0, top_k: 0}
reciprocal_ranks = []
ndcg_scores = []
for query in corpus.queries:
relevant_doc_ids = corpus.qrels.get(query["query_id"], set())
vector_results = retriever.retrieve(query["question"])
results = vector_results
if bm25_retriever:
bm25_results = bm25_retriever.retrieve(query["question"], retrieve_top_k)
results = merge_eval_results(vector_results, bm25_results, retrieve_top_k)
if reranker:
results = reranker.rerank(
query["question"],
results,
top_n=max(top_k * 5, top_k),
)
retrieved = []
seen_doc_ids = set()
first_hit_rank = None
dcg = 0.0
for result in results:
metadata = result.node.metadata
doc_id = str(metadata.get("doc_id", ""))
if doc_id in seen_doc_ids:
continue
seen_doc_ids.add(doc_id)
rank = len(retrieved) + 1
hit = doc_id in relevant_doc_ids
if hit and first_hit_rank is None:
first_hit_rank = rank
if hit:
dcg += 1 / math.log2(rank + 1)
retrieved.append(
{
"rank": rank,
"doc_id": doc_id,
"score": result.score,
"hit": hit,
"title": metadata.get("title", ""),
}
)
if len(retrieved) >= top_k:
break
ideal_hits = min(len(relevant_doc_ids), top_k)
idcg = sum(1 / math.log2(rank + 1) for rank in range(1, ideal_hits + 1))
ndcg = dcg / idcg if idcg else 0.0
ndcg_scores.append(ndcg)
reciprocal_ranks.append(1 / first_hit_rank if first_hit_rank else 0.0)
for k in hit_counts:
if any(item["hit"] for item in retrieved[:k]):
hit_counts[k] += 1
cases.append(
{
"query_id": query["query_id"],
"question": query["question"],
"relevant_doc_ids": sorted(relevant_doc_ids),
"first_hit_rank": first_hit_rank,
"retrieved": retrieved,
}
)
total = len(corpus.queries)
metrics = {
"queries": total,
"documents": len(corpus.documents),
"top_k": top_k,
"mrr": sum(reciprocal_ranks) / total if total else 0.0,
"ndcg_at_k": sum(ndcg_scores) / total if total else 0.0,
"reranker_enabled": use_reranker,
"hybrid_enabled": use_hybrid,
}
for k, count in sorted(hit_counts.items()):
metrics[f"hit_at_{k}"] = count / total if total else 0.0
return {"dataset": corpus.name, "metrics": metrics, "cases": cases}
def write_reports(report: dict[str, Any]) -> tuple[Path, Path]:
ensure_dirs()
dataset_name = report["dataset"]
json_path = REPORT_DIR / f"{dataset_name}_retrieval_eval.json"
md_path = REPORT_DIR / f"{dataset_name}_retrieval_eval.md"
json_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
metrics = report["metrics"]
lines = [
f"# Retrieval Eval: {dataset_name}",
"",
"## Metrics",
"",
]
for key, value in metrics.items():
lines.append(f"- `{key}`: {value:.4f}" if isinstance(value, float) else f"- `{key}`: {value}")
lines.extend(["", "## Sample Cases", ""])
for case in report["cases"][:10]:
lines.append(f"### {case['query_id']}")
lines.append("")
lines.append(case["question"])
lines.append("")
lines.append(f"- first_hit_rank: `{case['first_hit_rank']}`")
for item in case["retrieved"][:5]:
lines.append(
f"- rank {item['rank']}: hit={item['hit']} doc_id=`{item['doc_id']}` score={item['score']}"
)
lines.append("")
md_path.write_text("\n".join(lines), encoding="utf-8")
return json_path, md_path
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run retrieval eval for RAG datasets.")
parser.add_argument(
"--dataset",
required=True,
help="beir/scifact, beir/fiqa, open-ragbench, t2-ragbench, or local-options",
)
parser.add_argument("--split", default="test")
parser.add_argument("--top-k", type=int, default=5)
parser.add_argument("--chunk-size", type=int, default=512)
parser.add_argument("--chunk-overlap", type=int, default=64)
parser.add_argument("--max-corpus-docs", type=int, default=None)
parser.add_argument("--max-queries", type=int, default=None)
parser.add_argument("--rebuild", action="store_true")
parser.add_argument("--use-hybrid", action="store_true")
parser.add_argument("--use-reranker", action="store_true")
parser.add_argument("--reranker-model", default=RERANKER_MODEL_NAME)
parser.add_argument("--reranker-candidates", type=int, default=25)
return parser.parse_args()
def main() -> None:
args = parse_args()
corpus = load_eval_corpus(args)
index = build_index(corpus, args.chunk_size, args.chunk_overlap, args.rebuild)
report = evaluate_retrieval(
corpus,
index,
args.top_k,
use_reranker=args.use_reranker,
use_hybrid=args.use_hybrid,
chunk_size=args.chunk_size,
chunk_overlap=args.chunk_overlap,
reranker_model_name=args.reranker_model,
reranker_candidates=args.reranker_candidates,
)
json_path, md_path = write_reports(report)
print(json.dumps(report["metrics"], ensure_ascii=False, indent=2))
print(f"JSON report: {json_path}")
print(f"Markdown report: {md_path}")
if __name__ == "__main__":
main()