diffusers-pr-api / src /slop_farmer /app /snapshot_state.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
from pathlib import Path
from slop_farmer.config import SnapshotAdoptOptions
from slop_farmer.data.parquet_io import read_json, read_parquet_rows, write_json
__all__ = ["adopt_snapshot_for_pipeline"]
def adopt_snapshot_for_pipeline(options: SnapshotAdoptOptions) -> Path:
snapshot_dir = options.snapshot_dir.resolve()
manifest_path = snapshot_dir / "manifest.json"
manifest = read_json(manifest_path)
repo = str(manifest.get("repo") or "")
if not repo:
raise ValueError(f"Snapshot manifest has no repo: {manifest_path}")
snapshot_id = str(manifest.get("snapshot_id") or snapshot_dir.name)
manifest_watermark = (
manifest.get("watermark") if isinstance(manifest.get("watermark"), dict) else {}
)
next_since = (
options.next_since
or manifest_watermark.get("next_since")
or manifest.get("crawl_started_at")
or _infer_snapshot_next_since(snapshot_dir, manifest)
or manifest.get("extracted_at")
)
if not isinstance(next_since, str) or not next_since:
raise ValueError(f"Could not determine next_since from {manifest_path}")
output_dir = options.output_dir.resolve()
latest_path = output_dir / "snapshots" / "latest.json"
watermark_path = output_dir / "state" / "watermark.json"
write_json(
{
"repo": repo,
"latest_snapshot_id": snapshot_id,
"snapshot_dir": str(snapshot_dir),
"manifest_path": str(manifest_path),
"next_since": next_since,
},
latest_path,
)
write_json(
{
"repo": repo,
"last_successful_snapshot_id": snapshot_id,
"snapshot_dir": str(snapshot_dir),
"effective_since": manifest_watermark.get("effective_since"),
"next_since": next_since,
"updated_at": manifest.get("imported_at") or manifest.get("extracted_at") or next_since,
},
watermark_path,
)
return latest_path
def _infer_snapshot_next_since(snapshot_dir: Path, manifest: dict[str, object]) -> str | None:
if manifest.get("source_type") != "hf_checkpoint_import":
return None
table_specs = (
("pull_requests.parquet", ("updated_at", "created_at")),
("issues.parquet", ("updated_at", "created_at")),
("comments.parquet", ("updated_at", "created_at")),
("reviews.parquet", ("submitted_at",)),
("review_comments.parquet", ("updated_at", "created_at")),
("events.parquet", ("created_at",)),
)
for filename, fields in table_specs:
candidates: list[str] = []
for row in read_parquet_rows(snapshot_dir / filename):
for field in fields:
value = row.get(field)
if isinstance(value, str) and value:
candidates.append(value)
break
if candidates:
return max(candidates)
return None