File size: 6,986 Bytes
291c0bb bcf20be 291c0bb bcf20be 291c0bb bcf20be 291c0bb bcf20be 291c0bb bcf20be 291c0bb bcf20be 291c0bb bcf20be 291c0bb bcf20be 291c0bb bcf20be 291c0bb bcf20be 291c0bb bcf20be 291c0bb bcf20be 291c0bb bcf20be 291c0bb bcf20be 291c0bb bcf20be 291c0bb bcf20be 291c0bb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 | """
Ingest ONLY PDFs from hackathon_data folder
Parallel processing with 4 workers using ThreadPoolExecutor (better for I/O-bound tasks)
"""
import os
import sys
import time
import json
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from dotenv import load_dotenv
# Load environment first (before any imports that need env vars)
load_dotenv()
# Project paths
PROJECT_ROOT = Path(__file__).parent.parent
PDFS_DIR = PROJECT_ROOT / "data" / "hackathon_data" # Changed to hackathon_data
OUTPUT_DIR = PROJECT_ROOT / "output" / "ingestion"
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
def worker_ingest(pdf_path: str):
"""
Worker function to ingest a single PDF.
Uses lazy imports to avoid issues with multiprocessing/threading.
"""
try:
# Import here to avoid global state issues in parallel execution
import ingest_pdfs
# Call the ingestion function
result = ingest_pdfs.ingest_pdf(str(pdf_path))
return result
except Exception as e:
import traceback
return {
"pdf_name": Path(pdf_path).name,
"status": "error",
"error": str(e),
"traceback": traceback.format_exc()
}
def main():
"""Main parallel ingestion pipeline"""
print("\n" + "="*70)
print("π HACKATHON DATA INGESTION (4x PARALLEL)")
print("="*70)
print(f"π PDF Directory: {PDFS_DIR}")
print(f"β‘ Workers: 4 PDFs at once")
print(f"π― Vector Database: Pinecone ({os.getenv('PINECONE_INDEX_NAME', 'hackathon')})")
print("="*70)
# Validate required environment variables
required_env_vars = [
"AZURE_OPENAI_API_KEY",
"AZURE_OPENAI_ENDPOINT",
"PINECONE_API_KEY",
"PINECONE_INDEX_NAME"
]
missing_vars = [var for var in required_env_vars if not os.getenv(var)]
if missing_vars:
print(f"\nβ Missing required environment variables:")
for var in missing_vars:
print(f" - {var}")
print("\nPlease set these in your .env file.")
return
# Check if directory exists
if not PDFS_DIR.exists():
print(f"\nβ Directory not found: {PDFS_DIR}")
print(f" Please create the directory and add PDFs to it.")
return
# Get all PDFs
all_pdfs = sorted(PDFS_DIR.glob("*.pdf"))
print(f"\nπ Found {len(all_pdfs)} PDFs in hackathon_data folder")
if not all_pdfs:
print("\nβ No PDFs found in hackathon_data folder!")
print(f" Please add PDF files to: {PDFS_DIR}")
return
for pdf in all_pdfs:
print(f" β {pdf.name}")
print(f"\nβ‘ Starting parallel processing with 4 workers...")
print(f"β±οΈ Estimated time: ~{len(all_pdfs) * 80 / 4 / 60:.1f} minutes\n")
# Process in parallel using ThreadPoolExecutor
# (Better for I/O-bound tasks like API calls to Azure and Pinecone)
results = []
completed = 0
start_time = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
# Submit all jobs
future_to_pdf = {
executor.submit(worker_ingest, str(pdf)): pdf
for pdf in all_pdfs
}
# Collect results as they complete
for future in as_completed(future_to_pdf):
pdf = future_to_pdf[future]
completed += 1
try:
result = future.result()
results.append(result)
if result.get("status") == "success":
elapsed = time.time() - start_time
avg_time = elapsed / completed
remaining = len(all_pdfs) - completed
eta = remaining * avg_time / 60
print(f"β
[{completed}/{len(all_pdfs)}] {pdf.name}")
print(f" π {result['num_vectors']} vectors, {result['time_total']:.1f}s")
print(f" β±οΈ ETA: {eta:.1f} minutes remaining\n")
else:
print(f"β [{completed}/{len(all_pdfs)}] {pdf.name} - {result.get('error', 'Unknown error')}\n")
except Exception as e:
print(f"β [{completed}/{len(all_pdfs)}] {pdf.name} - Error: {e}\n")
results.append({
"pdf_name": pdf.name,
"status": "error",
"error": str(e)
})
total_time = time.time() - start_time
# Summary
print("\n" + "="*70)
print("π INGESTION COMPLETE")
print("="*70)
successful = [r for r in results if r.get("status") == "success"]
failed = [r for r in results if r.get("status") == "error"]
print(f"\nβ
Successful: {len(successful)}/{len(all_pdfs)}")
print(f"β Failed: {len(failed)}")
print(f"β±οΈ Total Time: {total_time/60:.1f} minutes")
if successful:
total_vectors = sum(r["num_vectors"] for r in successful)
avg_time = sum(r["time_total"] for r in successful) / len(successful)
print(f"\nπ¦ Total Vectors Uploaded: {total_vectors}")
print(f"β±οΈ Average Time per PDF: {avg_time:.1f}s")
# Save results
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
results_file = OUTPUT_DIR / "hackathon_data_ingestion.json"
with open(results_file, 'w', encoding='utf-8') as f:
json.dump({
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"source_folder": "hackathon_data",
"total_pdfs": len(all_pdfs),
"successful": len(successful),
"failed": len(failed),
"total_time_seconds": round(total_time, 2),
"results": results
}, f, indent=2, ensure_ascii=False)
print(f"\nπ Results saved to: {results_file}")
# Final Pinecone stats
try:
from pinecone import Pinecone
pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY"))
index = pc.Index(os.getenv("PINECONE_INDEX_NAME", "hackathon"))
stats = index.describe_index_stats()
print(f"\nπ Final Pinecone Stats:")
# Handle both dict-like and object attribute access
total_vectors = getattr(stats, 'total_vector_count', None) or stats.get('total_vector_count', 0)
dimension = getattr(stats, 'dimension', None) or stats.get('dimension', 0)
print(f" Total Vectors: {total_vectors}")
print(f" Dimensions: {dimension}")
# Show namespaces if available
namespaces = getattr(stats, 'namespaces', None) or stats.get('namespaces', {})
if namespaces:
print(f" Namespaces: {len(namespaces)}")
except Exception as e:
print(f"\nβ οΈ Could not fetch Pinecone stats: {e}")
print(f" (This is non-fatal - ingestion was still successful)")
print("\n" + "="*70)
print("π HACKATHON DATA INGESTION COMPLETE!")
print("="*70)
if __name__ == "__main__":
main()
|