AdithyaSK's picture
AdithyaSK HF Staff
Port Harbor Visualiser from Gradio to FastAPI + Hugging Face theme
a301de7
"""URI parsing + dataset fetching for the Harbor Visualiser.
Accepts the same URI shapes as `repo2rlenv pull`:
owner/name → HF Hub (default)
owner/name@<rev> → HF Hub, revision pinned
hf://owner/name[@rev] → HF Hub, explicit prefix
gh://owner/repo[@ref] → GitHub, optional branch/tag/sha
https://github.com/owner/repo[.git] → GitHub, full URL
All datasets land in a local cache dir; subsequent loads in the same Space
process are free.
"""
from __future__ import annotations
import logging
import os
import shutil
import subprocess
import tempfile
from dataclasses import dataclass
from pathlib import Path
logger = logging.getLogger(__name__)
# Where downloads are cached during a single Space-process lifetime.
# In Spaces, /tmp survives across requests for the same instance.
CACHE_ROOT = Path(os.environ.get("HARBOR_VIEWER_CACHE", "/tmp/.harbor-viewer-cache"))
@dataclass(slots=True, frozen=True)
class DatasetSource:
"""A parsed dataset URI ready for fetch.
`kind` is one of "hf" | "gh" | "local". `ident` is the canonical id
(`owner/name` for HF/GH, absolute path for local). `revision` is the
optional @-suffix (tag / branch / commit / Hub revision).
"""
kind: str
ident: str
revision: str | None
@property
def display(self) -> str:
rev = f"@{self.revision}" if self.revision else ""
if self.kind == "hf":
return f"hf://{self.ident}{rev}"
if self.kind == "gh":
return f"gh://{self.ident}{rev}"
if self.kind == "harbor":
return f"harbor://{self.ident}{rev}"
return str(self.ident)
@property
def cache_key(self) -> str:
rev = (self.revision or "head").replace("/", "_")
return f"{self.kind}__{self.ident.replace('/', '__')}__{rev}"
# ---------------------------------------------------------------------------
# URI parsing
# ---------------------------------------------------------------------------
def _split_revision(s: str) -> tuple[str, str | None]:
"""Split `name@rev` into (name, rev). Returns (name, None) if no `@`."""
if "@" in s:
name, _, rev = s.rpartition("@")
return name, (rev or None)
return s, None
def parse_dataset_uri(uri: str) -> DatasetSource:
"""Classify a URI string into a DatasetSource. Raises ValueError on malformed input."""
s = (uri or "").strip()
if not s:
raise ValueError("empty dataset URI")
# Absolute / relative path → local
if s.startswith("/") or s.startswith("./") or s.startswith("../"):
path = Path(s).expanduser().resolve()
if not path.is_dir():
raise ValueError(f"local dataset directory not found: {path}")
return DatasetSource(kind="local", ident=str(path), revision=None)
# GitHub full URL
for prefix in ("https://github.com/", "http://github.com/", "git@github.com:"):
if s.startswith(prefix):
tail = s.removeprefix(prefix).removesuffix(".git")
base, rev = _split_revision(tail)
parts = [p for p in base.split("/") if p]
if len(parts) < 2:
raise ValueError(f"GitHub URL needs owner/repo, got {uri!r}")
return DatasetSource(kind="gh", ident=f"{parts[0]}/{parts[1]}", revision=rev)
if s.startswith("gh://"):
base, rev = _split_revision(s.removeprefix("gh://"))
parts = [p for p in base.split("/") if p]
if len(parts) != 2 or not all(parts):
raise ValueError(f"gh:// expects owner/repo, got {uri!r}")
return DatasetSource(kind="gh", ident=f"{parts[0]}/{parts[1]}", revision=rev)
if s.startswith("harbor://"):
base, rev = _split_revision(s.removeprefix("harbor://"))
parts = [p for p in base.split("/") if p]
# Harbor accepts both bare-name and org/name
if len(parts) == 2 and base.count("/") == 1:
return DatasetSource(kind="harbor", ident=f"{parts[0]}/{parts[1]}", revision=rev)
if len(parts) == 1 and "/" not in base:
return DatasetSource(kind="harbor", ident=parts[0], revision=rev)
raise ValueError(
f"harbor:// expects 'name' or 'org/name' (optionally @tag), got {uri!r}"
)
if s.startswith("hf://"):
s = s.removeprefix("hf://")
# fall through to HF parsing
base, rev = _split_revision(s)
parts = [p for p in base.split("/") if p]
if len(parts) == 2 and base.count("/") == 1:
return DatasetSource(kind="hf", ident=f"{parts[0]}/{parts[1]}", revision=rev)
raise ValueError(
f"unrecognized dataset URI {uri!r}. "
f"Accepted: owner/name, hf://owner/name[@rev], gh://owner/repo[@ref], "
f"https://github.com/owner/repo, or an absolute local path."
)
# ---------------------------------------------------------------------------
# Fetching
# ---------------------------------------------------------------------------
def _fetch_hf(source: DatasetSource, force: bool) -> Path:
"""Snapshot-download an HF Hub dataset into the cache."""
from huggingface_hub import snapshot_download
target = CACHE_ROOT / source.cache_key
# Pinned revisions (tag/commit) are immutable → caching is always safe.
# Unpinned ("head") datasets MUST re-sync every load so we never show stale
# data — snapshot_download is etag-aware, so re-syncing only pulls files
# that actually changed (cheap). This is the fix for "doesn't show latest".
pinned = source.revision is not None
if not force and pinned and target.exists() and any(target.iterdir()):
logger.info("hf cache hit (pinned %s): %s", source.revision, target)
return target
target.mkdir(parents=True, exist_ok=True)
# Public datasets work without a token; private ones rely on $HF_TOKEN
# being set in the Space's secrets.
token = os.environ.get("HF_TOKEN") or None
logger.info("hf %s: %s@%s", "fetch" if pinned else "re-sync",
source.ident, source.revision or "head")
snapshot_download(
repo_id=source.ident,
repo_type="dataset",
revision=source.revision,
local_dir=str(target),
token=token,
)
return target
def fetch_hf_task(source: DatasetSource, task_id: str, *, force: bool = False) -> Path:
"""Download ONLY one task's files from an HF dataset (not the whole repo).
Snapshot-downloading a 2k-task dataset just to open one task is the slowness
the user hit; even `snapshot_download(allow_patterns=...)` still walks the
entire repo tree first. Instead we list just this task's subtree (one shallow
API call) and `hf_hub_download` each file. A handful of small files, no
full-repo walk. Files accumulate under one per-dataset cache dir so
revisiting is free. Returns a root that `load_task(root, task_id)` resolves
for either flat or nested layout.
"""
from huggingface_hub import HfApi, hf_hub_download
target = CACHE_ROOT / f"{source.cache_key}__bytask"
target.mkdir(parents=True, exist_ok=True)
token = os.environ.get("HF_TOKEN") or None
api = HfApi(token=token)
logger.info("hf per-task fetch: %s :: %s", source.ident, task_id)
# Resolve the task's directory in the repo: nested (`tasks/<id>`) first, then flat.
# `list_repo_tree` is a generator, so the 404 for a non-existent prefix only
# fires while iterating — force it inside the try (via list()) so we fall
# through to the other layout instead of bubbling the error up.
files: list[str] = []
for prefix in (f"tasks/{task_id}", task_id):
try:
entries = list(api.list_repo_tree(
source.ident, prefix, repo_type="dataset",
revision=source.revision, recursive=True,
))
except Exception: # noqa: BLE001 — path doesn't exist in this layout
continue
files = [e.path for e in entries if getattr(e, "size", None) is not None]
if files:
break
if not files:
raise FileNotFoundError(f"task {task_id!r} not found in {source.ident}")
for f in files:
hf_hub_download(
repo_id=source.ident, repo_type="dataset", revision=source.revision,
filename=f, local_dir=str(target), token=token,
)
return target
def _fetch_harbor(source: DatasetSource, force: bool) -> Path:
"""Shell out to `harbor datasets download` to fetch a Harbor-registry dataset.
Harbor handles its own registry resolution, auth, and tag pinning via
`<org>/<name>@<tag>`. We just orchestrate + flatten the result into a
standard dataset layout.
"""
target = CACHE_ROOT / source.cache_key
if not force and target.exists() and any(target.iterdir()):
logger.info("harbor cache hit: %s", target)
return target
if not shutil.which("harbor"):
raise RuntimeError(
"`harbor` CLI not on PATH. "
"It's listed in `requirements.txt` — on a Hugging Face Space it "
"installs automatically. Locally: `pip install harbor`."
)
if target.exists():
shutil.rmtree(target)
target.mkdir(parents=True, exist_ok=True)
selector = source.ident + (f"@{source.revision}" if source.revision else "")
with tempfile.TemporaryDirectory(prefix="harbor-viewer-harbor-") as tmp:
args = ["harbor", "datasets", "download", selector, "-o", tmp]
logger.info("running: %s", " ".join(args))
proc = subprocess.run(args, capture_output=True, text=True, timeout=600, check=False)
if proc.returncode != 0:
shutil.rmtree(target, ignore_errors=True)
raise RuntimeError(
f"harbor download failed (exit {proc.returncode}): "
f"{proc.stderr.strip()[:400] or proc.stdout.strip()[:400]}"
)
# If the downloaded tree has exactly one subdirectory and no task.toml at the
# top, recurse one level — that's harbor's typical layout (`<tmp>/<name>/...`).
downloaded = Path(tmp)
children = [c for c in downloaded.iterdir() if c.is_dir()]
if len(children) == 1 and not (downloaded / "task.toml").exists():
downloaded = children[0]
# Move everything under the cache target
for child in downloaded.iterdir():
shutil.move(str(child), str(target / child.name))
return target
def _fetch_github(source: DatasetSource, force: bool) -> Path:
"""git clone --depth 1 a GitHub repo into the cache."""
target = CACHE_ROOT / source.cache_key
if not force and target.exists() and any(target.iterdir()):
logger.info("gh cache hit: %s", target)
return target
if target.exists():
shutil.rmtree(target)
target.parent.mkdir(parents=True, exist_ok=True)
if not shutil.which("git"):
raise RuntimeError(
"`git` not found on PATH. Install git (or rely on the HF Space sandbox which ships it)."
)
with tempfile.TemporaryDirectory(prefix="harbor-viewer-clone-") as tmp:
tmp_clone = Path(tmp) / "clone"
args = ["git", "clone", "--depth", "1"]
if source.revision:
args += ["--branch", source.revision]
args += [f"https://github.com/{source.ident}.git", str(tmp_clone)]
logger.info("running: git clone --depth 1 [...] %s", source.ident)
proc = subprocess.run(args, capture_output=True, text=True, timeout=300, check=False)
if proc.returncode != 0:
raise RuntimeError(
f"git clone failed (exit {proc.returncode}): {proc.stderr.strip()[:400]}"
)
shutil.move(str(tmp_clone), str(target))
return target
def fetch_dataset(source: DatasetSource, *, force: bool = False) -> Path:
"""Materialize a dataset into the local cache. Returns the on-disk root."""
if source.kind == "hf":
return _fetch_hf(source, force=force)
if source.kind == "gh":
return _fetch_github(source, force=force)
if source.kind == "harbor":
return _fetch_harbor(source, force=force)
if source.kind == "local":
return Path(source.ident)
raise ValueError(f"unknown source kind: {source.kind!r}")