from __future__ import annotations import tempfile from datetime import UTC, datetime from pathlib import Path from typing import Any from huggingface_hub import HfApi from slop_farmer.config import DatasetStatusOptions from slop_farmer.data.hf_dataset_repo import ( list_remote_paths, load_remote_file, load_remote_json_file, stable_snapshot_candidates, ) from slop_farmer.data.parquet_io import read_json from slop_farmer.data.snapshot_paths import ( CONTRIBUTOR_ARTIFACT_FILENAMES, CURRENT_ANALYSIS_MANIFEST_PATH, PR_SCOPE_CLUSTERS_FILENAME, SNAPSHOTS_LATEST_PATH, load_current_analysis_manifest, repo_relative_path_to_local, ) def _coerce_datetime(value: Any) -> datetime | None: if not isinstance(value, str) or not value: return None try: return datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: return None def _age_summary(value: str | None) -> dict[str, Any]: timestamp = _coerce_datetime(value) if timestamp is None: return {"seconds": None, "summary": "unknown", "staleness": "unknown"} age_seconds = max(int((datetime.now(tz=UTC) - timestamp).total_seconds()), 0) if age_seconds <= 6 * 3600: staleness = "fresh" elif age_seconds <= 24 * 3600: staleness = "aging" else: staleness = "stale" if age_seconds < 3600: summary = f"{age_seconds // 60}m" elif age_seconds < 24 * 3600: summary = f"{age_seconds // 3600}h" else: summary = f"{age_seconds // 86400}d" return {"seconds": age_seconds, "summary": summary, "staleness": staleness} def _local_status(output_dir: Path) -> dict[str, Any] | None: latest_path = output_dir.resolve() / "snapshots" / "latest.json" if not latest_path.exists(): return None payload = read_json(latest_path) snapshot_dir_raw = payload.get("snapshot_dir") manifest: dict[str, Any] = {} snapshot_dir: Path | None = None if isinstance(snapshot_dir_raw, str) and snapshot_dir_raw: snapshot_dir = Path(snapshot_dir_raw).resolve() manifest_path = snapshot_dir / "manifest.json" if manifest_path.exists(): manifest = read_json(manifest_path) current_analysis = _local_current_analysis(snapshot_dir) return { "latest_path": str(latest_path), "latest_pointer": payload, "snapshot_dir": snapshot_dir_raw, "snapshot_id": manifest.get("snapshot_id") or payload.get("latest_snapshot_id"), "current_analysis": current_analysis, } def _local_current_analysis(snapshot_dir: Path | None) -> dict[str, Any]: if snapshot_dir is None: return {"present": False} manifest_path = repo_relative_path_to_local(snapshot_dir, CURRENT_ANALYSIS_MANIFEST_PATH) if not manifest_path.exists(): return {"present": False} try: manifest = load_current_analysis_manifest(manifest_path) except ValueError as exc: return {"present": True, "valid": False, "detail": str(exc)} return { "present": True, "valid": True, "snapshot_id": manifest["snapshot_id"], "analysis_id": manifest["analysis_id"], "variant": manifest["variant"], "published_at": manifest["published_at"], } def _remote_status(repo_id: str, revision: str | None) -> dict[str, Any]: api = HfApi() with tempfile.TemporaryDirectory(prefix="slop-farmer-dataset-status-") as tmp: root = Path(tmp) remote_paths = list_remote_paths(api, repo_id, revision=revision) latest_pointer = load_remote_json_file( api, repo_id, SNAPSHOTS_LATEST_PATH, root, revision=revision, ) watermark = load_remote_json_file( api, repo_id, "state/watermark.json", root, revision=revision, ) manifest = None if latest_pointer is not None: for candidate in stable_snapshot_candidates(latest_pointer, "manifest.json"): downloaded = load_remote_file( api, repo_id, candidate, root, revision=revision, ) if downloaded is None: continue manifest = read_json(downloaded) break current_analysis = _remote_current_analysis( api, repo_id, root, revision=revision, remote_paths=remote_paths, latest_pointer=latest_pointer, ) latest_snapshot_id = ( str(latest_pointer.get("latest_snapshot_id")) if isinstance(latest_pointer, dict) else None ) archived_run_manifests = sorted( path for path in remote_paths if path.startswith("snapshots/") and "/analysis-runs/" in path and path.endswith("/manifest.json") ) current_snapshot_run_count = 0 if latest_snapshot_id: current_snapshot_run_count = sum( 1 for path in archived_run_manifests if path.startswith(f"snapshots/{latest_snapshot_id}/analysis-runs/") ) extracted_at = manifest.get("extracted_at") if manifest else None return { "dataset_id": repo_id, "revision": revision, "latest_pointer": latest_pointer, "watermark": watermark, "manifest": manifest, "cheap_artifacts": { "pr_scope_clusters": _remote_has_latest_artifact( remote_paths, latest_pointer, PR_SCOPE_CLUSTERS_FILENAME, ), "contributors": all( _remote_has_latest_artifact(remote_paths, latest_pointer, filename) for filename in CONTRIBUTOR_ARTIFACT_FILENAMES ), }, "current_analysis": current_analysis, "archived_analysis_runs": { "count": len(archived_run_manifests), "current_snapshot_count": current_snapshot_run_count, }, "remote_path_count": len(remote_paths), "age": _age_summary(extracted_at), } def _remote_current_analysis( api: HfApi, repo_id: str, root: Path, *, revision: str | None, remote_paths: set[str], latest_pointer: dict[str, Any] | None, ) -> dict[str, Any]: if CURRENT_ANALYSIS_MANIFEST_PATH not in remote_paths: return {"present": False} downloaded = load_remote_file( api, repo_id, CURRENT_ANALYSIS_MANIFEST_PATH, root, revision=revision, ) if downloaded is None: return {"present": False} try: manifest = load_current_analysis_manifest(downloaded) except ValueError as exc: return {"present": True, "valid": False, "detail": str(exc)} latest_snapshot_id = ( str(latest_pointer.get("latest_snapshot_id")) if isinstance(latest_pointer, dict) else None ) return { "present": True, "valid": True, "snapshot_id": manifest["snapshot_id"], "analysis_id": manifest["analysis_id"], "variant": manifest["variant"], "published_at": manifest["published_at"], "matches_latest_snapshot": manifest["snapshot_id"] == latest_snapshot_id, "artifact_count": len(manifest["artifacts"]), } def _remote_has_latest_artifact( remote_paths: set[str], latest_pointer: dict[str, Any] | None, filename: str, ) -> bool: candidates = stable_snapshot_candidates(latest_pointer, filename) return any(candidate in remote_paths for candidate in candidates) def get_dataset_status(options: DatasetStatusOptions) -> dict[str, Any]: remote = _remote_status(options.hf_repo_id, options.hf_revision) if options.hf_repo_id else None local = _local_status(options.output_dir) repo = options.repo if repo is None and remote and remote.get("manifest"): repo = remote["manifest"].get("repo") if repo is None and local and isinstance(local.get("latest_pointer"), dict): repo = local["latest_pointer"].get("repo") return { "repo": repo, "dataset_id": options.hf_repo_id, "remote": remote, "local": local, } def format_dataset_status(status: dict[str, Any]) -> str: remote = status.get("remote") or {} local = status.get("local") or {} manifest = remote.get("manifest") or {} watermark = remote.get("watermark") or {} latest_pointer = remote.get("latest_pointer") or {} age = remote.get("age") or {} current_analysis = remote.get("current_analysis") or {} cheap_artifacts = remote.get("cheap_artifacts") or {} archived_runs = remote.get("archived_analysis_runs") or {} lines = [ f"Repo: {status.get('repo') or '?'}", f"Dataset: {status.get('dataset_id') or 'not configured'}", ] if remote: lines.extend( [ f"Remote latest snapshot: {manifest.get('snapshot_id') or latest_pointer.get('latest_snapshot_id') or '?'}", f"Remote extracted at: {manifest.get('extracted_at') or '?'}", f"Remote next_since: {watermark.get('next_since') or latest_pointer.get('next_since') or '?'}", f"PR scope artifact: {'yes' if cheap_artifacts.get('pr_scope_clusters') else 'no'}", f"Contributor artifacts: {'yes' if cheap_artifacts.get('contributors') else 'no'}", ] ) if current_analysis.get("present"): if current_analysis.get("valid") is False: lines.append(f"Current analysis: invalid ({current_analysis.get('detail')})") else: lines.append( "Current analysis: " f"snapshot={current_analysis.get('snapshot_id')} " f"analysis_id={current_analysis.get('analysis_id')}" ) lines.append( "Current analysis matches latest snapshot: " f"{'yes' if current_analysis.get('matches_latest_snapshot') else 'no'}" ) else: lines.append("Current analysis: none") lines.append( "Archived analysis runs: " f"{archived_runs.get('count', 0)} total, {archived_runs.get('current_snapshot_count', 0)} for latest snapshot" ) lines.append( f"Freshness: {age.get('summary') or 'unknown'} ({age.get('staleness') or 'unknown'})" ) if local: lines.extend( [ f"Local latest pointer: {local.get('latest_path')}", f"Local snapshot id: {local.get('snapshot_id') or '?'}", ] ) local_current_analysis = local.get("current_analysis") or {} if local_current_analysis.get("present"): lines.append( "Local current analysis: " f"snapshot={local_current_analysis.get('snapshot_id')} " f"analysis_id={local_current_analysis.get('analysis_id')}" ) else: lines.append("Local current analysis: none") else: lines.append("Local latest pointer: none") return "\n".join(lines)