Codeseys's picture
Wave 13: serverless DiLoCo + replaysim normalization + 3 distillation losses + PRIME-RL + Monarch
b266c31
"""HuggingFace Jobs executor — skeleton for v0.
Per ADR-005, HF Jobs is one of two v0 target executors. This file is a
STUB. The full integration uses `huggingface_hub.run_job` (added in
huggingface_hub >= 0.27, ~2026 era) which spins up a containerized job
backed by HF's compute pool.
Pricing reference (2026-05-26): A100 ≈ $4.18/hr, H100 ≈ $9.50/hr. Cold
start ≈ 60s. NO inter-job networking — must use object-store rendezvous.
Status: SKELETON. Real implementation pending v0 polish wave.
"""
from __future__ import annotations
from typing import Any, Callable, Mapping
from composer_replication.diloco.serverless.executor import (
ReplicaHandle,
ServerlessExecutor,
)
class HFJobsExecutor(ServerlessExecutor):
"""Run replicas as HuggingFace Jobs in parallel.
Reference implementation pattern:
from huggingface_hub import run_job
jobs = []
for rank in range(N):
job = run_job(
image="...", # container with composer_replication installed
command=[
"python", "-m",
"composer_replication.diloco.serverless.replica_entrypoint",
"--rank", str(rank),
"--rendezvous", "hf://datasets/myuser/run42/",
],
env={"REPLICA_RANK": str(rank), "WORLD_SIZE": str(N)},
gpu="a100",
)
jobs.append(job)
return [ReplicaHandle(rank=i, backend_name="hf_jobs",
metadata={"job_id": jobs[i].id})
for i in range(N)]
Object-store rendezvous works naturally with the HF Datasets-as-storage
pattern — `hf://datasets/{user}/{run_id}/` is fsspec-compatible via
`huggingface_hub`'s fsspec integration.
Status: SKELETON.
"""
backend_name = "hf_jobs"
supports_inter_replica_network = False
def __init__(self) -> None:
try:
from huggingface_hub import HfApi # noqa: F401
except ImportError as e:
raise RuntimeError(
"HFJobsExecutor requires huggingface_hub. Got: " + repr(e)
)
# Real implementation: instantiate HfApi, validate token, etc.
raise NotImplementedError(
"HFJobsExecutor is a v0 skeleton; full implementation pending. "
"Use LocalProcessExecutor for testing."
)
def launch_replicas(
self,
n_replicas: int,
entrypoint: str | Callable[..., Any],
entrypoint_args: Mapping[str, Any],
*,
gpu: str | None = "a100",
timeout: int = 3600,
) -> list[ReplicaHandle]:
raise NotImplementedError
def poll(self, handle: ReplicaHandle) -> str:
raise NotImplementedError
def stream_logs(self, handle: ReplicaHandle, *, n_lines: int = 200) -> str:
raise NotImplementedError
def cancel(self, handle: ReplicaHandle) -> None:
raise NotImplementedError
def collect(
self,
handles: list[ReplicaHandle],
*,
timeout: int | None = None,
) -> list[dict[str, Any]]:
raise NotImplementedError
__all__ = ["HFJobsExecutor"]