from __future__ import annotations import json from collections.abc import Callable, Iterable from dataclasses import dataclass from datetime import UTC, datetime from pathlib import Path from typing import Any, Protocol, cast from huggingface_hub import CommitOperationAdd, HfApi, hf_hub_download from slop_farmer.app.save_cache import _save_analysis_cache_api from slop_farmer.config import PublishAnalysisArtifactsOptions from slop_farmer.data.parquet_io import read_json from slop_farmer.data.snapshot_paths import ( ANALYSIS_REPORT_FILENAME_BY_VARIANT, HYBRID_ANALYSIS_REVIEWS_FILENAME, ROOT_MANIFEST_FILENAME, analysis_run_artifact_path, analysis_run_manifest_path, archived_snapshot_manifest_path, build_archived_analysis_run_manifest, build_current_analysis_manifest, current_analysis_artifact_path, resolve_snapshot_dir_from_output, ) class HubApiLike(Protocol): def create_repo( self, repo_id: str, *, repo_type: str, private: bool, exist_ok: bool, ) -> None: ... def create_commit( self, repo_id: str, operations: Iterable[CommitOperationAdd], *, commit_message: str, repo_type: str, ) -> Any: ... def upload_folder( self, *, repo_id: str, folder_path: Path, path_in_repo: str, repo_type: str, commit_message: str, ) -> None: ... @dataclass(frozen=True, slots=True) class PublishableAnalysisArtifacts: repo: str snapshot_id: str model: str | None report_path: Path reviews_path: Path | None report_payload: dict[str, Any] def run_publish_analysis_artifacts(options: PublishAnalysisArtifactsOptions) -> dict[str, Any]: snapshot_dir = resolve_snapshot_dir_from_output(options.output_dir, options.snapshot_dir) return publish_analysis_artifacts( snapshot_dir=snapshot_dir, analysis_input=options.analysis_input, hf_repo_id=options.hf_repo_id, analysis_id=options.analysis_id, canonical=options.canonical, save_cache=options.save_cache, private=options.private_hf_repo, ) def publish_analysis_artifacts( *, snapshot_dir: Path, analysis_input: Path | None, hf_repo_id: str, analysis_id: str, canonical: bool, private: bool, save_cache: bool = False, log: Callable[[str], None] | None = None, ) -> dict[str, Any]: return _publish_analysis_artifacts_api( cast("HubApiLike", HfApi()), snapshot_dir=snapshot_dir, analysis_input=analysis_input, hf_repo_id=hf_repo_id, analysis_id=analysis_id, canonical=canonical, private=private, save_cache=save_cache, log=log, ) def _publish_analysis_artifacts_api( api: HubApiLike, *, snapshot_dir: Path, analysis_input: Path | None = None, hf_repo_id: str, analysis_id: str, canonical: bool, private: bool, save_cache: bool = False, log: Callable[[str], None] | None = None, ) -> dict[str, Any]: artifacts = _discover_publishable_analysis(snapshot_dir, analysis_input=analysis_input) published_at = _iso_now() channel = "canonical" if canonical else "comparison" archived_manifest = build_archived_analysis_run_manifest( repo=artifacts.repo, snapshot_id=artifacts.snapshot_id, analysis_id=analysis_id, variant="hybrid", channel=channel, model=artifacts.model, published_at=published_at, include_hybrid_reviews=artifacts.reviews_path is not None, ) current_manifest = ( build_current_analysis_manifest( repo=artifacts.repo, snapshot_id=artifacts.snapshot_id, analysis_id=analysis_id, variant="hybrid", channel=channel, model=artifacts.model, published_at=published_at, include_hybrid_reviews=artifacts.reviews_path is not None, ) if canonical else None ) snapshot_manifest = _updated_snapshot_manifest( snapshot_dir=snapshot_dir, hf_repo_id=hf_repo_id, snapshot_id=artifacts.snapshot_id, analysis_id=analysis_id, archived_manifest=archived_manifest, canonical=canonical, ) operations = _commit_operations( artifacts=artifacts, analysis_id=analysis_id, archived_manifest=archived_manifest, current_manifest=current_manifest, snapshot_manifest=snapshot_manifest, ) if log: log(f"Ensuring Hub dataset repo exists: {hf_repo_id}") api.create_repo(hf_repo_id, repo_type="dataset", private=private, exist_ok=True) if log: log(f"Publishing analysis {analysis_id} for snapshot {artifacts.snapshot_id}") api.create_commit( hf_repo_id, operations, commit_message=f"Publish analysis {analysis_id} for snapshot {artifacts.snapshot_id}", repo_type="dataset", ) cache_result = ( _save_analysis_cache_api( api, snapshot_dir=snapshot_dir, hf_repo_id=hf_repo_id, private=private, log=log, ) if save_cache else None ) result: dict[str, Any] = { "repo": artifacts.repo, "dataset_id": hf_repo_id, "snapshot_id": artifacts.snapshot_id, "analysis_id": analysis_id, "canonical": canonical, "save_cache": save_cache, "published_at": published_at, "artifact_paths": [operation.path_in_repo for operation in operations], } if cache_result is not None: result["cache"] = cache_result if log: log(f"Published analysis artifacts to {hf_repo_id}") return result def _discover_publishable_analysis( snapshot_dir: Path, *, analysis_input: Path | None ) -> PublishableAnalysisArtifacts: manifest_path = snapshot_dir / ROOT_MANIFEST_FILENAME if not manifest_path.exists(): raise FileNotFoundError(f"Snapshot manifest is missing: {manifest_path}") manifest = read_json(manifest_path) if not isinstance(manifest, dict): raise ValueError(f"Snapshot manifest at {manifest_path} must contain a JSON object.") snapshot_id = str(manifest.get("snapshot_id") or snapshot_dir.name).strip() repo = str(manifest.get("repo") or "").strip() if not repo: raise ValueError(f"Snapshot manifest at {manifest_path} does not define repo.") report_path = ( analysis_input.resolve() if analysis_input is not None else snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"] ) if not report_path.exists(): raise FileNotFoundError(f"Hybrid analysis report is missing: {report_path}") report_payload = read_json(report_path) if not isinstance(report_payload, dict): raise ValueError(f"Hybrid analysis report at {report_path} must contain a JSON object.") report_snapshot_id = str(report_payload.get("snapshot_id") or snapshot_id).strip() if report_snapshot_id != snapshot_id: raise ValueError( f"Hybrid analysis report snapshot_id {report_snapshot_id!r} does not match manifest snapshot_id {snapshot_id!r}." ) report_repo = str(report_payload.get("repo") or repo).strip() if report_repo != repo: raise ValueError( f"Hybrid analysis report repo {report_repo!r} does not match manifest repo {repo!r}." ) model = report_payload.get("model") if model is not None: model = str(model) reviews_path = report_path.with_name(f"{report_path.stem}.llm-reviews.json") return PublishableAnalysisArtifacts( repo=repo, snapshot_id=snapshot_id, model=model, report_path=report_path, reviews_path=reviews_path if reviews_path.exists() else None, report_payload={str(key): value for key, value in report_payload.items()}, ) def _updated_snapshot_manifest( *, snapshot_dir: Path, hf_repo_id: str, snapshot_id: str, analysis_id: str, archived_manifest: dict[str, Any], canonical: bool, ) -> dict[str, Any]: manifest = _load_remote_snapshot_manifest(hf_repo_id, snapshot_id) or read_json( snapshot_dir / ROOT_MANIFEST_FILENAME ) if not isinstance(manifest, dict): raise ValueError("Archived snapshot manifest must contain a JSON object.") updated = {str(key): value for key, value in manifest.items()} published_analysis: dict[str, Any] | Any = updated.get("published_analysis") if not isinstance(published_analysis, dict): published_analysis = {"schema_version": 1, "runs": {}} runs: dict[str, Any] | Any = published_analysis.get("runs") if not isinstance(runs, dict): runs = {} runs[analysis_id] = { "analysis_id": analysis_id, "variant": archived_manifest["variant"], "channel": archived_manifest["channel"], "model": archived_manifest.get("model"), "published_at": archived_manifest["published_at"], "manifest_path": analysis_run_manifest_path(snapshot_id, analysis_id), "artifacts": archived_manifest["artifacts"], } published_analysis["schema_version"] = 1 published_analysis["runs"] = runs if canonical: published_analysis["canonical_analysis_id"] = analysis_id updated["published_analysis"] = published_analysis return updated def _load_remote_snapshot_manifest(hf_repo_id: str, snapshot_id: str) -> dict[str, Any] | None: try: downloaded = hf_hub_download( repo_id=hf_repo_id, repo_type="dataset", filename=archived_snapshot_manifest_path(snapshot_id), ) except Exception: return None payload = json.loads(Path(downloaded).read_text(encoding="utf-8")) return payload if isinstance(payload, dict) else None def _commit_operations( *, artifacts: PublishableAnalysisArtifacts, analysis_id: str, archived_manifest: dict[str, Any], current_manifest: dict[str, Any] | None, snapshot_manifest: dict[str, Any], ) -> list[CommitOperationAdd]: report_filename = ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"] operations = [ CommitOperationAdd( path_in_repo=analysis_run_artifact_path( artifacts.snapshot_id, analysis_id, report_filename, ), path_or_fileobj=artifacts.report_path, ), CommitOperationAdd( path_in_repo=analysis_run_manifest_path(artifacts.snapshot_id, analysis_id), path_or_fileobj=_json_bytes(archived_manifest), ), CommitOperationAdd( path_in_repo=archived_snapshot_manifest_path(artifacts.snapshot_id), path_or_fileobj=_json_bytes(snapshot_manifest), ), ] if artifacts.reviews_path is not None: operations.append( CommitOperationAdd( path_in_repo=analysis_run_artifact_path( artifacts.snapshot_id, analysis_id, HYBRID_ANALYSIS_REVIEWS_FILENAME, ), path_or_fileobj=artifacts.reviews_path, ) ) if current_manifest is not None: operations.extend( [ CommitOperationAdd( path_in_repo=current_analysis_artifact_path(report_filename), path_or_fileobj=artifacts.report_path, ), CommitOperationAdd( path_in_repo=current_analysis_artifact_path(ROOT_MANIFEST_FILENAME), path_or_fileobj=_json_bytes(current_manifest), ), ] ) if artifacts.reviews_path is not None: operations.append( CommitOperationAdd( path_in_repo=current_analysis_artifact_path(HYBRID_ANALYSIS_REVIEWS_FILENAME), path_or_fileobj=artifacts.reviews_path, ) ) return operations def _json_bytes(payload: dict[str, Any]) -> bytes: return (json.dumps(payload, indent=2, sort_keys=True) + "\n").encode("utf-8") def _iso_now() -> str: return datetime.now(tz=UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")