Spaces:
Running
Running
| """URI parsing + dataset fetching for the Harbor Visualiser. | |
| Accepts the same URI shapes as `repo2rlenv pull`: | |
| owner/name → HF Hub (default) | |
| owner/name@<rev> → HF Hub, revision pinned | |
| hf://owner/name[@rev] → HF Hub, explicit prefix | |
| gh://owner/repo[@ref] → GitHub, optional branch/tag/sha | |
| https://github.com/owner/repo[.git] → GitHub, full URL | |
| All datasets land in a local cache dir; subsequent loads in the same Space | |
| process are free. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import os | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| logger = logging.getLogger(__name__) | |
| # Where downloads are cached during a single Space-process lifetime. | |
| # In Spaces, /tmp survives across requests for the same instance. | |
| CACHE_ROOT = Path(os.environ.get("HARBOR_VIEWER_CACHE", "/tmp/.harbor-viewer-cache")) | |
| class DatasetSource: | |
| """A parsed dataset URI ready for fetch. | |
| `kind` is one of "hf" | "gh" | "local". `ident` is the canonical id | |
| (`owner/name` for HF/GH, absolute path for local). `revision` is the | |
| optional @-suffix (tag / branch / commit / Hub revision). | |
| """ | |
| kind: str | |
| ident: str | |
| revision: str | None | |
| def display(self) -> str: | |
| rev = f"@{self.revision}" if self.revision else "" | |
| if self.kind == "hf": | |
| return f"hf://{self.ident}{rev}" | |
| if self.kind == "gh": | |
| return f"gh://{self.ident}{rev}" | |
| if self.kind == "harbor": | |
| return f"harbor://{self.ident}{rev}" | |
| return str(self.ident) | |
| def cache_key(self) -> str: | |
| rev = (self.revision or "head").replace("/", "_") | |
| return f"{self.kind}__{self.ident.replace('/', '__')}__{rev}" | |
| # --------------------------------------------------------------------------- | |
| # URI parsing | |
| # --------------------------------------------------------------------------- | |
| def _split_revision(s: str) -> tuple[str, str | None]: | |
| """Split `name@rev` into (name, rev). Returns (name, None) if no `@`.""" | |
| if "@" in s: | |
| name, _, rev = s.rpartition("@") | |
| return name, (rev or None) | |
| return s, None | |
| def parse_dataset_uri(uri: str) -> DatasetSource: | |
| """Classify a URI string into a DatasetSource. Raises ValueError on malformed input.""" | |
| s = (uri or "").strip() | |
| if not s: | |
| raise ValueError("empty dataset URI") | |
| # Absolute / relative path → local | |
| if s.startswith("/") or s.startswith("./") or s.startswith("../"): | |
| path = Path(s).expanduser().resolve() | |
| if not path.is_dir(): | |
| raise ValueError(f"local dataset directory not found: {path}") | |
| return DatasetSource(kind="local", ident=str(path), revision=None) | |
| # GitHub full URL | |
| for prefix in ("https://github.com/", "http://github.com/", "git@github.com:"): | |
| if s.startswith(prefix): | |
| tail = s.removeprefix(prefix).removesuffix(".git") | |
| base, rev = _split_revision(tail) | |
| parts = [p for p in base.split("/") if p] | |
| if len(parts) < 2: | |
| raise ValueError(f"GitHub URL needs owner/repo, got {uri!r}") | |
| return DatasetSource(kind="gh", ident=f"{parts[0]}/{parts[1]}", revision=rev) | |
| if s.startswith("gh://"): | |
| base, rev = _split_revision(s.removeprefix("gh://")) | |
| parts = [p for p in base.split("/") if p] | |
| if len(parts) != 2 or not all(parts): | |
| raise ValueError(f"gh:// expects owner/repo, got {uri!r}") | |
| return DatasetSource(kind="gh", ident=f"{parts[0]}/{parts[1]}", revision=rev) | |
| if s.startswith("harbor://"): | |
| base, rev = _split_revision(s.removeprefix("harbor://")) | |
| parts = [p for p in base.split("/") if p] | |
| # Harbor accepts both bare-name and org/name | |
| if len(parts) == 2 and base.count("/") == 1: | |
| return DatasetSource(kind="harbor", ident=f"{parts[0]}/{parts[1]}", revision=rev) | |
| if len(parts) == 1 and "/" not in base: | |
| return DatasetSource(kind="harbor", ident=parts[0], revision=rev) | |
| raise ValueError( | |
| f"harbor:// expects 'name' or 'org/name' (optionally @tag), got {uri!r}" | |
| ) | |
| if s.startswith("hf://"): | |
| s = s.removeprefix("hf://") | |
| # fall through to HF parsing | |
| base, rev = _split_revision(s) | |
| parts = [p for p in base.split("/") if p] | |
| if len(parts) == 2 and base.count("/") == 1: | |
| return DatasetSource(kind="hf", ident=f"{parts[0]}/{parts[1]}", revision=rev) | |
| raise ValueError( | |
| f"unrecognized dataset URI {uri!r}. " | |
| f"Accepted: owner/name, hf://owner/name[@rev], gh://owner/repo[@ref], " | |
| f"https://github.com/owner/repo, or an absolute local path." | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Fetching | |
| # --------------------------------------------------------------------------- | |
| def _fetch_hf(source: DatasetSource, force: bool) -> Path: | |
| """Snapshot-download an HF Hub dataset into the cache.""" | |
| from huggingface_hub import snapshot_download | |
| target = CACHE_ROOT / source.cache_key | |
| # Pinned revisions (tag/commit) are immutable → caching is always safe. | |
| # Unpinned ("head") datasets MUST re-sync every load so we never show stale | |
| # data — snapshot_download is etag-aware, so re-syncing only pulls files | |
| # that actually changed (cheap). This is the fix for "doesn't show latest". | |
| pinned = source.revision is not None | |
| if not force and pinned and target.exists() and any(target.iterdir()): | |
| logger.info("hf cache hit (pinned %s): %s", source.revision, target) | |
| return target | |
| target.mkdir(parents=True, exist_ok=True) | |
| # Public datasets work without a token; private ones rely on $HF_TOKEN | |
| # being set in the Space's secrets. | |
| token = os.environ.get("HF_TOKEN") or None | |
| logger.info("hf %s: %s@%s", "fetch" if pinned else "re-sync", | |
| source.ident, source.revision or "head") | |
| snapshot_download( | |
| repo_id=source.ident, | |
| repo_type="dataset", | |
| revision=source.revision, | |
| local_dir=str(target), | |
| token=token, | |
| ) | |
| return target | |
| def fetch_hf_task(source: DatasetSource, task_id: str, *, force: bool = False) -> Path: | |
| """Download ONLY one task's files from an HF dataset (not the whole repo). | |
| Snapshot-downloading a 2k-task dataset just to open one task is the slowness | |
| the user hit; even `snapshot_download(allow_patterns=...)` still walks the | |
| entire repo tree first. Instead we list just this task's subtree (one shallow | |
| API call) and `hf_hub_download` each file. A handful of small files, no | |
| full-repo walk. Files accumulate under one per-dataset cache dir so | |
| revisiting is free. Returns a root that `load_task(root, task_id)` resolves | |
| for either flat or nested layout. | |
| """ | |
| from huggingface_hub import HfApi, hf_hub_download | |
| target = CACHE_ROOT / f"{source.cache_key}__bytask" | |
| target.mkdir(parents=True, exist_ok=True) | |
| token = os.environ.get("HF_TOKEN") or None | |
| api = HfApi(token=token) | |
| logger.info("hf per-task fetch: %s :: %s", source.ident, task_id) | |
| # Resolve the task's directory in the repo: nested (`tasks/<id>`) first, then flat. | |
| # `list_repo_tree` is a generator, so the 404 for a non-existent prefix only | |
| # fires while iterating — force it inside the try (via list()) so we fall | |
| # through to the other layout instead of bubbling the error up. | |
| files: list[str] = [] | |
| for prefix in (f"tasks/{task_id}", task_id): | |
| try: | |
| entries = list(api.list_repo_tree( | |
| source.ident, prefix, repo_type="dataset", | |
| revision=source.revision, recursive=True, | |
| )) | |
| except Exception: # noqa: BLE001 — path doesn't exist in this layout | |
| continue | |
| files = [e.path for e in entries if getattr(e, "size", None) is not None] | |
| if files: | |
| break | |
| if not files: | |
| raise FileNotFoundError(f"task {task_id!r} not found in {source.ident}") | |
| for f in files: | |
| hf_hub_download( | |
| repo_id=source.ident, repo_type="dataset", revision=source.revision, | |
| filename=f, local_dir=str(target), token=token, | |
| ) | |
| return target | |
| def _fetch_harbor(source: DatasetSource, force: bool) -> Path: | |
| """Shell out to `harbor datasets download` to fetch a Harbor-registry dataset. | |
| Harbor handles its own registry resolution, auth, and tag pinning via | |
| `<org>/<name>@<tag>`. We just orchestrate + flatten the result into a | |
| standard dataset layout. | |
| """ | |
| target = CACHE_ROOT / source.cache_key | |
| if not force and target.exists() and any(target.iterdir()): | |
| logger.info("harbor cache hit: %s", target) | |
| return target | |
| if not shutil.which("harbor"): | |
| raise RuntimeError( | |
| "`harbor` CLI not on PATH. " | |
| "It's listed in `requirements.txt` — on a Hugging Face Space it " | |
| "installs automatically. Locally: `pip install harbor`." | |
| ) | |
| if target.exists(): | |
| shutil.rmtree(target) | |
| target.mkdir(parents=True, exist_ok=True) | |
| selector = source.ident + (f"@{source.revision}" if source.revision else "") | |
| with tempfile.TemporaryDirectory(prefix="harbor-viewer-harbor-") as tmp: | |
| args = ["harbor", "datasets", "download", selector, "-o", tmp] | |
| logger.info("running: %s", " ".join(args)) | |
| proc = subprocess.run(args, capture_output=True, text=True, timeout=600, check=False) | |
| if proc.returncode != 0: | |
| shutil.rmtree(target, ignore_errors=True) | |
| raise RuntimeError( | |
| f"harbor download failed (exit {proc.returncode}): " | |
| f"{proc.stderr.strip()[:400] or proc.stdout.strip()[:400]}" | |
| ) | |
| # If the downloaded tree has exactly one subdirectory and no task.toml at the | |
| # top, recurse one level — that's harbor's typical layout (`<tmp>/<name>/...`). | |
| downloaded = Path(tmp) | |
| children = [c for c in downloaded.iterdir() if c.is_dir()] | |
| if len(children) == 1 and not (downloaded / "task.toml").exists(): | |
| downloaded = children[0] | |
| # Move everything under the cache target | |
| for child in downloaded.iterdir(): | |
| shutil.move(str(child), str(target / child.name)) | |
| return target | |
| def _fetch_github(source: DatasetSource, force: bool) -> Path: | |
| """git clone --depth 1 a GitHub repo into the cache.""" | |
| target = CACHE_ROOT / source.cache_key | |
| if not force and target.exists() and any(target.iterdir()): | |
| logger.info("gh cache hit: %s", target) | |
| return target | |
| if target.exists(): | |
| shutil.rmtree(target) | |
| target.parent.mkdir(parents=True, exist_ok=True) | |
| if not shutil.which("git"): | |
| raise RuntimeError( | |
| "`git` not found on PATH. Install git (or rely on the HF Space sandbox which ships it)." | |
| ) | |
| with tempfile.TemporaryDirectory(prefix="harbor-viewer-clone-") as tmp: | |
| tmp_clone = Path(tmp) / "clone" | |
| args = ["git", "clone", "--depth", "1"] | |
| if source.revision: | |
| args += ["--branch", source.revision] | |
| args += [f"https://github.com/{source.ident}.git", str(tmp_clone)] | |
| logger.info("running: git clone --depth 1 [...] %s", source.ident) | |
| proc = subprocess.run(args, capture_output=True, text=True, timeout=300, check=False) | |
| if proc.returncode != 0: | |
| raise RuntimeError( | |
| f"git clone failed (exit {proc.returncode}): {proc.stderr.strip()[:400]}" | |
| ) | |
| shutil.move(str(tmp_clone), str(target)) | |
| return target | |
| def fetch_dataset(source: DatasetSource, *, force: bool = False) -> Path: | |
| """Materialize a dataset into the local cache. Returns the on-disk root.""" | |
| if source.kind == "hf": | |
| return _fetch_hf(source, force=force) | |
| if source.kind == "gh": | |
| return _fetch_github(source, force=force) | |
| if source.kind == "harbor": | |
| return _fetch_harbor(source, force=force) | |
| if source.kind == "local": | |
| return Path(source.ident) | |
| raise ValueError(f"unknown source kind: {source.kind!r}") | |