File size: 6,986 Bytes
291c0bb
 
bcf20be
291c0bb
 
 
 
 
 
 
bcf20be
291c0bb
 
bcf20be
291c0bb
 
bcf20be
291c0bb
 
 
 
bcf20be
 
 
291c0bb
 
bcf20be
 
 
 
291c0bb
bcf20be
 
 
 
291c0bb
 
 
bcf20be
291c0bb
 
 
bcf20be
 
291c0bb
 
 
 
 
 
 
 
 
 
bcf20be
291c0bb
 
bcf20be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
291c0bb
 
 
 
 
 
bcf20be
291c0bb
 
 
 
 
 
 
 
bcf20be
 
291c0bb
 
 
 
bcf20be
291c0bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bcf20be
 
 
 
 
 
 
 
 
 
291c0bb
bcf20be
 
291c0bb
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""
Ingest ONLY PDFs from hackathon_data folder
Parallel processing with 4 workers using ThreadPoolExecutor (better for I/O-bound tasks)
"""

import os
import sys
import time
import json
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from dotenv import load_dotenv

# Load environment first (before any imports that need env vars)
load_dotenv()

# Project paths
PROJECT_ROOT = Path(__file__).parent.parent
PDFS_DIR = PROJECT_ROOT / "data" / "hackathon_data"  # Changed to hackathon_data
OUTPUT_DIR = PROJECT_ROOT / "output" / "ingestion"

# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))


def worker_ingest(pdf_path: str):
    """
    Worker function to ingest a single PDF.
    Uses lazy imports to avoid issues with multiprocessing/threading.
    """
    try:
        # Import here to avoid global state issues in parallel execution
        import ingest_pdfs

        # Call the ingestion function
        result = ingest_pdfs.ingest_pdf(str(pdf_path))
        return result
    except Exception as e:
        import traceback
        return {
            "pdf_name": Path(pdf_path).name,
            "status": "error",
            "error": str(e),
            "traceback": traceback.format_exc()
        }


def main():
    """Main parallel ingestion pipeline"""
    print("\n" + "="*70)
    print("πŸš€ HACKATHON DATA INGESTION (4x PARALLEL)")
    print("="*70)
    print(f"πŸ“‚ PDF Directory: {PDFS_DIR}")
    print(f"⚑ Workers: 4 PDFs at once")
    print(f"🎯 Vector Database: Pinecone ({os.getenv('PINECONE_INDEX_NAME', 'hackathon')})")
    print("="*70)

    # Validate required environment variables
    required_env_vars = [
        "AZURE_OPENAI_API_KEY",
        "AZURE_OPENAI_ENDPOINT",
        "PINECONE_API_KEY",
        "PINECONE_INDEX_NAME"
    ]

    missing_vars = [var for var in required_env_vars if not os.getenv(var)]
    if missing_vars:
        print(f"\n❌ Missing required environment variables:")
        for var in missing_vars:
            print(f"   - {var}")
        print("\nPlease set these in your .env file.")
        return

    # Check if directory exists
    if not PDFS_DIR.exists():
        print(f"\n❌ Directory not found: {PDFS_DIR}")
        print(f"   Please create the directory and add PDFs to it.")
        return

    # Get all PDFs
    all_pdfs = sorted(PDFS_DIR.glob("*.pdf"))
    print(f"\nπŸ“š Found {len(all_pdfs)} PDFs in hackathon_data folder")

    if not all_pdfs:
        print("\n❌ No PDFs found in hackathon_data folder!")
        print(f"   Please add PDF files to: {PDFS_DIR}")
        return

    for pdf in all_pdfs:
        print(f"   β†’ {pdf.name}")

    print(f"\n⚑ Starting parallel processing with 4 workers...")
    print(f"⏱️  Estimated time: ~{len(all_pdfs) * 80 / 4 / 60:.1f} minutes\n")

    # Process in parallel using ThreadPoolExecutor
    # (Better for I/O-bound tasks like API calls to Azure and Pinecone)
    results = []
    completed = 0
    start_time = time.time()

    with ThreadPoolExecutor(max_workers=4) as executor:
        # Submit all jobs
        future_to_pdf = {
            executor.submit(worker_ingest, str(pdf)): pdf
            for pdf in all_pdfs
        }

        # Collect results as they complete
        for future in as_completed(future_to_pdf):
            pdf = future_to_pdf[future]
            completed += 1

            try:
                result = future.result()
                results.append(result)

                if result.get("status") == "success":
                    elapsed = time.time() - start_time
                    avg_time = elapsed / completed
                    remaining = len(all_pdfs) - completed
                    eta = remaining * avg_time / 60

                    print(f"βœ… [{completed}/{len(all_pdfs)}] {pdf.name}")
                    print(f"   πŸ“Š {result['num_vectors']} vectors, {result['time_total']:.1f}s")
                    print(f"   ⏱️  ETA: {eta:.1f} minutes remaining\n")
                else:
                    print(f"❌ [{completed}/{len(all_pdfs)}] {pdf.name} - {result.get('error', 'Unknown error')}\n")

            except Exception as e:
                print(f"❌ [{completed}/{len(all_pdfs)}] {pdf.name} - Error: {e}\n")
                results.append({
                    "pdf_name": pdf.name,
                    "status": "error",
                    "error": str(e)
                })

    total_time = time.time() - start_time

    # Summary
    print("\n" + "="*70)
    print("πŸ“Š INGESTION COMPLETE")
    print("="*70)

    successful = [r for r in results if r.get("status") == "success"]
    failed = [r for r in results if r.get("status") == "error"]

    print(f"\nβœ… Successful: {len(successful)}/{len(all_pdfs)}")
    print(f"❌ Failed: {len(failed)}")
    print(f"⏱️  Total Time: {total_time/60:.1f} minutes")

    if successful:
        total_vectors = sum(r["num_vectors"] for r in successful)
        avg_time = sum(r["time_total"] for r in successful) / len(successful)
        print(f"\nπŸ“¦ Total Vectors Uploaded: {total_vectors}")
        print(f"⏱️  Average Time per PDF: {avg_time:.1f}s")

    # Save results
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    results_file = OUTPUT_DIR / "hackathon_data_ingestion.json"

    with open(results_file, 'w', encoding='utf-8') as f:
        json.dump({
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
            "source_folder": "hackathon_data",
            "total_pdfs": len(all_pdfs),
            "successful": len(successful),
            "failed": len(failed),
            "total_time_seconds": round(total_time, 2),
            "results": results
        }, f, indent=2, ensure_ascii=False)

    print(f"\nπŸ“„ Results saved to: {results_file}")

    # Final Pinecone stats
    try:
        from pinecone import Pinecone
        pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY"))
        index = pc.Index(os.getenv("PINECONE_INDEX_NAME", "hackathon"))
        stats = index.describe_index_stats()

        print(f"\nπŸ“Š Final Pinecone Stats:")
        # Handle both dict-like and object attribute access
        total_vectors = getattr(stats, 'total_vector_count', None) or stats.get('total_vector_count', 0)
        dimension = getattr(stats, 'dimension', None) or stats.get('dimension', 0)
        print(f"   Total Vectors: {total_vectors}")
        print(f"   Dimensions: {dimension}")

        # Show namespaces if available
        namespaces = getattr(stats, 'namespaces', None) or stats.get('namespaces', {})
        if namespaces:
            print(f"   Namespaces: {len(namespaces)}")
    except Exception as e:
        print(f"\n⚠️  Could not fetch Pinecone stats: {e}")
        print(f"   (This is non-fatal - ingestion was still successful)")

    print("\n" + "="*70)
    print("πŸŽ‰ HACKATHON DATA INGESTION COMPLETE!")
    print("="*70)


if __name__ == "__main__":
    main()