Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import tempfile | |
| from datetime import UTC, datetime | |
| from pathlib import Path | |
| from typing import Any | |
| from huggingface_hub import HfApi | |
| from slop_farmer.config import DatasetStatusOptions | |
| from slop_farmer.data.hf_dataset_repo import ( | |
| list_remote_paths, | |
| load_remote_file, | |
| load_remote_json_file, | |
| stable_snapshot_candidates, | |
| ) | |
| from slop_farmer.data.parquet_io import read_json | |
| from slop_farmer.data.snapshot_paths import ( | |
| CONTRIBUTOR_ARTIFACT_FILENAMES, | |
| CURRENT_ANALYSIS_MANIFEST_PATH, | |
| PR_SCOPE_CLUSTERS_FILENAME, | |
| SNAPSHOTS_LATEST_PATH, | |
| load_current_analysis_manifest, | |
| repo_relative_path_to_local, | |
| ) | |
| def _coerce_datetime(value: Any) -> datetime | None: | |
| if not isinstance(value, str) or not value: | |
| return None | |
| try: | |
| return datetime.fromisoformat(value.replace("Z", "+00:00")) | |
| except ValueError: | |
| return None | |
| def _age_summary(value: str | None) -> dict[str, Any]: | |
| timestamp = _coerce_datetime(value) | |
| if timestamp is None: | |
| return {"seconds": None, "summary": "unknown", "staleness": "unknown"} | |
| age_seconds = max(int((datetime.now(tz=UTC) - timestamp).total_seconds()), 0) | |
| if age_seconds <= 6 * 3600: | |
| staleness = "fresh" | |
| elif age_seconds <= 24 * 3600: | |
| staleness = "aging" | |
| else: | |
| staleness = "stale" | |
| if age_seconds < 3600: | |
| summary = f"{age_seconds // 60}m" | |
| elif age_seconds < 24 * 3600: | |
| summary = f"{age_seconds // 3600}h" | |
| else: | |
| summary = f"{age_seconds // 86400}d" | |
| return {"seconds": age_seconds, "summary": summary, "staleness": staleness} | |
| def _local_status(output_dir: Path) -> dict[str, Any] | None: | |
| latest_path = output_dir.resolve() / "snapshots" / "latest.json" | |
| if not latest_path.exists(): | |
| return None | |
| payload = read_json(latest_path) | |
| snapshot_dir_raw = payload.get("snapshot_dir") | |
| manifest: dict[str, Any] = {} | |
| snapshot_dir: Path | None = None | |
| if isinstance(snapshot_dir_raw, str) and snapshot_dir_raw: | |
| snapshot_dir = Path(snapshot_dir_raw).resolve() | |
| manifest_path = snapshot_dir / "manifest.json" | |
| if manifest_path.exists(): | |
| manifest = read_json(manifest_path) | |
| current_analysis = _local_current_analysis(snapshot_dir) | |
| return { | |
| "latest_path": str(latest_path), | |
| "latest_pointer": payload, | |
| "snapshot_dir": snapshot_dir_raw, | |
| "snapshot_id": manifest.get("snapshot_id") or payload.get("latest_snapshot_id"), | |
| "current_analysis": current_analysis, | |
| } | |
| def _local_current_analysis(snapshot_dir: Path | None) -> dict[str, Any]: | |
| if snapshot_dir is None: | |
| return {"present": False} | |
| manifest_path = repo_relative_path_to_local(snapshot_dir, CURRENT_ANALYSIS_MANIFEST_PATH) | |
| if not manifest_path.exists(): | |
| return {"present": False} | |
| try: | |
| manifest = load_current_analysis_manifest(manifest_path) | |
| except ValueError as exc: | |
| return {"present": True, "valid": False, "detail": str(exc)} | |
| return { | |
| "present": True, | |
| "valid": True, | |
| "snapshot_id": manifest["snapshot_id"], | |
| "analysis_id": manifest["analysis_id"], | |
| "variant": manifest["variant"], | |
| "published_at": manifest["published_at"], | |
| } | |
| def _remote_status(repo_id: str, revision: str | None) -> dict[str, Any]: | |
| api = HfApi() | |
| with tempfile.TemporaryDirectory(prefix="slop-farmer-dataset-status-") as tmp: | |
| root = Path(tmp) | |
| remote_paths = list_remote_paths(api, repo_id, revision=revision) | |
| latest_pointer = load_remote_json_file( | |
| api, | |
| repo_id, | |
| SNAPSHOTS_LATEST_PATH, | |
| root, | |
| revision=revision, | |
| ) | |
| watermark = load_remote_json_file( | |
| api, | |
| repo_id, | |
| "state/watermark.json", | |
| root, | |
| revision=revision, | |
| ) | |
| manifest = None | |
| if latest_pointer is not None: | |
| for candidate in stable_snapshot_candidates(latest_pointer, "manifest.json"): | |
| downloaded = load_remote_file( | |
| api, | |
| repo_id, | |
| candidate, | |
| root, | |
| revision=revision, | |
| ) | |
| if downloaded is None: | |
| continue | |
| manifest = read_json(downloaded) | |
| break | |
| current_analysis = _remote_current_analysis( | |
| api, | |
| repo_id, | |
| root, | |
| revision=revision, | |
| remote_paths=remote_paths, | |
| latest_pointer=latest_pointer, | |
| ) | |
| latest_snapshot_id = ( | |
| str(latest_pointer.get("latest_snapshot_id")) | |
| if isinstance(latest_pointer, dict) | |
| else None | |
| ) | |
| archived_run_manifests = sorted( | |
| path | |
| for path in remote_paths | |
| if path.startswith("snapshots/") | |
| and "/analysis-runs/" in path | |
| and path.endswith("/manifest.json") | |
| ) | |
| current_snapshot_run_count = 0 | |
| if latest_snapshot_id: | |
| current_snapshot_run_count = sum( | |
| 1 | |
| for path in archived_run_manifests | |
| if path.startswith(f"snapshots/{latest_snapshot_id}/analysis-runs/") | |
| ) | |
| extracted_at = manifest.get("extracted_at") if manifest else None | |
| return { | |
| "dataset_id": repo_id, | |
| "revision": revision, | |
| "latest_pointer": latest_pointer, | |
| "watermark": watermark, | |
| "manifest": manifest, | |
| "cheap_artifacts": { | |
| "pr_scope_clusters": _remote_has_latest_artifact( | |
| remote_paths, | |
| latest_pointer, | |
| PR_SCOPE_CLUSTERS_FILENAME, | |
| ), | |
| "contributors": all( | |
| _remote_has_latest_artifact(remote_paths, latest_pointer, filename) | |
| for filename in CONTRIBUTOR_ARTIFACT_FILENAMES | |
| ), | |
| }, | |
| "current_analysis": current_analysis, | |
| "archived_analysis_runs": { | |
| "count": len(archived_run_manifests), | |
| "current_snapshot_count": current_snapshot_run_count, | |
| }, | |
| "remote_path_count": len(remote_paths), | |
| "age": _age_summary(extracted_at), | |
| } | |
| def _remote_current_analysis( | |
| api: HfApi, | |
| repo_id: str, | |
| root: Path, | |
| *, | |
| revision: str | None, | |
| remote_paths: set[str], | |
| latest_pointer: dict[str, Any] | None, | |
| ) -> dict[str, Any]: | |
| if CURRENT_ANALYSIS_MANIFEST_PATH not in remote_paths: | |
| return {"present": False} | |
| downloaded = load_remote_file( | |
| api, | |
| repo_id, | |
| CURRENT_ANALYSIS_MANIFEST_PATH, | |
| root, | |
| revision=revision, | |
| ) | |
| if downloaded is None: | |
| return {"present": False} | |
| try: | |
| manifest = load_current_analysis_manifest(downloaded) | |
| except ValueError as exc: | |
| return {"present": True, "valid": False, "detail": str(exc)} | |
| latest_snapshot_id = ( | |
| str(latest_pointer.get("latest_snapshot_id")) if isinstance(latest_pointer, dict) else None | |
| ) | |
| return { | |
| "present": True, | |
| "valid": True, | |
| "snapshot_id": manifest["snapshot_id"], | |
| "analysis_id": manifest["analysis_id"], | |
| "variant": manifest["variant"], | |
| "published_at": manifest["published_at"], | |
| "matches_latest_snapshot": manifest["snapshot_id"] == latest_snapshot_id, | |
| "artifact_count": len(manifest["artifacts"]), | |
| } | |
| def _remote_has_latest_artifact( | |
| remote_paths: set[str], | |
| latest_pointer: dict[str, Any] | None, | |
| filename: str, | |
| ) -> bool: | |
| candidates = stable_snapshot_candidates(latest_pointer, filename) | |
| return any(candidate in remote_paths for candidate in candidates) | |
| def get_dataset_status(options: DatasetStatusOptions) -> dict[str, Any]: | |
| remote = _remote_status(options.hf_repo_id, options.hf_revision) if options.hf_repo_id else None | |
| local = _local_status(options.output_dir) | |
| repo = options.repo | |
| if repo is None and remote and remote.get("manifest"): | |
| repo = remote["manifest"].get("repo") | |
| if repo is None and local and isinstance(local.get("latest_pointer"), dict): | |
| repo = local["latest_pointer"].get("repo") | |
| return { | |
| "repo": repo, | |
| "dataset_id": options.hf_repo_id, | |
| "remote": remote, | |
| "local": local, | |
| } | |
| def format_dataset_status(status: dict[str, Any]) -> str: | |
| remote = status.get("remote") or {} | |
| local = status.get("local") or {} | |
| manifest = remote.get("manifest") or {} | |
| watermark = remote.get("watermark") or {} | |
| latest_pointer = remote.get("latest_pointer") or {} | |
| age = remote.get("age") or {} | |
| current_analysis = remote.get("current_analysis") or {} | |
| cheap_artifacts = remote.get("cheap_artifacts") or {} | |
| archived_runs = remote.get("archived_analysis_runs") or {} | |
| lines = [ | |
| f"Repo: {status.get('repo') or '?'}", | |
| f"Dataset: {status.get('dataset_id') or 'not configured'}", | |
| ] | |
| if remote: | |
| lines.extend( | |
| [ | |
| f"Remote latest snapshot: {manifest.get('snapshot_id') or latest_pointer.get('latest_snapshot_id') or '?'}", | |
| f"Remote extracted at: {manifest.get('extracted_at') or '?'}", | |
| f"Remote next_since: {watermark.get('next_since') or latest_pointer.get('next_since') or '?'}", | |
| f"PR scope artifact: {'yes' if cheap_artifacts.get('pr_scope_clusters') else 'no'}", | |
| f"Contributor artifacts: {'yes' if cheap_artifacts.get('contributors') else 'no'}", | |
| ] | |
| ) | |
| if current_analysis.get("present"): | |
| if current_analysis.get("valid") is False: | |
| lines.append(f"Current analysis: invalid ({current_analysis.get('detail')})") | |
| else: | |
| lines.append( | |
| "Current analysis: " | |
| f"snapshot={current_analysis.get('snapshot_id')} " | |
| f"analysis_id={current_analysis.get('analysis_id')}" | |
| ) | |
| lines.append( | |
| "Current analysis matches latest snapshot: " | |
| f"{'yes' if current_analysis.get('matches_latest_snapshot') else 'no'}" | |
| ) | |
| else: | |
| lines.append("Current analysis: none") | |
| lines.append( | |
| "Archived analysis runs: " | |
| f"{archived_runs.get('count', 0)} total, {archived_runs.get('current_snapshot_count', 0)} for latest snapshot" | |
| ) | |
| lines.append( | |
| f"Freshness: {age.get('summary') or 'unknown'} ({age.get('staleness') or 'unknown'})" | |
| ) | |
| if local: | |
| lines.extend( | |
| [ | |
| f"Local latest pointer: {local.get('latest_path')}", | |
| f"Local snapshot id: {local.get('snapshot_id') or '?'}", | |
| ] | |
| ) | |
| local_current_analysis = local.get("current_analysis") or {} | |
| if local_current_analysis.get("present"): | |
| lines.append( | |
| "Local current analysis: " | |
| f"snapshot={local_current_analysis.get('snapshot_id')} " | |
| f"analysis_id={local_current_analysis.get('analysis_id')}" | |
| ) | |
| else: | |
| lines.append("Local current analysis: none") | |
| else: | |
| lines.append("Local latest pointer: none") | |
| return "\n".join(lines) | |