Spaces:
Running
Running
File size: 7,738 Bytes
f718aea 3949fb1 f718aea 3949fb1 f718aea 3949fb1 f718aea 3949fb1 f718aea 3949fb1 f718aea | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 | """Walk a Harbor dataset directory + load each task's spec files.
A "Harbor task" is a directory containing `task.toml` at its root, plus
any of: `instruction.md`, `solution/patch.diff`, `solution/solve.sh`,
`tests/test.sh`, `environment/Dockerfile`. We tolerate missing files β
not every task ships every artifact.
Two dataset layouts are accepted:
flat: <root>/<task-id>/task.toml
nested: <root>/tasks/<task-id>/task.toml
We discover both transparently.
"""
from __future__ import annotations
import logging
import tomllib
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
@dataclass(slots=True)
class HarborTask:
"""The full parsed view of a single Harbor task directory."""
# Identity
id: str # the directory name, e.g. "pallets__click-3373"
root: Path # absolute path to the task dir
# task.toml top-level fields
name: str | None = None
org: str | None = None
version: str | None = None
# [task]
description: str | None = None
instruction_inline: str | None = None # if task.toml has `instruction =`
# [metadata]
difficulty: str | None = None
category: str | None = None
keywords: list[str] = field(default_factory=list)
# [agent] / [verifier] timeouts (if set)
agent_timeout_sec: float | None = None
verifier_timeout_sec: float | None = None
# Repo2RLEnv extension β [metadata.repo2env] block, opaque dict
repo2env: dict[str, Any] | None = None
# File contents (None = file not present)
instruction_md: str | None = None
oracle_patch: str | None = None
solve_sh: str | None = None
test_sh: str | None = None
dockerfile: str | None = None
task_toml_raw: str = ""
# Generic discovery: every readable text file under the task dir,
# keyed by path relative to the task root. e.g. "tests/grader.py",
# "environment/pull_bucket.py", "solution/patch.diff". Populated by
# load_task() walking the directory tree β the file viewer surfaces
# everything in here so the dataset is shown faithfully, not via
# a hardcoded allowlist.
files: dict[str, str] = field(default_factory=dict)
def _read_text(path: Path) -> str | None:
"""Read a text file, return None if it doesn't exist."""
try:
return path.read_text(encoding="utf-8")
except FileNotFoundError:
return None
except Exception as exc:
logger.warning("could not read %s: %s", path, exc)
return None
# Files we never surface in the file viewer (binaries, caches, secrets).
_SKIP_DIRS: set[str] = {".git", "__pycache__", ".cache", "node_modules", ".venv", ".pytest_cache", ".mypy_cache"}
_SKIP_NAME_PREFIXES: tuple[str, ...] = (".DS_Store",)
# Hard cap on file size β anything bigger we treat as non-displayable.
_MAX_FILE_BYTES = 512 * 1024 # 512 KiB; viewer's code panel chokes well below this
def _discover_task_files(task_dir: Path) -> dict[str, str]:
"""Walk `task_dir` recursively and return every readable text file as
{relative_path: content}. Skips binaries, hidden noise, and oversized files.
Paths use forward slashes (POSIX-style) for stable file_ids across OSes.
"""
out: dict[str, str] = {}
for path in sorted(task_dir.rglob("*")):
if not path.is_file():
continue
# Skip files inside excluded directories (any level)
if any(part in _SKIP_DIRS for part in path.relative_to(task_dir).parts):
continue
if path.name.startswith(_SKIP_NAME_PREFIXES):
continue
try:
size = path.stat().st_size
except OSError:
continue
if size > _MAX_FILE_BYTES:
continue
try:
content = path.read_text(encoding="utf-8")
except (UnicodeDecodeError, OSError):
continue
rel = path.relative_to(task_dir).as_posix()
out[rel] = content
return out
def _discover_task_roots(dataset_root: Path) -> list[Path]:
"""Find every directory under `dataset_root` that contains a `task.toml`.
Handles both flat (`<root>/<id>/task.toml`) and nested
(`<root>/tasks/<id>/task.toml`) Harbor layouts. Prefers nested when
`<root>/tasks/` exists (that's what `repo2rlenv push` stages on HF).
Also handles the case where the dataset is itself a single task root.
"""
if (dataset_root / "task.toml").exists():
return [dataset_root]
tasks_dir = dataset_root / "tasks"
if tasks_dir.is_dir():
return sorted(
[p for p in tasks_dir.iterdir() if p.is_dir() and (p / "task.toml").exists()]
)
# Flat layout β every immediate subdir that has task.toml is a task
return sorted(
[
p
for p in dataset_root.iterdir()
if p.is_dir() and not p.name.startswith(".") and (p / "task.toml").exists()
]
)
def list_tasks(dataset_root: Path) -> list[str]:
"""Return the task-id (directory name) of every task under `dataset_root`."""
return [p.name for p in _discover_task_roots(dataset_root)]
def _resolve_task_dir(dataset_root: Path, task_id: str) -> Path:
"""Find the on-disk directory for a given task id (handles flat + nested)."""
flat = dataset_root / task_id
if (flat / "task.toml").exists():
return flat
nested = dataset_root / "tasks" / task_id
if (nested / "task.toml").exists():
return nested
if (dataset_root / "task.toml").exists() and dataset_root.name == task_id:
return dataset_root
raise FileNotFoundError(f"no task {task_id!r} under {dataset_root}")
def load_task(dataset_root: Path, task_id: str) -> HarborTask:
"""Load every spec file for a single Harbor task. Tolerant of missing pieces."""
task_dir = _resolve_task_dir(dataset_root, task_id)
toml_path = task_dir / "task.toml"
raw = toml_path.read_text(encoding="utf-8")
data = tomllib.loads(raw)
task_block = data.get("task") or {}
metadata_block = data.get("metadata") or {}
agent_block = data.get("agent") or {}
verifier_block = data.get("verifier") or {}
# [metadata.repo2env] is the Repo2RLEnv extension β surfaced as a separate
# opaque dict so the UI can render it specially if present.
repo2env = metadata_block.get("repo2env")
if repo2env is not None and not isinstance(repo2env, dict):
repo2env = None
# Walk the whole task dir so any present file (grader.py, pull_bucket.py,
# helper modules, multiple test scripts, ...) ends up in the viewer.
files = _discover_task_files(task_dir)
return HarborTask(
id=task_id,
root=task_dir,
name=task_block.get("name"),
org=task_block.get("org"),
version=data.get("version"),
description=task_block.get("description"),
instruction_inline=task_block.get("instruction"),
difficulty=metadata_block.get("difficulty"),
category=metadata_block.get("category"),
keywords=list(metadata_block.get("keywords") or []),
agent_timeout_sec=agent_block.get("timeout_sec"),
verifier_timeout_sec=verifier_block.get("timeout_sec"),
repo2env=repo2env,
# Convenience fields β kept populated for backwards compat with callers
# that reach in by name. Source of truth for the viewer is `files`.
instruction_md=files.get("instruction.md"),
oracle_patch=files.get("solution/patch.diff"),
solve_sh=files.get("solution/solve.sh"),
test_sh=files.get("tests/test.sh"),
dockerfile=files.get("environment/Dockerfile"),
task_toml_raw=raw,
files=files,
)
|