"""Walk a Harbor dataset directory + load each task's spec files. A "Harbor task" is a directory containing `task.toml` at its root, plus any of: `instruction.md`, `solution/patch.diff`, `solution/solve.sh`, `tests/test.sh`, `environment/Dockerfile`. We tolerate missing files — not every task ships every artifact. Two dataset layouts are accepted: flat: //task.toml nested: /tasks//task.toml We discover both transparently. """ from __future__ import annotations import logging import tomllib from dataclasses import dataclass, field from pathlib import Path from typing import Any logger = logging.getLogger(__name__) @dataclass(slots=True) class HarborTask: """The full parsed view of a single Harbor task directory.""" # Identity id: str # the directory name, e.g. "pallets__click-3373" root: Path # absolute path to the task dir # task.toml top-level fields name: str | None = None org: str | None = None version: str | None = None # [task] description: str | None = None instruction_inline: str | None = None # if task.toml has `instruction =` # [metadata] difficulty: str | None = None category: str | None = None keywords: list[str] = field(default_factory=list) # [agent] / [verifier] timeouts (if set) agent_timeout_sec: float | None = None verifier_timeout_sec: float | None = None # Repo2RLEnv extension — [metadata.repo2env] block, opaque dict repo2env: dict[str, Any] | None = None # File contents (None = file not present) instruction_md: str | None = None oracle_patch: str | None = None solve_sh: str | None = None test_sh: str | None = None dockerfile: str | None = None task_toml_raw: str = "" # Generic discovery: every readable text file under the task dir, # keyed by path relative to the task root. e.g. "tests/grader.py", # "environment/pull_bucket.py", "solution/patch.diff". Populated by # load_task() walking the directory tree — the file viewer surfaces # everything in here so the dataset is shown faithfully, not via # a hardcoded allowlist. files: dict[str, str] = field(default_factory=dict) def _read_text(path: Path) -> str | None: """Read a text file, return None if it doesn't exist.""" try: return path.read_text(encoding="utf-8") except FileNotFoundError: return None except Exception as exc: logger.warning("could not read %s: %s", path, exc) return None # Files we never surface in the file viewer (binaries, caches, secrets). _SKIP_DIRS: set[str] = {".git", "__pycache__", ".cache", "node_modules", ".venv", ".pytest_cache", ".mypy_cache"} _SKIP_NAME_PREFIXES: tuple[str, ...] = (".DS_Store",) # Hard cap on file size — anything bigger we treat as non-displayable. _MAX_FILE_BYTES = 512 * 1024 # 512 KiB; viewer's code panel chokes well below this def _discover_task_files(task_dir: Path) -> dict[str, str]: """Walk `task_dir` recursively and return every readable text file as {relative_path: content}. Skips binaries, hidden noise, and oversized files. Paths use forward slashes (POSIX-style) for stable file_ids across OSes. """ out: dict[str, str] = {} for path in sorted(task_dir.rglob("*")): if not path.is_file(): continue # Skip files inside excluded directories (any level) if any(part in _SKIP_DIRS for part in path.relative_to(task_dir).parts): continue if path.name.startswith(_SKIP_NAME_PREFIXES): continue try: size = path.stat().st_size except OSError: continue if size > _MAX_FILE_BYTES: continue try: content = path.read_text(encoding="utf-8") except (UnicodeDecodeError, OSError): continue rel = path.relative_to(task_dir).as_posix() out[rel] = content return out def _discover_task_roots(dataset_root: Path) -> list[Path]: """Find every directory under `dataset_root` that contains a `task.toml`. Handles both flat (`//task.toml`) and nested (`/tasks//task.toml`) Harbor layouts. Prefers nested when `/tasks/` exists (that's what `repo2rlenv push` stages on HF). Also handles the case where the dataset is itself a single task root. """ if (dataset_root / "task.toml").exists(): return [dataset_root] tasks_dir = dataset_root / "tasks" if tasks_dir.is_dir(): return sorted( [p for p in tasks_dir.iterdir() if p.is_dir() and (p / "task.toml").exists()] ) # Flat layout — every immediate subdir that has task.toml is a task return sorted( [ p for p in dataset_root.iterdir() if p.is_dir() and not p.name.startswith(".") and (p / "task.toml").exists() ] ) def list_tasks(dataset_root: Path) -> list[str]: """Return the task-id (directory name) of every task under `dataset_root`.""" return [p.name for p in _discover_task_roots(dataset_root)] def _resolve_task_dir(dataset_root: Path, task_id: str) -> Path: """Find the on-disk directory for a given task id (handles flat + nested).""" flat = dataset_root / task_id if (flat / "task.toml").exists(): return flat nested = dataset_root / "tasks" / task_id if (nested / "task.toml").exists(): return nested if (dataset_root / "task.toml").exists() and dataset_root.name == task_id: return dataset_root raise FileNotFoundError(f"no task {task_id!r} under {dataset_root}") def load_task(dataset_root: Path, task_id: str) -> HarborTask: """Load every spec file for a single Harbor task. Tolerant of missing pieces.""" task_dir = _resolve_task_dir(dataset_root, task_id) toml_path = task_dir / "task.toml" raw = toml_path.read_text(encoding="utf-8") data = tomllib.loads(raw) task_block = data.get("task") or {} metadata_block = data.get("metadata") or {} agent_block = data.get("agent") or {} verifier_block = data.get("verifier") or {} # [metadata.repo2env] is the Repo2RLEnv extension — surfaced as a separate # opaque dict so the UI can render it specially if present. repo2env = metadata_block.get("repo2env") if repo2env is not None and not isinstance(repo2env, dict): repo2env = None # Walk the whole task dir so any present file (grader.py, pull_bucket.py, # helper modules, multiple test scripts, ...) ends up in the viewer. files = _discover_task_files(task_dir) return HarborTask( id=task_id, root=task_dir, name=task_block.get("name"), org=task_block.get("org"), version=data.get("version"), description=task_block.get("description"), instruction_inline=task_block.get("instruction"), difficulty=metadata_block.get("difficulty"), category=metadata_block.get("category"), keywords=list(metadata_block.get("keywords") or []), agent_timeout_sec=agent_block.get("timeout_sec"), verifier_timeout_sec=verifier_block.get("timeout_sec"), repo2env=repo2env, # Convenience fields — kept populated for backwards compat with callers # that reach in by name. Source of truth for the viewer is `files`. instruction_md=files.get("instruction.md"), oracle_patch=files.get("solution/patch.diff"), solve_sh=files.get("solution/solve.sh"), test_sh=files.get("tests/test.sh"), dockerfile=files.get("environment/Dockerfile"), task_toml_raw=raw, files=files, )