| import os |
| import json |
| import time |
| from datetime import datetime |
| from multiprocessing import Pool, cpu_count |
| from functools import partial |
| from dotenv import load_dotenv |
| from config_loader import cfg |
|
|
| from data.vector_db import get_pinecone_index, refresh_pinecone_index |
| from retriever.retriever import HybridRetriever |
| from retriever.generator import RAGGenerator |
| from retriever.processor import ChunkProcessor |
| from retriever.evaluator import RAGEvaluator |
| from data.data_loader import load_cbt_book, get_book_stats |
| from data.ingest import ingest_data, CHUNKING_TECHNIQUES |
|
|
| |
| from models.llama_3_8b import Llama3_8B |
| from models.mistral_7b import Mistral_7b |
| from models.qwen_2_5 import Qwen2_5 |
| from models.deepseek_v3 import DeepSeek_V3 |
| from models.tiny_aya import TinyAya |
|
|
| MODEL_MAP = { |
| "Llama-3-8B": Llama3_8B, |
| "Mistral-7B": Mistral_7b, |
| "Qwen-2.5": Qwen2_5, |
| "DeepSeek-V3": DeepSeek_V3, |
| "TinyAya": TinyAya |
| } |
|
|
| load_dotenv() |
|
|
|
|
| def run_rag_for_technique(technique_name, query, index, encoder, models, evaluator, rag_engine): |
| """Run RAG pipeline for a specific chunking technique.""" |
| |
| print(f"\n{'='*80}") |
| print(f"TECHNIQUE: {technique_name.upper()}") |
| print(f"{'='*80}") |
| |
| |
| query_vector = encoder.encode(query).tolist() |
| |
| |
| res = index.query( |
| vector=query_vector, |
| top_k=25, |
| include_metadata=True, |
| filter={"technique": {"$eq": technique_name}} |
| ) |
| |
| |
| all_candidates = [] |
| chunk_urls = [] |
| for match in res['matches']: |
| all_candidates.append(match['metadata']['text']) |
| chunk_urls.append(match['metadata'].get('url', '')) |
| |
| print(f"\nRetrieved {len(all_candidates)} candidate chunks for technique '{technique_name}'") |
| |
| if not all_candidates: |
| print(f"WARNING: No chunks found for technique '{technique_name}'") |
| return {} |
| |
| |
| |
| global _worker_reranker |
| pairs = [[query, chunk] for chunk in all_candidates] |
| scores = _worker_reranker.predict(pairs) |
| ranked = sorted(zip(all_candidates, chunk_urls, scores), key=lambda x: x[2], reverse=True) |
| context_chunks = [chunk for chunk, _, _ in ranked[:5]] |
| context_urls = [url for _, url, _ in ranked[:5]] |
| |
| print(f"After reranking: {len(context_chunks)} chunks (top 5)") |
| |
| |
| print(f"\n{'='*80}") |
| print(f"📚 FINAL RAG CONTEXT FOR TECHNIQUE '{technique_name.upper()}'") |
| print(f"{'='*80}") |
| for i, chunk in enumerate(context_chunks, 1): |
| print(f"\n[Chunk {i}] ({len(chunk)} chars):") |
| print(f"{'─'*60}") |
| print(chunk) |
| print(f"{'─'*60}") |
| print(f"\n{'='*80}") |
| |
| |
| tournament_results = {} |
| |
| for name, model_inst in models.items(): |
| print(f"\n{'-'*60}") |
| print(f"Model: {name}") |
| print(f"{'-'*60}") |
| try: |
| |
| answer = rag_engine.get_answer( |
| model_inst, query, context_chunks, |
| context_urls=context_urls, |
| temperature=cfg.gen['temperature'] |
| ) |
|
|
| print(f"\n{'─'*60}") |
| print(f"📝 FULL ANSWER from {name}:") |
| print(f"{'─'*60}") |
| print(answer) |
| print(f"{'─'*60}") |
|
|
| |
| faith = evaluator.evaluate_faithfulness(answer, context_chunks, strict=False) |
| |
| rel = evaluator.evaluate_relevancy(query, answer) |
|
|
| tournament_results[name] = { |
| "answer": answer, |
| "Faithfulness": faith['score'], |
| "Relevancy": rel['score'], |
| "Claims": faith['details'], |
| "context_chunks": context_chunks, |
| "context_urls": context_urls |
| } |
|
|
| print(f"\n📊 EVALUATION SCORES:") |
| print(f" Faithfulness: {faith['score']:.1f}%") |
| print(f" Relevancy: {rel['score']:.3f}") |
| print(f" Combined: {faith['score'] + rel['score']:.3f}") |
|
|
| except Exception as e: |
| print(f" Error evaluating {name}: {e}") |
| tournament_results[name] = { |
| "answer": "", |
| "Faithfulness": 0, |
| "Relevancy": 0, |
| "Claims": [], |
| "error": str(e), |
| "context_chunks": context_chunks, |
| "context_urls": context_urls |
| } |
| |
| return tournament_results |
|
|
|
|
| def generate_findings_document(all_query_results, queries, output_file="rag_ablation_findings.md"): |
| """Generate detailed markdown document with findings from all techniques across all queries. |
| |
| Args: |
| all_query_results: Dict mapping query index to results dict |
| queries: List of all test queries |
| output_file: Path to output file |
| """ |
| |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| |
| content = f"""# RAG Ablation Study Findings |
| |
| *Generated:* {timestamp} |
| |
| ## Overview |
| |
| This document presents findings from a comparative analysis of 6 different chunking techniques |
| applied to a Cognitive Behavioral Therapy (CBT) book. Each technique was evaluated using |
| multiple LLM models with RAG (Retrieval-Augmented Generation) pipeline. |
| |
| ## Test Queries |
| |
| """ |
| |
| for i, query in enumerate(queries, 1): |
| content += f"{i}. {query}\n" |
| |
| content += """ |
| ## Chunking Techniques Evaluated |
| |
| 1. *Fixed* - Fixed-size chunking (1000 chars, 100 overlap) |
| 2. *Sentence* - Sentence-level chunking (NLTK) |
| 3. *Paragraph* - Paragraph-level chunking (\\n\\n boundaries) |
| 4. *Semantic* - Semantic chunking (embedding similarity) |
| 5. *Recursive* - Recursive chunking (hierarchical separators) |
| 6. *Page* - Page-level chunking (--- Page markers) |
| |
| ## Results by Technique (Aggregated Across All Queries) |
| |
| """ |
| |
| |
| aggregated_results = {} |
| |
| for query_idx, query_results in all_query_results.items(): |
| for technique_name, model_results in query_results.items(): |
| if technique_name not in aggregated_results: |
| aggregated_results[technique_name] = {} |
| |
| for model_name, results in model_results.items(): |
| if model_name not in aggregated_results[technique_name]: |
| aggregated_results[technique_name][model_name] = { |
| 'Faithfulness': [], |
| 'Relevancy': [], |
| 'answers': [], |
| 'context_chunks': results.get('context_chunks', []), |
| 'context_urls': results.get('context_urls', []) |
| } |
| |
| aggregated_results[technique_name][model_name]['Faithfulness'].append(results.get('Faithfulness', 0)) |
| aggregated_results[technique_name][model_name]['Relevancy'].append(results.get('Relevancy', 0)) |
| aggregated_results[technique_name][model_name]['answers'].append(results.get('answer', '')) |
| |
| |
| for technique_name, model_results in aggregated_results.items(): |
| content += f"### {technique_name.upper()} Chunking\n\n" |
| |
| if not model_results: |
| content += "No results available for this technique.\n\n" |
| continue |
| |
| |
| content += "| Model | Avg Faithfulness | Avg Relevancy | Avg Combined |\n" |
| content += "|-------|------------------|---------------|--------------|\n" |
| |
| for model_name, results in model_results.items(): |
| avg_faith = sum(results['Faithfulness']) / len(results['Faithfulness']) if results['Faithfulness'] else 0 |
| avg_rel = sum(results['Relevancy']) / len(results['Relevancy']) if results['Relevancy'] else 0 |
| avg_combined = avg_faith + avg_rel |
| content += f"| {model_name} | {avg_faith:.1f}% | {avg_rel:.3f} | {avg_combined:.3f} |\n" |
| |
| |
| if model_results: |
| best_model = max( |
| model_results.items(), |
| key=lambda x: (sum(x[1]['Faithfulness']) / len(x[1]['Faithfulness']) if x[1]['Faithfulness'] else 0) + |
| (sum(x[1]['Relevancy']) / len(x[1]['Relevancy']) if x[1]['Relevancy'] else 0) |
| ) |
| best_name = best_model[0] |
| best_faith = sum(best_model[1]['Faithfulness']) / len(best_model[1]['Faithfulness']) if best_model[1]['Faithfulness'] else 0 |
| best_rel = sum(best_model[1]['Relevancy']) / len(best_model[1]['Relevancy']) if best_model[1]['Relevancy'] else 0 |
| |
| content += f"\n*Best Model:* {best_name} (Avg Faithfulness: {best_faith:.1f}%, Avg Relevancy: {best_rel:.3f})\n\n" |
| |
| |
| context_chunks = None |
| context_urls = None |
| for model_name, results in model_results.items(): |
| if results.get('context_chunks'): |
| context_chunks = results['context_chunks'] |
| context_urls = results.get('context_urls', []) |
| break |
| |
| if context_chunks: |
| content += "#### Context Chunks Used\n\n" |
| for i, chunk in enumerate(context_chunks, 1): |
| url = context_urls[i-1] if context_urls and i-1 < len(context_urls) else "" |
| if url: |
| content += f"*Chunk {i}* ([Source]({url})):\n" |
| else: |
| content += f"*Chunk {i}*:\n" |
| content += f"\n{chunk}\n\n\n" |
| |
| |
| content += "#### Detailed RAG Results\n\n" |
| |
| for model_name, results in model_results.items(): |
| answers = results.get('answers', []) |
| avg_faith = sum(results['Faithfulness']) / len(results['Faithfulness']) if results['Faithfulness'] else 0 |
| avg_rel = sum(results['Relevancy']) / len(results['Relevancy']) if results['Relevancy'] else 0 |
| |
| content += f"*{model_name}* (Avg Faithfulness: {avg_faith:.1f}%, Avg Relevancy: {avg_rel:.3f})\n\n" |
| |
| |
| for q_idx, answer in enumerate(answers): |
| content += f"📝 *Answer for Query {q_idx + 1}:*\n\n" |
| content += f"\n{answer}\n\n\n" |
| |
| content += "---\n\n" |
| |
| |
| content += """## Comparative Analysis |
| |
| ### Overall Performance Ranking (Across All Queries) |
| |
| | Rank | Technique | Avg Faithfulness | Avg Relevancy | Avg Combined | |
| |------|-----------|------------------|---------------|--------------| |
| """ |
| |
| |
| technique_averages = {} |
| for technique_name, model_results in aggregated_results.items(): |
| if model_results: |
| all_faith = [] |
| all_rel = [] |
| for model_name, results in model_results.items(): |
| all_faith.extend(results['Faithfulness']) |
| all_rel.extend(results['Relevancy']) |
| |
| avg_faith = sum(all_faith) / len(all_faith) if all_faith else 0 |
| avg_rel = sum(all_rel) / len(all_rel) if all_rel else 0 |
| avg_combined = avg_faith + avg_rel |
| technique_averages[technique_name] = { |
| 'faith': avg_faith, |
| 'rel': avg_rel, |
| 'combined': avg_combined |
| } |
| |
| |
| sorted_techniques = sorted( |
| technique_averages.items(), |
| key=lambda x: x[1]['combined'], |
| reverse=True |
| ) |
| |
| for rank, (technique_name, averages) in enumerate(sorted_techniques, 1): |
| content += f"| {rank} | {technique_name} | {averages['faith']:.1f}% | {averages['rel']:.3f} | {averages['combined']:.3f} |\n" |
| |
| content += """ |
| ### Key Findings |
| |
| """ |
| |
| if sorted_techniques: |
| best_technique = sorted_techniques[0][0] |
| worst_technique = sorted_techniques[-1][0] |
| |
| content += f""" |
| 1. *Best Performing Technique:* {best_technique} |
| - Achieved highest combined score across all models and queries |
| - Recommended for production RAG applications |
| |
| 2. *Worst Performing Technique:* {worst_technique} |
| - Lower combined scores across models and queries |
| - May need optimization or different configuration |
| |
| 3. *Model Consistency:* |
| - Analyzed which models perform consistently across techniques |
| - Identified technique-specific model preferences |
| |
| """ |
| |
| content += """## Recommendations |
| |
| Based on the ablation study results: |
| |
| 1. *Primary Recommendation:* Use the best-performing chunking technique for your specific use case |
| 2. *Hybrid Approach:* Consider combining techniques for different types of queries |
| 3. *Model Selection:* Choose models that perform well across multiple techniques |
| 4. *Parameter Tuning:* Optimize chunk sizes and overlaps based on document characteristics |
| |
| ## Technical Details |
| |
| - *Embedding Model:* Jina embeddings (512 dimensions) |
| - *Vector Database:* Pinecone (serverless, AWS us-east-1) |
| - *Judge Model:* Openrouter Free models |
| - *Retrieval:* Top 5 chunks per technique |
| - *Evaluation Metrics:* Faithfulness (context grounding), Relevancy (query addressing) |
| |
| --- |
| |
| This report was automatically generated by the RAG Ablation Study Pipeline. |
| """ |
| |
| |
| with open(output_file, 'w', encoding='utf-8') as f: |
| f.write(content) |
| |
| print(f"\nFindings document saved to: {output_file}") |
| return output_file |
|
|
|
|
| |
| _worker_proc = None |
| _worker_evaluator = None |
| _worker_models = None |
| _worker_rag_engine = None |
| _worker_reranker = None |
|
|
| def init_worker(model_name, evaluator_config): |
| """Initialize models once per worker process.""" |
| global _worker_proc, _worker_evaluator, _worker_models, _worker_rag_engine, _worker_reranker |
| |
| from retriever.processor import ChunkProcessor |
| from retriever.evaluator import RAGEvaluator |
| from retriever.generator import RAGGenerator |
| from sentence_transformers import CrossEncoder |
| from models.llama_3_8b import Llama3_8B |
| from models.mistral_7b import Mistral_7b |
| from models.qwen_2_5 import Qwen2_5 |
| from models.deepseek_v3 import DeepSeek_V3 |
| from models.tiny_aya import TinyAya |
| |
| MODEL_MAP = { |
| "Llama-3-8B": Llama3_8B, |
| "Mistral-7B": Mistral_7b, |
| "Qwen-2.5": Qwen2_5, |
| "DeepSeek-V3": DeepSeek_V3, |
| "TinyAya": TinyAya |
| } |
| |
| |
| _worker_proc = ChunkProcessor(model_name=model_name, verbose=False) |
| |
| |
| _worker_evaluator = RAGEvaluator( |
| judge_model=evaluator_config['judge_model'], |
| embedding_model=_worker_proc.encoder, |
| api_key=evaluator_config['api_key'] |
| ) |
| |
| |
| hf_token = os.getenv("HF_TOKEN") |
| _worker_models = {name: MODEL_MAP[name](token=hf_token) for name in evaluator_config['model_list']} |
| |
| |
| _worker_rag_engine = RAGGenerator() |
| |
| |
| _worker_reranker = CrossEncoder('jinaai/jina-reranker-v1-tiny-en') |
|
|
|
|
| def run_rag_for_technique_wrapper(args): |
| """Wrapper function for parallel execution.""" |
| global _worker_proc, _worker_evaluator, _worker_models, _worker_rag_engine |
| |
| technique, query, index_name, pinecone_key = args |
| try: |
| |
| from data.vector_db import get_index_by_name |
| index = get_index_by_name(pinecone_key, index_name) |
| |
| return technique['name'], run_rag_for_technique( |
| technique_name=technique['name'], |
| query=query, |
| index=index, |
| encoder=_worker_proc.encoder, |
| models=_worker_models, |
| evaluator=_worker_evaluator, |
| rag_engine=_worker_rag_engine |
| ) |
| except Exception as e: |
| import traceback |
| print(f"\n✗ Error processing technique {technique['name']}: {e}") |
| print(f"Full traceback:") |
| traceback.print_exc() |
| return technique['name'], {} |
|
|
|
|
| def main(): |
| """Main function to run RAG ablation study across all 6 chunking techniques.""" |
| hf_token = os.getenv("HF_TOKEN") |
| pinecone_key = os.getenv("PINECONE_API_KEY") |
| openrouter_key = os.getenv("OPENROUTER_API_KEY") |
|
|
| |
| if not hf_token: |
| raise RuntimeError("HF_TOKEN not found in environment variables") |
| if not pinecone_key: |
| raise RuntimeError("PINECONE_API_KEY not found in environment variables") |
| if not openrouter_key: |
| raise RuntimeError("OPENROUTER_API_KEY not found in environment variables") |
|
|
| |
| test_queries = [ |
| "What is cognitive behavior therapy and how does it work?", |
| "What are the common cognitive distortions in CBT?", |
| "How does CBT help with anxiety and depression?" |
| ] |
|
|
| print("=" * 80) |
| print("RAG ABLATION STUDY - 6 CHUNKING TECHNIQUES") |
| print("=" * 80) |
| print(f"\nTest Queries:") |
| for i, q in enumerate(test_queries, 1): |
| print(f" {i}. {q}") |
|
|
| |
| print("\n" + "=" * 80) |
| print("STEP 1: CHECKING/INGESTING DATA WITH ALL 6 TECHNIQUES") |
| print("=" * 80) |
|
|
| |
| from data.vector_db import get_index_by_name |
| index_name = f"{cfg.db['base_index_name']}-{cfg.processing['technique']}" |
| |
| print(f"\nChecking for existing index: {index_name}") |
| |
| try: |
| |
| print("Connecting to Pinecone...") |
| existing_index = get_index_by_name(pinecone_key, index_name) |
| print("Getting index stats...") |
| stats = existing_index.describe_index_stats() |
| existing_count = stats.get('total_vector_count', 0) |
| |
| if existing_count > 0: |
| print(f"\n✓ Found existing index with {existing_count} vectors") |
| print("Skipping ingestion - using existing data") |
| |
| |
| print("Loading embedding model for retrieval...") |
| from retriever.processor import ChunkProcessor |
| proc = ChunkProcessor(model_name=cfg.processing['embedding_model'], verbose=False) |
| index = existing_index |
| all_chunks = [] |
| final_chunks = [] |
| print("✓ Processor initialized") |
| else: |
| print("\nIndex exists but is empty. Running full ingestion...") |
| all_chunks, final_chunks, proc, index = ingest_data() |
| except Exception as e: |
| print(f"\nIndex check failed: {e}") |
| print("Running full ingestion...") |
| all_chunks, final_chunks, proc, index = ingest_data() |
|
|
| print(f"\nTechniques to evaluate: {[tech['name'] for tech in CHUNKING_TECHNIQUES]}") |
|
|
| |
| print("\n" + "=" * 80) |
| print("STEP 2: INITIALIZING COMPONENTS") |
| print("=" * 80) |
|
|
| |
| print("\nInitializing models...") |
| rag_engine = RAGGenerator() |
| models = {name: MODEL_MAP[name](token=hf_token) for name in cfg.model_list} |
|
|
| |
| print("Initializing evaluator...") |
| if not openrouter_key: |
| raise RuntimeError("OPENROUTER_API_KEY not found in environment variables") |
| |
| evaluator = RAGEvaluator( |
| judge_model=cfg.gen['judge_model'], |
| embedding_model=proc.encoder, |
| api_key=openrouter_key |
| ) |
|
|
| |
| print("\n" + "=" * 80) |
| print("STEP 3: RUNNING RAG FOR ALL 6 TECHNIQUES (IN PARALLEL)") |
| print("=" * 80) |
|
|
| |
| num_processes = min(cpu_count(), len(CHUNKING_TECHNIQUES)) |
| print(f"\nUsing {num_processes} parallel processes for {len(CHUNKING_TECHNIQUES)} techniques") |
|
|
| |
| evaluator_config = { |
| 'judge_model': cfg.gen['judge_model'], |
| 'api_key': openrouter_key, |
| 'model_list': cfg.model_list |
| } |
| |
| all_query_results = {} |
| |
| for query_idx, query in enumerate(test_queries): |
| print(f"\n{'='*80}") |
| print(f"PROCESSING QUERY {query_idx + 1}/{len(test_queries)}") |
| print(f"Query: {query}") |
| print(f"{'='*80}") |
| |
| with Pool( |
| processes=num_processes, |
| initializer=init_worker, |
| initargs=(cfg.processing['embedding_model'], evaluator_config) |
| ) as pool: |
| args_list = [ |
| (technique, query, index_name, pinecone_key) |
| for technique in CHUNKING_TECHNIQUES |
| ] |
| results_list = pool.map(run_rag_for_technique_wrapper, args_list) |
|
|
| |
| query_results = {name: results for name, results in results_list} |
| all_query_results[query_idx] = query_results |
| |
| |
| print(f"\n{'='*80}") |
| print(f"QUERY {query_idx + 1} SUMMARY") |
| print(f"{'='*80}") |
| print(f"\n{'Technique':<15} {'Avg Faith':>12} {'Avg Rel':>12} {'Best Model':<20}") |
| print("-" * 60) |
|
|
| for technique_name, model_results in query_results.items(): |
| if model_results: |
| avg_faith = sum(r.get('Faithfulness', 0) for r in model_results.values()) / len(model_results) |
| avg_rel = sum(r.get('Relevancy', 0) for r in model_results.values()) / len(model_results) |
| |
| |
| best_model = max( |
| model_results.items(), |
| key=lambda x: x[1].get('Faithfulness', 0) + x[1].get('Relevancy', 0) |
| ) |
| best_name = best_model[0] |
| |
| print(f"{technique_name:<15} {avg_faith:>11.1f}% {avg_rel:>12.3f} {best_name:<20}") |
| else: |
| print(f"{technique_name:<15} {'N/A':>12} {'N/A':>12} {'N/A':<20}") |
|
|
| print("-" * 60) |
|
|
| |
| print("\n" + "=" * 80) |
| print("STEP 4: GENERATING FINDINGS DOCUMENT") |
| print("=" * 80) |
|
|
| findings_file = generate_findings_document(all_query_results, test_queries) |
|
|
| |
| print("\n" + "=" * 80) |
| print("ABLATION STUDY COMPLETE - SUMMARY") |
| print("=" * 80) |
|
|
| print(f"\nQueries processed: {len(test_queries)}") |
| print(f"Techniques evaluated: {len(CHUNKING_TECHNIQUES)}") |
| print(f"Models tested: {len(cfg.model_list)}") |
| print(f"\nFindings document: {findings_file}") |
|
|
| |
| print("\n" + "-" * 60) |
| print(f"{'Technique':<15} {'Avg Faith':>12} {'Avg Rel':>12} {'Best Model':<20}") |
| print("-" * 60) |
|
|
| |
| for tech_config in CHUNKING_TECHNIQUES: |
| tech_name = tech_config['name'] |
| all_faith = [] |
| all_rel = [] |
| best_model_name = None |
| best_combined = 0 |
| |
| for query_idx, query_results in all_query_results.items(): |
| if tech_name in query_results and query_results[tech_name]: |
| model_results = query_results[tech_name] |
| for model_name, results in model_results.items(): |
| faith = results.get('Faithfulness', 0) |
| rel = results.get('Relevancy', 0) |
| combined = faith + rel |
| all_faith.append(faith) |
| all_rel.append(rel) |
| |
| if combined > best_combined: |
| best_combined = combined |
| best_model_name = model_name |
| |
| if all_faith: |
| avg_faith = sum(all_faith) / len(all_faith) |
| avg_rel = sum(all_rel) / len(all_rel) |
| print(f"{tech_name:<15} {avg_faith:>11.1f}% {avg_rel:>12.3f} {best_model_name or 'N/A':<20}") |
| else: |
| print(f"{tech_name:<15} {'N/A':>12} {'N/A':>12} {'N/A':<20}") |
|
|
| print("-" * 60) |
|
|
| print("\n✓ Ablation study complete!") |
| print(f"✓ Results saved to: {findings_file}") |
| print("\nYou can now analyze the findings document to compare chunking techniques.") |
|
|
| return all_query_results |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|