import re with open('app.py', 'r') as f: content = f.read() # The unified function new_func = """def start_auto_ingest_thread(): def _auto_ingest_worker(): global _auto_ingest_status import requests, time, shutil, os from huggingface_hub import snapshot_download, hf_hub_download from pathlib import Path token = os.environ.get("HF_PRIVATE_TOKEN") or os.environ.get("HF_TOKEN") # --- Wait for LLM services to boot before doing anything --- log.info("Auto-ingest: waiting for LLM services to boot...") for _ in range(30): try: r1 = requests.get(f"{config.EMBED_BASE_URL}/health", timeout=2) r2 = requests.get(f"{config.LLM_BASE_URL}/health", timeout=2) if r1.status_code == 200 and r2.status_code == 200: break except Exception: pass time.sleep(2) else: log.warning("Auto-ingest aborted: LLM services not online.") _auto_ingest_status["error"] = "LLM services not online within 60s" _auto_ingest_status["done"] = True return if not token: log.error("HF_PRIVATE_TOKEN or HF_TOKEN environment variable is not set. Dataset synchronization will be skipped.") _auto_ingest_status["error"] = "HF Token missing" _auto_ingest_status["done"] = True return # --- 2-Way Log Sync on Startup --- log_dir = Path(__file__).parent / "app" / "logs" log_dir.mkdir(parents=True, exist_ok=True) try: for log_file in ["nitdaa_sessions.json", "nitdaa_summary.json"]: local_p = log_dir / log_file try: dl_path = hf_hub_download(repo_id="Sam-max1/mat_data", filename=log_file, repo_type="dataset", token=token) if os.path.exists(dl_path): remote_lines = set(open(dl_path).readlines()) if local_p.exists(): for line in open(local_p).readlines(): if line not in remote_lines: remote_lines.add(line) with open(local_p, "w") as f: for line in sorted(list(remote_lines)): f.write(line) log.info(f"Successfully merged {log_file} from mat_data.") except Exception as e: log.warning(f"Could not download {log_file} from mat_data (it may not exist yet): {e}") except Exception as e: log.warning(f"Log sync failed: {e}") # --------------------------------- kbdocs_dir = Path(__file__).parent / "kbdocs" kbdocs_dir.mkdir(parents=True, exist_ok=True) tmp_sync_dir = Path("/tmp/he_data_sync") if tmp_sync_dir.exists(): shutil.rmtree(tmp_sync_dir) tmp_sync_dir.mkdir(exist_ok=True) log.info("Syncing fresh files from Sam-max1/he-data to local /tmp...") try: snapshot_download( repo_id="Sam-max1/he-data", repo_type="dataset", local_dir=str(tmp_sync_dir), token=token, ignore_patterns=[".git*"] ) except Exception as e: log.error(f"Failed to download he-data dataset: {e}") _auto_ingest_status["error"] = f"Download failed: {e}" _auto_ingest_status["done"] = True return from pipeline import vector_store, graph_store local_files = {f.name: f.stat().st_size for f in kbdocs_dir.glob("*.*") if f.is_file()} remote_files = {f.name: f.stat().st_size for f in tmp_sync_dir.glob("*.*") if f.is_file()} is_different = False if set(local_files.keys()) != set(remote_files.keys()): is_different = True else: for k in local_files: if local_files[k] != remote_files[k]: is_different = True break if is_different: log.info("Detected changes in Sam-max1/he-data! Purging databases and re-syncing kbdocs.") vector_store.purge() if graph_store.is_available(): graph_store.purge() shutil.rmtree(kbdocs_dir) shutil.copytree(tmp_sync_dir, kbdocs_dir) files_to_ingest = [f for f in kbdocs_dir.glob("*.*") if f.is_file() and _allowed(f.name)] if not files_to_ingest: log.info("No valid files to ingest in he-data.") _auto_ingest_status["done"] = True return config.current_session.set("admin") _auto_ingest_status["running"] = True _auto_ingest_status["total"] = len(files_to_ingest) _auto_ingest_status["completed"] = 0 _auto_ingest_status["results"] = [] _auto_ingest_status["done"] = False for path in files_to_ingest: _auto_ingest_status["current_file"] = path.name log.info(f"Auto-ingesting file: {path.name}") res = process_document_pipeline(str(path), path.name, "foundation", "admin", delete_after=False) _auto_ingest_status["completed"] += 1 _auto_ingest_status["results"].append({ "file": path.name, "ok": res["ok"], "result": res["result"], }) if res["ok"]: log.info("Auto-ingest successful for %s", path.name) else: log.error("Auto-ingest failed for %s: %s", path.name, res["result"]) _auto_ingest_status["running"] = False _auto_ingest_status["done"] = True _auto_ingest_status["current_file"] = None trigger_kv_cache_update("admin") log.info("=== Full Data Re-Ingestion Complete ===") else: log.info("kbdocs is completely up to date with he-data. No ingestion needed.") _auto_ingest_status["done"] = True log.info(f"Vector DB Chunks: {vector_store.count()}") if graph_store.is_available(): stats = graph_store.get_stats() log.info(f"Kuzu DB Nodes: {stats.get('nodes', 0)}, Edges: {stats.get('edges', 0)}") threading.Thread(target=_auto_ingest_worker, daemon=True).start()""" # Replace start_auto_ingest_thread content = re.sub(r'def start_auto_ingest_thread\(\):.*? threading\.Thread\(target=_auto_ingest_worker, daemon=True\)\.start\(\)', new_func, content, flags=re.DOTALL) with open('app.py', 'w') as f: f.write(content)