Buckets:
| from __future__ import annotations | |
| import os | |
| from typing import Any | |
| from n21.types import DeploymentHandle, TrainHandle, ValidationResult | |
| class BaseProvider: | |
| name = "base" | |
| required_env: list[str] = [] | |
| def validate_config(self, config: dict[str, Any], *, live: bool = False) -> ValidationResult: | |
| errors: list[str] = [] | |
| warnings: list[str] = [] | |
| if config.get("provider") != self.name: | |
| errors.append(f"provider config mismatch: expected {self.name}") | |
| if live: | |
| for env_var in self.required_env: | |
| if not os.environ.get(env_var): | |
| errors.append(f"missing required environment variable for live mode: {env_var}") | |
| else: | |
| warnings.append("dry-run mode: live provider calls disabled") | |
| return {"ok": not errors, "errors": errors, "warnings": warnings} | |
| def stage_dataset(self, dataset_manifest: dict[str, Any]) -> dict[str, Any]: | |
| return {"status": "planned", "provider": self.name, "dataset_manifest_id": dataset_manifest.get("dataset_manifest_id")} | |
| def start_train(self, run_manifest: dict[str, Any]) -> TrainHandle: | |
| return { | |
| "run_id": run_manifest["run_id"], | |
| "provider_job_id": f"dryrun-{self.name}-{run_manifest['run_id']}", | |
| "status": "planned", | |
| "logs_uri": None, | |
| } | |
| def resume_train(self, run_id: str, checkpoint_ptr: dict[str, Any]) -> TrainHandle: | |
| return {"run_id": run_id, "provider_job_id": f"dryrun-resume-{self.name}-{run_id}", "status": "planned", "logs_uri": None} | |
| def cancel_train(self, run_id: str) -> dict[str, Any]: | |
| return {"run_id": run_id, "status": "cancel_planned", "provider": self.name} | |
| def get_run_status(self, run_id: str) -> dict[str, Any]: | |
| return {"run_id": run_id, "status": "planned", "provider": self.name} | |
| def stream_logs(self, run_id: str, cursor: str | None = None) -> dict[str, Any]: | |
| return {"run_id": run_id, "cursor": cursor, "lines": [], "provider": self.name} | |
| def export_artifacts(self, run_id: str, export_policy: dict[str, Any]) -> dict[str, Any]: | |
| return {"run_id": run_id, "status": "export_planned", "provider": self.name, "export_policy": export_policy} | |
| def deploy_endpoint(self, model_manifest: dict[str, Any], env: str) -> DeploymentHandle: | |
| return { | |
| "deployment_id": f"dryrun-{self.name}-{env}-{model_manifest['run_id']}", | |
| "endpoint_uri": None, | |
| "status": "planned", | |
| } | |
| def invoke(self, deployment_id: str, request: dict[str, Any]) -> dict[str, Any]: | |
| return {"deployment_id": deployment_id, "response": {"choices": []}, "latency_ms": 0, "provider": self.name} | |
| def healthcheck(self, deployment_id: str) -> dict[str, Any]: | |
| return {"deployment_id": deployment_id, "status": "healthy_planned", "provider": self.name} | |
| def promote(self, deployment_id: str, env: str) -> dict[str, Any]: | |
| return {"deployment_id": deployment_id, "env": env, "status": "promotion_planned", "provider": self.name} | |
| def rollback(self, env: str, target: dict[str, Any]) -> dict[str, Any]: | |
| return {"env": env, "target": target, "status": "rollback_planned", "provider": self.name} | |
| def delete_endpoint(self, deployment_id: str) -> dict[str, Any]: | |
| return {"deployment_id": deployment_id, "status": "delete_planned", "provider": self.name} | |
| def estimate_cost(self, plan: dict[str, Any]) -> dict[str, Any]: | |
| return {"provider": self.name, "gpu_hours": 0.0, "cost_usd": 0.0, "estimation_method": "dry_run_zero_cost"} | |
Xet Storage Details
- Size:
- 3.62 kB
- Xet hash:
- b4d01304b6bae12b61a85f2fb8027b4cf6b5e1f97ceef7c476bae0b11a8b6794
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.