Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any | |
| from slop_farmer.data.parquet_io import read_json | |
| from slop_farmer.data.search_duckdb import connect_pr_search_db, resolve_active_run | |
| from slop_farmer.data.snapshot_paths import ( | |
| ANALYSIS_REPORT_FILENAME_BY_VARIANT, | |
| CURRENT_ANALYSIS_MANIFEST_PATH, | |
| analysis_run_manifest_path, | |
| load_archived_analysis_run_manifest, | |
| load_current_analysis_manifest, | |
| repo_relative_path_to_local, | |
| ) | |
| ANALYSIS_VARIANTS = {"auto", "deterministic", "hybrid"} | |
| class ActiveSnapshotContext: | |
| active_run: dict[str, Any] | |
| snapshot_dir: Path | |
| class AnalysisContext: | |
| active_run: dict[str, Any] | |
| report: dict[str, Any] | |
| report_path: Path | |
| report_source: str | |
| variant_requested: str | |
| variant_used: str | |
| analysis_id: str | None | |
| def get_analysis_status( | |
| db_path: Path, | |
| *, | |
| repo: str | None = None, | |
| variant: str = "auto", | |
| snapshot_id: str | None = None, | |
| analysis_id: str | None = None, | |
| ) -> dict[str, Any]: | |
| active = _resolve_active_snapshot_context(db_path, repo=repo) | |
| report_path, variant_used, report_source, resolved_analysis_id = _resolve_analysis_report_path( | |
| active.snapshot_dir, | |
| variant, | |
| snapshot_id=snapshot_id, | |
| analysis_id=analysis_id, | |
| required=False, | |
| ) | |
| payload = { | |
| "repo": str(active.active_run["repo"]), | |
| "active_snapshot_id": str(active.active_run["snapshot_id"]), | |
| "run_id": str(active.active_run["id"]), | |
| "variant_requested": _normalize_analysis_variant(variant), | |
| "available": report_path is not None, | |
| } | |
| if report_path is None or variant_used is None or report_source is None: | |
| return payload | |
| report = _load_report(report_path) | |
| status = { | |
| **payload, | |
| "snapshot_id": str(report.get("snapshot_id") or active.active_run["snapshot_id"]), | |
| "variant_used": variant_used, | |
| "analysis_source": report_source, | |
| "llm_enrichment": bool(report.get("llm_enrichment")), | |
| "generated_at": report.get("generated_at"), | |
| "counts": _analysis_counts(report), | |
| } | |
| if resolved_analysis_id is not None: | |
| status["analysis_id"] = resolved_analysis_id | |
| return status | |
| def get_pr_analysis( | |
| db_path: Path, | |
| *, | |
| pr_number: int, | |
| repo: str | None = None, | |
| variant: str = "auto", | |
| snapshot_id: str | None = None, | |
| analysis_id: str | None = None, | |
| ) -> dict[str, Any]: | |
| context = _load_analysis_context( | |
| db_path, | |
| repo=repo, | |
| variant=variant, | |
| snapshot_id=snapshot_id, | |
| analysis_id=analysis_id, | |
| ) | |
| meta_bug, rank = _find_meta_bug_for_pr(context.report, pr_number) | |
| duplicate_pr = _find_duplicate_pr_for_pr(context.report, pr_number) | |
| return { | |
| **_analysis_base_payload(context), | |
| "pr_number": pr_number, | |
| "found": meta_bug is not None or duplicate_pr is not None, | |
| "meta_bug": None if meta_bug is None else _meta_bug_payload(meta_bug, rank=rank), | |
| "duplicate_pr": duplicate_pr, | |
| } | |
| def list_analysis_meta_bugs( | |
| db_path: Path, | |
| *, | |
| repo: str | None = None, | |
| variant: str = "auto", | |
| limit: int = 50, | |
| snapshot_id: str | None = None, | |
| analysis_id: str | None = None, | |
| ) -> dict[str, Any]: | |
| context = _load_analysis_context( | |
| db_path, | |
| repo=repo, | |
| variant=variant, | |
| snapshot_id=snapshot_id, | |
| analysis_id=analysis_id, | |
| ) | |
| meta_bugs = [ | |
| _meta_bug_payload(cluster, rank=index) | |
| for index, cluster in enumerate(context.report.get("meta_bugs", [])[:limit], start=1) | |
| ] | |
| return { | |
| **_analysis_base_payload(context), | |
| "meta_bugs": meta_bugs, | |
| "meta_bug_count": len(meta_bugs), | |
| } | |
| def get_analysis_meta_bug( | |
| db_path: Path, | |
| *, | |
| cluster_id: str, | |
| repo: str | None = None, | |
| variant: str = "auto", | |
| snapshot_id: str | None = None, | |
| analysis_id: str | None = None, | |
| ) -> dict[str, Any]: | |
| context = _load_analysis_context( | |
| db_path, | |
| repo=repo, | |
| variant=variant, | |
| snapshot_id=snapshot_id, | |
| analysis_id=analysis_id, | |
| ) | |
| for index, cluster in enumerate(context.report.get("meta_bugs", []), start=1): | |
| if str(cluster.get("cluster_id")) != cluster_id: | |
| continue | |
| return { | |
| **_analysis_base_payload(context), | |
| "meta_bug": _meta_bug_payload(cluster, rank=index), | |
| "duplicate_pr": _find_duplicate_pr_by_cluster_id(context.report, cluster_id), | |
| } | |
| raise ValueError(f"Analysis cluster {cluster_id!r} was not found in the active analysis view.") | |
| def list_analysis_duplicate_prs( | |
| db_path: Path, | |
| *, | |
| repo: str | None = None, | |
| variant: str = "auto", | |
| limit: int = 50, | |
| snapshot_id: str | None = None, | |
| analysis_id: str | None = None, | |
| ) -> dict[str, Any]: | |
| context = _load_analysis_context( | |
| db_path, | |
| repo=repo, | |
| variant=variant, | |
| snapshot_id=snapshot_id, | |
| analysis_id=analysis_id, | |
| ) | |
| duplicate_prs = [ | |
| {"rank": index, **dict(entry)} | |
| for index, entry in enumerate(context.report.get("duplicate_prs", [])[:limit], start=1) | |
| ] | |
| return { | |
| **_analysis_base_payload(context), | |
| "duplicate_prs": duplicate_prs, | |
| "duplicate_pr_count": len(duplicate_prs), | |
| } | |
| def get_analysis_best( | |
| db_path: Path, | |
| *, | |
| repo: str | None = None, | |
| variant: str = "auto", | |
| snapshot_id: str | None = None, | |
| analysis_id: str | None = None, | |
| ) -> dict[str, Any]: | |
| context = _load_analysis_context( | |
| db_path, | |
| repo=repo, | |
| variant=variant, | |
| snapshot_id=snapshot_id, | |
| analysis_id=analysis_id, | |
| ) | |
| return { | |
| **_analysis_base_payload(context), | |
| "best_issue": _best_entry_with_cluster_id( | |
| context.report, | |
| context.report.get("best_issue"), | |
| number_key="issue_number", | |
| numbers_key="issue_numbers", | |
| ), | |
| "best_pr": _best_entry_with_cluster_id( | |
| context.report, | |
| context.report.get("best_pr"), | |
| number_key="pr_number", | |
| numbers_key="pr_numbers", | |
| ), | |
| } | |
| def _resolve_active_snapshot_context( | |
| db_path: Path, | |
| *, | |
| repo: str | None, | |
| ) -> ActiveSnapshotContext: | |
| connection = connect_pr_search_db(db_path, read_only=True) | |
| try: | |
| active_run = resolve_active_run(connection, repo=repo) | |
| finally: | |
| connection.close() | |
| return ActiveSnapshotContext( | |
| active_run={str(key): value for key, value in active_run.items()}, | |
| snapshot_dir=Path(str(active_run["snapshot_dir"])).resolve(), | |
| ) | |
| def _load_analysis_context( | |
| db_path: Path, | |
| *, | |
| repo: str | None, | |
| variant: str, | |
| snapshot_id: str | None, | |
| analysis_id: str | None, | |
| ) -> AnalysisContext: | |
| active = _resolve_active_snapshot_context(db_path, repo=repo) | |
| report_path, variant_used, report_source, resolved_analysis_id = _resolve_analysis_report_path( | |
| active.snapshot_dir, | |
| variant, | |
| snapshot_id=snapshot_id, | |
| analysis_id=analysis_id, | |
| required=True, | |
| ) | |
| assert report_path is not None | |
| assert variant_used is not None | |
| assert report_source is not None | |
| return AnalysisContext( | |
| active_run=active.active_run, | |
| report=_load_report(report_path), | |
| report_path=report_path, | |
| report_source=report_source, | |
| variant_requested=_normalize_analysis_variant(variant), | |
| variant_used=variant_used, | |
| analysis_id=resolved_analysis_id, | |
| ) | |
| def _resolve_analysis_report_path( | |
| snapshot_dir: Path, | |
| variant: str, | |
| *, | |
| snapshot_id: str | None, | |
| analysis_id: str | None, | |
| required: bool, | |
| ) -> tuple[Path | None, str | None, str | None, str | None]: | |
| normalized = _normalize_analysis_variant(variant) | |
| if (snapshot_id is None) != (analysis_id is None): | |
| raise ValueError("snapshot_id and analysis_id must be provided together.") | |
| if snapshot_id is not None and analysis_id is not None: | |
| selection = _resolve_archived_analysis_report_path( | |
| snapshot_dir, | |
| snapshot_id=snapshot_id, | |
| analysis_id=analysis_id, | |
| variant=normalized, | |
| ) | |
| if selection is not None: | |
| return (*selection, analysis_id) | |
| if not required: | |
| return None, None, None, None | |
| raise ValueError( | |
| f"Published analysis run {analysis_id!r} for snapshot {snapshot_id!r} was not found." | |
| ) | |
| current_manifest_path = repo_relative_path_to_local( | |
| snapshot_dir, CURRENT_ANALYSIS_MANIFEST_PATH | |
| ) | |
| if normalized == "deterministic": | |
| selection = _resolve_snapshot_local_report_path(snapshot_dir, variant=normalized) | |
| if selection is not None: | |
| return (*selection, None) | |
| if current_manifest_path.exists(): | |
| report_path, variant_used = _resolve_manifest_report_path( | |
| snapshot_dir, | |
| load_current_analysis_manifest(current_manifest_path), | |
| variant=normalized, | |
| manifest_kind="current", | |
| ) | |
| return ( | |
| report_path, | |
| variant_used, | |
| "current", | |
| str(load_current_analysis_manifest(current_manifest_path)["analysis_id"]), | |
| ) | |
| selection = _resolve_snapshot_local_report_path(snapshot_dir, variant=normalized) | |
| if selection is not None: | |
| return (*selection, None) | |
| if not required: | |
| return None, None, None, None | |
| raise ValueError( | |
| "No analysis report was found for the current analysis view or active snapshot." | |
| ) | |
| def _resolve_archived_analysis_report_path( | |
| snapshot_dir: Path, | |
| *, | |
| snapshot_id: str, | |
| analysis_id: str, | |
| variant: str, | |
| ) -> tuple[Path, str, str] | None: | |
| manifest_path = repo_relative_path_to_local( | |
| snapshot_dir, | |
| analysis_run_manifest_path(snapshot_id, analysis_id), | |
| ) | |
| if not manifest_path.exists(): | |
| return None | |
| report_path, variant_used = _resolve_manifest_report_path( | |
| snapshot_dir, | |
| load_archived_analysis_run_manifest(manifest_path), | |
| variant=variant, | |
| manifest_kind="archived", | |
| ) | |
| return report_path, variant_used, "archived" | |
| def _resolve_manifest_report_path( | |
| snapshot_dir: Path, | |
| manifest: dict[str, Any], | |
| *, | |
| variant: str, | |
| manifest_kind: str, | |
| ) -> tuple[Path, str]: | |
| artifact_key = _artifact_key_for_variant(variant, manifest_kind=manifest_kind) | |
| artifacts = manifest.get("artifacts") or {} | |
| artifact_path = artifacts.get(artifact_key) | |
| if not isinstance(artifact_path, str) or not artifact_path: | |
| message = ( | |
| f"Published {manifest_kind} analysis manifest does not provide the {variant} artifact." | |
| if variant != "auto" | |
| else f"Published {manifest_kind} analysis manifest does not provide the canonical hybrid artifact." | |
| ) | |
| raise ValueError(message) | |
| report_path = repo_relative_path_to_local(snapshot_dir, artifact_path) | |
| if not report_path.exists(): | |
| raise ValueError( | |
| f"Published {manifest_kind} analysis artifact {artifact_path!r} is missing from the materialized snapshot." | |
| ) | |
| variant_used = "hybrid" if artifact_key == "hybrid" else variant | |
| return report_path, variant_used | |
| def _artifact_key_for_variant(variant: str, *, manifest_kind: str) -> str: | |
| if variant == "auto": | |
| return "hybrid" | |
| if variant == "hybrid": | |
| return "hybrid" | |
| raise ValueError( | |
| f"Published {manifest_kind} analysis only serves canonical hybrid artifacts; requested {variant!r}." | |
| ) | |
| def _resolve_snapshot_local_report_path( | |
| snapshot_dir: Path, | |
| *, | |
| variant: str, | |
| ) -> tuple[Path, str, str] | None: | |
| if variant == "auto": | |
| hybrid_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"] | |
| if hybrid_path.exists(): | |
| return hybrid_path, "hybrid", "snapshot" | |
| deterministic_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT["deterministic"] | |
| if deterministic_path.exists(): | |
| return deterministic_path, "deterministic", "snapshot" | |
| return None | |
| report_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT[variant] | |
| if not report_path.exists(): | |
| return None | |
| return report_path, variant, "snapshot" | |
| def _normalize_analysis_variant(variant: str) -> str: | |
| normalized = variant.strip().lower() | |
| if normalized not in ANALYSIS_VARIANTS: | |
| raise ValueError( | |
| f"Unsupported analysis variant {variant!r}; expected auto, hybrid, or deterministic." | |
| ) | |
| return normalized | |
| def _analysis_base_payload(context: AnalysisContext) -> dict[str, Any]: | |
| active_snapshot_id = str(context.active_run["snapshot_id"]) | |
| snapshot_id = str(context.report.get("snapshot_id") or active_snapshot_id) | |
| payload = { | |
| "repo": str(context.active_run["repo"]), | |
| "snapshot_id": snapshot_id, | |
| "active_snapshot_id": active_snapshot_id, | |
| "run_id": str(context.active_run["id"]), | |
| "variant_requested": context.variant_requested, | |
| "variant_used": context.variant_used, | |
| "analysis_source": context.report_source, | |
| "llm_enrichment": bool(context.report.get("llm_enrichment")), | |
| "generated_at": context.report.get("generated_at"), | |
| } | |
| if context.analysis_id is not None: | |
| payload["analysis_id"] = context.analysis_id | |
| return payload | |
| def _analysis_counts(report: dict[str, Any]) -> dict[str, int]: | |
| return { | |
| "meta_bugs": len(report.get("meta_bugs") or []), | |
| "duplicate_issues": len(report.get("duplicate_issues") or []), | |
| "duplicate_prs": len(report.get("duplicate_prs") or []), | |
| } | |
| def _meta_bug_payload(cluster: dict[str, Any], *, rank: int | None = None) -> dict[str, Any]: | |
| payload = dict(cluster) | |
| if rank is not None: | |
| payload["rank"] = rank | |
| return payload | |
| def _find_meta_bug_for_pr( | |
| report: dict[str, Any], | |
| pr_number: int, | |
| ) -> tuple[dict[str, Any] | None, int | None]: | |
| for index, cluster in enumerate(report.get("meta_bugs", []), start=1): | |
| pr_numbers = {int(number) for number in cluster.get("pr_numbers", [])} | |
| if pr_number in pr_numbers: | |
| return dict(cluster), index | |
| return None, None | |
| def _find_duplicate_pr_for_pr(report: dict[str, Any], pr_number: int) -> dict[str, Any] | None: | |
| for entry in report.get("duplicate_prs", []): | |
| numbers = { | |
| int(entry["canonical_pr_number"]), | |
| *(int(number) for number in entry.get("duplicate_pr_numbers", [])), | |
| } | |
| if pr_number in numbers: | |
| return dict(entry) | |
| return None | |
| def _find_duplicate_pr_by_cluster_id( | |
| report: dict[str, Any], | |
| cluster_id: str, | |
| ) -> dict[str, Any] | None: | |
| for entry in report.get("duplicate_prs", []): | |
| if str(entry.get("cluster_id")) == cluster_id: | |
| return dict(entry) | |
| return None | |
| def _best_entry_with_cluster_id( | |
| report: dict[str, Any], | |
| entry: Any, | |
| *, | |
| number_key: str, | |
| numbers_key: str, | |
| ) -> dict[str, Any] | None: | |
| if not isinstance(entry, dict): | |
| return None | |
| number = entry.get(number_key) | |
| if number is None: | |
| return dict(entry) | |
| for cluster in report.get("meta_bugs", []): | |
| numbers = {int(value) for value in cluster.get(numbers_key, [])} | |
| if int(number) in numbers: | |
| return {"cluster_id": cluster.get("cluster_id"), **dict(entry)} | |
| return dict(entry) | |
| def _load_report(path: Path) -> dict[str, Any]: | |
| payload = read_json(path) | |
| if not isinstance(payload, dict): | |
| raise ValueError(f"Analysis report at {path} must contain a JSON object.") | |
| return {str(key): value for key, value in payload.items()} | |