Spaces:
Running
Running
File size: 12,343 Bytes
f718aea a301de7 f718aea a301de7 f718aea a301de7 f718aea | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 | """URI parsing + dataset fetching for the Harbor Visualiser.
Accepts the same URI shapes as `repo2rlenv pull`:
owner/name β HF Hub (default)
owner/name@<rev> β HF Hub, revision pinned
hf://owner/name[@rev] β HF Hub, explicit prefix
gh://owner/repo[@ref] β GitHub, optional branch/tag/sha
https://github.com/owner/repo[.git] β GitHub, full URL
All datasets land in a local cache dir; subsequent loads in the same Space
process are free.
"""
from __future__ import annotations
import logging
import os
import shutil
import subprocess
import tempfile
from dataclasses import dataclass
from pathlib import Path
logger = logging.getLogger(__name__)
# Where downloads are cached during a single Space-process lifetime.
# In Spaces, /tmp survives across requests for the same instance.
CACHE_ROOT = Path(os.environ.get("HARBOR_VIEWER_CACHE", "/tmp/.harbor-viewer-cache"))
@dataclass(slots=True, frozen=True)
class DatasetSource:
"""A parsed dataset URI ready for fetch.
`kind` is one of "hf" | "gh" | "local". `ident` is the canonical id
(`owner/name` for HF/GH, absolute path for local). `revision` is the
optional @-suffix (tag / branch / commit / Hub revision).
"""
kind: str
ident: str
revision: str | None
@property
def display(self) -> str:
rev = f"@{self.revision}" if self.revision else ""
if self.kind == "hf":
return f"hf://{self.ident}{rev}"
if self.kind == "gh":
return f"gh://{self.ident}{rev}"
if self.kind == "harbor":
return f"harbor://{self.ident}{rev}"
return str(self.ident)
@property
def cache_key(self) -> str:
rev = (self.revision or "head").replace("/", "_")
return f"{self.kind}__{self.ident.replace('/', '__')}__{rev}"
# ---------------------------------------------------------------------------
# URI parsing
# ---------------------------------------------------------------------------
def _split_revision(s: str) -> tuple[str, str | None]:
"""Split `name@rev` into (name, rev). Returns (name, None) if no `@`."""
if "@" in s:
name, _, rev = s.rpartition("@")
return name, (rev or None)
return s, None
def parse_dataset_uri(uri: str) -> DatasetSource:
"""Classify a URI string into a DatasetSource. Raises ValueError on malformed input."""
s = (uri or "").strip()
if not s:
raise ValueError("empty dataset URI")
# Absolute / relative path β local
if s.startswith("/") or s.startswith("./") or s.startswith("../"):
path = Path(s).expanduser().resolve()
if not path.is_dir():
raise ValueError(f"local dataset directory not found: {path}")
return DatasetSource(kind="local", ident=str(path), revision=None)
# GitHub full URL
for prefix in ("https://github.com/", "http://github.com/", "git@github.com:"):
if s.startswith(prefix):
tail = s.removeprefix(prefix).removesuffix(".git")
base, rev = _split_revision(tail)
parts = [p for p in base.split("/") if p]
if len(parts) < 2:
raise ValueError(f"GitHub URL needs owner/repo, got {uri!r}")
return DatasetSource(kind="gh", ident=f"{parts[0]}/{parts[1]}", revision=rev)
if s.startswith("gh://"):
base, rev = _split_revision(s.removeprefix("gh://"))
parts = [p for p in base.split("/") if p]
if len(parts) != 2 or not all(parts):
raise ValueError(f"gh:// expects owner/repo, got {uri!r}")
return DatasetSource(kind="gh", ident=f"{parts[0]}/{parts[1]}", revision=rev)
if s.startswith("harbor://"):
base, rev = _split_revision(s.removeprefix("harbor://"))
parts = [p for p in base.split("/") if p]
# Harbor accepts both bare-name and org/name
if len(parts) == 2 and base.count("/") == 1:
return DatasetSource(kind="harbor", ident=f"{parts[0]}/{parts[1]}", revision=rev)
if len(parts) == 1 and "/" not in base:
return DatasetSource(kind="harbor", ident=parts[0], revision=rev)
raise ValueError(
f"harbor:// expects 'name' or 'org/name' (optionally @tag), got {uri!r}"
)
if s.startswith("hf://"):
s = s.removeprefix("hf://")
# fall through to HF parsing
base, rev = _split_revision(s)
parts = [p for p in base.split("/") if p]
if len(parts) == 2 and base.count("/") == 1:
return DatasetSource(kind="hf", ident=f"{parts[0]}/{parts[1]}", revision=rev)
raise ValueError(
f"unrecognized dataset URI {uri!r}. "
f"Accepted: owner/name, hf://owner/name[@rev], gh://owner/repo[@ref], "
f"https://github.com/owner/repo, or an absolute local path."
)
# ---------------------------------------------------------------------------
# Fetching
# ---------------------------------------------------------------------------
def _fetch_hf(source: DatasetSource, force: bool) -> Path:
"""Snapshot-download an HF Hub dataset into the cache."""
from huggingface_hub import snapshot_download
target = CACHE_ROOT / source.cache_key
# Pinned revisions (tag/commit) are immutable β caching is always safe.
# Unpinned ("head") datasets MUST re-sync every load so we never show stale
# data β snapshot_download is etag-aware, so re-syncing only pulls files
# that actually changed (cheap). This is the fix for "doesn't show latest".
pinned = source.revision is not None
if not force and pinned and target.exists() and any(target.iterdir()):
logger.info("hf cache hit (pinned %s): %s", source.revision, target)
return target
target.mkdir(parents=True, exist_ok=True)
# Public datasets work without a token; private ones rely on $HF_TOKEN
# being set in the Space's secrets.
token = os.environ.get("HF_TOKEN") or None
logger.info("hf %s: %s@%s", "fetch" if pinned else "re-sync",
source.ident, source.revision or "head")
snapshot_download(
repo_id=source.ident,
repo_type="dataset",
revision=source.revision,
local_dir=str(target),
token=token,
)
return target
def fetch_hf_task(source: DatasetSource, task_id: str, *, force: bool = False) -> Path:
"""Download ONLY one task's files from an HF dataset (not the whole repo).
Snapshot-downloading a 2k-task dataset just to open one task is the slowness
the user hit; even `snapshot_download(allow_patterns=...)` still walks the
entire repo tree first. Instead we list just this task's subtree (one shallow
API call) and `hf_hub_download` each file. A handful of small files, no
full-repo walk. Files accumulate under one per-dataset cache dir so
revisiting is free. Returns a root that `load_task(root, task_id)` resolves
for either flat or nested layout.
"""
from huggingface_hub import HfApi, hf_hub_download
target = CACHE_ROOT / f"{source.cache_key}__bytask"
target.mkdir(parents=True, exist_ok=True)
token = os.environ.get("HF_TOKEN") or None
api = HfApi(token=token)
logger.info("hf per-task fetch: %s :: %s", source.ident, task_id)
# Resolve the task's directory in the repo: nested (`tasks/<id>`) first, then flat.
# `list_repo_tree` is a generator, so the 404 for a non-existent prefix only
# fires while iterating β force it inside the try (via list()) so we fall
# through to the other layout instead of bubbling the error up.
files: list[str] = []
for prefix in (f"tasks/{task_id}", task_id):
try:
entries = list(api.list_repo_tree(
source.ident, prefix, repo_type="dataset",
revision=source.revision, recursive=True,
))
except Exception: # noqa: BLE001 β path doesn't exist in this layout
continue
files = [e.path for e in entries if getattr(e, "size", None) is not None]
if files:
break
if not files:
raise FileNotFoundError(f"task {task_id!r} not found in {source.ident}")
for f in files:
hf_hub_download(
repo_id=source.ident, repo_type="dataset", revision=source.revision,
filename=f, local_dir=str(target), token=token,
)
return target
def _fetch_harbor(source: DatasetSource, force: bool) -> Path:
"""Shell out to `harbor datasets download` to fetch a Harbor-registry dataset.
Harbor handles its own registry resolution, auth, and tag pinning via
`<org>/<name>@<tag>`. We just orchestrate + flatten the result into a
standard dataset layout.
"""
target = CACHE_ROOT / source.cache_key
if not force and target.exists() and any(target.iterdir()):
logger.info("harbor cache hit: %s", target)
return target
if not shutil.which("harbor"):
raise RuntimeError(
"`harbor` CLI not on PATH. "
"It's listed in `requirements.txt` β on a Hugging Face Space it "
"installs automatically. Locally: `pip install harbor`."
)
if target.exists():
shutil.rmtree(target)
target.mkdir(parents=True, exist_ok=True)
selector = source.ident + (f"@{source.revision}" if source.revision else "")
with tempfile.TemporaryDirectory(prefix="harbor-viewer-harbor-") as tmp:
args = ["harbor", "datasets", "download", selector, "-o", tmp]
logger.info("running: %s", " ".join(args))
proc = subprocess.run(args, capture_output=True, text=True, timeout=600, check=False)
if proc.returncode != 0:
shutil.rmtree(target, ignore_errors=True)
raise RuntimeError(
f"harbor download failed (exit {proc.returncode}): "
f"{proc.stderr.strip()[:400] or proc.stdout.strip()[:400]}"
)
# If the downloaded tree has exactly one subdirectory and no task.toml at the
# top, recurse one level β that's harbor's typical layout (`<tmp>/<name>/...`).
downloaded = Path(tmp)
children = [c for c in downloaded.iterdir() if c.is_dir()]
if len(children) == 1 and not (downloaded / "task.toml").exists():
downloaded = children[0]
# Move everything under the cache target
for child in downloaded.iterdir():
shutil.move(str(child), str(target / child.name))
return target
def _fetch_github(source: DatasetSource, force: bool) -> Path:
"""git clone --depth 1 a GitHub repo into the cache."""
target = CACHE_ROOT / source.cache_key
if not force and target.exists() and any(target.iterdir()):
logger.info("gh cache hit: %s", target)
return target
if target.exists():
shutil.rmtree(target)
target.parent.mkdir(parents=True, exist_ok=True)
if not shutil.which("git"):
raise RuntimeError(
"`git` not found on PATH. Install git (or rely on the HF Space sandbox which ships it)."
)
with tempfile.TemporaryDirectory(prefix="harbor-viewer-clone-") as tmp:
tmp_clone = Path(tmp) / "clone"
args = ["git", "clone", "--depth", "1"]
if source.revision:
args += ["--branch", source.revision]
args += [f"https://github.com/{source.ident}.git", str(tmp_clone)]
logger.info("running: git clone --depth 1 [...] %s", source.ident)
proc = subprocess.run(args, capture_output=True, text=True, timeout=300, check=False)
if proc.returncode != 0:
raise RuntimeError(
f"git clone failed (exit {proc.returncode}): {proc.stderr.strip()[:400]}"
)
shutil.move(str(tmp_clone), str(target))
return target
def fetch_dataset(source: DatasetSource, *, force: bool = False) -> Path:
"""Materialize a dataset into the local cache. Returns the on-disk root."""
if source.kind == "hf":
return _fetch_hf(source, force=force)
if source.kind == "gh":
return _fetch_github(source, force=force)
if source.kind == "harbor":
return _fetch_harbor(source, force=force)
if source.kind == "local":
return Path(source.ident)
raise ValueError(f"unknown source kind: {source.kind!r}")
|