"""HuggingFace Jobs executor — skeleton for v0. Per ADR-005, HF Jobs is one of two v0 target executors. This file is a STUB. The full integration uses `huggingface_hub.run_job` (added in huggingface_hub >= 0.27, ~2026 era) which spins up a containerized job backed by HF's compute pool. Pricing reference (2026-05-26): A100 ≈ $4.18/hr, H100 ≈ $9.50/hr. Cold start ≈ 60s. NO inter-job networking — must use object-store rendezvous. Status: SKELETON. Real implementation pending v0 polish wave. """ from __future__ import annotations from typing import Any, Callable, Mapping from composer_replication.diloco.serverless.executor import ( ReplicaHandle, ServerlessExecutor, ) class HFJobsExecutor(ServerlessExecutor): """Run replicas as HuggingFace Jobs in parallel. Reference implementation pattern: from huggingface_hub import run_job jobs = [] for rank in range(N): job = run_job( image="...", # container with composer_replication installed command=[ "python", "-m", "composer_replication.diloco.serverless.replica_entrypoint", "--rank", str(rank), "--rendezvous", "hf://datasets/myuser/run42/", ], env={"REPLICA_RANK": str(rank), "WORLD_SIZE": str(N)}, gpu="a100", ) jobs.append(job) return [ReplicaHandle(rank=i, backend_name="hf_jobs", metadata={"job_id": jobs[i].id}) for i in range(N)] Object-store rendezvous works naturally with the HF Datasets-as-storage pattern — `hf://datasets/{user}/{run_id}/` is fsspec-compatible via `huggingface_hub`'s fsspec integration. Status: SKELETON. """ backend_name = "hf_jobs" supports_inter_replica_network = False def __init__(self) -> None: try: from huggingface_hub import HfApi # noqa: F401 except ImportError as e: raise RuntimeError( "HFJobsExecutor requires huggingface_hub. Got: " + repr(e) ) # Real implementation: instantiate HfApi, validate token, etc. raise NotImplementedError( "HFJobsExecutor is a v0 skeleton; full implementation pending. " "Use LocalProcessExecutor for testing." ) def launch_replicas( self, n_replicas: int, entrypoint: str | Callable[..., Any], entrypoint_args: Mapping[str, Any], *, gpu: str | None = "a100", timeout: int = 3600, ) -> list[ReplicaHandle]: raise NotImplementedError def poll(self, handle: ReplicaHandle) -> str: raise NotImplementedError def stream_logs(self, handle: ReplicaHandle, *, n_lines: int = 200) -> str: raise NotImplementedError def cancel(self, handle: ReplicaHandle) -> None: raise NotImplementedError def collect( self, handles: list[ReplicaHandle], *, timeout: int | None = None, ) -> list[dict[str, Any]]: raise NotImplementedError __all__ = ["HFJobsExecutor"]