from __future__ import annotations import argparse import hashlib import json import logging import os import sys from pathlib import Path from typing import Any, Dict, List sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from rag.vectorstore_loader import ( get_vectorstore_components, reset_vectorstore_singleton, ) logger = logging.getLogger(__name__) def _resolve_data_dir(raw: str | None) -> Path: if raw: p = Path(raw) if p.is_absolute(): return p p = Path.cwd() / raw if p.exists(): return p default = Path(__file__).resolve().parents[1] / "datasets" return default def _iter_json_files(data_dir: Path): for file in sorted(data_dir.rglob("*")): if file.suffix not in {".json", ".jsonl"}: continue yield file def _load_records(file_path: Path) -> List[Dict[str, Any]]: records: List[Dict[str, Any]] = [] try: raw = file_path.read_text(encoding="utf-8").strip() if file_path.suffix == ".jsonl": for lineno, line in enumerate(raw.splitlines(), start=1): line = line.strip() if not line: continue try: records.append(json.loads(line)) except json.JSONDecodeError: logger.warning("Skipping malformed JSONL line %s:%d", file_path.name, lineno) else: parsed = json.loads(raw) if isinstance(parsed, list): records.extend(parsed) elif isinstance(parsed, dict): records.append(parsed) except Exception as exc: logger.warning("Failed to parse %s: %s", file_path.name, exc) return records def _build_id(source_file: str, page: int, content: str) -> str: key = f"{source_file}::{page}::{content[:120]}" return hashlib.sha256(key.encode()).hexdigest()[:40] def main() -> None: parser = argparse.ArgumentParser(description="Ingest DepEd SHS curriculum JSON/JSONL into ChromaDB") parser.add_argument("--data-dir", default=None, help="Directory containing .json/.jsonl files") parser.add_argument("--reset", action="store_true", help="Reset the vectorstore singleton before ingestion") args = parser.parse_args() data_dir = _resolve_data_dir(args.data_dir) logger.info("Ingesting from: %s", data_dir) if args.reset: reset_vectorstore_singleton() _, collection, _ = get_vectorstore_components() try: collection.delete(ids=collection.get(include=[])["ids"]) except Exception: pass reset_vectorstore_singleton() total_processed = 0 total_upserted = 0 total_errors = 0 _, collection, embedder = get_vectorstore_components() for file_path in _iter_json_files(data_dir): records = _load_records(file_path) documents: List[str] = [] metadatas: List[Dict[str, Any]] = [] ids: List[str] = [] embeddings_list: List[List[float]] = [] for record in records: total_processed += 1 content = str(record.get("content") or "").strip() if not content: logger.debug("Skipping empty content in %s", file_path.name) continue try: subject = str(record.get("subject") or "unknown") quarter = int(record.get("quarter") or 0) page = int(record.get("page") or 0) content_domain = str(record.get("content_domain") or "unknown") chunk_type = str(record.get("chunk_type") or "unknown") source_file = str(record.get("source_file") or file_path.name) embedding = embedder.encode(content).tolist() chunk_id = _build_id(source_file, page, content) metadata = { "subject": subject, "quarter": quarter, "content_domain": content_domain, "chunk_type": chunk_type, "source_file": source_file, "page": page, } documents.append(content) metadatas.append(metadata) ids.append(chunk_id) embeddings_list.append(embedding) except Exception as exc: total_errors += 1 logger.warning("Error processing record in %s: %s", file_path.name, exc) if documents: try: collection.upsert( ids=ids, documents=documents, metadatas=metadatas, embeddings=embeddings_list, ) total_upserted += len(documents) logger.info("Upserted %d chunks from %s", len(documents), file_path.name) except Exception as exc: total_errors += len(documents) logger.warning("Failed to upsert batch from %s: %s", file_path.name, exc) print(f"=== Ingestion Summary ===") print(f"Total records processed: {total_processed}") print(f"Total chunks upserted: {total_upserted}") print(f"Total errors: {total_errors}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) main()