| | """ |
| | Ingest ONLY PDFs from hackathon_data folder |
| | Parallel processing with 4 workers using ThreadPoolExecutor (better for I/O-bound tasks) |
| | """ |
| |
|
| | import os |
| | import sys |
| | import time |
| | import json |
| | from pathlib import Path |
| | from concurrent.futures import ThreadPoolExecutor, as_completed |
| | from dotenv import load_dotenv |
| |
|
| | |
| | load_dotenv() |
| |
|
| | |
| | PROJECT_ROOT = Path(__file__).parent.parent |
| | PDFS_DIR = PROJECT_ROOT / "data" / "hackathon_data" |
| | OUTPUT_DIR = PROJECT_ROOT / "output" / "ingestion" |
| |
|
| | |
| | sys.path.insert(0, str(Path(__file__).parent)) |
| |
|
| |
|
| | def worker_ingest(pdf_path: str): |
| | """ |
| | Worker function to ingest a single PDF. |
| | Uses lazy imports to avoid issues with multiprocessing/threading. |
| | """ |
| | try: |
| | |
| | import ingest_pdfs |
| |
|
| | |
| | result = ingest_pdfs.ingest_pdf(str(pdf_path)) |
| | return result |
| | except Exception as e: |
| | import traceback |
| | return { |
| | "pdf_name": Path(pdf_path).name, |
| | "status": "error", |
| | "error": str(e), |
| | "traceback": traceback.format_exc() |
| | } |
| |
|
| |
|
| | def main(): |
| | """Main parallel ingestion pipeline""" |
| | print("\n" + "="*70) |
| | print("π HACKATHON DATA INGESTION (4x PARALLEL)") |
| | print("="*70) |
| | print(f"π PDF Directory: {PDFS_DIR}") |
| | print(f"β‘ Workers: 4 PDFs at once") |
| | print(f"π― Vector Database: Pinecone ({os.getenv('PINECONE_INDEX_NAME', 'hackathon')})") |
| | print("="*70) |
| |
|
| | |
| | required_env_vars = [ |
| | "AZURE_OPENAI_API_KEY", |
| | "AZURE_OPENAI_ENDPOINT", |
| | "PINECONE_API_KEY", |
| | "PINECONE_INDEX_NAME" |
| | ] |
| |
|
| | missing_vars = [var for var in required_env_vars if not os.getenv(var)] |
| | if missing_vars: |
| | print(f"\nβ Missing required environment variables:") |
| | for var in missing_vars: |
| | print(f" - {var}") |
| | print("\nPlease set these in your .env file.") |
| | return |
| |
|
| | |
| | if not PDFS_DIR.exists(): |
| | print(f"\nβ Directory not found: {PDFS_DIR}") |
| | print(f" Please create the directory and add PDFs to it.") |
| | return |
| |
|
| | |
| | all_pdfs = sorted(PDFS_DIR.glob("*.pdf")) |
| | print(f"\nπ Found {len(all_pdfs)} PDFs in hackathon_data folder") |
| |
|
| | if not all_pdfs: |
| | print("\nβ No PDFs found in hackathon_data folder!") |
| | print(f" Please add PDF files to: {PDFS_DIR}") |
| | return |
| |
|
| | for pdf in all_pdfs: |
| | print(f" β {pdf.name}") |
| |
|
| | print(f"\nβ‘ Starting parallel processing with 4 workers...") |
| | print(f"β±οΈ Estimated time: ~{len(all_pdfs) * 80 / 4 / 60:.1f} minutes\n") |
| |
|
| | |
| | |
| | results = [] |
| | completed = 0 |
| | start_time = time.time() |
| |
|
| | with ThreadPoolExecutor(max_workers=4) as executor: |
| | |
| | future_to_pdf = { |
| | executor.submit(worker_ingest, str(pdf)): pdf |
| | for pdf in all_pdfs |
| | } |
| |
|
| | |
| | for future in as_completed(future_to_pdf): |
| | pdf = future_to_pdf[future] |
| | completed += 1 |
| |
|
| | try: |
| | result = future.result() |
| | results.append(result) |
| |
|
| | if result.get("status") == "success": |
| | elapsed = time.time() - start_time |
| | avg_time = elapsed / completed |
| | remaining = len(all_pdfs) - completed |
| | eta = remaining * avg_time / 60 |
| |
|
| | print(f"β
[{completed}/{len(all_pdfs)}] {pdf.name}") |
| | print(f" π {result['num_vectors']} vectors, {result['time_total']:.1f}s") |
| | print(f" β±οΈ ETA: {eta:.1f} minutes remaining\n") |
| | else: |
| | print(f"β [{completed}/{len(all_pdfs)}] {pdf.name} - {result.get('error', 'Unknown error')}\n") |
| |
|
| | except Exception as e: |
| | print(f"β [{completed}/{len(all_pdfs)}] {pdf.name} - Error: {e}\n") |
| | results.append({ |
| | "pdf_name": pdf.name, |
| | "status": "error", |
| | "error": str(e) |
| | }) |
| |
|
| | total_time = time.time() - start_time |
| |
|
| | |
| | print("\n" + "="*70) |
| | print("π INGESTION COMPLETE") |
| | print("="*70) |
| |
|
| | successful = [r for r in results if r.get("status") == "success"] |
| | failed = [r for r in results if r.get("status") == "error"] |
| |
|
| | print(f"\nβ
Successful: {len(successful)}/{len(all_pdfs)}") |
| | print(f"β Failed: {len(failed)}") |
| | print(f"β±οΈ Total Time: {total_time/60:.1f} minutes") |
| |
|
| | if successful: |
| | total_vectors = sum(r["num_vectors"] for r in successful) |
| | avg_time = sum(r["time_total"] for r in successful) / len(successful) |
| | print(f"\nπ¦ Total Vectors Uploaded: {total_vectors}") |
| | print(f"β±οΈ Average Time per PDF: {avg_time:.1f}s") |
| |
|
| | |
| | OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
| | results_file = OUTPUT_DIR / "hackathon_data_ingestion.json" |
| |
|
| | with open(results_file, 'w', encoding='utf-8') as f: |
| | json.dump({ |
| | "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), |
| | "source_folder": "hackathon_data", |
| | "total_pdfs": len(all_pdfs), |
| | "successful": len(successful), |
| | "failed": len(failed), |
| | "total_time_seconds": round(total_time, 2), |
| | "results": results |
| | }, f, indent=2, ensure_ascii=False) |
| |
|
| | print(f"\nπ Results saved to: {results_file}") |
| |
|
| | |
| | try: |
| | from pinecone import Pinecone |
| | pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY")) |
| | index = pc.Index(os.getenv("PINECONE_INDEX_NAME", "hackathon")) |
| | stats = index.describe_index_stats() |
| |
|
| | print(f"\nπ Final Pinecone Stats:") |
| | |
| | total_vectors = getattr(stats, 'total_vector_count', None) or stats.get('total_vector_count', 0) |
| | dimension = getattr(stats, 'dimension', None) or stats.get('dimension', 0) |
| | print(f" Total Vectors: {total_vectors}") |
| | print(f" Dimensions: {dimension}") |
| |
|
| | |
| | namespaces = getattr(stats, 'namespaces', None) or stats.get('namespaces', {}) |
| | if namespaces: |
| | print(f" Namespaces: {len(namespaces)}") |
| | except Exception as e: |
| | print(f"\nβ οΈ Could not fetch Pinecone stats: {e}") |
| | print(f" (This is non-fatal - ingestion was still successful)") |
| |
|
| | print("\n" + "="*70) |
| | print("π HACKATHON DATA INGESTION COMPLETE!") |
| | print("="*70) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|