diffusers-pr-api / src /slop_farmer /reports /duplicate_prs.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any
from pydantic import BaseModel
from slop_farmer.config import AnalysisOptions
from slop_farmer.data.parquet_io import read_json
from slop_farmer.reports.analysis import LLM_PROVIDER_ENV_VARS, run_analysis
from slop_farmer.reports.canonical_duplicate_pr import (
SnapshotBundle,
load_snapshot_bundle,
select_ranked_duplicate_pr_cluster,
select_ranked_duplicate_pr_clusters,
)
DEFAULT_DUPLICATE_PR_MODEL = "gpt-5.4-mini?service_tier=flex"
HYBRID_REPORT_FILENAME = "analysis-report-hybrid.json"
class DuplicatePrClusterMergeabilityResponse(BaseModel):
accept: bool
confidence: float
reason: str
def ensure_hybrid_report(
*,
report_path: Path | None,
snapshot_dir: Path | None,
model: str = DEFAULT_DUPLICATE_PR_MODEL,
) -> Path:
resolved_report, resolved_snapshot_dir = _resolve_duplicate_pr_inputs(
report_path=report_path,
snapshot_dir=snapshot_dir,
)
if resolved_report is not None and _report_has_llm_enrichment(resolved_report):
return resolved_report
cached_hybrid_report = resolved_snapshot_dir / HYBRID_REPORT_FILENAME
if cached_hybrid_report.exists() and _report_has_llm_enrichment(cached_hybrid_report):
return cached_hybrid_report.resolve()
assert_hybrid_analysis_prerequisites()
output_path = cached_hybrid_report.resolve()
generated_report = run_analysis(
AnalysisOptions(
snapshot_dir=resolved_snapshot_dir,
output_dir=resolved_snapshot_dir.parent,
output=output_path,
hf_repo_id=None,
hf_revision=None,
hf_materialize_dir=None,
ranking_backend="hybrid",
model=model,
max_clusters=10,
)
).resolve()
if not _report_has_llm_enrichment(generated_report):
raise RuntimeError(
f"Hybrid analysis for {resolved_snapshot_dir} completed without LLM enrichment. "
"Install the optional fast-agent dependency, configure a provider API key, and retry."
)
return generated_report
def assert_hybrid_analysis_prerequisites() -> None:
problems: list[str] = []
try:
import fast_agent # noqa: F401
except Exception:
problems.append(
"Install `slop-farmer[llm]` or `fast-agent-mcp` so hybrid duplicate-PR gating can run."
)
if not any(bool(os.environ.get(name)) for name in LLM_PROVIDER_ENV_VARS):
problems.append(
"Set one of OPENAI_API_KEY, ANTHROPIC_API_KEY, GOOGLE_API_KEY, or DEEPSEEK_API_KEY."
)
if problems:
raise RuntimeError(
"Hybrid duplicate-PR analysis prerequisites are missing. " + " ".join(problems)
)
def load_duplicate_pr_bundle(
*,
report_path: Path | None,
snapshot_dir: Path | None,
model: str = DEFAULT_DUPLICATE_PR_MODEL,
) -> SnapshotBundle:
hybrid_report_path = ensure_hybrid_report(
report_path=report_path,
snapshot_dir=snapshot_dir,
model=model,
)
return load_snapshot_bundle(hybrid_report_path)
def list_mergeable_duplicate_pr_clusters(
*,
report_path: Path | None,
snapshot_dir: Path | None,
limit: int | None,
model: str = DEFAULT_DUPLICATE_PR_MODEL,
) -> list[dict[str, Any]]:
if limit is not None and limit < 1:
raise ValueError("--limit must be at least 1")
bundle = load_duplicate_pr_bundle(
report_path=report_path,
snapshot_dir=snapshot_dir,
model=model,
)
assert_hybrid_analysis_prerequisites()
mergeable_clusters: list[dict[str, Any]] = []
for candidate in select_ranked_duplicate_pr_clusters(bundle):
gate_result = assess_duplicate_pr_cluster_mergeability(bundle, candidate, model=model)
if not gate_result.accept:
continue
mergeable_clusters.append(
{
**candidate,
"repo": bundle.repo,
"snapshot_id": bundle.snapshot_id,
"report_path": str(bundle.report_path),
"mergeability_confidence": round(float(gate_result.confidence), 3),
"mergeability_reason": gate_result.reason,
}
)
if limit is not None and len(mergeable_clusters) >= limit:
break
return mergeable_clusters
def select_mergeable_duplicate_pr_cluster(
bundle: SnapshotBundle,
*,
cluster_id: str | None,
model: str = DEFAULT_DUPLICATE_PR_MODEL,
) -> dict[str, Any]:
assert_hybrid_analysis_prerequisites()
if cluster_id is not None:
candidate = select_ranked_duplicate_pr_cluster(bundle, cluster_id=cluster_id)
gate_result = assess_duplicate_pr_cluster_mergeability(bundle, candidate, model=model)
if not gate_result.accept:
raise ValueError(
f"Cluster {cluster_id} did not pass the mergeability gate: {gate_result.reason}"
)
return {
**candidate,
"mergeability_confidence": round(float(gate_result.confidence), 3),
"mergeability_reason": gate_result.reason,
}
for candidate in select_ranked_duplicate_pr_clusters(bundle):
gate_result = assess_duplicate_pr_cluster_mergeability(bundle, candidate, model=model)
if gate_result.accept:
return {
**candidate,
"mergeability_confidence": round(float(gate_result.confidence), 3),
"mergeability_reason": gate_result.reason,
}
raise ValueError("No duplicate PR cluster passed the mergeability gate.")
def assess_duplicate_pr_cluster_mergeability(
bundle: SnapshotBundle,
candidate: dict[str, Any],
*,
model: str = DEFAULT_DUPLICATE_PR_MODEL,
) -> DuplicatePrClusterMergeabilityResponse:
packet = _duplicate_pr_cluster_packet(bundle, candidate)
result = _run_duplicate_pr_cluster_gate(packet, model=model)
if result is None:
raise RuntimeError("Hybrid duplicate-PR mergeability gate failed to return a result.")
return result
def _resolve_duplicate_pr_inputs(
*,
report_path: Path | None,
snapshot_dir: Path | None,
) -> tuple[Path | None, Path]:
if (report_path is None) == (snapshot_dir is None):
raise ValueError("Provide exactly one of --report or --snapshot-dir.")
if report_path is not None:
resolved_report = report_path.resolve()
return resolved_report, resolved_report.parent.resolve()
assert snapshot_dir is not None
return None, snapshot_dir.resolve()
def _report_has_llm_enrichment(report_path: Path) -> bool:
if not report_path.exists():
return False
try:
payload = read_json(report_path)
except Exception:
return False
return bool(payload.get("llm_enrichment"))
def _duplicate_pr_cluster_packet(
bundle: SnapshotBundle, candidate: dict[str, Any]
) -> dict[str, Any]:
pr_rows = {
int(row["number"]): row for row in bundle.pull_requests if row.get("number") is not None
}
issue_rows = {int(row["number"]): row for row in bundle.issues if row.get("number") is not None}
pull_request_packets: list[dict[str, Any]] = []
for pr_number in candidate["source_pr_numbers"]:
pull_request = pr_rows.get(int(pr_number))
if pull_request is None:
continue
files = [
row
for row in bundle.pr_files
if _coerce_int(row.get("pull_request_number")) == int(pr_number)
]
diff_row = next(
(
row
for row in bundle.pr_diffs
if _coerce_int(row.get("pull_request_number")) == int(pr_number)
),
None,
)
comments = [
row
for row in bundle.comments
if row.get("parent_kind") == "pull_request"
and _coerce_int(row.get("parent_number")) == int(pr_number)
]
reviews = [
row
for row in bundle.reviews
if _coerce_int(row.get("pull_request_number")) == int(pr_number)
]
review_comments = [
row
for row in bundle.review_comments
if _coerce_int(row.get("pull_request_number")) == int(pr_number)
]
pull_request_packets.append(
{
"number": int(pr_number),
"title": pull_request.get("title"),
"body_excerpt": _excerpt(pull_request.get("body"), 600),
"filenames": sorted(
{str(row.get("filename")) for row in files if row.get("filename")}
)[:20],
"diff_preview": _excerpt((diff_row or {}).get("diff"), 900),
"discussion_comments": [
_excerpt(row.get("body"), 180) for row in comments[:2] if row.get("body")
],
"reviews": [
{
"state": row.get("state"),
"body_excerpt": _excerpt(row.get("body"), 180),
}
for row in reviews[:2]
],
"review_comments": [
{
"path": row.get("path"),
"body_excerpt": _excerpt(row.get("body"), 180),
}
for row in review_comments[:2]
],
}
)
target_issue_packet: dict[str, Any] | None = None
target_issue_number = _coerce_int(candidate.get("target_issue_number"))
if target_issue_number is not None and target_issue_number in issue_rows:
issue = issue_rows[target_issue_number]
issue_comments = [
row
for row in bundle.comments
if row.get("parent_kind") == "issue"
and _coerce_int(row.get("parent_number")) == target_issue_number
]
target_issue_packet = {
"number": target_issue_number,
"title": issue.get("title"),
"body_excerpt": _excerpt(issue.get("body"), 500),
"comments": [
_excerpt(row.get("body"), 180) for row in issue_comments[:2] if row.get("body")
],
}
return {
"repo": bundle.repo,
"snapshot_id": bundle.snapshot_id,
"cluster_id": candidate["cluster_id"],
"summary": candidate.get("summary"),
"canonical_issue_number": _coerce_int(candidate.get("canonical_issue_number")),
"canonical_pr_number": _coerce_int(candidate.get("canonical_pr_number")),
"target_issue": target_issue_packet,
"source_pr_numbers": candidate["source_pr_numbers"],
"pull_requests": pull_request_packets,
}
def _run_duplicate_pr_cluster_gate(
packet: dict[str, Any],
*,
model: str,
) -> DuplicatePrClusterMergeabilityResponse | None:
try:
from fast_agent import FastAgent
except Exception:
return None
instruction = (
"You decide whether a cluster of open GitHub pull requests should be synthesized into one "
"canonical pull request. Accept only when the PRs appear to implement the same concrete "
"code-path fix and one small patch could replace them. Reject when the root cause, scope, "
"or implementation strategy diverges, or when the overlap is only docs/tests/chatter."
)
fast = FastAgent("slop-farmer-duplicate-pr-mergeability")
@fast.agent(name="mergeability_gate", instruction=instruction, model=model, use_history=False)
async def mergeability_gate_stub() -> None:
return None
prompt = json.dumps(packet, indent=2, sort_keys=True)
try:
import asyncio
async def _run() -> DuplicatePrClusterMergeabilityResponse | None:
async with fast.run() as agent:
result, _ = await agent.mergeability_gate.structured(
prompt,
DuplicatePrClusterMergeabilityResponse,
)
return result
return asyncio.run(_run())
except Exception:
return None
def _excerpt(value: Any, limit: int) -> str | None:
text = str(value or "").strip()
if not text:
return None
if len(text) <= limit:
return text
return text[: limit - 1].rstrip() + "…"
def _coerce_int(value: Any) -> int | None:
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
return None