Spaces:
Sleeping
Sleeping
| import argparse | |
| import os | |
| import logging | |
| from datetime import datetime, timezone | |
| # Disable Chroma telemetry to avoid opentelemetry compatibility errors during ingestion | |
| os.environ.setdefault("CHROMA_TELEMETRY_ENABLED", "false") | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_community.embeddings import OpenAIEmbeddings | |
| # KG integration: import unconditionally so errors propagate if dependencies missing | |
| from src.kg.extract import extract_triples_with_llm | |
| from src.kg.store import KGStore | |
| import uuid | |
| import json | |
| # Module logger | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.INFO) | |
| def load_documents(data_dir: str): | |
| from pathlib import Path | |
| from langchain_community.document_loaders import CSVLoader, TextLoader | |
| docs = [] | |
| for path in Path(data_dir).rglob("*"): | |
| if not path.is_file(): | |
| continue | |
| suffix = path.suffix.lower() | |
| if suffix == ".txt": | |
| loader = TextLoader(str(path)) | |
| elif suffix == ".csv": | |
| loader = CSVLoader(file_path=str(path)) | |
| else: | |
| continue | |
| loaded = loader.load() | |
| docs.extend(loaded) | |
| logger.info(f"Loaded {len(docs)} documents from {data_dir}") | |
| logger.debug("Documents ingested: %s", [ (d.metadata or {}).get('source') for d in docs ]) | |
| return docs | |
| def ingest(data_dir: str, persist_dir: str, chunk_size: int, chunk_overlap: int, openai_api_key: str = None): | |
| logger.info("Starting ingest: data_dir=%s persist_dir=%s chunk_size=%s chunk_overlap=%s", data_dir, persist_dir, chunk_size, chunk_overlap) | |
| if not os.path.exists(data_dir): | |
| logger.error("Data directory does not exist: %s", data_dir) | |
| raise ValueError(f"Data directory does not exist: {data_dir}") | |
| docs = load_documents(data_dir) | |
| if not docs: | |
| logger.error("No documents found in %s", data_dir) | |
| raise ValueError(f"No .txt documents found in {data_dir}") | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| ) | |
| split_docs = splitter.split_documents(docs) | |
| logger.info("Split into %d chunks", len(split_docs)) | |
| # Ensure persist dir exists and add file handler to logger | |
| os.makedirs(persist_dir, exist_ok=True) | |
| # Add file handler for detailed logs in persist_dir/ingest.log | |
| try: | |
| fh = logging.FileHandler(os.path.join(persist_dir, 'ingest.log')) | |
| fh.setLevel(logging.DEBUG) | |
| fh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) | |
| # Avoid adding multiple file handlers on repeated calls | |
| if not any(isinstance(h, logging.FileHandler) and getattr(h, 'baseFilename', None) == fh.baseFilename for h in logger.handlers): | |
| logger.addHandler(fh) | |
| except Exception: | |
| logger.exception('Failed to add file handler for ingest log') | |
| # Prepare KG store and local chunk index | |
| chunks_index = {} | |
| kg_path = os.path.join(persist_dir, "kg_store.ttl") | |
| # Initialize embeddings; provide a clear error if OpenAI API key is missing | |
| try: | |
| logger.info('Initializing embeddings') | |
| # If an API key was provided on the CLI, inject it into the environment | |
| if openai_api_key: | |
| os.environ['OPENAI_API_KEY'] = openai_api_key | |
| logger.debug('Set OPENAI_API_KEY from CLI flag') | |
| embeddings = OpenAIEmbeddings() | |
| logger.info('Embeddings initialized') | |
| except Exception as e: | |
| logger.exception("Failed to initialize OpenAI embeddings. Ensure OPENAI_API_KEY is set in the environment or pass --openai-api-key.") | |
| raise | |
| # Initialize KG store unconditionally so errors are visible | |
| try: | |
| logger.info('Initializing KG store at %s', kg_path) | |
| kg = KGStore(path=kg_path) | |
| logger.info('KG store initialized') | |
| except Exception: | |
| logger.exception('Failed to initialize KGStore') | |
| # re-raise so caller sees the failure | |
| raise | |
| # Annotate chunks with stable chunk_id and optionally extract/link KG triples | |
| start_time = datetime.now(timezone.utc) | |
| logger.info('Beginning per-chunk processing at %s UTC', start_time.isoformat()) | |
| for i, d in enumerate(split_docs, start=1): | |
| print(i, d) | |
| meta = d.metadata or {} | |
| chunk_id = meta.get("chunk_id") or str(uuid.uuid4()) | |
| if not meta: | |
| d.metadata = {} | |
| d.metadata["chunk_id"] = chunk_id | |
| # Save minimal chunk index for runtime retrieval (text and source metadata) | |
| chunks_index[chunk_id] = { | |
| "text": getattr(d, "page_content", "") or getattr(d, "content", ""), | |
| "metadata": d.metadata, | |
| } | |
| # Log progress at intervals | |
| if i % 50 == 0 or i <= 5: | |
| logger.debug('Processing chunk %d/%d (id=%s)', i, len(split_docs), chunk_id) | |
| # Attempt to extract triples and link the chunk (errors during extraction are non-fatal) | |
| try: | |
| triples = extract_triples_with_llm(chunks_index[chunk_id]["text"], max_triples=4) | |
| if triples: | |
| logger.debug('Extracted %d triples for chunk %s', len(triples), chunk_id) | |
| for t in triples: | |
| try: | |
| kg.add_triple( | |
| t.get("subject"), | |
| t.get("predicate"), | |
| t.get("object"), | |
| provenance={"sentence": t.get("sentence"), "confidence": t.get("confidence")}, | |
| ) | |
| kg.link_chunk_to_entity( | |
| chunk_id, | |
| t.get("subject"), | |
| sentence=t.get("sentence"), | |
| confidence=t.get("confidence"), | |
| ) | |
| except Exception: | |
| logger.exception('Non-fatal error while adding triple or linking chunk %s', chunk_id) | |
| continue | |
| except Exception: | |
| # LLM extraction failed or not configured; skip KG extraction for this chunk | |
| logger.exception('KG extraction failed for chunk %s (continuing)', chunk_id) | |
| pass | |
| end_time = datetime.now(timezone.utc) | |
| logger.info('Finished per-chunk processing at %s UTC (duration %s)', end_time.isoformat(), end_time - start_time) | |
| # Persist Chroma vectorstore | |
| try: | |
| logger.info('Persisting Chroma vectorstore to %s', persist_dir) | |
| Chroma.from_documents( | |
| split_docs, | |
| embedding=embeddings, | |
| persist_directory=persist_dir, | |
| ) | |
| logger.info('Vectorstore built and persisted to %s', persist_dir) | |
| except Exception as e: | |
| import traceback, sys | |
| logger.exception('Chroma.from_documents failed to write the vectorstore:') | |
| # ensure the log is flushed to file | |
| for h in logger.handlers: | |
| try: | |
| h.flush() | |
| except Exception: | |
| pass | |
| sys.exit(1) | |
| # Persist chunks index for runtime (simple json mapping) | |
| try: | |
| idx_path = os.path.join(persist_dir, "chunks_index.json") | |
| with open(idx_path, "w", encoding="utf-8") as fh: | |
| json.dump(chunks_index, fh) | |
| logger.info('Wrote chunks_index.json (%d entries)', len(chunks_index)) | |
| except Exception: | |
| logger.exception('Failed to write chunks_index.json') | |
| # Persist KG | |
| try: | |
| kg.save() | |
| logger.info('KG persisted to %s', kg_path) | |
| except Exception: | |
| import traceback, sys | |
| logger.exception('Failed to persist KG to disk:') | |
| # ensure the log is flushed to file | |
| for h in logger.handlers: | |
| try: | |
| h.flush() | |
| except Exception: | |
| pass | |
| sys.exit(1) | |
| def main(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--data-dir", type=str, default="./data") | |
| parser.add_argument("--persist-dir", type=str, default="./vectorstore") | |
| parser.add_argument("--chunk-size", type=int, default=200) | |
| parser.add_argument("--chunk-overlap", type=int, default=50) | |
| parser.add_argument("--openai-api-key", type=str, default=None, help="Optional OpenAI API key to use for embeddings (overrides env var)") | |
| args = parser.parse_args() | |
| ingest( | |
| data_dir=args.data_dir, | |
| persist_dir=args.persist_dir, | |
| chunk_size=args.chunk_size, | |
| chunk_overlap=args.chunk_overlap, | |
| openai_api_key=args.openai_api_key, | |
| ) | |
| if __name__ == "__main__": | |
| main() | |