Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import shutil | |
| import urllib.parse | |
| import urllib.request | |
| from datetime import UTC, datetime | |
| from pathlib import Path, PurePosixPath | |
| from typing import Any | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from slop_farmer.data.http import urlopen_with_retry | |
| from slop_farmer.data.parquet_io import read_json, write_text | |
| from slop_farmer.data.snapshot_paths import ( | |
| CONTRIBUTOR_ARTIFACT_FILENAMES, | |
| CURRENT_ANALYSIS_MANIFEST_PATH, | |
| LEGACY_ANALYSIS_FILENAMES, | |
| PR_SCOPE_CLUSTERS_FILENAME, | |
| RAW_TABLE_FILENAMES, | |
| README_FILENAME, | |
| ROOT_MANIFEST_FILENAME, | |
| SNAPSHOTS_LATEST_PATH, | |
| STATE_WATERMARK_PATH, | |
| load_archived_analysis_run_manifest, | |
| load_current_analysis_manifest, | |
| repo_relative_path_to_local, | |
| ) | |
| def materialize_hf_dataset_snapshot( | |
| *, | |
| repo_id: str, | |
| local_dir: Path, | |
| revision: str | None = None, | |
| ) -> Path: | |
| info = _hf_dataset_info(repo_id=repo_id, revision=revision, files_metadata=True) | |
| remote_paths = {sibling.rfilename for sibling in info.siblings} | |
| resolved_revision = str(info.sha or revision or "main") | |
| if SNAPSHOTS_LATEST_PATH in remote_paths: | |
| return _materialize_hf_snapshot_repo_snapshot( | |
| repo_id=repo_id, | |
| local_dir=local_dir, | |
| revision=resolved_revision, | |
| requested_revision=revision, | |
| hf_sha=info.sha, | |
| remote_paths=remote_paths, | |
| ) | |
| if {"issues.parquet", "pull_requests.parquet"} <= remote_paths: | |
| return _materialize_hf_root_snapshot( | |
| repo_id=repo_id, | |
| local_dir=local_dir, | |
| revision=resolved_revision, | |
| requested_revision=revision, | |
| hf_sha=info.sha, | |
| remote_paths=remote_paths, | |
| ) | |
| return _materialize_hf_dataset_viewer_snapshot( | |
| repo_id=repo_id, | |
| local_dir=local_dir, | |
| revision=resolved_revision, | |
| requested_revision=revision, | |
| hf_sha=info.sha, | |
| ) | |
| def _materialize_hf_snapshot_repo_snapshot( | |
| *, | |
| repo_id: str, | |
| local_dir: Path, | |
| revision: str, | |
| requested_revision: str | None, | |
| hf_sha: str | None, | |
| remote_paths: set[str], | |
| ) -> Path: | |
| local_dir.mkdir(parents=True, exist_ok=True) | |
| latest_download = Path( | |
| hf_hub_download( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| filename=SNAPSHOTS_LATEST_PATH, | |
| revision=revision, | |
| ) | |
| ) | |
| latest_payload = json.loads(latest_download.read_text(encoding="utf-8")) | |
| downloaded_files: set[str] = set() | |
| _copy_downloaded_file( | |
| latest_download, repo_relative_path_to_local(local_dir, SNAPSHOTS_LATEST_PATH) | |
| ) | |
| downloaded_files.add(SNAPSHOTS_LATEST_PATH) | |
| for filename in ( | |
| *RAW_TABLE_FILENAMES, | |
| ROOT_MANIFEST_FILENAME, | |
| PR_SCOPE_CLUSTERS_FILENAME, | |
| *CONTRIBUTOR_ARTIFACT_FILENAMES, | |
| *LEGACY_ANALYSIS_FILENAMES, | |
| ): | |
| downloaded = _download_first_available_hf_file( | |
| repo_id=repo_id, | |
| revision=revision, | |
| filenames=_hf_latest_snapshot_candidates(latest_payload, filename), | |
| ) | |
| if downloaded is None: | |
| continue | |
| _copy_downloaded_file(downloaded, local_dir / filename) | |
| downloaded_files.add(filename) | |
| if STATE_WATERMARK_PATH in remote_paths: | |
| _download_repo_file( | |
| repo_id=repo_id, | |
| revision=revision, | |
| local_dir=local_dir, | |
| repo_path=STATE_WATERMARK_PATH, | |
| downloaded_files=downloaded_files, | |
| ) | |
| _download_analysis_state_files( | |
| repo_id=repo_id, | |
| revision=revision, | |
| local_dir=local_dir, | |
| remote_paths=remote_paths, | |
| downloaded_files=downloaded_files, | |
| ) | |
| _download_published_analysis_files( | |
| repo_id=repo_id, | |
| revision=revision, | |
| local_dir=local_dir, | |
| remote_paths=remote_paths, | |
| downloaded_files=downloaded_files, | |
| ) | |
| _download_repo_file( | |
| repo_id=repo_id, | |
| revision=revision, | |
| local_dir=local_dir, | |
| repo_path=README_FILENAME, | |
| downloaded_files=downloaded_files, | |
| required=False, | |
| ) | |
| manifest = ( | |
| read_json(local_dir / ROOT_MANIFEST_FILENAME) | |
| if (local_dir / ROOT_MANIFEST_FILENAME).exists() | |
| else {} | |
| ) | |
| manifest.setdefault("repo", _infer_repo_from_materialized_snapshot(local_dir)) | |
| manifest.setdefault( | |
| "snapshot_id", | |
| str(latest_payload.get("latest_snapshot_id") or hf_sha or local_dir.name), | |
| ) | |
| manifest.update( | |
| { | |
| "source_type": "hf_snapshot_repo", | |
| "hf_repo_id": repo_id, | |
| "hf_revision": requested_revision, | |
| "hf_resolved_revision": revision, | |
| "hf_sha": hf_sha, | |
| "materialized_at": _iso_now(), | |
| "downloaded_files": sorted(downloaded_files), | |
| "hf_latest_pointer": latest_payload, | |
| } | |
| ) | |
| write_text(json.dumps(manifest, indent=2) + "\n", local_dir / ROOT_MANIFEST_FILENAME) | |
| return local_dir | |
| def _materialize_hf_root_snapshot( | |
| *, | |
| repo_id: str, | |
| local_dir: Path, | |
| revision: str, | |
| requested_revision: str | None, | |
| hf_sha: str | None, | |
| remote_paths: set[str], | |
| ) -> Path: | |
| local_dir.mkdir(parents=True, exist_ok=True) | |
| downloaded_files: set[str] = set() | |
| for repo_path in ( | |
| *RAW_TABLE_FILENAMES, | |
| ROOT_MANIFEST_FILENAME, | |
| PR_SCOPE_CLUSTERS_FILENAME, | |
| *CONTRIBUTOR_ARTIFACT_FILENAMES, | |
| *LEGACY_ANALYSIS_FILENAMES, | |
| SNAPSHOTS_LATEST_PATH, | |
| STATE_WATERMARK_PATH, | |
| README_FILENAME, | |
| ): | |
| if repo_path not in remote_paths: | |
| continue | |
| _download_repo_file( | |
| repo_id=repo_id, | |
| revision=revision, | |
| local_dir=local_dir, | |
| repo_path=repo_path, | |
| downloaded_files=downloaded_files, | |
| ) | |
| _download_analysis_state_files( | |
| repo_id=repo_id, | |
| revision=revision, | |
| local_dir=local_dir, | |
| remote_paths=remote_paths, | |
| downloaded_files=downloaded_files, | |
| ) | |
| _download_published_analysis_files( | |
| repo_id=repo_id, | |
| revision=revision, | |
| local_dir=local_dir, | |
| remote_paths=remote_paths, | |
| downloaded_files=downloaded_files, | |
| ) | |
| manifest = ( | |
| read_json(local_dir / ROOT_MANIFEST_FILENAME) | |
| if (local_dir / ROOT_MANIFEST_FILENAME).exists() | |
| else {} | |
| ) | |
| manifest.setdefault("repo", _infer_repo_from_materialized_snapshot(local_dir)) | |
| manifest.setdefault("snapshot_id", hf_sha or local_dir.name) | |
| manifest.update( | |
| { | |
| "source_type": "hf_root_snapshot", | |
| "hf_repo_id": repo_id, | |
| "hf_revision": requested_revision, | |
| "hf_resolved_revision": revision, | |
| "hf_sha": hf_sha, | |
| "materialized_at": _iso_now(), | |
| "downloaded_files": sorted(downloaded_files), | |
| } | |
| ) | |
| write_text(json.dumps(manifest, indent=2) + "\n", local_dir / ROOT_MANIFEST_FILENAME) | |
| return local_dir | |
| def _materialize_hf_dataset_viewer_snapshot( | |
| *, | |
| repo_id: str, | |
| local_dir: Path, | |
| revision: str, | |
| requested_revision: str | None, | |
| hf_sha: str | None, | |
| ) -> Path: | |
| local_dir.mkdir(parents=True, exist_ok=True) | |
| downloaded_files: set[str] = set() | |
| for index, url in enumerate(_hf_dataset_parquet_urls(repo_id, revision)): | |
| temporary_path = local_dir / f"tmp-{index:04d}.parquet" | |
| _download_url_to_path(url, temporary_path) | |
| table_name = _parquet_table_name(temporary_path) | |
| temporary_path.replace(local_dir / table_name) | |
| downloaded_files.add(table_name) | |
| readme_path = hf_hub_download( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| filename=README_FILENAME, | |
| revision=revision, | |
| ) | |
| shutil.copy2(readme_path, local_dir / README_FILENAME) | |
| downloaded_files.add(README_FILENAME) | |
| manifest = { | |
| "repo": _infer_repo_from_materialized_snapshot(local_dir), | |
| "snapshot_id": hf_sha or local_dir.name, | |
| "source_type": "hf_dataset_viewer", | |
| "hf_repo_id": repo_id, | |
| "hf_revision": requested_revision, | |
| "hf_resolved_revision": revision, | |
| "hf_sha": hf_sha, | |
| "materialized_at": _iso_now(), | |
| "downloaded_files": sorted(downloaded_files), | |
| } | |
| write_text(json.dumps(manifest, indent=2) + "\n", local_dir / ROOT_MANIFEST_FILENAME) | |
| return local_dir | |
| def _download_published_analysis_files( | |
| *, | |
| repo_id: str, | |
| revision: str, | |
| local_dir: Path, | |
| remote_paths: set[str], | |
| downloaded_files: set[str], | |
| ) -> None: | |
| if CURRENT_ANALYSIS_MANIFEST_PATH in remote_paths: | |
| manifest_path = _download_repo_file( | |
| repo_id=repo_id, | |
| revision=revision, | |
| local_dir=local_dir, | |
| repo_path=CURRENT_ANALYSIS_MANIFEST_PATH, | |
| downloaded_files=downloaded_files, | |
| ) | |
| current_manifest = load_current_analysis_manifest(manifest_path) | |
| for repo_path in _manifest_artifact_paths(current_manifest, include_archived=True): | |
| if repo_path not in remote_paths: | |
| continue | |
| _download_repo_file( | |
| repo_id=repo_id, | |
| revision=revision, | |
| local_dir=local_dir, | |
| repo_path=repo_path, | |
| downloaded_files=downloaded_files, | |
| ) | |
| for repo_path in sorted( | |
| path for path in remote_paths if _is_archived_analysis_manifest_path(path) | |
| ): | |
| manifest_path = _download_repo_file( | |
| repo_id=repo_id, | |
| revision=revision, | |
| local_dir=local_dir, | |
| repo_path=repo_path, | |
| downloaded_files=downloaded_files, | |
| ) | |
| archived_manifest = load_archived_analysis_run_manifest(manifest_path) | |
| for artifact_path in _manifest_artifact_paths(archived_manifest, include_archived=False): | |
| if artifact_path not in remote_paths: | |
| continue | |
| _download_repo_file( | |
| repo_id=repo_id, | |
| revision=revision, | |
| local_dir=local_dir, | |
| repo_path=artifact_path, | |
| downloaded_files=downloaded_files, | |
| ) | |
| def _download_analysis_state_files( | |
| *, | |
| repo_id: str, | |
| revision: str, | |
| local_dir: Path, | |
| remote_paths: set[str], | |
| downloaded_files: set[str], | |
| ) -> None: | |
| for repo_path in sorted( | |
| path for path in remote_paths if PurePosixPath(path).parts[:1] == ("analysis-state",) | |
| ): | |
| _download_repo_file( | |
| repo_id=repo_id, | |
| revision=revision, | |
| local_dir=local_dir, | |
| repo_path=repo_path, | |
| downloaded_files=downloaded_files, | |
| ) | |
| def _manifest_artifact_paths( | |
| payload: dict[str, Any], | |
| *, | |
| include_archived: bool, | |
| ) -> list[str]: | |
| paths = [ | |
| str(value) for value in (payload.get("artifacts") or {}).values() if isinstance(value, str) | |
| ] | |
| if include_archived: | |
| paths.extend( | |
| str(value) | |
| for value in (payload.get("archived_artifacts") or {}).values() | |
| if isinstance(value, str) | |
| ) | |
| deduped: list[str] = [] | |
| seen: set[str] = set() | |
| for repo_path in paths: | |
| normalized = repo_path.lstrip("./") | |
| if not normalized or normalized in seen: | |
| continue | |
| seen.add(normalized) | |
| deduped.append(normalized) | |
| return deduped | |
| def _is_archived_analysis_manifest_path(repo_path: str) -> bool: | |
| parts = PurePosixPath(repo_path).parts | |
| return ( | |
| len(parts) == 5 | |
| and parts[0] == "snapshots" | |
| and parts[2] == "analysis-runs" | |
| and parts[4] == ROOT_MANIFEST_FILENAME | |
| ) | |
| def _download_repo_file( | |
| *, | |
| repo_id: str, | |
| revision: str, | |
| local_dir: Path, | |
| repo_path: str, | |
| downloaded_files: set[str], | |
| required: bool = True, | |
| ) -> Path: | |
| try: | |
| downloaded = Path( | |
| hf_hub_download( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| filename=repo_path, | |
| revision=revision, | |
| ) | |
| ) | |
| except Exception: | |
| if required: | |
| raise | |
| return local_dir / repo_path | |
| destination = repo_relative_path_to_local(local_dir, repo_path) | |
| _copy_downloaded_file(downloaded, destination) | |
| downloaded_files.add(repo_path) | |
| return destination | |
| def _copy_downloaded_file(downloaded_path: Path, destination: Path) -> None: | |
| destination.parent.mkdir(parents=True, exist_ok=True) | |
| shutil.copy2(downloaded_path, destination) | |
| def _hf_dataset_info(repo_id: str, revision: str | None, *, files_metadata: bool) -> Any: | |
| api = HfApi() | |
| try: | |
| return api.dataset_info(repo_id=repo_id, revision=revision, files_metadata=files_metadata) | |
| except TypeError: | |
| return api.dataset_info(repo_id=repo_id, revision=revision) | |
| def _hf_dataset_parquet_urls(repo_id: str, revision: str | None = None) -> list[str]: | |
| query = urllib.parse.urlencode({"revision": revision}) if revision else "" | |
| api_url = ( | |
| f"https://huggingface.co/api/datasets/{urllib.parse.quote(repo_id, safe='')}/parquet" | |
| f"{f'?{query}' if query else ''}" | |
| ) | |
| with urlopen_with_retry(api_url, timeout=120, label=api_url) as response: | |
| payload = json.loads(response.read().decode("utf-8")) | |
| urls = payload.get("default", {}).get("train", []) | |
| if not isinstance(urls, list) or not urls: | |
| raise FileNotFoundError( | |
| f"No parquet export URLs found for HF dataset {repo_id} at {api_url}" | |
| ) | |
| return [str(url) for url in urls] | |
| def _download_first_available_hf_file( | |
| *, | |
| repo_id: str, | |
| revision: str, | |
| filenames: list[str], | |
| ) -> Path | None: | |
| for filename in filenames: | |
| try: | |
| downloaded = Path( | |
| hf_hub_download( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| filename=filename, | |
| revision=revision, | |
| ) | |
| ) | |
| except Exception: | |
| continue | |
| if downloaded.exists(): | |
| return downloaded | |
| return None | |
| def _hf_latest_snapshot_candidates(latest_payload: dict[str, Any], filename: str) -> list[str]: | |
| candidates: list[str] = [] | |
| manifest_path = str(latest_payload.get("manifest_path") or "").strip("/") | |
| snapshot_dir = str(latest_payload.get("snapshot_dir") or "").strip("/") | |
| latest_snapshot_id = str(latest_payload.get("latest_snapshot_id") or "").strip() | |
| archived_manifest_path = str(latest_payload.get("archived_manifest_path") or "").strip("/") | |
| if filename == ROOT_MANIFEST_FILENAME and manifest_path: | |
| candidates.append(manifest_path) | |
| if snapshot_dir and snapshot_dir not in {".", "/"}: | |
| candidates.append(f"{snapshot_dir}/{filename}") | |
| if filename == ROOT_MANIFEST_FILENAME and archived_manifest_path: | |
| candidates.append(archived_manifest_path) | |
| if manifest_path and "/" in manifest_path: | |
| manifest_dir = manifest_path.rsplit("/", 1)[0] | |
| candidates.append(f"{manifest_dir}/{filename}") | |
| if latest_snapshot_id: | |
| candidates.append(str(PurePosixPath("snapshots") / latest_snapshot_id / filename)) | |
| candidates.append(filename) | |
| seen: set[str] = set() | |
| deduped: list[str] = [] | |
| for candidate in candidates: | |
| normalized = candidate.lstrip("./") | |
| if not normalized or normalized in seen: | |
| continue | |
| seen.add(normalized) | |
| deduped.append(normalized) | |
| return deduped | |
| def _download_url_to_path(url: str, destination: Path) -> None: | |
| destination.parent.mkdir(parents=True, exist_ok=True) | |
| urllib.request.urlretrieve(url, destination) | |
| def _parquet_table_name(path: Path) -> str: | |
| import pyarrow.parquet as pq | |
| columns = set(pq.read_table(path).column_names) | |
| if {"parent_kind", "issue_api_url", "body"} <= columns: | |
| return "comments.parquet" | |
| if {"event", "source_issue_number", "source_issue_url"} <= columns: | |
| return "events.parquet" | |
| if {"milestone_title", "comments_count"} <= columns and "merged_at" not in columns: | |
| return "issues.parquet" | |
| if {"link_type", "link_origin", "target_number"} <= columns: | |
| return "links.parquet" | |
| if {"pull_request_number", "filename", "blob_url", "patch"} <= columns: | |
| return "pr_files.parquet" | |
| if {"pull_request_number", "diff", "html_url", "api_url"} <= columns: | |
| return "pr_diffs.parquet" | |
| if {"merged_at", "head_ref", "base_ref"} <= columns: | |
| return "pull_requests.parquet" | |
| if {"review_id", "pull_request_api_url", "path"} <= columns: | |
| return "review_comments.parquet" | |
| if {"pull_request_number", "submitted_at"} <= columns and "review_id" not in columns: | |
| return "reviews.parquet" | |
| raise ValueError(f"Unrecognized HF parquet schema for {path.name}: {sorted(columns)}") | |
| def _infer_repo_from_materialized_snapshot(local_dir: Path) -> str: | |
| import pyarrow.parquet as pq | |
| for table_filename in RAW_TABLE_FILENAMES: | |
| path = local_dir / table_filename | |
| if not path.exists(): | |
| continue | |
| rows = pq.read_table(path).slice(0, 1).to_pylist() | |
| if rows and rows[0].get("repo"): | |
| return str(rows[0]["repo"]) | |
| raise FileNotFoundError(f"Could not infer repo from materialized snapshot in {local_dir}") | |
| def _iso_now() -> str: | |
| return datetime.now(tz=UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z") | |