Spaces:
Running
Running
| """Walk a Harbor dataset directory + load each task's spec files. | |
| A "Harbor task" is a directory containing `task.toml` at its root, plus | |
| any of: `instruction.md`, `solution/patch.diff`, `solution/solve.sh`, | |
| `tests/test.sh`, `environment/Dockerfile`. We tolerate missing files — | |
| not every task ships every artifact. | |
| Two dataset layouts are accepted: | |
| flat: <root>/<task-id>/task.toml | |
| nested: <root>/tasks/<task-id>/task.toml | |
| We discover both transparently. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import tomllib | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Any | |
| logger = logging.getLogger(__name__) | |
| class HarborTask: | |
| """The full parsed view of a single Harbor task directory.""" | |
| # Identity | |
| id: str # the directory name, e.g. "pallets__click-3373" | |
| root: Path # absolute path to the task dir | |
| # task.toml top-level fields | |
| name: str | None = None | |
| org: str | None = None | |
| version: str | None = None | |
| # [task] | |
| description: str | None = None | |
| instruction_inline: str | None = None # if task.toml has `instruction =` | |
| # [metadata] | |
| difficulty: str | None = None | |
| category: str | None = None | |
| keywords: list[str] = field(default_factory=list) | |
| # [agent] / [verifier] timeouts (if set) | |
| agent_timeout_sec: float | None = None | |
| verifier_timeout_sec: float | None = None | |
| # Repo2RLEnv extension — [metadata.repo2env] block, opaque dict | |
| repo2env: dict[str, Any] | None = None | |
| # File contents (None = file not present) | |
| instruction_md: str | None = None | |
| oracle_patch: str | None = None | |
| solve_sh: str | None = None | |
| test_sh: str | None = None | |
| dockerfile: str | None = None | |
| task_toml_raw: str = "" | |
| # Generic discovery: every readable text file under the task dir, | |
| # keyed by path relative to the task root. e.g. "tests/grader.py", | |
| # "environment/pull_bucket.py", "solution/patch.diff". Populated by | |
| # load_task() walking the directory tree — the file viewer surfaces | |
| # everything in here so the dataset is shown faithfully, not via | |
| # a hardcoded allowlist. | |
| files: dict[str, str] = field(default_factory=dict) | |
| def _read_text(path: Path) -> str | None: | |
| """Read a text file, return None if it doesn't exist.""" | |
| try: | |
| return path.read_text(encoding="utf-8") | |
| except FileNotFoundError: | |
| return None | |
| except Exception as exc: | |
| logger.warning("could not read %s: %s", path, exc) | |
| return None | |
| # Files we never surface in the file viewer (binaries, caches, secrets). | |
| _SKIP_DIRS: set[str] = {".git", "__pycache__", ".cache", "node_modules", ".venv", ".pytest_cache", ".mypy_cache"} | |
| _SKIP_NAME_PREFIXES: tuple[str, ...] = (".DS_Store",) | |
| # Hard cap on file size — anything bigger we treat as non-displayable. | |
| _MAX_FILE_BYTES = 512 * 1024 # 512 KiB; viewer's code panel chokes well below this | |
| def _discover_task_files(task_dir: Path) -> dict[str, str]: | |
| """Walk `task_dir` recursively and return every readable text file as | |
| {relative_path: content}. Skips binaries, hidden noise, and oversized files. | |
| Paths use forward slashes (POSIX-style) for stable file_ids across OSes. | |
| """ | |
| out: dict[str, str] = {} | |
| for path in sorted(task_dir.rglob("*")): | |
| if not path.is_file(): | |
| continue | |
| # Skip files inside excluded directories (any level) | |
| if any(part in _SKIP_DIRS for part in path.relative_to(task_dir).parts): | |
| continue | |
| if path.name.startswith(_SKIP_NAME_PREFIXES): | |
| continue | |
| try: | |
| size = path.stat().st_size | |
| except OSError: | |
| continue | |
| if size > _MAX_FILE_BYTES: | |
| continue | |
| try: | |
| content = path.read_text(encoding="utf-8") | |
| except (UnicodeDecodeError, OSError): | |
| continue | |
| rel = path.relative_to(task_dir).as_posix() | |
| out[rel] = content | |
| return out | |
| def _discover_task_roots(dataset_root: Path) -> list[Path]: | |
| """Find every directory under `dataset_root` that contains a `task.toml`. | |
| Handles both flat (`<root>/<id>/task.toml`) and nested | |
| (`<root>/tasks/<id>/task.toml`) Harbor layouts. Prefers nested when | |
| `<root>/tasks/` exists (that's what `repo2rlenv push` stages on HF). | |
| Also handles the case where the dataset is itself a single task root. | |
| """ | |
| if (dataset_root / "task.toml").exists(): | |
| return [dataset_root] | |
| tasks_dir = dataset_root / "tasks" | |
| if tasks_dir.is_dir(): | |
| return sorted( | |
| [p for p in tasks_dir.iterdir() if p.is_dir() and (p / "task.toml").exists()] | |
| ) | |
| # Flat layout — every immediate subdir that has task.toml is a task | |
| return sorted( | |
| [ | |
| p | |
| for p in dataset_root.iterdir() | |
| if p.is_dir() and not p.name.startswith(".") and (p / "task.toml").exists() | |
| ] | |
| ) | |
| def list_tasks(dataset_root: Path) -> list[str]: | |
| """Return the task-id (directory name) of every task under `dataset_root`.""" | |
| return [p.name for p in _discover_task_roots(dataset_root)] | |
| def _resolve_task_dir(dataset_root: Path, task_id: str) -> Path: | |
| """Find the on-disk directory for a given task id (handles flat + nested).""" | |
| flat = dataset_root / task_id | |
| if (flat / "task.toml").exists(): | |
| return flat | |
| nested = dataset_root / "tasks" / task_id | |
| if (nested / "task.toml").exists(): | |
| return nested | |
| if (dataset_root / "task.toml").exists() and dataset_root.name == task_id: | |
| return dataset_root | |
| raise FileNotFoundError(f"no task {task_id!r} under {dataset_root}") | |
| def load_task(dataset_root: Path, task_id: str) -> HarborTask: | |
| """Load every spec file for a single Harbor task. Tolerant of missing pieces.""" | |
| task_dir = _resolve_task_dir(dataset_root, task_id) | |
| toml_path = task_dir / "task.toml" | |
| raw = toml_path.read_text(encoding="utf-8") | |
| data = tomllib.loads(raw) | |
| task_block = data.get("task") or {} | |
| metadata_block = data.get("metadata") or {} | |
| agent_block = data.get("agent") or {} | |
| verifier_block = data.get("verifier") or {} | |
| # [metadata.repo2env] is the Repo2RLEnv extension — surfaced as a separate | |
| # opaque dict so the UI can render it specially if present. | |
| repo2env = metadata_block.get("repo2env") | |
| if repo2env is not None and not isinstance(repo2env, dict): | |
| repo2env = None | |
| # Walk the whole task dir so any present file (grader.py, pull_bucket.py, | |
| # helper modules, multiple test scripts, ...) ends up in the viewer. | |
| files = _discover_task_files(task_dir) | |
| return HarborTask( | |
| id=task_id, | |
| root=task_dir, | |
| name=task_block.get("name"), | |
| org=task_block.get("org"), | |
| version=data.get("version"), | |
| description=task_block.get("description"), | |
| instruction_inline=task_block.get("instruction"), | |
| difficulty=metadata_block.get("difficulty"), | |
| category=metadata_block.get("category"), | |
| keywords=list(metadata_block.get("keywords") or []), | |
| agent_timeout_sec=agent_block.get("timeout_sec"), | |
| verifier_timeout_sec=verifier_block.get("timeout_sec"), | |
| repo2env=repo2env, | |
| # Convenience fields — kept populated for backwards compat with callers | |
| # that reach in by name. Source of truth for the viewer is `files`. | |
| instruction_md=files.get("instruction.md"), | |
| oracle_patch=files.get("solution/patch.diff"), | |
| solve_sh=files.get("solution/solve.sh"), | |
| test_sh=files.get("tests/test.sh"), | |
| dockerfile=files.get("environment/Dockerfile"), | |
| task_toml_raw=raw, | |
| files=files, | |
| ) | |