from __future__ import annotations from pathlib import Path from slop_farmer.config import SnapshotAdoptOptions from slop_farmer.data.parquet_io import read_json, read_parquet_rows, write_json __all__ = ["adopt_snapshot_for_pipeline"] def adopt_snapshot_for_pipeline(options: SnapshotAdoptOptions) -> Path: snapshot_dir = options.snapshot_dir.resolve() manifest_path = snapshot_dir / "manifest.json" manifest = read_json(manifest_path) repo = str(manifest.get("repo") or "") if not repo: raise ValueError(f"Snapshot manifest has no repo: {manifest_path}") snapshot_id = str(manifest.get("snapshot_id") or snapshot_dir.name) manifest_watermark = ( manifest.get("watermark") if isinstance(manifest.get("watermark"), dict) else {} ) next_since = ( options.next_since or manifest_watermark.get("next_since") or manifest.get("crawl_started_at") or _infer_snapshot_next_since(snapshot_dir, manifest) or manifest.get("extracted_at") ) if not isinstance(next_since, str) or not next_since: raise ValueError(f"Could not determine next_since from {manifest_path}") output_dir = options.output_dir.resolve() latest_path = output_dir / "snapshots" / "latest.json" watermark_path = output_dir / "state" / "watermark.json" write_json( { "repo": repo, "latest_snapshot_id": snapshot_id, "snapshot_dir": str(snapshot_dir), "manifest_path": str(manifest_path), "next_since": next_since, }, latest_path, ) write_json( { "repo": repo, "last_successful_snapshot_id": snapshot_id, "snapshot_dir": str(snapshot_dir), "effective_since": manifest_watermark.get("effective_since"), "next_since": next_since, "updated_at": manifest.get("imported_at") or manifest.get("extracted_at") or next_since, }, watermark_path, ) return latest_path def _infer_snapshot_next_since(snapshot_dir: Path, manifest: dict[str, object]) -> str | None: if manifest.get("source_type") != "hf_checkpoint_import": return None table_specs = ( ("pull_requests.parquet", ("updated_at", "created_at")), ("issues.parquet", ("updated_at", "created_at")), ("comments.parquet", ("updated_at", "created_at")), ("reviews.parquet", ("submitted_at",)), ("review_comments.parquet", ("updated_at", "created_at")), ("events.parquet", ("created_at",)), ) for filename, fields in table_specs: candidates: list[str] = [] for row in read_parquet_rows(snapshot_dir / filename): for field in fields: value = row.get(field) if isinstance(value, str) and value: candidates.append(value) break if candidates: return max(candidates) return None