diffusers-pr-api / src /slop_farmer /reports /analysis_service.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from slop_farmer.data.parquet_io import read_json
from slop_farmer.data.search_duckdb import connect_pr_search_db, resolve_active_run
from slop_farmer.data.snapshot_paths import (
ANALYSIS_REPORT_FILENAME_BY_VARIANT,
CURRENT_ANALYSIS_MANIFEST_PATH,
analysis_run_manifest_path,
load_archived_analysis_run_manifest,
load_current_analysis_manifest,
repo_relative_path_to_local,
)
ANALYSIS_VARIANTS = {"auto", "deterministic", "hybrid"}
@dataclass(frozen=True, slots=True)
class ActiveSnapshotContext:
active_run: dict[str, Any]
snapshot_dir: Path
@dataclass(frozen=True, slots=True)
class AnalysisContext:
active_run: dict[str, Any]
report: dict[str, Any]
report_path: Path
report_source: str
variant_requested: str
variant_used: str
analysis_id: str | None
def get_analysis_status(
db_path: Path,
*,
repo: str | None = None,
variant: str = "auto",
snapshot_id: str | None = None,
analysis_id: str | None = None,
) -> dict[str, Any]:
active = _resolve_active_snapshot_context(db_path, repo=repo)
report_path, variant_used, report_source, resolved_analysis_id = _resolve_analysis_report_path(
active.snapshot_dir,
variant,
snapshot_id=snapshot_id,
analysis_id=analysis_id,
required=False,
)
payload = {
"repo": str(active.active_run["repo"]),
"active_snapshot_id": str(active.active_run["snapshot_id"]),
"run_id": str(active.active_run["id"]),
"variant_requested": _normalize_analysis_variant(variant),
"available": report_path is not None,
}
if report_path is None or variant_used is None or report_source is None:
return payload
report = _load_report(report_path)
status = {
**payload,
"snapshot_id": str(report.get("snapshot_id") or active.active_run["snapshot_id"]),
"variant_used": variant_used,
"analysis_source": report_source,
"llm_enrichment": bool(report.get("llm_enrichment")),
"generated_at": report.get("generated_at"),
"counts": _analysis_counts(report),
}
if resolved_analysis_id is not None:
status["analysis_id"] = resolved_analysis_id
return status
def get_pr_analysis(
db_path: Path,
*,
pr_number: int,
repo: str | None = None,
variant: str = "auto",
snapshot_id: str | None = None,
analysis_id: str | None = None,
) -> dict[str, Any]:
context = _load_analysis_context(
db_path,
repo=repo,
variant=variant,
snapshot_id=snapshot_id,
analysis_id=analysis_id,
)
meta_bug, rank = _find_meta_bug_for_pr(context.report, pr_number)
duplicate_pr = _find_duplicate_pr_for_pr(context.report, pr_number)
return {
**_analysis_base_payload(context),
"pr_number": pr_number,
"found": meta_bug is not None or duplicate_pr is not None,
"meta_bug": None if meta_bug is None else _meta_bug_payload(meta_bug, rank=rank),
"duplicate_pr": duplicate_pr,
}
def list_analysis_meta_bugs(
db_path: Path,
*,
repo: str | None = None,
variant: str = "auto",
limit: int = 50,
snapshot_id: str | None = None,
analysis_id: str | None = None,
) -> dict[str, Any]:
context = _load_analysis_context(
db_path,
repo=repo,
variant=variant,
snapshot_id=snapshot_id,
analysis_id=analysis_id,
)
meta_bugs = [
_meta_bug_payload(cluster, rank=index)
for index, cluster in enumerate(context.report.get("meta_bugs", [])[:limit], start=1)
]
return {
**_analysis_base_payload(context),
"meta_bugs": meta_bugs,
"meta_bug_count": len(meta_bugs),
}
def get_analysis_meta_bug(
db_path: Path,
*,
cluster_id: str,
repo: str | None = None,
variant: str = "auto",
snapshot_id: str | None = None,
analysis_id: str | None = None,
) -> dict[str, Any]:
context = _load_analysis_context(
db_path,
repo=repo,
variant=variant,
snapshot_id=snapshot_id,
analysis_id=analysis_id,
)
for index, cluster in enumerate(context.report.get("meta_bugs", []), start=1):
if str(cluster.get("cluster_id")) != cluster_id:
continue
return {
**_analysis_base_payload(context),
"meta_bug": _meta_bug_payload(cluster, rank=index),
"duplicate_pr": _find_duplicate_pr_by_cluster_id(context.report, cluster_id),
}
raise ValueError(f"Analysis cluster {cluster_id!r} was not found in the active analysis view.")
def list_analysis_duplicate_prs(
db_path: Path,
*,
repo: str | None = None,
variant: str = "auto",
limit: int = 50,
snapshot_id: str | None = None,
analysis_id: str | None = None,
) -> dict[str, Any]:
context = _load_analysis_context(
db_path,
repo=repo,
variant=variant,
snapshot_id=snapshot_id,
analysis_id=analysis_id,
)
duplicate_prs = [
{"rank": index, **dict(entry)}
for index, entry in enumerate(context.report.get("duplicate_prs", [])[:limit], start=1)
]
return {
**_analysis_base_payload(context),
"duplicate_prs": duplicate_prs,
"duplicate_pr_count": len(duplicate_prs),
}
def get_analysis_best(
db_path: Path,
*,
repo: str | None = None,
variant: str = "auto",
snapshot_id: str | None = None,
analysis_id: str | None = None,
) -> dict[str, Any]:
context = _load_analysis_context(
db_path,
repo=repo,
variant=variant,
snapshot_id=snapshot_id,
analysis_id=analysis_id,
)
return {
**_analysis_base_payload(context),
"best_issue": _best_entry_with_cluster_id(
context.report,
context.report.get("best_issue"),
number_key="issue_number",
numbers_key="issue_numbers",
),
"best_pr": _best_entry_with_cluster_id(
context.report,
context.report.get("best_pr"),
number_key="pr_number",
numbers_key="pr_numbers",
),
}
def _resolve_active_snapshot_context(
db_path: Path,
*,
repo: str | None,
) -> ActiveSnapshotContext:
connection = connect_pr_search_db(db_path, read_only=True)
try:
active_run = resolve_active_run(connection, repo=repo)
finally:
connection.close()
return ActiveSnapshotContext(
active_run={str(key): value for key, value in active_run.items()},
snapshot_dir=Path(str(active_run["snapshot_dir"])).resolve(),
)
def _load_analysis_context(
db_path: Path,
*,
repo: str | None,
variant: str,
snapshot_id: str | None,
analysis_id: str | None,
) -> AnalysisContext:
active = _resolve_active_snapshot_context(db_path, repo=repo)
report_path, variant_used, report_source, resolved_analysis_id = _resolve_analysis_report_path(
active.snapshot_dir,
variant,
snapshot_id=snapshot_id,
analysis_id=analysis_id,
required=True,
)
assert report_path is not None
assert variant_used is not None
assert report_source is not None
return AnalysisContext(
active_run=active.active_run,
report=_load_report(report_path),
report_path=report_path,
report_source=report_source,
variant_requested=_normalize_analysis_variant(variant),
variant_used=variant_used,
analysis_id=resolved_analysis_id,
)
def _resolve_analysis_report_path(
snapshot_dir: Path,
variant: str,
*,
snapshot_id: str | None,
analysis_id: str | None,
required: bool,
) -> tuple[Path | None, str | None, str | None, str | None]:
normalized = _normalize_analysis_variant(variant)
if (snapshot_id is None) != (analysis_id is None):
raise ValueError("snapshot_id and analysis_id must be provided together.")
if snapshot_id is not None and analysis_id is not None:
selection = _resolve_archived_analysis_report_path(
snapshot_dir,
snapshot_id=snapshot_id,
analysis_id=analysis_id,
variant=normalized,
)
if selection is not None:
return (*selection, analysis_id)
if not required:
return None, None, None, None
raise ValueError(
f"Published analysis run {analysis_id!r} for snapshot {snapshot_id!r} was not found."
)
current_manifest_path = repo_relative_path_to_local(
snapshot_dir, CURRENT_ANALYSIS_MANIFEST_PATH
)
if normalized == "deterministic":
selection = _resolve_snapshot_local_report_path(snapshot_dir, variant=normalized)
if selection is not None:
return (*selection, None)
if current_manifest_path.exists():
report_path, variant_used = _resolve_manifest_report_path(
snapshot_dir,
load_current_analysis_manifest(current_manifest_path),
variant=normalized,
manifest_kind="current",
)
return (
report_path,
variant_used,
"current",
str(load_current_analysis_manifest(current_manifest_path)["analysis_id"]),
)
selection = _resolve_snapshot_local_report_path(snapshot_dir, variant=normalized)
if selection is not None:
return (*selection, None)
if not required:
return None, None, None, None
raise ValueError(
"No analysis report was found for the current analysis view or active snapshot."
)
def _resolve_archived_analysis_report_path(
snapshot_dir: Path,
*,
snapshot_id: str,
analysis_id: str,
variant: str,
) -> tuple[Path, str, str] | None:
manifest_path = repo_relative_path_to_local(
snapshot_dir,
analysis_run_manifest_path(snapshot_id, analysis_id),
)
if not manifest_path.exists():
return None
report_path, variant_used = _resolve_manifest_report_path(
snapshot_dir,
load_archived_analysis_run_manifest(manifest_path),
variant=variant,
manifest_kind="archived",
)
return report_path, variant_used, "archived"
def _resolve_manifest_report_path(
snapshot_dir: Path,
manifest: dict[str, Any],
*,
variant: str,
manifest_kind: str,
) -> tuple[Path, str]:
artifact_key = _artifact_key_for_variant(variant, manifest_kind=manifest_kind)
artifacts = manifest.get("artifacts") or {}
artifact_path = artifacts.get(artifact_key)
if not isinstance(artifact_path, str) or not artifact_path:
message = (
f"Published {manifest_kind} analysis manifest does not provide the {variant} artifact."
if variant != "auto"
else f"Published {manifest_kind} analysis manifest does not provide the canonical hybrid artifact."
)
raise ValueError(message)
report_path = repo_relative_path_to_local(snapshot_dir, artifact_path)
if not report_path.exists():
raise ValueError(
f"Published {manifest_kind} analysis artifact {artifact_path!r} is missing from the materialized snapshot."
)
variant_used = "hybrid" if artifact_key == "hybrid" else variant
return report_path, variant_used
def _artifact_key_for_variant(variant: str, *, manifest_kind: str) -> str:
if variant == "auto":
return "hybrid"
if variant == "hybrid":
return "hybrid"
raise ValueError(
f"Published {manifest_kind} analysis only serves canonical hybrid artifacts; requested {variant!r}."
)
def _resolve_snapshot_local_report_path(
snapshot_dir: Path,
*,
variant: str,
) -> tuple[Path, str, str] | None:
if variant == "auto":
hybrid_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT["hybrid"]
if hybrid_path.exists():
return hybrid_path, "hybrid", "snapshot"
deterministic_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT["deterministic"]
if deterministic_path.exists():
return deterministic_path, "deterministic", "snapshot"
return None
report_path = snapshot_dir / ANALYSIS_REPORT_FILENAME_BY_VARIANT[variant]
if not report_path.exists():
return None
return report_path, variant, "snapshot"
def _normalize_analysis_variant(variant: str) -> str:
normalized = variant.strip().lower()
if normalized not in ANALYSIS_VARIANTS:
raise ValueError(
f"Unsupported analysis variant {variant!r}; expected auto, hybrid, or deterministic."
)
return normalized
def _analysis_base_payload(context: AnalysisContext) -> dict[str, Any]:
active_snapshot_id = str(context.active_run["snapshot_id"])
snapshot_id = str(context.report.get("snapshot_id") or active_snapshot_id)
payload = {
"repo": str(context.active_run["repo"]),
"snapshot_id": snapshot_id,
"active_snapshot_id": active_snapshot_id,
"run_id": str(context.active_run["id"]),
"variant_requested": context.variant_requested,
"variant_used": context.variant_used,
"analysis_source": context.report_source,
"llm_enrichment": bool(context.report.get("llm_enrichment")),
"generated_at": context.report.get("generated_at"),
}
if context.analysis_id is not None:
payload["analysis_id"] = context.analysis_id
return payload
def _analysis_counts(report: dict[str, Any]) -> dict[str, int]:
return {
"meta_bugs": len(report.get("meta_bugs") or []),
"duplicate_issues": len(report.get("duplicate_issues") or []),
"duplicate_prs": len(report.get("duplicate_prs") or []),
}
def _meta_bug_payload(cluster: dict[str, Any], *, rank: int | None = None) -> dict[str, Any]:
payload = dict(cluster)
if rank is not None:
payload["rank"] = rank
return payload
def _find_meta_bug_for_pr(
report: dict[str, Any],
pr_number: int,
) -> tuple[dict[str, Any] | None, int | None]:
for index, cluster in enumerate(report.get("meta_bugs", []), start=1):
pr_numbers = {int(number) for number in cluster.get("pr_numbers", [])}
if pr_number in pr_numbers:
return dict(cluster), index
return None, None
def _find_duplicate_pr_for_pr(report: dict[str, Any], pr_number: int) -> dict[str, Any] | None:
for entry in report.get("duplicate_prs", []):
numbers = {
int(entry["canonical_pr_number"]),
*(int(number) for number in entry.get("duplicate_pr_numbers", [])),
}
if pr_number in numbers:
return dict(entry)
return None
def _find_duplicate_pr_by_cluster_id(
report: dict[str, Any],
cluster_id: str,
) -> dict[str, Any] | None:
for entry in report.get("duplicate_prs", []):
if str(entry.get("cluster_id")) == cluster_id:
return dict(entry)
return None
def _best_entry_with_cluster_id(
report: dict[str, Any],
entry: Any,
*,
number_key: str,
numbers_key: str,
) -> dict[str, Any] | None:
if not isinstance(entry, dict):
return None
number = entry.get(number_key)
if number is None:
return dict(entry)
for cluster in report.get("meta_bugs", []):
numbers = {int(value) for value in cluster.get(numbers_key, [])}
if int(number) in numbers:
return {"cluster_id": cluster.get("cluster_id"), **dict(entry)}
return dict(entry)
def _load_report(path: Path) -> dict[str, Any]:
payload = read_json(path)
if not isinstance(payload, dict):
raise ValueError(f"Analysis report at {path} must contain a JSON object.")
return {str(key): value for key, value in payload.items()}