NLP-RAG / main.py
Qar-Raz's picture
hf-space: deploy branch without frontend/data/results
c7256ee
import os
import json
import time
from datetime import datetime
from multiprocessing import Pool, cpu_count
from functools import partial
from dotenv import load_dotenv
from config_loader import cfg
from data.vector_db import get_pinecone_index, refresh_pinecone_index
from retriever.retriever import HybridRetriever
from retriever.generator import RAGGenerator
from retriever.processor import ChunkProcessor
from retriever.evaluator import RAGEvaluator
from data.data_loader import load_cbt_book, get_book_stats
from data.ingest import ingest_data, CHUNKING_TECHNIQUES
# Import model fleet
from models.llama_3_8b import Llama3_8B
from models.mistral_7b import Mistral_7b
from models.qwen_2_5 import Qwen2_5
from models.deepseek_v3 import DeepSeek_V3
from models.tiny_aya import TinyAya
MODEL_MAP = {
"Llama-3-8B": Llama3_8B,
"Mistral-7B": Mistral_7b,
"Qwen-2.5": Qwen2_5,
"DeepSeek-V3": DeepSeek_V3,
"TinyAya": TinyAya
}
load_dotenv()
def run_rag_for_technique(technique_name, query, index, encoder, models, evaluator, rag_engine):
"""Run RAG pipeline for a specific chunking technique."""
print(f"\n{'='*80}")
print(f"TECHNIQUE: {technique_name.upper()}")
print(f"{'='*80}")
# Filter chunks by technique metadata
query_vector = encoder.encode(query).tolist()
# Query with metadata filter for this technique - get more candidates for reranking
res = index.query(
vector=query_vector,
top_k=25,
include_metadata=True,
filter={"technique": {"$eq": technique_name}}
)
# Extract context chunks with URLs
all_candidates = []
chunk_urls = []
for match in res['matches']:
all_candidates.append(match['metadata']['text'])
chunk_urls.append(match['metadata'].get('url', ''))
print(f"\nRetrieved {len(all_candidates)} candidate chunks for technique '{technique_name}'")
if not all_candidates:
print(f"WARNING: No chunks found for technique '{technique_name}'")
return {}
# Apply cross-encoder reranking to get top 5
# Use global reranker loaded once per worker
global _worker_reranker
pairs = [[query, chunk] for chunk in all_candidates]
scores = _worker_reranker.predict(pairs)
ranked = sorted(zip(all_candidates, chunk_urls, scores), key=lambda x: x[2], reverse=True)
context_chunks = [chunk for chunk, _, _ in ranked[:5]]
context_urls = [url for _, url, _ in ranked[:5]]
print(f"After reranking: {len(context_chunks)} chunks (top 5)")
# Print the final RAG context being passed to models (only once)
print(f"\n{'='*80}")
print(f"📚 FINAL RAG CONTEXT FOR TECHNIQUE '{technique_name.upper()}'")
print(f"{'='*80}")
for i, chunk in enumerate(context_chunks, 1):
print(f"\n[Chunk {i}] ({len(chunk)} chars):")
print(f"{'─'*60}")
print(chunk)
print(f"{'─'*60}")
print(f"\n{'='*80}")
# Run model tournament for this technique
tournament_results = {}
for name, model_inst in models.items():
print(f"\n{'-'*60}")
print(f"Model: {name}")
print(f"{'-'*60}")
try:
# Generation
answer = rag_engine.get_answer(
model_inst, query, context_chunks,
context_urls=context_urls,
temperature=cfg.gen['temperature']
)
print(f"\n{'─'*60}")
print(f"📝 FULL ANSWER from {name}:")
print(f"{'─'*60}")
print(answer)
print(f"{'─'*60}")
# Faithfulness Evaluation (strict=False reduces API calls from ~22 to ~3 per eval)
faith = evaluator.evaluate_faithfulness(answer, context_chunks, strict=False)
# Relevancy Evaluation
rel = evaluator.evaluate_relevancy(query, answer)
tournament_results[name] = {
"answer": answer,
"Faithfulness": faith['score'],
"Relevancy": rel['score'],
"Claims": faith['details'],
"context_chunks": context_chunks,
"context_urls": context_urls
}
print(f"\n📊 EVALUATION SCORES:")
print(f" Faithfulness: {faith['score']:.1f}%")
print(f" Relevancy: {rel['score']:.3f}")
print(f" Combined: {faith['score'] + rel['score']:.3f}")
except Exception as e:
print(f" Error evaluating {name}: {e}")
tournament_results[name] = {
"answer": "",
"Faithfulness": 0,
"Relevancy": 0,
"Claims": [],
"error": str(e),
"context_chunks": context_chunks,
"context_urls": context_urls
}
return tournament_results
def generate_findings_document(all_query_results, queries, output_file="rag_ablation_findings.md"):
"""Generate detailed markdown document with findings from all techniques across all queries.
Args:
all_query_results: Dict mapping query index to results dict
queries: List of all test queries
output_file: Path to output file
"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
content = f"""# RAG Ablation Study Findings
*Generated:* {timestamp}
## Overview
This document presents findings from a comparative analysis of 6 different chunking techniques
applied to a Cognitive Behavioral Therapy (CBT) book. Each technique was evaluated using
multiple LLM models with RAG (Retrieval-Augmented Generation) pipeline.
## Test Queries
"""
for i, query in enumerate(queries, 1):
content += f"{i}. {query}\n"
content += """
## Chunking Techniques Evaluated
1. *Fixed* - Fixed-size chunking (1000 chars, 100 overlap)
2. *Sentence* - Sentence-level chunking (NLTK)
3. *Paragraph* - Paragraph-level chunking (\\n\\n boundaries)
4. *Semantic* - Semantic chunking (embedding similarity)
5. *Recursive* - Recursive chunking (hierarchical separators)
6. *Page* - Page-level chunking (--- Page markers)
## Results by Technique (Aggregated Across All Queries)
"""
# Aggregate results across all queries
aggregated_results = {}
for query_idx, query_results in all_query_results.items():
for technique_name, model_results in query_results.items():
if technique_name not in aggregated_results:
aggregated_results[technique_name] = {}
for model_name, results in model_results.items():
if model_name not in aggregated_results[technique_name]:
aggregated_results[technique_name][model_name] = {
'Faithfulness': [],
'Relevancy': [],
'answers': [],
'context_chunks': results.get('context_chunks', []),
'context_urls': results.get('context_urls', [])
}
aggregated_results[technique_name][model_name]['Faithfulness'].append(results.get('Faithfulness', 0))
aggregated_results[technique_name][model_name]['Relevancy'].append(results.get('Relevancy', 0))
aggregated_results[technique_name][model_name]['answers'].append(results.get('answer', ''))
# Add results for each technique
for technique_name, model_results in aggregated_results.items():
content += f"### {technique_name.upper()} Chunking\n\n"
if not model_results:
content += "No results available for this technique.\n\n"
continue
# Create results table with averaged scores
content += "| Model | Avg Faithfulness | Avg Relevancy | Avg Combined |\n"
content += "|-------|------------------|---------------|--------------|\n"
for model_name, results in model_results.items():
avg_faith = sum(results['Faithfulness']) / len(results['Faithfulness']) if results['Faithfulness'] else 0
avg_rel = sum(results['Relevancy']) / len(results['Relevancy']) if results['Relevancy'] else 0
avg_combined = avg_faith + avg_rel
content += f"| {model_name} | {avg_faith:.1f}% | {avg_rel:.3f} | {avg_combined:.3f} |\n"
# Find best model for this technique
if model_results:
best_model = max(
model_results.items(),
key=lambda x: (sum(x[1]['Faithfulness']) / len(x[1]['Faithfulness']) if x[1]['Faithfulness'] else 0) +
(sum(x[1]['Relevancy']) / len(x[1]['Relevancy']) if x[1]['Relevancy'] else 0)
)
best_name = best_model[0]
best_faith = sum(best_model[1]['Faithfulness']) / len(best_model[1]['Faithfulness']) if best_model[1]['Faithfulness'] else 0
best_rel = sum(best_model[1]['Relevancy']) / len(best_model[1]['Relevancy']) if best_model[1]['Relevancy'] else 0
content += f"\n*Best Model:* {best_name} (Avg Faithfulness: {best_faith:.1f}%, Avg Relevancy: {best_rel:.3f})\n\n"
# Show context chunks once per technique (not per model)
context_chunks = None
context_urls = None
for model_name, results in model_results.items():
if results.get('context_chunks'):
context_chunks = results['context_chunks']
context_urls = results.get('context_urls', [])
break
if context_chunks:
content += "#### Context Chunks Used\n\n"
for i, chunk in enumerate(context_chunks, 1):
url = context_urls[i-1] if context_urls and i-1 < len(context_urls) else ""
if url:
content += f"*Chunk {i}* ([Source]({url})):\n"
else:
content += f"*Chunk {i}*:\n"
content += f"\n{chunk}\n\n\n"
# Add detailed RAG results for each model
content += "#### Detailed RAG Results\n\n"
for model_name, results in model_results.items():
answers = results.get('answers', [])
avg_faith = sum(results['Faithfulness']) / len(results['Faithfulness']) if results['Faithfulness'] else 0
avg_rel = sum(results['Relevancy']) / len(results['Relevancy']) if results['Relevancy'] else 0
content += f"*{model_name}* (Avg Faithfulness: {avg_faith:.1f}%, Avg Relevancy: {avg_rel:.3f})\n\n"
# Show answers from each query
for q_idx, answer in enumerate(answers):
content += f"📝 *Answer for Query {q_idx + 1}:*\n\n"
content += f"\n{answer}\n\n\n"
content += "---\n\n"
# Add comparative analysis
content += """## Comparative Analysis
### Overall Performance Ranking (Across All Queries)
| Rank | Technique | Avg Faithfulness | Avg Relevancy | Avg Combined |
|------|-----------|------------------|---------------|--------------|
"""
# Calculate averages for each technique across all queries
technique_averages = {}
for technique_name, model_results in aggregated_results.items():
if model_results:
all_faith = []
all_rel = []
for model_name, results in model_results.items():
all_faith.extend(results['Faithfulness'])
all_rel.extend(results['Relevancy'])
avg_faith = sum(all_faith) / len(all_faith) if all_faith else 0
avg_rel = sum(all_rel) / len(all_rel) if all_rel else 0
avg_combined = avg_faith + avg_rel
technique_averages[technique_name] = {
'faith': avg_faith,
'rel': avg_rel,
'combined': avg_combined
}
# Sort by combined score
sorted_techniques = sorted(
technique_averages.items(),
key=lambda x: x[1]['combined'],
reverse=True
)
for rank, (technique_name, averages) in enumerate(sorted_techniques, 1):
content += f"| {rank} | {technique_name} | {averages['faith']:.1f}% | {averages['rel']:.3f} | {averages['combined']:.3f} |\n"
content += """
### Key Findings
"""
if sorted_techniques:
best_technique = sorted_techniques[0][0]
worst_technique = sorted_techniques[-1][0]
content += f"""
1. *Best Performing Technique:* {best_technique}
- Achieved highest combined score across all models and queries
- Recommended for production RAG applications
2. *Worst Performing Technique:* {worst_technique}
- Lower combined scores across models and queries
- May need optimization or different configuration
3. *Model Consistency:*
- Analyzed which models perform consistently across techniques
- Identified technique-specific model preferences
"""
content += """## Recommendations
Based on the ablation study results:
1. *Primary Recommendation:* Use the best-performing chunking technique for your specific use case
2. *Hybrid Approach:* Consider combining techniques for different types of queries
3. *Model Selection:* Choose models that perform well across multiple techniques
4. *Parameter Tuning:* Optimize chunk sizes and overlaps based on document characteristics
## Technical Details
- *Embedding Model:* Jina embeddings (512 dimensions)
- *Vector Database:* Pinecone (serverless, AWS us-east-1)
- *Judge Model:* Openrouter Free models
- *Retrieval:* Top 5 chunks per technique
- *Evaluation Metrics:* Faithfulness (context grounding), Relevancy (query addressing)
---
This report was automatically generated by the RAG Ablation Study Pipeline.
"""
# Write to file
with open(output_file, 'w', encoding='utf-8') as f:
f.write(content)
print(f"\nFindings document saved to: {output_file}")
return output_file
# Global variables for worker processes
_worker_proc = None
_worker_evaluator = None
_worker_models = None
_worker_rag_engine = None
_worker_reranker = None
def init_worker(model_name, evaluator_config):
"""Initialize models once per worker process."""
global _worker_proc, _worker_evaluator, _worker_models, _worker_rag_engine, _worker_reranker
from retriever.processor import ChunkProcessor
from retriever.evaluator import RAGEvaluator
from retriever.generator import RAGGenerator
from sentence_transformers import CrossEncoder
from models.llama_3_8b import Llama3_8B
from models.mistral_7b import Mistral_7b
from models.qwen_2_5 import Qwen2_5
from models.deepseek_v3 import DeepSeek_V3
from models.tiny_aya import TinyAya
MODEL_MAP = {
"Llama-3-8B": Llama3_8B,
"Mistral-7B": Mistral_7b,
"Qwen-2.5": Qwen2_5,
"DeepSeek-V3": DeepSeek_V3,
"TinyAya": TinyAya
}
# Load embedding model once
_worker_proc = ChunkProcessor(model_name=model_name, verbose=False)
# Initialize evaluator
_worker_evaluator = RAGEvaluator(
judge_model=evaluator_config['judge_model'],
embedding_model=_worker_proc.encoder,
api_key=evaluator_config['api_key']
)
# Initialize models
hf_token = os.getenv("HF_TOKEN")
_worker_models = {name: MODEL_MAP[name](token=hf_token) for name in evaluator_config['model_list']}
# Initialize RAG engine
_worker_rag_engine = RAGGenerator()
# Load reranker once per worker
_worker_reranker = CrossEncoder('jinaai/jina-reranker-v1-tiny-en')
def run_rag_for_technique_wrapper(args):
"""Wrapper function for parallel execution."""
global _worker_proc, _worker_evaluator, _worker_models, _worker_rag_engine
technique, query, index_name, pinecone_key = args
try:
# Create new connection in worker process
from data.vector_db import get_index_by_name
index = get_index_by_name(pinecone_key, index_name)
return technique['name'], run_rag_for_technique(
technique_name=technique['name'],
query=query,
index=index,
encoder=_worker_proc.encoder,
models=_worker_models,
evaluator=_worker_evaluator,
rag_engine=_worker_rag_engine
)
except Exception as e:
import traceback
print(f"\n✗ Error processing technique {technique['name']}: {e}")
print(f"Full traceback:")
traceback.print_exc()
return technique['name'], {}
def main():
"""Main function to run RAG ablation study across all 6 chunking techniques."""
hf_token = os.getenv("HF_TOKEN")
pinecone_key = os.getenv("PINECONE_API_KEY")
openrouter_key = os.getenv("OPENROUTER_API_KEY")
# Verify environment variables
if not hf_token:
raise RuntimeError("HF_TOKEN not found in environment variables")
if not pinecone_key:
raise RuntimeError("PINECONE_API_KEY not found in environment variables")
if not openrouter_key:
raise RuntimeError("OPENROUTER_API_KEY not found in environment variables")
# Test queries
test_queries = [
"What is cognitive behavior therapy and how does it work?",
"What are the common cognitive distortions in CBT?",
"How does CBT help with anxiety and depression?"
]
print("=" * 80)
print("RAG ABLATION STUDY - 6 CHUNKING TECHNIQUES")
print("=" * 80)
print(f"\nTest Queries:")
for i, q in enumerate(test_queries, 1):
print(f" {i}. {q}")
# Step 1: Check if data already exists, skip ingestion if so
print("\n" + "=" * 80)
print("STEP 1: CHECKING/INGESTING DATA WITH ALL 6 TECHNIQUES")
print("=" * 80)
# Check if index already has data
from data.vector_db import get_index_by_name
index_name = f"{cfg.db['base_index_name']}-{cfg.processing['technique']}"
print(f"\nChecking for existing index: {index_name}")
try:
# Try to connect to existing index
print("Connecting to Pinecone...")
existing_index = get_index_by_name(pinecone_key, index_name)
print("Getting index stats...")
stats = existing_index.describe_index_stats()
existing_count = stats.get('total_vector_count', 0)
if existing_count > 0:
print(f"\n✓ Found existing index with {existing_count} vectors")
print("Skipping ingestion - using existing data")
# Initialize processor (this loads the embedding model)
print("Loading embedding model for retrieval...")
from retriever.processor import ChunkProcessor
proc = ChunkProcessor(model_name=cfg.processing['embedding_model'], verbose=False)
index = existing_index
all_chunks = [] # Empty since we're using existing data
final_chunks = []
print("✓ Processor initialized")
else:
print("\nIndex exists but is empty. Running full ingestion...")
all_chunks, final_chunks, proc, index = ingest_data()
except Exception as e:
print(f"\nIndex check failed: {e}")
print("Running full ingestion...")
all_chunks, final_chunks, proc, index = ingest_data()
print(f"\nTechniques to evaluate: {[tech['name'] for tech in CHUNKING_TECHNIQUES]}")
# Step 2: Initialize components
print("\n" + "=" * 80)
print("STEP 2: INITIALIZING COMPONENTS")
print("=" * 80)
# Initialize models
print("\nInitializing models...")
rag_engine = RAGGenerator()
models = {name: MODEL_MAP[name](token=hf_token) for name in cfg.model_list}
# Initialize evaluator
print("Initializing evaluator...")
if not openrouter_key:
raise RuntimeError("OPENROUTER_API_KEY not found in environment variables")
evaluator = RAGEvaluator(
judge_model=cfg.gen['judge_model'],
embedding_model=proc.encoder,
api_key=openrouter_key
)
# Step 3: Run RAG for all techniques in parallel for all queries
print("\n" + "=" * 80)
print("STEP 3: RUNNING RAG FOR ALL 6 TECHNIQUES (IN PARALLEL)")
print("=" * 80)
# Prepare arguments for parallel execution
num_processes = min(cpu_count(), len(CHUNKING_TECHNIQUES))
print(f"\nUsing {num_processes} parallel processes for {len(CHUNKING_TECHNIQUES)} techniques")
# Run techniques in parallel for all queries
evaluator_config = {
'judge_model': cfg.gen['judge_model'],
'api_key': openrouter_key,
'model_list': cfg.model_list
}
all_query_results = {}
for query_idx, query in enumerate(test_queries):
print(f"\n{'='*80}")
print(f"PROCESSING QUERY {query_idx + 1}/{len(test_queries)}")
print(f"Query: {query}")
print(f"{'='*80}")
with Pool(
processes=num_processes,
initializer=init_worker,
initargs=(cfg.processing['embedding_model'], evaluator_config)
) as pool:
args_list = [
(technique, query, index_name, pinecone_key)
for technique in CHUNKING_TECHNIQUES
]
results_list = pool.map(run_rag_for_technique_wrapper, args_list)
# Convert results to dictionary and store
query_results = {name: results for name, results in results_list}
all_query_results[query_idx] = query_results
# Print quick summary for this query
print(f"\n{'='*80}")
print(f"QUERY {query_idx + 1} SUMMARY")
print(f"{'='*80}")
print(f"\n{'Technique':<15} {'Avg Faith':>12} {'Avg Rel':>12} {'Best Model':<20}")
print("-" * 60)
for technique_name, model_results in query_results.items():
if model_results:
avg_faith = sum(r.get('Faithfulness', 0) for r in model_results.values()) / len(model_results)
avg_rel = sum(r.get('Relevancy', 0) for r in model_results.values()) / len(model_results)
# Find best model
best_model = max(
model_results.items(),
key=lambda x: x[1].get('Faithfulness', 0) + x[1].get('Relevancy', 0)
)
best_name = best_model[0]
print(f"{technique_name:<15} {avg_faith:>11.1f}% {avg_rel:>12.3f} {best_name:<20}")
else:
print(f"{technique_name:<15} {'N/A':>12} {'N/A':>12} {'N/A':<20}")
print("-" * 60)
# Step 4: Generate findings document from all queries
print("\n" + "=" * 80)
print("STEP 4: GENERATING FINDINGS DOCUMENT")
print("=" * 80)
findings_file = generate_findings_document(all_query_results, test_queries)
# Step 5: Final summary
print("\n" + "=" * 80)
print("ABLATION STUDY COMPLETE - SUMMARY")
print("=" * 80)
print(f"\nQueries processed: {len(test_queries)}")
print(f"Techniques evaluated: {len(CHUNKING_TECHNIQUES)}")
print(f"Models tested: {len(cfg.model_list)}")
print(f"\nFindings document: {findings_file}")
# Print final summary across all queries
print("\n" + "-" * 60)
print(f"{'Technique':<15} {'Avg Faith':>12} {'Avg Rel':>12} {'Best Model':<20}")
print("-" * 60)
# Calculate averages across all queries
for tech_config in CHUNKING_TECHNIQUES:
tech_name = tech_config['name']
all_faith = []
all_rel = []
best_model_name = None
best_combined = 0
for query_idx, query_results in all_query_results.items():
if tech_name in query_results and query_results[tech_name]:
model_results = query_results[tech_name]
for model_name, results in model_results.items():
faith = results.get('Faithfulness', 0)
rel = results.get('Relevancy', 0)
combined = faith + rel
all_faith.append(faith)
all_rel.append(rel)
if combined > best_combined:
best_combined = combined
best_model_name = model_name
if all_faith:
avg_faith = sum(all_faith) / len(all_faith)
avg_rel = sum(all_rel) / len(all_rel)
print(f"{tech_name:<15} {avg_faith:>11.1f}% {avg_rel:>12.3f} {best_model_name or 'N/A':<20}")
else:
print(f"{tech_name:<15} {'N/A':>12} {'N/A':>12} {'N/A':<20}")
print("-" * 60)
print("\n✓ Ablation study complete!")
print(f"✓ Results saved to: {findings_file}")
print("\nYou can now analyze the findings document to compare chunking techniques.")
return all_query_results
if __name__ == "__main__":
main()