| import re |
|
|
| with open('app.py', 'r') as f: |
| content = f.read() |
|
|
| |
| new_func = """def start_auto_ingest_thread(): |
| def _auto_ingest_worker(): |
| global _auto_ingest_status |
| import requests, time, shutil, os |
| from huggingface_hub import snapshot_download, hf_hub_download |
| from pathlib import Path |
| |
| token = os.environ.get("HF_PRIVATE_TOKEN") or os.environ.get("HF_TOKEN") |
| |
| # --- Wait for LLM services to boot before doing anything --- |
| log.info("Auto-ingest: waiting for LLM services to boot...") |
| for _ in range(30): |
| try: |
| r1 = requests.get(f"{config.EMBED_BASE_URL}/health", timeout=2) |
| r2 = requests.get(f"{config.LLM_BASE_URL}/health", timeout=2) |
| if r1.status_code == 200 and r2.status_code == 200: |
| break |
| except Exception: |
| pass |
| time.sleep(2) |
| else: |
| log.warning("Auto-ingest aborted: LLM services not online.") |
| _auto_ingest_status["error"] = "LLM services not online within 60s" |
| _auto_ingest_status["done"] = True |
| return |
| |
| if not token: |
| log.error("HF_PRIVATE_TOKEN or HF_TOKEN environment variable is not set. Dataset synchronization will be skipped.") |
| _auto_ingest_status["error"] = "HF Token missing" |
| _auto_ingest_status["done"] = True |
| return |
| |
| # --- 2-Way Log Sync on Startup --- |
| log_dir = Path(__file__).parent / "app" / "logs" |
| log_dir.mkdir(parents=True, exist_ok=True) |
| try: |
| for log_file in ["nitdaa_sessions.json", "nitdaa_summary.json"]: |
| local_p = log_dir / log_file |
| try: |
| dl_path = hf_hub_download(repo_id="Sam-max1/mat_data", filename=log_file, repo_type="dataset", token=token) |
| if os.path.exists(dl_path): |
| remote_lines = set(open(dl_path).readlines()) |
| if local_p.exists(): |
| for line in open(local_p).readlines(): |
| if line not in remote_lines: |
| remote_lines.add(line) |
| with open(local_p, "w") as f: |
| for line in sorted(list(remote_lines)): |
| f.write(line) |
| log.info(f"Successfully merged {log_file} from mat_data.") |
| except Exception as e: |
| log.warning(f"Could not download {log_file} from mat_data (it may not exist yet): {e}") |
| except Exception as e: |
| log.warning(f"Log sync failed: {e}") |
| # --------------------------------- |
| |
| kbdocs_dir = Path(__file__).parent / "kbdocs" |
| kbdocs_dir.mkdir(parents=True, exist_ok=True) |
| |
| tmp_sync_dir = Path("/tmp/he_data_sync") |
| if tmp_sync_dir.exists(): |
| shutil.rmtree(tmp_sync_dir) |
| tmp_sync_dir.mkdir(exist_ok=True) |
| |
| log.info("Syncing fresh files from Sam-max1/he-data to local /tmp...") |
| try: |
| snapshot_download( |
| repo_id="Sam-max1/he-data", |
| repo_type="dataset", |
| local_dir=str(tmp_sync_dir), |
| token=token, |
| ignore_patterns=[".git*"] |
| ) |
| except Exception as e: |
| log.error(f"Failed to download he-data dataset: {e}") |
| _auto_ingest_status["error"] = f"Download failed: {e}" |
| _auto_ingest_status["done"] = True |
| return |
| |
| from pipeline import vector_store, graph_store |
| |
| local_files = {f.name: f.stat().st_size for f in kbdocs_dir.glob("*.*") if f.is_file()} |
| remote_files = {f.name: f.stat().st_size for f in tmp_sync_dir.glob("*.*") if f.is_file()} |
| |
| is_different = False |
| if set(local_files.keys()) != set(remote_files.keys()): |
| is_different = True |
| else: |
| for k in local_files: |
| if local_files[k] != remote_files[k]: |
| is_different = True |
| break |
| |
| if is_different: |
| log.info("Detected changes in Sam-max1/he-data! Purging databases and re-syncing kbdocs.") |
| vector_store.purge() |
| if graph_store.is_available(): |
| graph_store.purge() |
| |
| shutil.rmtree(kbdocs_dir) |
| shutil.copytree(tmp_sync_dir, kbdocs_dir) |
| |
| files_to_ingest = [f for f in kbdocs_dir.glob("*.*") if f.is_file() and _allowed(f.name)] |
| if not files_to_ingest: |
| log.info("No valid files to ingest in he-data.") |
| _auto_ingest_status["done"] = True |
| return |
| |
| config.current_session.set("admin") |
| _auto_ingest_status["running"] = True |
| _auto_ingest_status["total"] = len(files_to_ingest) |
| _auto_ingest_status["completed"] = 0 |
| _auto_ingest_status["results"] = [] |
| _auto_ingest_status["done"] = False |
| |
| for path in files_to_ingest: |
| _auto_ingest_status["current_file"] = path.name |
| log.info(f"Auto-ingesting file: {path.name}") |
| res = process_document_pipeline(str(path), path.name, "foundation", "admin", delete_after=False) |
| _auto_ingest_status["completed"] += 1 |
| _auto_ingest_status["results"].append({ |
| "file": path.name, |
| "ok": res["ok"], |
| "result": res["result"], |
| }) |
| if res["ok"]: |
| log.info("Auto-ingest successful for %s", path.name) |
| else: |
| log.error("Auto-ingest failed for %s: %s", path.name, res["result"]) |
| |
| _auto_ingest_status["running"] = False |
| _auto_ingest_status["done"] = True |
| _auto_ingest_status["current_file"] = None |
| trigger_kv_cache_update("admin") |
| |
| log.info("=== Full Data Re-Ingestion Complete ===") |
| else: |
| log.info("kbdocs is completely up to date with he-data. No ingestion needed.") |
| _auto_ingest_status["done"] = True |
| |
| log.info(f"Vector DB Chunks: {vector_store.count()}") |
| if graph_store.is_available(): |
| stats = graph_store.get_stats() |
| log.info(f"Kuzu DB Nodes: {stats.get('nodes', 0)}, Edges: {stats.get('edges', 0)}") |
| |
| threading.Thread(target=_auto_ingest_worker, daemon=True).start()""" |
|
|
| |
| content = re.sub(r'def start_auto_ingest_thread\(\):.*? threading\.Thread\(target=_auto_ingest_worker, daemon=True\)\.start\(\)', new_func, content, flags=re.DOTALL) |
|
|
| with open('app.py', 'w') as f: |
| f.write(content) |
|
|
|
|