"""URI parsing + dataset fetching for the Harbor Visualiser. Accepts the same URI shapes as `repo2rlenv pull`: owner/name → HF Hub (default) owner/name@ → HF Hub, revision pinned hf://owner/name[@rev] → HF Hub, explicit prefix gh://owner/repo[@ref] → GitHub, optional branch/tag/sha https://github.com/owner/repo[.git] → GitHub, full URL All datasets land in a local cache dir; subsequent loads in the same Space process are free. """ from __future__ import annotations import logging import os import shutil import subprocess import tempfile from dataclasses import dataclass from pathlib import Path logger = logging.getLogger(__name__) # Where downloads are cached during a single Space-process lifetime. # In Spaces, /tmp survives across requests for the same instance. CACHE_ROOT = Path(os.environ.get("HARBOR_VIEWER_CACHE", "/tmp/.harbor-viewer-cache")) @dataclass(slots=True, frozen=True) class DatasetSource: """A parsed dataset URI ready for fetch. `kind` is one of "hf" | "gh" | "local". `ident` is the canonical id (`owner/name` for HF/GH, absolute path for local). `revision` is the optional @-suffix (tag / branch / commit / Hub revision). """ kind: str ident: str revision: str | None @property def display(self) -> str: rev = f"@{self.revision}" if self.revision else "" if self.kind == "hf": return f"hf://{self.ident}{rev}" if self.kind == "gh": return f"gh://{self.ident}{rev}" if self.kind == "harbor": return f"harbor://{self.ident}{rev}" return str(self.ident) @property def cache_key(self) -> str: rev = (self.revision or "head").replace("/", "_") return f"{self.kind}__{self.ident.replace('/', '__')}__{rev}" # --------------------------------------------------------------------------- # URI parsing # --------------------------------------------------------------------------- def _split_revision(s: str) -> tuple[str, str | None]: """Split `name@rev` into (name, rev). Returns (name, None) if no `@`.""" if "@" in s: name, _, rev = s.rpartition("@") return name, (rev or None) return s, None def parse_dataset_uri(uri: str) -> DatasetSource: """Classify a URI string into a DatasetSource. Raises ValueError on malformed input.""" s = (uri or "").strip() if not s: raise ValueError("empty dataset URI") # Absolute / relative path → local if s.startswith("/") or s.startswith("./") or s.startswith("../"): path = Path(s).expanduser().resolve() if not path.is_dir(): raise ValueError(f"local dataset directory not found: {path}") return DatasetSource(kind="local", ident=str(path), revision=None) # GitHub full URL for prefix in ("https://github.com/", "http://github.com/", "git@github.com:"): if s.startswith(prefix): tail = s.removeprefix(prefix).removesuffix(".git") base, rev = _split_revision(tail) parts = [p for p in base.split("/") if p] if len(parts) < 2: raise ValueError(f"GitHub URL needs owner/repo, got {uri!r}") return DatasetSource(kind="gh", ident=f"{parts[0]}/{parts[1]}", revision=rev) if s.startswith("gh://"): base, rev = _split_revision(s.removeprefix("gh://")) parts = [p for p in base.split("/") if p] if len(parts) != 2 or not all(parts): raise ValueError(f"gh:// expects owner/repo, got {uri!r}") return DatasetSource(kind="gh", ident=f"{parts[0]}/{parts[1]}", revision=rev) if s.startswith("harbor://"): base, rev = _split_revision(s.removeprefix("harbor://")) parts = [p for p in base.split("/") if p] # Harbor accepts both bare-name and org/name if len(parts) == 2 and base.count("/") == 1: return DatasetSource(kind="harbor", ident=f"{parts[0]}/{parts[1]}", revision=rev) if len(parts) == 1 and "/" not in base: return DatasetSource(kind="harbor", ident=parts[0], revision=rev) raise ValueError( f"harbor:// expects 'name' or 'org/name' (optionally @tag), got {uri!r}" ) if s.startswith("hf://"): s = s.removeprefix("hf://") # fall through to HF parsing base, rev = _split_revision(s) parts = [p for p in base.split("/") if p] if len(parts) == 2 and base.count("/") == 1: return DatasetSource(kind="hf", ident=f"{parts[0]}/{parts[1]}", revision=rev) raise ValueError( f"unrecognized dataset URI {uri!r}. " f"Accepted: owner/name, hf://owner/name[@rev], gh://owner/repo[@ref], " f"https://github.com/owner/repo, or an absolute local path." ) # --------------------------------------------------------------------------- # Fetching # --------------------------------------------------------------------------- def _fetch_hf(source: DatasetSource, force: bool) -> Path: """Snapshot-download an HF Hub dataset into the cache.""" from huggingface_hub import snapshot_download target = CACHE_ROOT / source.cache_key # Pinned revisions (tag/commit) are immutable → caching is always safe. # Unpinned ("head") datasets MUST re-sync every load so we never show stale # data — snapshot_download is etag-aware, so re-syncing only pulls files # that actually changed (cheap). This is the fix for "doesn't show latest". pinned = source.revision is not None if not force and pinned and target.exists() and any(target.iterdir()): logger.info("hf cache hit (pinned %s): %s", source.revision, target) return target target.mkdir(parents=True, exist_ok=True) # Public datasets work without a token; private ones rely on $HF_TOKEN # being set in the Space's secrets. token = os.environ.get("HF_TOKEN") or None logger.info("hf %s: %s@%s", "fetch" if pinned else "re-sync", source.ident, source.revision or "head") snapshot_download( repo_id=source.ident, repo_type="dataset", revision=source.revision, local_dir=str(target), token=token, ) return target def fetch_hf_task(source: DatasetSource, task_id: str, *, force: bool = False) -> Path: """Download ONLY one task's files from an HF dataset (not the whole repo). Snapshot-downloading a 2k-task dataset just to open one task is the slowness the user hit; even `snapshot_download(allow_patterns=...)` still walks the entire repo tree first. Instead we list just this task's subtree (one shallow API call) and `hf_hub_download` each file. A handful of small files, no full-repo walk. Files accumulate under one per-dataset cache dir so revisiting is free. Returns a root that `load_task(root, task_id)` resolves for either flat or nested layout. """ from huggingface_hub import HfApi, hf_hub_download target = CACHE_ROOT / f"{source.cache_key}__bytask" target.mkdir(parents=True, exist_ok=True) token = os.environ.get("HF_TOKEN") or None api = HfApi(token=token) logger.info("hf per-task fetch: %s :: %s", source.ident, task_id) # Resolve the task's directory in the repo: nested (`tasks/`) first, then flat. # `list_repo_tree` is a generator, so the 404 for a non-existent prefix only # fires while iterating — force it inside the try (via list()) so we fall # through to the other layout instead of bubbling the error up. files: list[str] = [] for prefix in (f"tasks/{task_id}", task_id): try: entries = list(api.list_repo_tree( source.ident, prefix, repo_type="dataset", revision=source.revision, recursive=True, )) except Exception: # noqa: BLE001 — path doesn't exist in this layout continue files = [e.path for e in entries if getattr(e, "size", None) is not None] if files: break if not files: raise FileNotFoundError(f"task {task_id!r} not found in {source.ident}") for f in files: hf_hub_download( repo_id=source.ident, repo_type="dataset", revision=source.revision, filename=f, local_dir=str(target), token=token, ) return target def _fetch_harbor(source: DatasetSource, force: bool) -> Path: """Shell out to `harbor datasets download` to fetch a Harbor-registry dataset. Harbor handles its own registry resolution, auth, and tag pinning via `/@`. We just orchestrate + flatten the result into a standard dataset layout. """ target = CACHE_ROOT / source.cache_key if not force and target.exists() and any(target.iterdir()): logger.info("harbor cache hit: %s", target) return target if not shutil.which("harbor"): raise RuntimeError( "`harbor` CLI not on PATH. " "It's listed in `requirements.txt` — on a Hugging Face Space it " "installs automatically. Locally: `pip install harbor`." ) if target.exists(): shutil.rmtree(target) target.mkdir(parents=True, exist_ok=True) selector = source.ident + (f"@{source.revision}" if source.revision else "") with tempfile.TemporaryDirectory(prefix="harbor-viewer-harbor-") as tmp: args = ["harbor", "datasets", "download", selector, "-o", tmp] logger.info("running: %s", " ".join(args)) proc = subprocess.run(args, capture_output=True, text=True, timeout=600, check=False) if proc.returncode != 0: shutil.rmtree(target, ignore_errors=True) raise RuntimeError( f"harbor download failed (exit {proc.returncode}): " f"{proc.stderr.strip()[:400] or proc.stdout.strip()[:400]}" ) # If the downloaded tree has exactly one subdirectory and no task.toml at the # top, recurse one level — that's harbor's typical layout (`//...`). downloaded = Path(tmp) children = [c for c in downloaded.iterdir() if c.is_dir()] if len(children) == 1 and not (downloaded / "task.toml").exists(): downloaded = children[0] # Move everything under the cache target for child in downloaded.iterdir(): shutil.move(str(child), str(target / child.name)) return target def _fetch_github(source: DatasetSource, force: bool) -> Path: """git clone --depth 1 a GitHub repo into the cache.""" target = CACHE_ROOT / source.cache_key if not force and target.exists() and any(target.iterdir()): logger.info("gh cache hit: %s", target) return target if target.exists(): shutil.rmtree(target) target.parent.mkdir(parents=True, exist_ok=True) if not shutil.which("git"): raise RuntimeError( "`git` not found on PATH. Install git (or rely on the HF Space sandbox which ships it)." ) with tempfile.TemporaryDirectory(prefix="harbor-viewer-clone-") as tmp: tmp_clone = Path(tmp) / "clone" args = ["git", "clone", "--depth", "1"] if source.revision: args += ["--branch", source.revision] args += [f"https://github.com/{source.ident}.git", str(tmp_clone)] logger.info("running: git clone --depth 1 [...] %s", source.ident) proc = subprocess.run(args, capture_output=True, text=True, timeout=300, check=False) if proc.returncode != 0: raise RuntimeError( f"git clone failed (exit {proc.returncode}): {proc.stderr.strip()[:400]}" ) shutil.move(str(tmp_clone), str(target)) return target def fetch_dataset(source: DatasetSource, *, force: bool = False) -> Path: """Materialize a dataset into the local cache. Returns the on-disk root.""" if source.kind == "hf": return _fetch_hf(source, force=force) if source.kind == "gh": return _fetch_github(source, force=force) if source.kind == "harbor": return _fetch_harbor(source, force=force) if source.kind == "local": return Path(source.ident) raise ValueError(f"unknown source kind: {source.kind!r}")