from __future__ import annotations import json import os from pathlib import Path from typing import Any from pydantic import BaseModel from slop_farmer.config import AnalysisOptions from slop_farmer.data.parquet_io import read_json from slop_farmer.reports.analysis import LLM_PROVIDER_ENV_VARS, run_analysis from slop_farmer.reports.canonical_duplicate_pr import ( SnapshotBundle, load_snapshot_bundle, select_ranked_duplicate_pr_cluster, select_ranked_duplicate_pr_clusters, ) DEFAULT_DUPLICATE_PR_MODEL = "gpt-5.4-mini?service_tier=flex" HYBRID_REPORT_FILENAME = "analysis-report-hybrid.json" class DuplicatePrClusterMergeabilityResponse(BaseModel): accept: bool confidence: float reason: str def ensure_hybrid_report( *, report_path: Path | None, snapshot_dir: Path | None, model: str = DEFAULT_DUPLICATE_PR_MODEL, ) -> Path: resolved_report, resolved_snapshot_dir = _resolve_duplicate_pr_inputs( report_path=report_path, snapshot_dir=snapshot_dir, ) if resolved_report is not None and _report_has_llm_enrichment(resolved_report): return resolved_report cached_hybrid_report = resolved_snapshot_dir / HYBRID_REPORT_FILENAME if cached_hybrid_report.exists() and _report_has_llm_enrichment(cached_hybrid_report): return cached_hybrid_report.resolve() assert_hybrid_analysis_prerequisites() output_path = cached_hybrid_report.resolve() generated_report = run_analysis( AnalysisOptions( snapshot_dir=resolved_snapshot_dir, output_dir=resolved_snapshot_dir.parent, output=output_path, hf_repo_id=None, hf_revision=None, hf_materialize_dir=None, ranking_backend="hybrid", model=model, max_clusters=10, ) ).resolve() if not _report_has_llm_enrichment(generated_report): raise RuntimeError( f"Hybrid analysis for {resolved_snapshot_dir} completed without LLM enrichment. " "Install the optional fast-agent dependency, configure a provider API key, and retry." ) return generated_report def assert_hybrid_analysis_prerequisites() -> None: problems: list[str] = [] try: import fast_agent # noqa: F401 except Exception: problems.append( "Install `slop-farmer[llm]` or `fast-agent-mcp` so hybrid duplicate-PR gating can run." ) if not any(bool(os.environ.get(name)) for name in LLM_PROVIDER_ENV_VARS): problems.append( "Set one of OPENAI_API_KEY, ANTHROPIC_API_KEY, GOOGLE_API_KEY, or DEEPSEEK_API_KEY." ) if problems: raise RuntimeError( "Hybrid duplicate-PR analysis prerequisites are missing. " + " ".join(problems) ) def load_duplicate_pr_bundle( *, report_path: Path | None, snapshot_dir: Path | None, model: str = DEFAULT_DUPLICATE_PR_MODEL, ) -> SnapshotBundle: hybrid_report_path = ensure_hybrid_report( report_path=report_path, snapshot_dir=snapshot_dir, model=model, ) return load_snapshot_bundle(hybrid_report_path) def list_mergeable_duplicate_pr_clusters( *, report_path: Path | None, snapshot_dir: Path | None, limit: int | None, model: str = DEFAULT_DUPLICATE_PR_MODEL, ) -> list[dict[str, Any]]: if limit is not None and limit < 1: raise ValueError("--limit must be at least 1") bundle = load_duplicate_pr_bundle( report_path=report_path, snapshot_dir=snapshot_dir, model=model, ) assert_hybrid_analysis_prerequisites() mergeable_clusters: list[dict[str, Any]] = [] for candidate in select_ranked_duplicate_pr_clusters(bundle): gate_result = assess_duplicate_pr_cluster_mergeability(bundle, candidate, model=model) if not gate_result.accept: continue mergeable_clusters.append( { **candidate, "repo": bundle.repo, "snapshot_id": bundle.snapshot_id, "report_path": str(bundle.report_path), "mergeability_confidence": round(float(gate_result.confidence), 3), "mergeability_reason": gate_result.reason, } ) if limit is not None and len(mergeable_clusters) >= limit: break return mergeable_clusters def select_mergeable_duplicate_pr_cluster( bundle: SnapshotBundle, *, cluster_id: str | None, model: str = DEFAULT_DUPLICATE_PR_MODEL, ) -> dict[str, Any]: assert_hybrid_analysis_prerequisites() if cluster_id is not None: candidate = select_ranked_duplicate_pr_cluster(bundle, cluster_id=cluster_id) gate_result = assess_duplicate_pr_cluster_mergeability(bundle, candidate, model=model) if not gate_result.accept: raise ValueError( f"Cluster {cluster_id} did not pass the mergeability gate: {gate_result.reason}" ) return { **candidate, "mergeability_confidence": round(float(gate_result.confidence), 3), "mergeability_reason": gate_result.reason, } for candidate in select_ranked_duplicate_pr_clusters(bundle): gate_result = assess_duplicate_pr_cluster_mergeability(bundle, candidate, model=model) if gate_result.accept: return { **candidate, "mergeability_confidence": round(float(gate_result.confidence), 3), "mergeability_reason": gate_result.reason, } raise ValueError("No duplicate PR cluster passed the mergeability gate.") def assess_duplicate_pr_cluster_mergeability( bundle: SnapshotBundle, candidate: dict[str, Any], *, model: str = DEFAULT_DUPLICATE_PR_MODEL, ) -> DuplicatePrClusterMergeabilityResponse: packet = _duplicate_pr_cluster_packet(bundle, candidate) result = _run_duplicate_pr_cluster_gate(packet, model=model) if result is None: raise RuntimeError("Hybrid duplicate-PR mergeability gate failed to return a result.") return result def _resolve_duplicate_pr_inputs( *, report_path: Path | None, snapshot_dir: Path | None, ) -> tuple[Path | None, Path]: if (report_path is None) == (snapshot_dir is None): raise ValueError("Provide exactly one of --report or --snapshot-dir.") if report_path is not None: resolved_report = report_path.resolve() return resolved_report, resolved_report.parent.resolve() assert snapshot_dir is not None return None, snapshot_dir.resolve() def _report_has_llm_enrichment(report_path: Path) -> bool: if not report_path.exists(): return False try: payload = read_json(report_path) except Exception: return False return bool(payload.get("llm_enrichment")) def _duplicate_pr_cluster_packet( bundle: SnapshotBundle, candidate: dict[str, Any] ) -> dict[str, Any]: pr_rows = { int(row["number"]): row for row in bundle.pull_requests if row.get("number") is not None } issue_rows = {int(row["number"]): row for row in bundle.issues if row.get("number") is not None} pull_request_packets: list[dict[str, Any]] = [] for pr_number in candidate["source_pr_numbers"]: pull_request = pr_rows.get(int(pr_number)) if pull_request is None: continue files = [ row for row in bundle.pr_files if _coerce_int(row.get("pull_request_number")) == int(pr_number) ] diff_row = next( ( row for row in bundle.pr_diffs if _coerce_int(row.get("pull_request_number")) == int(pr_number) ), None, ) comments = [ row for row in bundle.comments if row.get("parent_kind") == "pull_request" and _coerce_int(row.get("parent_number")) == int(pr_number) ] reviews = [ row for row in bundle.reviews if _coerce_int(row.get("pull_request_number")) == int(pr_number) ] review_comments = [ row for row in bundle.review_comments if _coerce_int(row.get("pull_request_number")) == int(pr_number) ] pull_request_packets.append( { "number": int(pr_number), "title": pull_request.get("title"), "body_excerpt": _excerpt(pull_request.get("body"), 600), "filenames": sorted( {str(row.get("filename")) for row in files if row.get("filename")} )[:20], "diff_preview": _excerpt((diff_row or {}).get("diff"), 900), "discussion_comments": [ _excerpt(row.get("body"), 180) for row in comments[:2] if row.get("body") ], "reviews": [ { "state": row.get("state"), "body_excerpt": _excerpt(row.get("body"), 180), } for row in reviews[:2] ], "review_comments": [ { "path": row.get("path"), "body_excerpt": _excerpt(row.get("body"), 180), } for row in review_comments[:2] ], } ) target_issue_packet: dict[str, Any] | None = None target_issue_number = _coerce_int(candidate.get("target_issue_number")) if target_issue_number is not None and target_issue_number in issue_rows: issue = issue_rows[target_issue_number] issue_comments = [ row for row in bundle.comments if row.get("parent_kind") == "issue" and _coerce_int(row.get("parent_number")) == target_issue_number ] target_issue_packet = { "number": target_issue_number, "title": issue.get("title"), "body_excerpt": _excerpt(issue.get("body"), 500), "comments": [ _excerpt(row.get("body"), 180) for row in issue_comments[:2] if row.get("body") ], } return { "repo": bundle.repo, "snapshot_id": bundle.snapshot_id, "cluster_id": candidate["cluster_id"], "summary": candidate.get("summary"), "canonical_issue_number": _coerce_int(candidate.get("canonical_issue_number")), "canonical_pr_number": _coerce_int(candidate.get("canonical_pr_number")), "target_issue": target_issue_packet, "source_pr_numbers": candidate["source_pr_numbers"], "pull_requests": pull_request_packets, } def _run_duplicate_pr_cluster_gate( packet: dict[str, Any], *, model: str, ) -> DuplicatePrClusterMergeabilityResponse | None: try: from fast_agent import FastAgent except Exception: return None instruction = ( "You decide whether a cluster of open GitHub pull requests should be synthesized into one " "canonical pull request. Accept only when the PRs appear to implement the same concrete " "code-path fix and one small patch could replace them. Reject when the root cause, scope, " "or implementation strategy diverges, or when the overlap is only docs/tests/chatter." ) fast = FastAgent("slop-farmer-duplicate-pr-mergeability") @fast.agent(name="mergeability_gate", instruction=instruction, model=model, use_history=False) async def mergeability_gate_stub() -> None: return None prompt = json.dumps(packet, indent=2, sort_keys=True) try: import asyncio async def _run() -> DuplicatePrClusterMergeabilityResponse | None: async with fast.run() as agent: result, _ = await agent.mergeability_gate.structured( prompt, DuplicatePrClusterMergeabilityResponse, ) return result return asyncio.run(_run()) except Exception: return None def _excerpt(value: Any, limit: int) -> str | None: text = str(value or "").strip() if not text: return None if len(text) <= limit: return text return text[: limit - 1].rstrip() + "…" def _coerce_int(value: Any) -> int | None: if value is None: return None try: return int(value) except (TypeError, ValueError): return None