from __future__ import annotations from dataclasses import dataclass from pathlib import Path from typing import Any from slop_farmer.data.parquet_io import read_json from slop_farmer.data.search_duckdb import connect_pr_search_db, resolve_active_run from slop_farmer.data.snapshot_paths import ( ANALYSIS_REPORT_FILENAME_BY_VARIANT, CURRENT_ANALYSIS_MANIFEST_PATH, analysis_run_manifest_path, load_archived_analysis_run_manifest, load_current_analysis_manifest, repo_relative_path_to_local, ) ANALYSIS_VARIANTS = {"auto", "deterministic", "hybrid"} @dataclass(frozen=True, slots=True) class ActiveSnapshotContext: active_run: dict[str, Any] snapshot_dir: Path @dataclass(frozen=True, slots=True) class AnalysisContext: active_run: dict[str, Any] report: dict[str, Any] report_path: Path report_source: str variant_requested: str variant_used: str analysis_id: str | None def get_analysis_status( db_path: Path, *, repo: str | None = None, variant: str = "auto", snapshot_id: str | None = None, analysis_id: str | None = None, ) -> dict[str, Any]: active = _resolve_active_snapshot_context(db_path, repo=repo) report_path, variant_used, report_source, resolved_analysis_id = _resolve_analysis_report_path( active.snapshot_dir, variant, snapshot_id=snapshot_id, analysis_id=analysis_id, required=False, ) payload = { "repo": str(active.active_run["repo"]), "active_snapshot_id": str(active.active_run["snapshot_id"]), "run_id": str(active.active_run["id"]), "variant_requested": _normalize_analysis_variant(variant), "available": report_path is not None, } if report_path is None or variant_used is None or report_source is None: return payload report = _load_report(report_path) status = { **payload, "snapshot_id": str(report.get("snapshot_id") or active.active_run["snapshot_id"]), "variant_used": variant_used, "analysis_source": report_source, "llm_enrichment": bool(report.get("llm_enrichment")), "generated_at": report.get("generated_at"), "counts": _analysis_counts(report), } if resolved_analysis_id is not None: status["analysis_id"] = resolved_analysis_id return status def get_pr_analysis( db_path: Path, *, pr_number: int, repo: str | None = None, variant: str = "auto", snapshot_id: str | None = None, analysis_id: str | None = None, ) -> dict[str, Any]: context = _load_analysis_context( db_path, repo=repo, variant=variant, snapshot_id=snapshot_id, analysis_id=analysis_id, ) meta_bug, rank = _find_meta_bug_for_pr(context.report, pr_number) duplicate_pr = _find_duplicate_pr_for_pr(context.report, pr_number) return { **_analysis_base_payload(context), "pr_number": pr_number, "found": meta_bug is not None or duplicate_pr is not None, "meta_bug": None if meta_bug is None else _meta_bug_payload(meta_bug, rank=rank), "duplicate_pr": duplicate_pr, } def list_analysis_meta_bugs( db_path: Path, *, repo: str | None = None, variant: str = "auto", limit: int = 50, snapshot_id: str | None = None, analysis_id: str | None = None, ) -> dict[str, Any]: context = _load_analysis_context( db_path, repo=repo, variant=variant, snapshot_id=snapshot_id, analysis_id=analysis_id, ) meta_bugs = [ _meta_bug_payload(cluster, rank=index) for index, cluster in enumerate(context.report.get("meta_bugs", [])[:limit], start=1) ] return { **_analysis_base_payload(context), "meta_bugs": meta_bugs, "meta_bug_count": len(meta_bugs), } def get_analysis_meta_bug( db_path: Path, *, cluster_id: str, repo: str | None = None, variant: str = "auto", snapshot_id: str | None = None, analysis_id: str | None = None, ) -> dict[str, Any]: context = _load_analysis_context( db_path, repo=repo, variant=variant, snapshot_id=snapshot_id, analysis_id=analysis_id, ) for index, cluster in enumerate(context.report.get("meta_bugs", []), start=1): if str(cluster.get("cluster_id")) != cluster_id: continue return { **_analysis_base_payload(context), "meta_bug": _meta_bug_payload(cluster, rank=index), "duplicate_pr": _find_duplicate_pr_by_cluster_id(context.report, cluster_id), } raise ValueError(f"Analysis cluster {cluster_id!r} was not found in the active analysis view.") def list_analysis_duplicate_prs( db_path: Path, *, repo: str | None = None, variant: str = "auto", limit: int = 50, snapshot_id: str | None = None, analysis_id: str | None = None, ) -> dict[str, Any]: context = _load_analysis_context( db_path, repo=repo, variant=variant, snapshot_id=snapshot_id, analysis_id=analysis_id, ) duplicate_prs = [ {"rank": index, **dict(entry)} for index, entry in enumerate(context.report.get("duplicate_prs", [])[:limit], start=1) ] return { **_analysis_base_payload(context), "duplicate_prs": duplicate_prs, "duplicate_pr_count": len(duplicate_prs), } def get_analysis_best( db_path: Path, *, repo: str | None = None, variant: str = "auto", snapshot_id: str | None = None, analysis_id: str | None = None, ) -> dict[str, Any]: context = _load_analysis_context( db_path, repo=repo, variant=variant, snapshot_id=snapshot_id, analysis_id=analysis_id, ) return { **_analysis_base_payload(context), "best_issue": _best_entry_with_cluster_id( context.report, context.report.get("best_issue"), number_key="issue_number", numbers_key="issue_numbers", ), "best_pr": _best_entry_with_cluster_id( context.report, context.report.get("best_pr"), number_key="pr_number", numbers_key="pr_numbers", ), } def _resolve_active_snapshot_context( db_path: Path, *, repo: str | None, ) -> ActiveSnapshotContext: connection = connect_pr_search_db(db_path, read_only=True) try: active_run = resolve_active_run(connection, repo=repo) finally: connection.close() return ActiveSnapshotContext( active_run={str(key): value for key, value in active_run.items()}, snapshot_dir=Path(str(active_run["snapshot_dir"])).resolve(), ) def _load_analysis_context( db_path: Path, *, repo: str | None, variant: str, snapshot_id: str | None, analysis_id: str | None, ) -> AnalysisContext: active = _resolve_active_snapshot_context(db_path, repo=repo) report_path, variant_used, report_source, resolved_analysis_id = _resolve_analysis_report_path( active.snapshot_dir, variant, snapshot_id=snapshot_id, analysis_id=analysis_id, required=True, ) assert report_path is not None assert variant_used is not None assert report_source is not None return AnalysisContext( active_run=active.active_run, report=_load_report(report_path), report_path=report_path, report_source=report_source, variant_requested=_normalize_analysis_variant(variant), variant_used=variant_used, analysis_id=resolved_analysis_id, ) def _resolve_analysis_report_path( snapshot_dir: Path, variant: str, *, snapshot_id: str | None, analysis_id: str | None, required: bool, ) -> tuple[Path | None, str | None, str | None, str | None]: normalized = _normalize_analysis_variant(variant) if (snapshot_id is None) != (analysis_id is None): raise ValueError("snapshot_id and analysis_id must be provided together.") if snapshot_id is not None and analysis_id is not None: selection = _resolve_archived_analysis_report_path( snapshot_dir, snapshot_id=snapshot_id, analysis_id=analysis_id, variant=normalized, ) if selection is not None: return (*selection, analysis_id) if not required: return None, None, None, None raise ValueError( f"Published analysis run {analysis_id!r} for snapshot {snapshot_id!r} was not found." ) current_manifest_path = repo_relative_path_to_local( snapshot_dir, CURRENT_ANALYSIS_MANIFEST_PATH ) if normalized == "deterministic": selection = _resolve_snapshot_local_report_path(snapshot_dir, variant=normalized) if selection is not None: return (*selection, None) if current_manifest_path.exists(): report_path, variant_used = _resolve_manifest_report_path( snapshot_dir, load_current_analysis_manifest(current_manifest_path), variant=normalized, manifest_kind="current", ) return ( report_path, variant_used, "current", str(load_current_analysis_manifest(current_manifest_path)["analysis_id"]), ) selection = _resolve_snapshot_local_report_path(snapshot_dir, variant=normalized) if selection is not None: return (*selection, None) if not required: return None, None, None, None raise ValueError( "No analysis report was found for the current analysis view or active snapshot." ) def _resolve_archived_analysis_report_path( snapshot_dir: Path, *, snapshot_id: str, analysis_id: str, variant: str, ) -> tuple[Path, str, str] | None: manifest_path = repo_relative_path_to_local( snapshot_dir, analysis_run_manifest_path(snapshot_id, analysis_id), ) if not manifest_path.exists(): return None report_path, variant_used = _resolve_manifest_report_path( snapshot_dir, load_archived_analysis_run_manifest(manifest_path), variant=variant, manifest_kind="archived", ) return report_path, variant_used, "archived" def _resolve_manifest_report_path( snapshot_dir: Path, manifest: dict[str, Any], *, variant: str, manifest_kind: str, ) -> tuple[Path, str]: artifact_key = _artifact_key_for_variant(variant, manifest_kind=manifest_kind) artifacts = manifest.get("artifacts") or {} artifact_path = artifacts.get(artifact_key) if not isinstance(artifact_path, str) or not artifact_path: message = ( f"Published {manifest_kind} analysis manifest does not provide the {variant} artifact." if variant != "auto" else f"Published {manifest_kind} analysis manifest does not provide the canonical hybrid artifact." ) raise ValueError(message) report_path = repo_relative_path_to_local(snapshot_dir, artifact_path) if not report_path.exists(): raise ValueError( f"Published {manifest_kind} analysis artifact {artifact_path!r} is missing from the materialized snapshot." ) variant_used = "hybrid" if artifact_key == "hybrid" else variant return report_path, variant_used def _artifact_key_for_variant(variant: str, *, manifest_kind: str) -> str: if variant == "auto": return "hybrid" if variant == "hybrid": return "hybrid" raise ValueError( f"Published {manifest_kind} analysis only serves canonical hybrid artifacts; requested {variant!r}." ) def _resolve_snapshot_local_report_path( snapshot_dir: Path, *, variant: str, ) -> tuple[Path, str, str] | None: if variant == "auto": hybrid_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"] if hybrid_path.exists(): return hybrid_path, "hybrid", "snapshot" deterministic_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT["deterministic"] if deterministic_path.exists(): return deterministic_path, "deterministic", "snapshot" return None report_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT[variant] if not report_path.exists(): return None return report_path, variant, "snapshot" def _normalize_analysis_variant(variant: str) -> str: normalized = variant.strip().lower() if normalized not in ANALYSIS_VARIANTS: raise ValueError( f"Unsupported analysis variant {variant!r}; expected auto, hybrid, or deterministic." ) return normalized def _analysis_base_payload(context: AnalysisContext) -> dict[str, Any]: active_snapshot_id = str(context.active_run["snapshot_id"]) snapshot_id = str(context.report.get("snapshot_id") or active_snapshot_id) payload = { "repo": str(context.active_run["repo"]), "snapshot_id": snapshot_id, "active_snapshot_id": active_snapshot_id, "run_id": str(context.active_run["id"]), "variant_requested": context.variant_requested, "variant_used": context.variant_used, "analysis_source": context.report_source, "llm_enrichment": bool(context.report.get("llm_enrichment")), "generated_at": context.report.get("generated_at"), } if context.analysis_id is not None: payload["analysis_id"] = context.analysis_id return payload def _analysis_counts(report: dict[str, Any]) -> dict[str, int]: return { "meta_bugs": len(report.get("meta_bugs") or []), "duplicate_issues": len(report.get("duplicate_issues") or []), "duplicate_prs": len(report.get("duplicate_prs") or []), } def _meta_bug_payload(cluster: dict[str, Any], *, rank: int | None = None) -> dict[str, Any]: payload = dict(cluster) if rank is not None: payload["rank"] = rank return payload def _find_meta_bug_for_pr( report: dict[str, Any], pr_number: int, ) -> tuple[dict[str, Any] | None, int | None]: for index, cluster in enumerate(report.get("meta_bugs", []), start=1): pr_numbers = {int(number) for number in cluster.get("pr_numbers", [])} if pr_number in pr_numbers: return dict(cluster), index return None, None def _find_duplicate_pr_for_pr(report: dict[str, Any], pr_number: int) -> dict[str, Any] | None: for entry in report.get("duplicate_prs", []): numbers = { int(entry["canonical_pr_number"]), *(int(number) for number in entry.get("duplicate_pr_numbers", [])), } if pr_number in numbers: return dict(entry) return None def _find_duplicate_pr_by_cluster_id( report: dict[str, Any], cluster_id: str, ) -> dict[str, Any] | None: for entry in report.get("duplicate_prs", []): if str(entry.get("cluster_id")) == cluster_id: return dict(entry) return None def _best_entry_with_cluster_id( report: dict[str, Any], entry: Any, *, number_key: str, numbers_key: str, ) -> dict[str, Any] | None: if not isinstance(entry, dict): return None number = entry.get(number_key) if number is None: return dict(entry) for cluster in report.get("meta_bugs", []): numbers = {int(value) for value in cluster.get(numbers_key, [])} if int(number) in numbers: return {"cluster_id": cluster.get("cluster_id"), **dict(entry)} return dict(entry) def _load_report(path: Path) -> dict[str, Any]: payload = read_json(path) if not isinstance(payload, dict): raise ValueError(f"Analysis report at {path} must contain a JSON object.") return {str(key): value for key, value in payload.items()}