nitdaa / update_sync.py
Sam-max1's picture
Upload folder using huggingface_hub
2f1cebb verified
Raw
History Blame Contribute Delete
6.96 kB
import re
with open('app.py', 'r') as f:
content = f.read()
# The unified function
new_func = """def start_auto_ingest_thread():
def _auto_ingest_worker():
global _auto_ingest_status
import requests, time, shutil, os
from huggingface_hub import snapshot_download, hf_hub_download
from pathlib import Path
token = os.environ.get("HF_PRIVATE_TOKEN") or os.environ.get("HF_TOKEN")
# --- Wait for LLM services to boot before doing anything ---
log.info("Auto-ingest: waiting for LLM services to boot...")
for _ in range(30):
try:
r1 = requests.get(f"{config.EMBED_BASE_URL}/health", timeout=2)
r2 = requests.get(f"{config.LLM_BASE_URL}/health", timeout=2)
if r1.status_code == 200 and r2.status_code == 200:
break
except Exception:
pass
time.sleep(2)
else:
log.warning("Auto-ingest aborted: LLM services not online.")
_auto_ingest_status["error"] = "LLM services not online within 60s"
_auto_ingest_status["done"] = True
return
if not token:
log.error("HF_PRIVATE_TOKEN or HF_TOKEN environment variable is not set. Dataset synchronization will be skipped.")
_auto_ingest_status["error"] = "HF Token missing"
_auto_ingest_status["done"] = True
return
# --- 2-Way Log Sync on Startup ---
log_dir = Path(__file__).parent / "app" / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
try:
for log_file in ["nitdaa_sessions.json", "nitdaa_summary.json"]:
local_p = log_dir / log_file
try:
dl_path = hf_hub_download(repo_id="Sam-max1/mat_data", filename=log_file, repo_type="dataset", token=token)
if os.path.exists(dl_path):
remote_lines = set(open(dl_path).readlines())
if local_p.exists():
for line in open(local_p).readlines():
if line not in remote_lines:
remote_lines.add(line)
with open(local_p, "w") as f:
for line in sorted(list(remote_lines)):
f.write(line)
log.info(f"Successfully merged {log_file} from mat_data.")
except Exception as e:
log.warning(f"Could not download {log_file} from mat_data (it may not exist yet): {e}")
except Exception as e:
log.warning(f"Log sync failed: {e}")
# ---------------------------------
kbdocs_dir = Path(__file__).parent / "kbdocs"
kbdocs_dir.mkdir(parents=True, exist_ok=True)
tmp_sync_dir = Path("/tmp/he_data_sync")
if tmp_sync_dir.exists():
shutil.rmtree(tmp_sync_dir)
tmp_sync_dir.mkdir(exist_ok=True)
log.info("Syncing fresh files from Sam-max1/he-data to local /tmp...")
try:
snapshot_download(
repo_id="Sam-max1/he-data",
repo_type="dataset",
local_dir=str(tmp_sync_dir),
token=token,
ignore_patterns=[".git*"]
)
except Exception as e:
log.error(f"Failed to download he-data dataset: {e}")
_auto_ingest_status["error"] = f"Download failed: {e}"
_auto_ingest_status["done"] = True
return
from pipeline import vector_store, graph_store
local_files = {f.name: f.stat().st_size for f in kbdocs_dir.glob("*.*") if f.is_file()}
remote_files = {f.name: f.stat().st_size for f in tmp_sync_dir.glob("*.*") if f.is_file()}
is_different = False
if set(local_files.keys()) != set(remote_files.keys()):
is_different = True
else:
for k in local_files:
if local_files[k] != remote_files[k]:
is_different = True
break
if is_different:
log.info("Detected changes in Sam-max1/he-data! Purging databases and re-syncing kbdocs.")
vector_store.purge()
if graph_store.is_available():
graph_store.purge()
shutil.rmtree(kbdocs_dir)
shutil.copytree(tmp_sync_dir, kbdocs_dir)
files_to_ingest = [f for f in kbdocs_dir.glob("*.*") if f.is_file() and _allowed(f.name)]
if not files_to_ingest:
log.info("No valid files to ingest in he-data.")
_auto_ingest_status["done"] = True
return
config.current_session.set("admin")
_auto_ingest_status["running"] = True
_auto_ingest_status["total"] = len(files_to_ingest)
_auto_ingest_status["completed"] = 0
_auto_ingest_status["results"] = []
_auto_ingest_status["done"] = False
for path in files_to_ingest:
_auto_ingest_status["current_file"] = path.name
log.info(f"Auto-ingesting file: {path.name}")
res = process_document_pipeline(str(path), path.name, "foundation", "admin", delete_after=False)
_auto_ingest_status["completed"] += 1
_auto_ingest_status["results"].append({
"file": path.name,
"ok": res["ok"],
"result": res["result"],
})
if res["ok"]:
log.info("Auto-ingest successful for %s", path.name)
else:
log.error("Auto-ingest failed for %s: %s", path.name, res["result"])
_auto_ingest_status["running"] = False
_auto_ingest_status["done"] = True
_auto_ingest_status["current_file"] = None
trigger_kv_cache_update("admin")
log.info("=== Full Data Re-Ingestion Complete ===")
else:
log.info("kbdocs is completely up to date with he-data. No ingestion needed.")
_auto_ingest_status["done"] = True
log.info(f"Vector DB Chunks: {vector_store.count()}")
if graph_store.is_available():
stats = graph_store.get_stats()
log.info(f"Kuzu DB Nodes: {stats.get('nodes', 0)}, Edges: {stats.get('edges', 0)}")
threading.Thread(target=_auto_ingest_worker, daemon=True).start()"""
# Replace start_auto_ingest_thread
content = re.sub(r'def start_auto_ingest_thread\(\):.*? threading\.Thread\(target=_auto_ingest_worker, daemon=True\)\.start\(\)', new_func, content, flags=re.DOTALL)
with open('app.py', 'w') as f:
f.write(content)