from __future__ import annotations import re from dataclasses import dataclass from pathlib import Path, PurePosixPath from typing import Any from slop_farmer.data.parquet_io import read_json RAW_TABLE_FILENAMES: tuple[str, ...] = ( "issues.parquet", "pull_requests.parquet", "comments.parquet", "reviews.parquet", "review_comments.parquet", "pr_files.parquet", "pr_diffs.parquet", "links.parquet", "events.parquet", ) VIEWER_SPLIT_FILENAMES: tuple[str, ...] = ( "issue_comments.parquet", "pr_comments.parquet", ) ROOT_MANIFEST_FILENAME = "manifest.json" README_FILENAME = "README.md" STATE_WATERMARK_PATH = "state/watermark.json" SNAPSHOTS_LATEST_PATH = "snapshots/latest.json" PR_SCOPE_CLUSTERS_FILENAME = "pr-scope-clusters.json" NEW_CONTRIBUTORS_PARQUET_FILENAME = "new_contributors.parquet" NEW_CONTRIBUTORS_REPORT_JSON_FILENAME = "new-contributors-report.json" NEW_CONTRIBUTORS_REPORT_MARKDOWN_FILENAME = "new-contributors-report.md" CONTRIBUTOR_ARTIFACT_FILENAMES: tuple[str, ...] = ( NEW_CONTRIBUTORS_PARQUET_FILENAME, NEW_CONTRIBUTORS_REPORT_JSON_FILENAME, NEW_CONTRIBUTORS_REPORT_MARKDOWN_FILENAME, ) ANALYSIS_REPORT_FILENAME_BY_VARIANT: dict[str, str] = { "deterministic": "analysis-report.json", "hybrid": "analysis-report-hybrid.json", } HYBRID_ANALYSIS_REVIEWS_FILENAME = "analysis-report-hybrid.llm-reviews.json" LEGACY_ANALYSIS_FILENAMES: tuple[str, ...] = ( ANALYSIS_REPORT_FILENAME_BY_VARIANT["deterministic"], ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"], HYBRID_ANALYSIS_REVIEWS_FILENAME, ) CURRENT_ANALYSIS_DIR = PurePosixPath("analysis/current") CURRENT_ANALYSIS_MANIFEST_PATH = str(CURRENT_ANALYSIS_DIR / ROOT_MANIFEST_FILENAME) ANALYSIS_MANIFEST_SCHEMA_VERSION = 1 @dataclass(frozen=True, slots=True) class ResolvedAnalysisReportPath: path: Path variant: str source: str snapshot_id: str | None = None analysis_id: str | None = None def default_hf_materialize_dir(output_dir: Path, repo_id: str, revision: str | None) -> Path: suffix = repo_id.replace("/", "--") if revision: suffix = f"{suffix}--{revision.replace('/', '--')}" return output_dir.resolve() / "snapshots" / f"hf-{suffix}" def repo_relative_path_to_local(base_dir: Path, repo_relative_path: str) -> Path: return base_dir.joinpath(*PurePosixPath(repo_relative_path).parts) def snapshot_artifact_path(snapshot_id: str, filename: str) -> str: return str(PurePosixPath("snapshots") / snapshot_id / filename) def archived_snapshot_manifest_path(snapshot_id: str) -> str: return snapshot_artifact_path(snapshot_id, ROOT_MANIFEST_FILENAME) def analysis_run_artifact_path(snapshot_id: str, analysis_id: str, filename: str) -> str: return str(PurePosixPath("snapshots") / snapshot_id / "analysis-runs" / analysis_id / filename) def analysis_run_manifest_path(snapshot_id: str, analysis_id: str) -> str: return analysis_run_artifact_path(snapshot_id, analysis_id, ROOT_MANIFEST_FILENAME) def current_analysis_artifact_path(filename: str) -> str: return str(CURRENT_ANALYSIS_DIR / filename) def repo_key(repo_slug: str) -> str: return _path_key(repo_slug) def model_key(model: str) -> str: return _path_key(model) def build_current_analysis_manifest( *, repo: str, snapshot_id: str, analysis_id: str, variant: str, channel: str, model: str | None, published_at: str, include_hybrid_reviews: bool, ) -> dict[str, Any]: artifacts = { "hybrid": current_analysis_artifact_path(ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"]), } archived_artifacts = { "hybrid": analysis_run_artifact_path( snapshot_id, analysis_id, ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"], ) } if include_hybrid_reviews: artifacts["hybrid_reviews"] = current_analysis_artifact_path( HYBRID_ANALYSIS_REVIEWS_FILENAME ) archived_artifacts["hybrid_reviews"] = analysis_run_artifact_path( snapshot_id, analysis_id, HYBRID_ANALYSIS_REVIEWS_FILENAME, ) payload = { "schema_version": ANALYSIS_MANIFEST_SCHEMA_VERSION, "repo": repo, "snapshot_id": snapshot_id, "analysis_id": analysis_id, "variant": variant, "channel": channel, "model": model, "published_at": published_at, "artifacts": artifacts, "archived_artifacts": archived_artifacts, } return validate_current_analysis_manifest(payload) def build_archived_analysis_run_manifest( *, repo: str, snapshot_id: str, analysis_id: str, variant: str, channel: str, model: str | None, published_at: str, include_hybrid_reviews: bool, ) -> dict[str, Any]: artifacts = { "hybrid": analysis_run_artifact_path( snapshot_id, analysis_id, ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"], ) } if include_hybrid_reviews: artifacts["hybrid_reviews"] = analysis_run_artifact_path( snapshot_id, analysis_id, HYBRID_ANALYSIS_REVIEWS_FILENAME, ) payload = { "schema_version": ANALYSIS_MANIFEST_SCHEMA_VERSION, "repo": repo, "snapshot_id": snapshot_id, "analysis_id": analysis_id, "variant": variant, "channel": channel, "model": model, "published_at": published_at, "artifacts": artifacts, } return validate_archived_analysis_run_manifest(payload) def load_current_analysis_manifest(path: Path) -> dict[str, Any]: payload = read_json(path) if not isinstance(payload, dict): raise ValueError(f"Current analysis manifest at {path} must contain a JSON object.") return validate_current_analysis_manifest(payload) def load_archived_analysis_run_manifest(path: Path) -> dict[str, Any]: payload = read_json(path) if not isinstance(payload, dict): raise ValueError(f"Archived analysis manifest at {path} must contain a JSON object.") return validate_archived_analysis_run_manifest(payload) def resolve_default_dashboard_analysis_report( snapshot_dir: Path, ) -> ResolvedAnalysisReportPath | None: current = resolve_current_analysis_report(snapshot_dir) if current is not None and _analysis_matches_snapshot(snapshot_dir, current): return current return resolve_snapshot_local_analysis_report(snapshot_dir, variant="auto") def resolve_current_analysis_report( snapshot_dir: Path, *, variant: str = "auto", ) -> ResolvedAnalysisReportPath | None: normalized = _normalize_analysis_variant(variant) manifest_path = repo_relative_path_to_local(snapshot_dir, CURRENT_ANALYSIS_MANIFEST_PATH) if not manifest_path.exists(): return None manifest = load_current_analysis_manifest(manifest_path) artifact_key = _analysis_artifact_key_for_variant(normalized, manifest_kind="current") artifact_path = manifest.get("artifacts", {}).get(artifact_key) if not isinstance(artifact_path, str) or not artifact_path: message = ( f"Published current analysis manifest does not provide the {normalized} artifact." if normalized != "auto" else "Published current analysis manifest does not provide the canonical hybrid artifact." ) raise ValueError(message) report_path = repo_relative_path_to_local(snapshot_dir, artifact_path) if not report_path.exists(): raise ValueError( f"Published current analysis artifact {artifact_path!r} is missing from the materialized snapshot." ) return ResolvedAnalysisReportPath( path=report_path, variant="hybrid" if artifact_key == "hybrid" else normalized, source="current", snapshot_id=str(manifest["snapshot_id"]), analysis_id=str(manifest["analysis_id"]), ) def resolve_snapshot_local_analysis_report( snapshot_dir: Path, *, variant: str = "auto", ) -> ResolvedAnalysisReportPath | None: normalized = _normalize_analysis_variant(variant) if normalized == "auto": hybrid_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"] if hybrid_path.exists(): return ResolvedAnalysisReportPath( path=hybrid_path, variant="hybrid", source="snapshot", ) deterministic_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT["deterministic"] if deterministic_path.exists(): return ResolvedAnalysisReportPath( path=deterministic_path, variant="deterministic", source="snapshot", ) return None report_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT[normalized] if not report_path.exists(): return None return ResolvedAnalysisReportPath( path=report_path, variant=normalized, source="snapshot", ) def validate_current_analysis_manifest(payload: dict[str, Any]) -> dict[str, Any]: validated = _validate_analysis_manifest(payload, require_archived_artifacts=True) archived_artifacts = _validate_artifacts( dict(validated["archived_artifacts"]), expected_prefix=analysis_run_artifact_path( str(validated["snapshot_id"]), str(validated["analysis_id"]), "", ), ) if set(archived_artifacts) != set(validated["artifacts"]): raise ValueError("Current analysis manifest artifacts and archived_artifacts must match.") validated["archived_artifacts"] = archived_artifacts return validated def validate_archived_analysis_run_manifest(payload: dict[str, Any]) -> dict[str, Any]: return _validate_analysis_manifest(payload, require_archived_artifacts=False) def load_latest_snapshot_pointer(snapshots_root: Path) -> Path | None: resolved_snapshots_root = snapshots_root.resolve() latest_path = resolved_snapshots_root / "latest.json" if not latest_path.exists(): return None payload = read_json(latest_path) snapshot_dir = payload.get("snapshot_dir") if isinstance(snapshot_dir, str) and snapshot_dir: path = Path(snapshot_dir) if path.is_absolute(): return path.resolve() return (resolved_snapshots_root.parent / path).resolve() return None def resolve_snapshot_dir_from_output(output_dir: Path, snapshot_dir: Path | None) -> Path: return resolve_snapshot_dir_from_snapshots_root( output_dir.resolve() / "snapshots", snapshot_dir ) def resolve_snapshot_dir_from_snapshots_root( snapshots_root: Path, snapshot_dir: Path | None, ) -> Path: if snapshot_dir is not None: return snapshot_dir.resolve() resolved_snapshots_root = snapshots_root.resolve() latest_path = resolved_snapshots_root / "latest.json" latest_snapshot_dir = load_latest_snapshot_pointer(resolved_snapshots_root) if latest_snapshot_dir is not None: return latest_snapshot_dir snapshot_dirs = sorted(path for path in resolved_snapshots_root.glob("*") if path.is_dir()) if snapshot_dirs: return snapshot_dirs[-1].resolve() raise FileNotFoundError(f"Could not resolve a snapshot directory from {latest_path}") def _validate_analysis_manifest( payload: dict[str, Any], *, require_archived_artifacts: bool, ) -> dict[str, Any]: validated = {str(key): value for key, value in payload.items()} if validated.get("schema_version") != ANALYSIS_MANIFEST_SCHEMA_VERSION: raise ValueError( f"Unsupported analysis manifest schema version: {validated.get('schema_version')!r}" ) for field in ("repo", "snapshot_id", "analysis_id", "variant", "channel", "published_at"): if not isinstance(validated.get(field), str) or not str(validated[field]).strip(): raise ValueError(f"Analysis manifest field {field!r} must be a non-empty string.") validated[field] = str(validated[field]).strip() model = validated.get("model") if model is not None and not isinstance(model, str): raise ValueError("Analysis manifest field 'model' must be a string when present.") artifacts = validated.get("artifacts") if not isinstance(artifacts, dict): raise ValueError("Analysis manifest field 'artifacts' must be an object.") expected_prefix = ( current_analysis_artifact_path("") if require_archived_artifacts else analysis_run_artifact_path( str(validated["snapshot_id"]), str(validated["analysis_id"]), "", ) ) validated["artifacts"] = _validate_artifacts(dict(artifacts), expected_prefix=expected_prefix) if require_archived_artifacts: archived_artifacts = validated.get("archived_artifacts") if not isinstance(archived_artifacts, dict): raise ValueError( "Current analysis manifest field 'archived_artifacts' must be an object." ) validated["archived_artifacts"] = { str(key): value for key, value in archived_artifacts.items() } return validated def _validate_artifacts(artifacts: dict[str, Any], *, expected_prefix: str) -> dict[str, str]: normalized = {str(key): value for key, value in artifacts.items()} hybrid_path = normalized.get("hybrid") if not isinstance(hybrid_path, str) or not hybrid_path: raise ValueError("Analysis manifest must include artifacts.hybrid.") validated = {"hybrid": hybrid_path} hybrid_reviews_path = normalized.get("hybrid_reviews") if hybrid_reviews_path is not None: if not isinstance(hybrid_reviews_path, str) or not hybrid_reviews_path: raise ValueError( "Analysis manifest artifacts.hybrid_reviews must be a non-empty string." ) validated["hybrid_reviews"] = hybrid_reviews_path for key, value in validated.items(): if not value.startswith(expected_prefix): raise ValueError( f"Analysis manifest artifact {key!r} must live under {expected_prefix!r}, got {value!r}." ) return validated def _path_key(value: str) -> str: normalized = re.sub(r"[^a-z0-9]+", "-", value.strip().lower()) normalized = re.sub(r"-+", "-", normalized).strip("-") if not normalized: raise ValueError("Expected a non-empty path key value.") return normalized def _analysis_matches_snapshot( snapshot_dir: Path, analysis_path: ResolvedAnalysisReportPath, ) -> bool: snapshot_manifest_path = snapshot_dir / ROOT_MANIFEST_FILENAME if snapshot_manifest_path.exists(): snapshot_manifest = read_json(snapshot_manifest_path) snapshot_id = snapshot_manifest.get("snapshot_id") if snapshot_id is not None: return str(snapshot_id) == str(analysis_path.snapshot_id) return snapshot_dir.name == str(analysis_path.snapshot_id) def _normalize_analysis_variant(variant: str) -> str: normalized = variant.strip().lower() if normalized not in {"auto", "deterministic", "hybrid"}: raise ValueError( f"Unsupported analysis variant {variant!r}; expected auto, hybrid, or deterministic." ) return normalized def _analysis_artifact_key_for_variant(variant: str, *, manifest_kind: str) -> str: if variant in {"auto", "hybrid"}: return "hybrid" raise ValueError( f"Published {manifest_kind} analysis only serves canonical hybrid artifacts; requested {variant!r}." )