import os import json import time from datetime import datetime from multiprocessing import Pool, cpu_count from functools import partial from dotenv import load_dotenv from config_loader import cfg from data.vector_db import get_pinecone_index, refresh_pinecone_index from retriever.retriever import HybridRetriever from retriever.generator import RAGGenerator from retriever.processor import ChunkProcessor from retriever.evaluator import RAGEvaluator from data.data_loader import load_cbt_book, get_book_stats from data.ingest import ingest_data, CHUNKING_TECHNIQUES # Import model fleet from models.llama_3_8b import Llama3_8B from models.mistral_7b import Mistral_7b from models.qwen_2_5 import Qwen2_5 from models.deepseek_v3 import DeepSeek_V3 from models.tiny_aya import TinyAya MODEL_MAP = { "Llama-3-8B": Llama3_8B, "Mistral-7B": Mistral_7b, "Qwen-2.5": Qwen2_5, "DeepSeek-V3": DeepSeek_V3, "TinyAya": TinyAya } load_dotenv() def run_rag_for_technique(technique_name, query, index, encoder, models, evaluator, rag_engine): """Run RAG pipeline for a specific chunking technique.""" print(f"\n{'='*80}") print(f"TECHNIQUE: {technique_name.upper()}") print(f"{'='*80}") # Filter chunks by technique metadata query_vector = encoder.encode(query).tolist() # Query with metadata filter for this technique - get more candidates for reranking res = index.query( vector=query_vector, top_k=25, include_metadata=True, filter={"technique": {"$eq": technique_name}} ) # Extract context chunks with URLs all_candidates = [] chunk_urls = [] for match in res['matches']: all_candidates.append(match['metadata']['text']) chunk_urls.append(match['metadata'].get('url', '')) print(f"\nRetrieved {len(all_candidates)} candidate chunks for technique '{technique_name}'") if not all_candidates: print(f"WARNING: No chunks found for technique '{technique_name}'") return {} # Apply cross-encoder reranking to get top 5 # Use global reranker loaded once per worker global _worker_reranker pairs = [[query, chunk] for chunk in all_candidates] scores = _worker_reranker.predict(pairs) ranked = sorted(zip(all_candidates, chunk_urls, scores), key=lambda x: x[2], reverse=True) context_chunks = [chunk for chunk, _, _ in ranked[:5]] context_urls = [url for _, url, _ in ranked[:5]] print(f"After reranking: {len(context_chunks)} chunks (top 5)") # Print the final RAG context being passed to models (only once) print(f"\n{'='*80}") print(f"šŸ“š FINAL RAG CONTEXT FOR TECHNIQUE '{technique_name.upper()}'") print(f"{'='*80}") for i, chunk in enumerate(context_chunks, 1): print(f"\n[Chunk {i}] ({len(chunk)} chars):") print(f"{'─'*60}") print(chunk) print(f"{'─'*60}") print(f"\n{'='*80}") # Run model tournament for this technique tournament_results = {} for name, model_inst in models.items(): print(f"\n{'-'*60}") print(f"Model: {name}") print(f"{'-'*60}") try: # Generation answer = rag_engine.get_answer( model_inst, query, context_chunks, context_urls=context_urls, temperature=cfg.gen['temperature'] ) print(f"\n{'─'*60}") print(f"šŸ“ FULL ANSWER from {name}:") print(f"{'─'*60}") print(answer) print(f"{'─'*60}") # Faithfulness Evaluation (strict=False reduces API calls from ~22 to ~3 per eval) faith = evaluator.evaluate_faithfulness(answer, context_chunks, strict=False) # Relevancy Evaluation rel = evaluator.evaluate_relevancy(query, answer) tournament_results[name] = { "answer": answer, "Faithfulness": faith['score'], "Relevancy": rel['score'], "Claims": faith['details'], "context_chunks": context_chunks, "context_urls": context_urls } print(f"\nšŸ“Š EVALUATION SCORES:") print(f" Faithfulness: {faith['score']:.1f}%") print(f" Relevancy: {rel['score']:.3f}") print(f" Combined: {faith['score'] + rel['score']:.3f}") except Exception as e: print(f" Error evaluating {name}: {e}") tournament_results[name] = { "answer": "", "Faithfulness": 0, "Relevancy": 0, "Claims": [], "error": str(e), "context_chunks": context_chunks, "context_urls": context_urls } return tournament_results def generate_findings_document(all_query_results, queries, output_file="rag_ablation_findings.md"): """Generate detailed markdown document with findings from all techniques across all queries. Args: all_query_results: Dict mapping query index to results dict queries: List of all test queries output_file: Path to output file """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") content = f"""# RAG Ablation Study Findings *Generated:* {timestamp} ## Overview This document presents findings from a comparative analysis of 6 different chunking techniques applied to a Cognitive Behavioral Therapy (CBT) book. Each technique was evaluated using multiple LLM models with RAG (Retrieval-Augmented Generation) pipeline. ## Test Queries """ for i, query in enumerate(queries, 1): content += f"{i}. {query}\n" content += """ ## Chunking Techniques Evaluated 1. *Fixed* - Fixed-size chunking (1000 chars, 100 overlap) 2. *Sentence* - Sentence-level chunking (NLTK) 3. *Paragraph* - Paragraph-level chunking (\\n\\n boundaries) 4. *Semantic* - Semantic chunking (embedding similarity) 5. *Recursive* - Recursive chunking (hierarchical separators) 6. *Page* - Page-level chunking (--- Page markers) ## Results by Technique (Aggregated Across All Queries) """ # Aggregate results across all queries aggregated_results = {} for query_idx, query_results in all_query_results.items(): for technique_name, model_results in query_results.items(): if technique_name not in aggregated_results: aggregated_results[technique_name] = {} for model_name, results in model_results.items(): if model_name not in aggregated_results[technique_name]: aggregated_results[technique_name][model_name] = { 'Faithfulness': [], 'Relevancy': [], 'answers': [], 'context_chunks': results.get('context_chunks', []), 'context_urls': results.get('context_urls', []) } aggregated_results[technique_name][model_name]['Faithfulness'].append(results.get('Faithfulness', 0)) aggregated_results[technique_name][model_name]['Relevancy'].append(results.get('Relevancy', 0)) aggregated_results[technique_name][model_name]['answers'].append(results.get('answer', '')) # Add results for each technique for technique_name, model_results in aggregated_results.items(): content += f"### {technique_name.upper()} Chunking\n\n" if not model_results: content += "No results available for this technique.\n\n" continue # Create results table with averaged scores content += "| Model | Avg Faithfulness | Avg Relevancy | Avg Combined |\n" content += "|-------|------------------|---------------|--------------|\n" for model_name, results in model_results.items(): avg_faith = sum(results['Faithfulness']) / len(results['Faithfulness']) if results['Faithfulness'] else 0 avg_rel = sum(results['Relevancy']) / len(results['Relevancy']) if results['Relevancy'] else 0 avg_combined = avg_faith + avg_rel content += f"| {model_name} | {avg_faith:.1f}% | {avg_rel:.3f} | {avg_combined:.3f} |\n" # Find best model for this technique if model_results: best_model = max( model_results.items(), key=lambda x: (sum(x[1]['Faithfulness']) / len(x[1]['Faithfulness']) if x[1]['Faithfulness'] else 0) + (sum(x[1]['Relevancy']) / len(x[1]['Relevancy']) if x[1]['Relevancy'] else 0) ) best_name = best_model[0] best_faith = sum(best_model[1]['Faithfulness']) / len(best_model[1]['Faithfulness']) if best_model[1]['Faithfulness'] else 0 best_rel = sum(best_model[1]['Relevancy']) / len(best_model[1]['Relevancy']) if best_model[1]['Relevancy'] else 0 content += f"\n*Best Model:* {best_name} (Avg Faithfulness: {best_faith:.1f}%, Avg Relevancy: {best_rel:.3f})\n\n" # Show context chunks once per technique (not per model) context_chunks = None context_urls = None for model_name, results in model_results.items(): if results.get('context_chunks'): context_chunks = results['context_chunks'] context_urls = results.get('context_urls', []) break if context_chunks: content += "#### Context Chunks Used\n\n" for i, chunk in enumerate(context_chunks, 1): url = context_urls[i-1] if context_urls and i-1 < len(context_urls) else "" if url: content += f"*Chunk {i}* ([Source]({url})):\n" else: content += f"*Chunk {i}*:\n" content += f"\n{chunk}\n\n\n" # Add detailed RAG results for each model content += "#### Detailed RAG Results\n\n" for model_name, results in model_results.items(): answers = results.get('answers', []) avg_faith = sum(results['Faithfulness']) / len(results['Faithfulness']) if results['Faithfulness'] else 0 avg_rel = sum(results['Relevancy']) / len(results['Relevancy']) if results['Relevancy'] else 0 content += f"*{model_name}* (Avg Faithfulness: {avg_faith:.1f}%, Avg Relevancy: {avg_rel:.3f})\n\n" # Show answers from each query for q_idx, answer in enumerate(answers): content += f"šŸ“ *Answer for Query {q_idx + 1}:*\n\n" content += f"\n{answer}\n\n\n" content += "---\n\n" # Add comparative analysis content += """## Comparative Analysis ### Overall Performance Ranking (Across All Queries) | Rank | Technique | Avg Faithfulness | Avg Relevancy | Avg Combined | |------|-----------|------------------|---------------|--------------| """ # Calculate averages for each technique across all queries technique_averages = {} for technique_name, model_results in aggregated_results.items(): if model_results: all_faith = [] all_rel = [] for model_name, results in model_results.items(): all_faith.extend(results['Faithfulness']) all_rel.extend(results['Relevancy']) avg_faith = sum(all_faith) / len(all_faith) if all_faith else 0 avg_rel = sum(all_rel) / len(all_rel) if all_rel else 0 avg_combined = avg_faith + avg_rel technique_averages[technique_name] = { 'faith': avg_faith, 'rel': avg_rel, 'combined': avg_combined } # Sort by combined score sorted_techniques = sorted( technique_averages.items(), key=lambda x: x[1]['combined'], reverse=True ) for rank, (technique_name, averages) in enumerate(sorted_techniques, 1): content += f"| {rank} | {technique_name} | {averages['faith']:.1f}% | {averages['rel']:.3f} | {averages['combined']:.3f} |\n" content += """ ### Key Findings """ if sorted_techniques: best_technique = sorted_techniques[0][0] worst_technique = sorted_techniques[-1][0] content += f""" 1. *Best Performing Technique:* {best_technique} - Achieved highest combined score across all models and queries - Recommended for production RAG applications 2. *Worst Performing Technique:* {worst_technique} - Lower combined scores across models and queries - May need optimization or different configuration 3. *Model Consistency:* - Analyzed which models perform consistently across techniques - Identified technique-specific model preferences """ content += """## Recommendations Based on the ablation study results: 1. *Primary Recommendation:* Use the best-performing chunking technique for your specific use case 2. *Hybrid Approach:* Consider combining techniques for different types of queries 3. *Model Selection:* Choose models that perform well across multiple techniques 4. *Parameter Tuning:* Optimize chunk sizes and overlaps based on document characteristics ## Technical Details - *Embedding Model:* Jina embeddings (512 dimensions) - *Vector Database:* Pinecone (serverless, AWS us-east-1) - *Judge Model:* Openrouter Free models - *Retrieval:* Top 5 chunks per technique - *Evaluation Metrics:* Faithfulness (context grounding), Relevancy (query addressing) --- This report was automatically generated by the RAG Ablation Study Pipeline. """ # Write to file with open(output_file, 'w', encoding='utf-8') as f: f.write(content) print(f"\nFindings document saved to: {output_file}") return output_file # Global variables for worker processes _worker_proc = None _worker_evaluator = None _worker_models = None _worker_rag_engine = None _worker_reranker = None def init_worker(model_name, evaluator_config): """Initialize models once per worker process.""" global _worker_proc, _worker_evaluator, _worker_models, _worker_rag_engine, _worker_reranker from retriever.processor import ChunkProcessor from retriever.evaluator import RAGEvaluator from retriever.generator import RAGGenerator from sentence_transformers import CrossEncoder from models.llama_3_8b import Llama3_8B from models.mistral_7b import Mistral_7b from models.qwen_2_5 import Qwen2_5 from models.deepseek_v3 import DeepSeek_V3 from models.tiny_aya import TinyAya MODEL_MAP = { "Llama-3-8B": Llama3_8B, "Mistral-7B": Mistral_7b, "Qwen-2.5": Qwen2_5, "DeepSeek-V3": DeepSeek_V3, "TinyAya": TinyAya } # Load embedding model once _worker_proc = ChunkProcessor(model_name=model_name, verbose=False) # Initialize evaluator _worker_evaluator = RAGEvaluator( judge_model=evaluator_config['judge_model'], embedding_model=_worker_proc.encoder, api_key=evaluator_config['api_key'] ) # Initialize models hf_token = os.getenv("HF_TOKEN") _worker_models = {name: MODEL_MAP[name](token=hf_token) for name in evaluator_config['model_list']} # Initialize RAG engine _worker_rag_engine = RAGGenerator() # Load reranker once per worker _worker_reranker = CrossEncoder('jinaai/jina-reranker-v1-tiny-en') def run_rag_for_technique_wrapper(args): """Wrapper function for parallel execution.""" global _worker_proc, _worker_evaluator, _worker_models, _worker_rag_engine technique, query, index_name, pinecone_key = args try: # Create new connection in worker process from data.vector_db import get_index_by_name index = get_index_by_name(pinecone_key, index_name) return technique['name'], run_rag_for_technique( technique_name=technique['name'], query=query, index=index, encoder=_worker_proc.encoder, models=_worker_models, evaluator=_worker_evaluator, rag_engine=_worker_rag_engine ) except Exception as e: import traceback print(f"\nāœ— Error processing technique {technique['name']}: {e}") print(f"Full traceback:") traceback.print_exc() return technique['name'], {} def main(): """Main function to run RAG ablation study across all 6 chunking techniques.""" hf_token = os.getenv("HF_TOKEN") pinecone_key = os.getenv("PINECONE_API_KEY") openrouter_key = os.getenv("OPENROUTER_API_KEY") # Verify environment variables if not hf_token: raise RuntimeError("HF_TOKEN not found in environment variables") if not pinecone_key: raise RuntimeError("PINECONE_API_KEY not found in environment variables") if not openrouter_key: raise RuntimeError("OPENROUTER_API_KEY not found in environment variables") # Test queries test_queries = [ "What is cognitive behavior therapy and how does it work?", "What are the common cognitive distortions in CBT?", "How does CBT help with anxiety and depression?" ] print("=" * 80) print("RAG ABLATION STUDY - 6 CHUNKING TECHNIQUES") print("=" * 80) print(f"\nTest Queries:") for i, q in enumerate(test_queries, 1): print(f" {i}. {q}") # Step 1: Check if data already exists, skip ingestion if so print("\n" + "=" * 80) print("STEP 1: CHECKING/INGESTING DATA WITH ALL 6 TECHNIQUES") print("=" * 80) # Check if index already has data from data.vector_db import get_index_by_name index_name = f"{cfg.db['base_index_name']}-{cfg.processing['technique']}" print(f"\nChecking for existing index: {index_name}") try: # Try to connect to existing index print("Connecting to Pinecone...") existing_index = get_index_by_name(pinecone_key, index_name) print("Getting index stats...") stats = existing_index.describe_index_stats() existing_count = stats.get('total_vector_count', 0) if existing_count > 0: print(f"\nāœ“ Found existing index with {existing_count} vectors") print("Skipping ingestion - using existing data") # Initialize processor (this loads the embedding model) print("Loading embedding model for retrieval...") from retriever.processor import ChunkProcessor proc = ChunkProcessor(model_name=cfg.processing['embedding_model'], verbose=False) index = existing_index all_chunks = [] # Empty since we're using existing data final_chunks = [] print("āœ“ Processor initialized") else: print("\nIndex exists but is empty. Running full ingestion...") all_chunks, final_chunks, proc, index = ingest_data() except Exception as e: print(f"\nIndex check failed: {e}") print("Running full ingestion...") all_chunks, final_chunks, proc, index = ingest_data() print(f"\nTechniques to evaluate: {[tech['name'] for tech in CHUNKING_TECHNIQUES]}") # Step 2: Initialize components print("\n" + "=" * 80) print("STEP 2: INITIALIZING COMPONENTS") print("=" * 80) # Initialize models print("\nInitializing models...") rag_engine = RAGGenerator() models = {name: MODEL_MAP[name](token=hf_token) for name in cfg.model_list} # Initialize evaluator print("Initializing evaluator...") if not openrouter_key: raise RuntimeError("OPENROUTER_API_KEY not found in environment variables") evaluator = RAGEvaluator( judge_model=cfg.gen['judge_model'], embedding_model=proc.encoder, api_key=openrouter_key ) # Step 3: Run RAG for all techniques in parallel for all queries print("\n" + "=" * 80) print("STEP 3: RUNNING RAG FOR ALL 6 TECHNIQUES (IN PARALLEL)") print("=" * 80) # Prepare arguments for parallel execution num_processes = min(cpu_count(), len(CHUNKING_TECHNIQUES)) print(f"\nUsing {num_processes} parallel processes for {len(CHUNKING_TECHNIQUES)} techniques") # Run techniques in parallel for all queries evaluator_config = { 'judge_model': cfg.gen['judge_model'], 'api_key': openrouter_key, 'model_list': cfg.model_list } all_query_results = {} for query_idx, query in enumerate(test_queries): print(f"\n{'='*80}") print(f"PROCESSING QUERY {query_idx + 1}/{len(test_queries)}") print(f"Query: {query}") print(f"{'='*80}") with Pool( processes=num_processes, initializer=init_worker, initargs=(cfg.processing['embedding_model'], evaluator_config) ) as pool: args_list = [ (technique, query, index_name, pinecone_key) for technique in CHUNKING_TECHNIQUES ] results_list = pool.map(run_rag_for_technique_wrapper, args_list) # Convert results to dictionary and store query_results = {name: results for name, results in results_list} all_query_results[query_idx] = query_results # Print quick summary for this query print(f"\n{'='*80}") print(f"QUERY {query_idx + 1} SUMMARY") print(f"{'='*80}") print(f"\n{'Technique':<15} {'Avg Faith':>12} {'Avg Rel':>12} {'Best Model':<20}") print("-" * 60) for technique_name, model_results in query_results.items(): if model_results: avg_faith = sum(r.get('Faithfulness', 0) for r in model_results.values()) / len(model_results) avg_rel = sum(r.get('Relevancy', 0) for r in model_results.values()) / len(model_results) # Find best model best_model = max( model_results.items(), key=lambda x: x[1].get('Faithfulness', 0) + x[1].get('Relevancy', 0) ) best_name = best_model[0] print(f"{technique_name:<15} {avg_faith:>11.1f}% {avg_rel:>12.3f} {best_name:<20}") else: print(f"{technique_name:<15} {'N/A':>12} {'N/A':>12} {'N/A':<20}") print("-" * 60) # Step 4: Generate findings document from all queries print("\n" + "=" * 80) print("STEP 4: GENERATING FINDINGS DOCUMENT") print("=" * 80) findings_file = generate_findings_document(all_query_results, test_queries) # Step 5: Final summary print("\n" + "=" * 80) print("ABLATION STUDY COMPLETE - SUMMARY") print("=" * 80) print(f"\nQueries processed: {len(test_queries)}") print(f"Techniques evaluated: {len(CHUNKING_TECHNIQUES)}") print(f"Models tested: {len(cfg.model_list)}") print(f"\nFindings document: {findings_file}") # Print final summary across all queries print("\n" + "-" * 60) print(f"{'Technique':<15} {'Avg Faith':>12} {'Avg Rel':>12} {'Best Model':<20}") print("-" * 60) # Calculate averages across all queries for tech_config in CHUNKING_TECHNIQUES: tech_name = tech_config['name'] all_faith = [] all_rel = [] best_model_name = None best_combined = 0 for query_idx, query_results in all_query_results.items(): if tech_name in query_results and query_results[tech_name]: model_results = query_results[tech_name] for model_name, results in model_results.items(): faith = results.get('Faithfulness', 0) rel = results.get('Relevancy', 0) combined = faith + rel all_faith.append(faith) all_rel.append(rel) if combined > best_combined: best_combined = combined best_model_name = model_name if all_faith: avg_faith = sum(all_faith) / len(all_faith) avg_rel = sum(all_rel) / len(all_rel) print(f"{tech_name:<15} {avg_faith:>11.1f}% {avg_rel:>12.3f} {best_model_name or 'N/A':<20}") else: print(f"{tech_name:<15} {'N/A':>12} {'N/A':>12} {'N/A':<20}") print("-" * 60) print("\nāœ“ Ablation study complete!") print(f"āœ“ Results saved to: {findings_file}") print("\nYou can now analyze the findings document to compare chunking techniques.") return all_query_results if __name__ == "__main__": main()