Spaces:
Running
Running
| from __future__ import annotations | |
| from pathlib import Path | |
| from slop_farmer.config import SnapshotAdoptOptions | |
| from slop_farmer.data.parquet_io import read_json, read_parquet_rows, write_json | |
| __all__ = ["adopt_snapshot_for_pipeline"] | |
| def adopt_snapshot_for_pipeline(options: SnapshotAdoptOptions) -> Path: | |
| snapshot_dir = options.snapshot_dir.resolve() | |
| manifest_path = snapshot_dir / "manifest.json" | |
| manifest = read_json(manifest_path) | |
| repo = str(manifest.get("repo") or "") | |
| if not repo: | |
| raise ValueError(f"Snapshot manifest has no repo: {manifest_path}") | |
| snapshot_id = str(manifest.get("snapshot_id") or snapshot_dir.name) | |
| manifest_watermark = ( | |
| manifest.get("watermark") if isinstance(manifest.get("watermark"), dict) else {} | |
| ) | |
| next_since = ( | |
| options.next_since | |
| or manifest_watermark.get("next_since") | |
| or manifest.get("crawl_started_at") | |
| or _infer_snapshot_next_since(snapshot_dir, manifest) | |
| or manifest.get("extracted_at") | |
| ) | |
| if not isinstance(next_since, str) or not next_since: | |
| raise ValueError(f"Could not determine next_since from {manifest_path}") | |
| output_dir = options.output_dir.resolve() | |
| latest_path = output_dir / "snapshots" / "latest.json" | |
| watermark_path = output_dir / "state" / "watermark.json" | |
| write_json( | |
| { | |
| "repo": repo, | |
| "latest_snapshot_id": snapshot_id, | |
| "snapshot_dir": str(snapshot_dir), | |
| "manifest_path": str(manifest_path), | |
| "next_since": next_since, | |
| }, | |
| latest_path, | |
| ) | |
| write_json( | |
| { | |
| "repo": repo, | |
| "last_successful_snapshot_id": snapshot_id, | |
| "snapshot_dir": str(snapshot_dir), | |
| "effective_since": manifest_watermark.get("effective_since"), | |
| "next_since": next_since, | |
| "updated_at": manifest.get("imported_at") or manifest.get("extracted_at") or next_since, | |
| }, | |
| watermark_path, | |
| ) | |
| return latest_path | |
| def _infer_snapshot_next_since(snapshot_dir: Path, manifest: dict[str, object]) -> str | None: | |
| if manifest.get("source_type") != "hf_checkpoint_import": | |
| return None | |
| table_specs = ( | |
| ("pull_requests.parquet", ("updated_at", "created_at")), | |
| ("issues.parquet", ("updated_at", "created_at")), | |
| ("comments.parquet", ("updated_at", "created_at")), | |
| ("reviews.parquet", ("submitted_at",)), | |
| ("review_comments.parquet", ("updated_at", "created_at")), | |
| ("events.parquet", ("created_at",)), | |
| ) | |
| for filename, fields in table_specs: | |
| candidates: list[str] = [] | |
| for row in read_parquet_rows(snapshot_dir / filename): | |
| for field in fields: | |
| value = row.get(field) | |
| if isinstance(value, str) and value: | |
| candidates.append(value) | |
| break | |
| if candidates: | |
| return max(candidates) | |
| return None | |