linvest21's picture
download
raw
3.62 kB
from __future__ import annotations
import os
from typing import Any
from n21.types import DeploymentHandle, TrainHandle, ValidationResult
class BaseProvider:
name = "base"
required_env: list[str] = []
def validate_config(self, config: dict[str, Any], *, live: bool = False) -> ValidationResult:
errors: list[str] = []
warnings: list[str] = []
if config.get("provider") != self.name:
errors.append(f"provider config mismatch: expected {self.name}")
if live:
for env_var in self.required_env:
if not os.environ.get(env_var):
errors.append(f"missing required environment variable for live mode: {env_var}")
else:
warnings.append("dry-run mode: live provider calls disabled")
return {"ok": not errors, "errors": errors, "warnings": warnings}
def stage_dataset(self, dataset_manifest: dict[str, Any]) -> dict[str, Any]:
return {"status": "planned", "provider": self.name, "dataset_manifest_id": dataset_manifest.get("dataset_manifest_id")}
def start_train(self, run_manifest: dict[str, Any]) -> TrainHandle:
return {
"run_id": run_manifest["run_id"],
"provider_job_id": f"dryrun-{self.name}-{run_manifest['run_id']}",
"status": "planned",
"logs_uri": None,
}
def resume_train(self, run_id: str, checkpoint_ptr: dict[str, Any]) -> TrainHandle:
return {"run_id": run_id, "provider_job_id": f"dryrun-resume-{self.name}-{run_id}", "status": "planned", "logs_uri": None}
def cancel_train(self, run_id: str) -> dict[str, Any]:
return {"run_id": run_id, "status": "cancel_planned", "provider": self.name}
def get_run_status(self, run_id: str) -> dict[str, Any]:
return {"run_id": run_id, "status": "planned", "provider": self.name}
def stream_logs(self, run_id: str, cursor: str | None = None) -> dict[str, Any]:
return {"run_id": run_id, "cursor": cursor, "lines": [], "provider": self.name}
def export_artifacts(self, run_id: str, export_policy: dict[str, Any]) -> dict[str, Any]:
return {"run_id": run_id, "status": "export_planned", "provider": self.name, "export_policy": export_policy}
def deploy_endpoint(self, model_manifest: dict[str, Any], env: str) -> DeploymentHandle:
return {
"deployment_id": f"dryrun-{self.name}-{env}-{model_manifest['run_id']}",
"endpoint_uri": None,
"status": "planned",
}
def invoke(self, deployment_id: str, request: dict[str, Any]) -> dict[str, Any]:
return {"deployment_id": deployment_id, "response": {"choices": []}, "latency_ms": 0, "provider": self.name}
def healthcheck(self, deployment_id: str) -> dict[str, Any]:
return {"deployment_id": deployment_id, "status": "healthy_planned", "provider": self.name}
def promote(self, deployment_id: str, env: str) -> dict[str, Any]:
return {"deployment_id": deployment_id, "env": env, "status": "promotion_planned", "provider": self.name}
def rollback(self, env: str, target: dict[str, Any]) -> dict[str, Any]:
return {"env": env, "target": target, "status": "rollback_planned", "provider": self.name}
def delete_endpoint(self, deployment_id: str) -> dict[str, Any]:
return {"deployment_id": deployment_id, "status": "delete_planned", "provider": self.name}
def estimate_cost(self, plan: dict[str, Any]) -> dict[str, Any]:
return {"provider": self.name, "gpu_hours": 0.0, "cost_usd": 0.0, "estimation_method": "dry_run_zero_cost"}

Xet Storage Details

Size:
3.62 kB
·
Xet hash:
b4d01304b6bae12b61a85f2fb8027b4cf6b5e1f97ceef7c476bae0b11a8b6794

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.