Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import asyncio | |
| import copy | |
| import json | |
| import os | |
| import re | |
| import shutil | |
| import sys | |
| from collections import Counter, defaultdict | |
| from dataclasses import dataclass | |
| from datetime import UTC, datetime | |
| from pathlib import Path | |
| from typing import Any | |
| from pydantic import BaseModel, Field | |
| from rank_bm25 import BM25Okapi | |
| from slop_farmer.config import AnalysisOptions, MarkdownReportOptions | |
| from slop_farmer.data.links import build_text_link_rows | |
| from slop_farmer.data.parquet_io import read_json, read_parquet_rows, write_text | |
| from slop_farmer.data.snapshot_source import resolve_snapshot_source_dir | |
| from slop_farmer.reports.analysis_cache import ( | |
| HYBRID_REVIEW_CACHE_SCHEMA_VERSION, | |
| PREPARED_REVIEW_UNIT_SCHEMA_VERSION, | |
| HybridReviewCacheEntry, | |
| HybridReviewCacheKey, | |
| HybridReviewCacheManifest, | |
| HybridReviewCacheStore, | |
| HybridReviewSettingsFingerprint, | |
| build_hybrid_review_cache_key, | |
| hybrid_review_cache_dir, | |
| ) | |
| from slop_farmer.reports.pr_heuristics import ( | |
| build_template_cleanup_settings, | |
| compile_cluster_suppression_rules, | |
| strip_pull_request_template, | |
| suppressed_pull_request_reasons, | |
| ) | |
| LINK_KEY_FIELDS = ( | |
| "repo", | |
| "source_type", | |
| "source_number", | |
| "source_github_id", | |
| "target_owner", | |
| "target_repo", | |
| "target_number", | |
| "link_type", | |
| "link_origin", | |
| ) | |
| STOPWORDS = { | |
| "a", | |
| "an", | |
| "and", | |
| "are", | |
| "as", | |
| "at", | |
| "be", | |
| "by", | |
| "for", | |
| "from", | |
| "how", | |
| "if", | |
| "in", | |
| "into", | |
| "is", | |
| "it", | |
| "of", | |
| "on", | |
| "or", | |
| "that", | |
| "the", | |
| "this", | |
| "to", | |
| "was", | |
| "were", | |
| "with", | |
| } | |
| TOKEN_PATTERN = re.compile(r"[a-z0-9_]+") | |
| HUNK_HEADER_PATTERN = re.compile(r"^@@ -\d+(?:,\d+)? \+(?P<start>\d+)(?:,(?P<count>\d+))? @@") | |
| LLM_PROVIDER_ENV_VARS = ( | |
| "OPENAI_API_KEY", | |
| "ANTHROPIC_API_KEY", | |
| "GOOGLE_API_KEY", | |
| "DEEPSEEK_API_KEY", | |
| ) | |
| LLM_PACKET_CHARS_PER_TOKEN = 4 | |
| LLM_MAX_INPUT_TOKENS = 60_000 | |
| LLM_MAX_NODES_PER_PACKET = 48 | |
| LLM_MAX_SOFT_PAIRS_PER_PACKET = 72 | |
| LLM_MAX_DIFF_CHARS_PER_ITEM = 1_200 | |
| LLM_MAX_FILENAMES_PER_ITEM = 16 | |
| LLM_SKIP_EVALUATOR_ABOVE_TOKENS = 60_000 | |
| LLM_OVERFLOW_POLICY = "truncate_then_skip" | |
| LLM_SHARED_TARGET_MAX_NEIGHBORS_PER_PR = 3 | |
| LLM_SHARED_TARGET_MAX_EXTRA_PAIRS_PER_TARGET = 18 | |
| LLM_SHARED_TARGET_MIN_TEXT_JACCARD = 0.1 | |
| CLUSTER_ANALYST_PROMPT_VERSION = "1.0" | |
| CLUSTER_EVALUATOR_PROMPT_VERSION = "1.0" | |
| CLUSTER_ANALYST_INSTRUCTION = ( | |
| "You analyze clustered GitHub issues and pull requests for duplicate triage. " | |
| "Return a short summary, confidence between 0 and 1, concise reasons for canonical issue/PR choices, " | |
| "concise reasons for global best issue/PR suitability, and accept/reject verdicts for each soft edge candidate. " | |
| "Only accept a soft edge when the two artifacts look like the same underlying bug or change. " | |
| "Use titles, descriptions, explicit issue targets, changed filenames, and diff previews when available. " | |
| "For pull requests, be strict: accept only when the PRs appear to fix the same concrete code-path problem and could plausibly be merged into one PR. " | |
| "Do not merge PRs just because they mention the same tracking issue, touch the same broad subsystem, or both change documentation/tests." | |
| ) | |
| CLUSTER_EVALUATOR_INSTRUCTION = ( | |
| "You review the analyst output for precision. Accept only when the summary is grounded in the packet " | |
| "and every soft-edge verdict is conservative. Reject if the analyst overstates evidence. " | |
| "For pull-request pairs, reject if the two changes do not look mergeable into a single PR for the same bugfix." | |
| ) | |
| class SoftEdgeVerdict(BaseModel): | |
| left: str | |
| right: str | |
| accept: bool | |
| reason: str | |
| class ClusterAnalystResponse(BaseModel): | |
| summary: str | |
| confidence: float | |
| canonical_issue_reason: str | None = None | |
| canonical_pr_reason: str | None = None | |
| best_issue_reason: str | None = None | |
| best_pr_reason: str | None = None | |
| soft_edge_verdicts: list[SoftEdgeVerdict] = Field(default_factory=list) | |
| class ClusterEvaluatorResponse(BaseModel): | |
| accept: bool | |
| feedback: str = "" | |
| class PrFileAreaEntry(BaseModel): | |
| filename: str | |
| left_ranges: list[list[int]] | |
| right_ranges: list[list[int]] | |
| class PrComparisonEntry(BaseModel): | |
| left_pr_number: int | |
| right_pr_number: int | |
| code_similarity: float | |
| size_similarity: float | |
| file_overlap: float | |
| area_overlap: float | |
| patch_similarity: float | |
| shared_filenames: list[str] | |
| shared_file_areas: list[PrFileAreaEntry] | |
| class MetaBugEntry(BaseModel): | |
| cluster_id: str | |
| summary: str | |
| status: str | |
| confidence: float | |
| canonical_issue_number: int | None | |
| canonical_pr_number: int | None | |
| issue_numbers: list[int] | |
| pr_numbers: list[int] | |
| evidence_types: list[str] | |
| pr_comparisons: list[PrComparisonEntry] = Field(default_factory=list) | |
| class DuplicateIssuesEntry(BaseModel): | |
| cluster_id: str | |
| canonical_issue_number: int | |
| duplicate_issue_numbers: list[int] | |
| reason: str | |
| class DuplicatePrsEntry(BaseModel): | |
| cluster_id: str | |
| canonical_pr_number: int | |
| duplicate_pr_numbers: list[int] | |
| target_issue_number: int | None | |
| reason: str | |
| class BestIssueEntry(BaseModel): | |
| cluster_id: str | |
| issue_number: int | |
| reason: str | |
| score: float | |
| class BestPrEntry(BaseModel): | |
| cluster_id: str | |
| pr_number: int | |
| reason: str | |
| score: float | |
| class AnalysisReport(BaseModel): | |
| schema_version: str | |
| repo: str | |
| snapshot_id: str | |
| generated_at: str | |
| evidence_quality: str | |
| llm_enrichment: bool | |
| meta_bugs: list[MetaBugEntry] | |
| duplicate_issues: list[DuplicateIssuesEntry] | |
| duplicate_prs: list[DuplicatePrsEntry] | |
| best_issue: BestIssueEntry | None | |
| best_pr: BestPrEntry | None | |
| class SnapshotData: | |
| repo: str | |
| snapshot_id: str | |
| snapshot_dir: Path | |
| manifest: dict[str, Any] | |
| issues: list[dict[str, Any]] | |
| pull_requests: list[dict[str, Any]] | |
| comments: list[dict[str, Any]] | |
| reviews: list[dict[str, Any]] | |
| review_comments: list[dict[str, Any]] | |
| pr_files: list[dict[str, Any]] | |
| pr_diffs: list[dict[str, Any]] | |
| links: list[dict[str, Any]] | |
| events: list[dict[str, Any]] | |
| evidence_quality: str | |
| class ArtifactFeature: | |
| node_id: str | |
| kind: str | |
| number: int | |
| row: dict[str, Any] | |
| tokens: list[str] | |
| title_tokens: set[str] | |
| title_length: int | |
| body_length: int | |
| discussion_activity: int | |
| review_activity: int | |
| inbound_references: int | |
| explicit_issue_links: int | |
| explicit_issue_targets: list[int] | |
| diff_size: int | |
| filenames: list[str] | |
| diff_preview: str | None | |
| file_ranges_by_name: dict[str, list[tuple[int, int]]] | |
| patch_tokens: list[str] | |
| class ClusterRecord: | |
| cluster_id: str | |
| nodes: list[str] | |
| issue_numbers: list[int] | |
| pr_numbers: list[int] | |
| evidence_types: list[str] | |
| canonical_issue_number: int | None | |
| canonical_pr_number: int | None | |
| target_issue_number: int | None | |
| summary: str | |
| status: str | |
| confidence: float | |
| canonical_issue_reason: str | None | |
| canonical_pr_reason: str | None | |
| best_issue_reason: str | None | |
| best_pr_reason: str | None | |
| cluster_score: float | |
| best_issue_score: float | None | |
| best_pr_score: float | None | |
| class PacketBudget: | |
| node_count: int | |
| item_count: int | |
| soft_pair_count: int | |
| serialized_chars: int | |
| estimated_input_tokens: int | |
| estimated_eval_tokens: int | |
| class PreparedLlmPacket: | |
| packet: dict[str, Any] | |
| budget: PacketBudget | |
| original_budget: PacketBudget | |
| trimmed: bool | |
| aggressively_trimmed: bool | |
| split: bool | |
| class ClusterAnalysisCallResult: | |
| analyst_result: ClusterAnalystResponse | None | |
| evaluator_result: ClusterEvaluatorResponse | None | |
| error_kind: str | None | |
| error_message: str | None | |
| evaluator_used: bool | |
| retried: bool | |
| class AnalysisBuildResult: | |
| report: AnalysisReport | |
| llm_reviews: list[dict[str, Any]] | |
| class SoftPairReviewUnitMeta: | |
| label: str | |
| component_index: int | |
| component_count: int | |
| review_unit_index: int | |
| review_unit_count: int | |
| cluster_id: str | |
| prefix: str | |
| nodes: tuple[str, ...] | |
| soft_pairs: tuple[str, ...] | |
| component_budget: PacketBudget | |
| budget: PacketBudget | |
| prepared_review_unit_hash: str | None | |
| trimmed: bool | |
| aggressively_trimmed: bool | |
| split: bool | |
| class PendingSoftPairReview: | |
| meta: SoftPairReviewUnitMeta | |
| prepared: PreparedLlmPacket | |
| cache_key: HybridReviewCacheKey | |
| class CompletedSoftPairReview: | |
| meta: SoftPairReviewUnitMeta | |
| result: ClusterAnalysisCallResult | None | |
| status: str | |
| reason: str | None | |
| source: str | None | |
| cache_hit: bool | |
| def _hybrid_review_cache_manifest() -> HybridReviewCacheManifest: | |
| return HybridReviewCacheManifest( | |
| cache_schema_version=HYBRID_REVIEW_CACHE_SCHEMA_VERSION, | |
| prepared_review_unit_schema_version=PREPARED_REVIEW_UNIT_SCHEMA_VERSION, | |
| analyst_prompt_version=CLUSTER_ANALYST_PROMPT_VERSION, | |
| evaluator_prompt_version=CLUSTER_EVALUATOR_PROMPT_VERSION, | |
| hybrid_review_settings=HybridReviewSettingsFingerprint( | |
| llm_max_input_tokens=LLM_MAX_INPUT_TOKENS, | |
| llm_max_nodes_per_packet=LLM_MAX_NODES_PER_PACKET, | |
| llm_max_soft_pairs_per_packet=LLM_MAX_SOFT_PAIRS_PER_PACKET, | |
| llm_max_diff_chars_per_item=LLM_MAX_DIFF_CHARS_PER_ITEM, | |
| llm_max_filenames_per_item=LLM_MAX_FILENAMES_PER_ITEM, | |
| llm_skip_evaluator_above_tokens=LLM_SKIP_EVALUATOR_ABOVE_TOKENS, | |
| llm_overflow_policy=LLM_OVERFLOW_POLICY, | |
| ), | |
| ) | |
| def _prepared_review_unit_payload(prepared: PreparedLlmPacket) -> dict[str, Any]: | |
| return { | |
| "packet": copy.deepcopy(prepared.packet), | |
| "budget": _packet_budget_json(prepared.budget), | |
| "original_budget": _packet_budget_json(prepared.original_budget), | |
| "trimmed": prepared.trimmed, | |
| "aggressively_trimmed": prepared.aggressively_trimmed, | |
| "split": prepared.split, | |
| } | |
| def _cluster_analysis_call_result_payload(result: ClusterAnalysisCallResult) -> dict[str, Any]: | |
| return { | |
| "analyst_result": ( | |
| None if result.analyst_result is None else result.analyst_result.model_dump(mode="json") | |
| ), | |
| "evaluator_result": ( | |
| None | |
| if result.evaluator_result is None | |
| else result.evaluator_result.model_dump(mode="json") | |
| ), | |
| "error_kind": result.error_kind, | |
| "error_message": result.error_message, | |
| "evaluator_used": result.evaluator_used, | |
| "retried": result.retried, | |
| } | |
| def _cluster_analysis_call_result_from_payload( | |
| payload: dict[str, Any], | |
| ) -> ClusterAnalysisCallResult: | |
| return ClusterAnalysisCallResult( | |
| analyst_result=( | |
| None | |
| if payload.get("analyst_result") is None | |
| else ClusterAnalystResponse.model_validate(payload["analyst_result"]) | |
| ), | |
| evaluator_result=( | |
| None | |
| if payload.get("evaluator_result") is None | |
| else ClusterEvaluatorResponse.model_validate(payload["evaluator_result"]) | |
| ), | |
| error_kind=payload.get("error_kind"), | |
| error_message=payload.get("error_message"), | |
| evaluator_used=bool(payload.get("evaluator_used", False)), | |
| retried=bool(payload.get("retried", False)), | |
| ) | |
| def _cacheable_cluster_analysis_result(result: ClusterAnalysisCallResult) -> bool: | |
| return result.analyst_result is not None and result.error_kind is None | |
| def run_analysis(options: AnalysisOptions) -> Path: | |
| if options.snapshot_dir is not None and options.hf_repo_id: | |
| raise ValueError("--snapshot-dir and --hf-repo-id are mutually exclusive") | |
| warning = _llm_fallback_warning(options) | |
| if warning: | |
| _analysis_log(warning) | |
| snapshot_dir = _resolve_snapshot_dir(options) | |
| snapshot = _load_snapshot(snapshot_dir) | |
| _maybe_carry_forward_hybrid_review_cache(snapshot, enabled=options.cached_analysis) | |
| build = asyncio.run(_build_report(snapshot, options)) | |
| output_path = options.output or (snapshot_dir / "analysis-report.json") | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| write_text(json.dumps(build.report.model_dump(mode="json"), indent=2) + "\n", output_path) | |
| llm_reviews_path = _llm_reviews_output_path(output_path) | |
| if build.llm_reviews: | |
| write_text( | |
| json.dumps( | |
| { | |
| "schema_version": "1.0", | |
| "repo": build.report.repo, | |
| "snapshot_id": build.report.snapshot_id, | |
| "generated_at": build.report.generated_at, | |
| "model": options.model, | |
| "reviews": build.llm_reviews, | |
| }, | |
| indent=2, | |
| ) | |
| + "\n", | |
| llm_reviews_path, | |
| ) | |
| elif llm_reviews_path.exists(): | |
| llm_reviews_path.unlink() | |
| _log_hybrid_review_cache_summary(build.llm_reviews, enabled=options.cached_analysis) | |
| return output_path | |
| def _analysis_log(message: str) -> None: | |
| stamp = datetime.now(tz=UTC).strftime("%H:%M:%SZ") | |
| print(f"[{stamp}] {message}", file=sys.stderr, flush=True) | |
| def _llm_reviews_output_path(output_path: Path) -> Path: | |
| return output_path.with_name(f"{output_path.stem}.llm-reviews.json") | |
| def _llm_fallback_warning(options: AnalysisOptions) -> str | None: | |
| if options.ranking_backend != "hybrid": | |
| return None | |
| if _can_use_fast_agent(): | |
| return None | |
| return ( | |
| "Analyze requested ranking-backend=hybrid but fast-agent LLM enrichment is unavailable; " | |
| "reusing cached hybrid review results when available and falling back to deterministic-only clustering " | |
| "for cache misses. " | |
| "Install the llm extra and set one of " | |
| f"{', '.join(LLM_PROVIDER_ENV_VARS)}." | |
| ) | |
| def _maybe_carry_forward_hybrid_review_cache(snapshot: SnapshotData, *, enabled: bool) -> None: | |
| if not enabled: | |
| return | |
| current_cache_dir = hybrid_review_cache_dir(snapshot.snapshot_dir) | |
| if current_cache_dir.exists(): | |
| _analysis_log( | |
| f"Cached analysis enabled: using existing analysis-state in {current_cache_dir}" | |
| ) | |
| return | |
| watermark = snapshot.manifest.get("watermark") | |
| if not isinstance(watermark, dict): | |
| _analysis_log("Cached analysis enabled: no previous snapshot recorded; starting fresh") | |
| return | |
| previous_snapshot_dir = watermark.get("previous_snapshot_dir") | |
| if not isinstance(previous_snapshot_dir, str) or not previous_snapshot_dir: | |
| _analysis_log("Cached analysis enabled: no previous snapshot recorded; starting fresh") | |
| return | |
| previous_cache_dir = hybrid_review_cache_dir(Path(previous_snapshot_dir)) | |
| if not previous_cache_dir.exists(): | |
| _analysis_log( | |
| "Cached analysis enabled: previous snapshot has no analysis-state; starting fresh" | |
| ) | |
| return | |
| shutil.copytree(previous_cache_dir, current_cache_dir) | |
| _analysis_log( | |
| f"Cached analysis enabled: copied analysis-state from {previous_cache_dir} to {current_cache_dir}" | |
| ) | |
| def _log_hybrid_review_cache_summary(llm_reviews: list[dict[str, Any]], *, enabled: bool) -> None: | |
| if not enabled: | |
| return | |
| if not llm_reviews: | |
| _analysis_log("Hybrid review cache summary: no LLM review units were produced") | |
| return | |
| reviewed = [review for review in llm_reviews if review.get("status") == "reviewed"] | |
| cache_hits = [review for review in reviewed if review.get("cache_hit")] | |
| cache_sourced = [review for review in reviewed if review.get("source") == "cache"] | |
| llm_sourced = [review for review in reviewed if review.get("source") == "llm"] | |
| skipped = [review for review in llm_reviews if review.get("status") != "reviewed"] | |
| hit_rate = 100.0 * len(cache_hits) / len(reviewed) if reviewed else 0.0 | |
| _analysis_log( | |
| "Hybrid review cache summary: " | |
| f"{len(cache_hits)}/{len(reviewed)} reviewed units reused from cache " | |
| f"({hit_rate:.1f}%); " | |
| f"source_cache={len(cache_sourced)}, source_llm={len(llm_sourced)}, skipped={len(skipped)}" | |
| ) | |
| if skipped: | |
| reasons = Counter(str(review.get("reason")) for review in skipped if review.get("reason")) | |
| if reasons: | |
| formatted = ", ".join(f"{reason}={count}" for reason, count in reasons.most_common(5)) | |
| _analysis_log(f"Hybrid review cache skipped reasons: {formatted}") | |
| def render_markdown_report(options: MarkdownReportOptions) -> Path: | |
| input_path = options.input.resolve() | |
| report = AnalysisReport.model_validate(read_json(input_path)) | |
| snapshot_dir = _resolve_markdown_snapshot_dir(input_path, options.snapshot_dir) | |
| issue_map, pr_map = _report_artifact_maps(snapshot_dir) | |
| output_path = (options.output or input_path.with_suffix(".md")).resolve() | |
| markdown = _markdown_report_text( | |
| report=report, | |
| issue_map=issue_map, | |
| pr_map=pr_map, | |
| ) | |
| write_text(markdown, output_path) | |
| return output_path | |
| def _resolve_markdown_snapshot_dir(input_path: Path, snapshot_dir: Path | None) -> Path | None: | |
| if snapshot_dir is not None: | |
| return snapshot_dir.resolve() | |
| candidate = input_path.parent.resolve() | |
| if (candidate / "issues.parquet").exists() or (candidate / "pull_requests.parquet").exists(): | |
| return candidate | |
| return None | |
| def _report_artifact_maps( | |
| snapshot_dir: Path | None, | |
| ) -> tuple[dict[int, dict[str, Any]], dict[int, dict[str, Any]]]: | |
| if snapshot_dir is None: | |
| return {}, {} | |
| issues = { | |
| int(row["number"]): row | |
| for row in read_parquet_rows(snapshot_dir / "issues.parquet") | |
| if row.get("number") is not None | |
| } | |
| pull_requests = { | |
| int(row["number"]): row | |
| for row in read_parquet_rows(snapshot_dir / "pull_requests.parquet") | |
| if row.get("number") is not None | |
| } | |
| return issues, pull_requests | |
| def _markdown_report_text( | |
| *, | |
| report: AnalysisReport, | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| ) -> str: | |
| lines = [ | |
| f"# Analysis Report: {report.repo}", | |
| "", | |
| f"- Snapshot: `{report.snapshot_id}`", | |
| f"- Generated: `{report.generated_at}`", | |
| f"- Evidence quality: `{report.evidence_quality}`", | |
| f"- LLM enrichment: `{str(report.llm_enrichment).lower()}`", | |
| f"- Meta bugs: `{len(report.meta_bugs)}`", | |
| ] | |
| if report.best_issue is not None: | |
| lines.append( | |
| f"- Best issue: {_artifact_markdown_link(report.repo, 'issue', report.best_issue.issue_number, issue_map.get(report.best_issue.issue_number))}" | |
| ) | |
| if report.best_pr is not None: | |
| lines.append( | |
| f"- Best PR: {_artifact_markdown_link(report.repo, 'pull_request', report.best_pr.pr_number, pr_map.get(report.best_pr.pr_number))}" | |
| ) | |
| lines.append("") | |
| ordered_meta_bugs = sorted( | |
| report.meta_bugs, | |
| key=lambda entry: _meta_bug_sort_key(entry, issue_map, pr_map), | |
| ) | |
| if not ordered_meta_bugs: | |
| lines.append("No meta bugs found.") | |
| lines.append("") | |
| return "\n".join(lines) | |
| for meta_bug in ordered_meta_bugs: | |
| lines.extend(_meta_bug_markdown_lines(report.repo, meta_bug, issue_map, pr_map)) | |
| return "\n".join(lines).rstrip() + "\n" | |
| def _meta_bug_markdown_lines( | |
| repo: str, | |
| meta_bug: MetaBugEntry, | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| ) -> list[str]: | |
| artifact_count = len(meta_bug.issue_numbers) + len(meta_bug.pr_numbers) | |
| latest_activity = _meta_bug_latest_activity(meta_bug, issue_map, pr_map) | |
| issue_numbers_to_render = [ | |
| number for number in meta_bug.issue_numbers if number != meta_bug.canonical_issue_number | |
| ] | |
| lines = [ | |
| f"## {meta_bug.summary}", | |
| "", | |
| f"- Cluster: `{meta_bug.cluster_id}`", | |
| f"- Status: `{meta_bug.status}`", | |
| f"- Confidence: `{meta_bug.confidence:.3f}`", | |
| f"- Artifacts: `{artifact_count}`", | |
| f"- Latest activity: `{latest_activity}`", | |
| ] | |
| if meta_bug.canonical_issue_number is not None: | |
| lines.append( | |
| f"- Canonical issue: {_artifact_markdown_link(repo, 'issue', meta_bug.canonical_issue_number, issue_map.get(meta_bug.canonical_issue_number))}" | |
| ) | |
| if meta_bug.canonical_pr_number is not None: | |
| lines.append( | |
| f"- Canonical PR: {_artifact_markdown_link(repo, 'pull_request', meta_bug.canonical_pr_number, pr_map.get(meta_bug.canonical_pr_number))}" | |
| ) | |
| if meta_bug.evidence_types: | |
| lines.append(f"- Evidence: `{', '.join(meta_bug.evidence_types)}`") | |
| lines.append("") | |
| if issue_numbers_to_render: | |
| lines.append("### Issues") | |
| lines.append("") | |
| for number in _sorted_artifact_numbers(issue_numbers_to_render, issue_map): | |
| lines.append( | |
| f"- {_artifact_markdown_link(repo, 'issue', number, issue_map.get(number))}{_artifact_suffix(issue_map.get(number), 'issue')}" | |
| ) | |
| lines.append("") | |
| if meta_bug.pr_numbers: | |
| lines.append("### PRs") | |
| lines.append("") | |
| for number in _sorted_artifact_numbers(meta_bug.pr_numbers, pr_map): | |
| lines.append( | |
| f"- {_artifact_markdown_link(repo, 'pull_request', number, pr_map.get(number))}{_artifact_suffix(pr_map.get(number), 'pull_request')}" | |
| ) | |
| lines.append("") | |
| if meta_bug.pr_comparisons: | |
| lines.append("### PR comparison") | |
| lines.append("") | |
| for comparison in meta_bug.pr_comparisons: | |
| shared_files = ", ".join(f"`{name}`" for name in comparison.shared_filenames) or "none" | |
| lines.append( | |
| f"- PR #{comparison.left_pr_number} vs PR #{comparison.right_pr_number}: " | |
| f"code `{comparison.code_similarity:.3f}`, " | |
| f"size `{comparison.size_similarity:.3f}`, " | |
| f"files `{comparison.file_overlap:.3f}`, " | |
| f"areas `{comparison.area_overlap:.3f}`, " | |
| f"patch `{comparison.patch_similarity:.3f}`; " | |
| f"shared files: {shared_files}" | |
| ) | |
| lines.append("") | |
| return lines | |
| def _meta_bug_sort_key( | |
| meta_bug: MetaBugEntry, | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| ) -> tuple[int, float, int, str]: | |
| artifact_count = len(meta_bug.issue_numbers) + len(meta_bug.pr_numbers) | |
| latest_activity = _meta_bug_latest_activity_dt(meta_bug, issue_map, pr_map).timestamp() | |
| largest_number = max([*meta_bug.issue_numbers, *meta_bug.pr_numbers], default=0) | |
| return (-artifact_count, -latest_activity, -largest_number, meta_bug.cluster_id) | |
| def _meta_bug_latest_activity( | |
| meta_bug: MetaBugEntry, issue_map: dict[int, dict[str, Any]], pr_map: dict[int, dict[str, Any]] | |
| ) -> str: | |
| latest_row = _meta_bug_latest_row(meta_bug, issue_map, pr_map) | |
| if latest_row is None: | |
| return "unknown" | |
| return str( | |
| latest_row.get("updated_at") | |
| or latest_row.get("created_at") | |
| or latest_row.get("closed_at") | |
| or "unknown" | |
| ) | |
| def _meta_bug_latest_activity_dt( | |
| meta_bug: MetaBugEntry, | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| ) -> datetime: | |
| latest_row = _meta_bug_latest_row(meta_bug, issue_map, pr_map) | |
| if latest_row is None: | |
| return datetime(1970, 1, 1, tzinfo=UTC) | |
| return _row_activity_dt(latest_row) | |
| def _meta_bug_latest_row( | |
| meta_bug: MetaBugEntry, | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| ) -> dict[str, Any] | None: | |
| rows = [issue_map[number] for number in meta_bug.issue_numbers if number in issue_map] | |
| rows.extend(pr_map[number] for number in meta_bug.pr_numbers if number in pr_map) | |
| if not rows: | |
| return None | |
| return max(rows, key=_row_activity_dt) | |
| def _sorted_artifact_numbers(numbers: list[int], row_map: dict[int, dict[str, Any]]) -> list[int]: | |
| return sorted( | |
| numbers, | |
| key=lambda number: ( | |
| -_row_activity_dt(row_map.get(number)).timestamp(), | |
| -number, | |
| ), | |
| ) | |
| def _row_activity_dt(row: dict[str, Any] | None) -> datetime: | |
| if not row: | |
| return datetime(1970, 1, 1, tzinfo=UTC) | |
| for field in ("updated_at", "created_at", "closed_at", "merged_at"): | |
| value = row.get(field) | |
| if not value: | |
| continue | |
| try: | |
| return _parse_dt(str(value)) | |
| except ValueError: | |
| continue | |
| return datetime(1970, 1, 1, tzinfo=UTC) | |
| def _artifact_markdown_link(repo: str, kind: str, number: int, row: dict[str, Any] | None) -> str: | |
| title = _artifact_title(kind, number, row) | |
| url = _artifact_url(repo, kind, number, row) | |
| return f"[{title}]({url})" | |
| def _artifact_title(kind: str, number: int, row: dict[str, Any] | None) -> str: | |
| prefix = "PR" if kind == "pull_request" else "Issue" | |
| title = str((row or {}).get("title") or "").strip() | |
| if not title and kind == "pull_request": | |
| body = str((row or {}).get("body") or "").strip() | |
| if body: | |
| title = body.splitlines()[0].strip()[:120] | |
| if title: | |
| return f"{prefix} #{number}: {title}" | |
| return f"{prefix} #{number}" | |
| def _artifact_url(repo: str, kind: str, number: int, row: dict[str, Any] | None) -> str: | |
| html_url = str((row or {}).get("html_url") or "").strip() | |
| if html_url: | |
| return html_url | |
| if repo: | |
| path = "pull" if kind == "pull_request" else "issues" | |
| return f"https://github.com/{repo}/{path}/{number}" | |
| return "#" | |
| def _artifact_suffix(row: dict[str, Any] | None, kind: str) -> str: | |
| if not row: | |
| return "" | |
| details: list[str] = [] | |
| state = str(row.get("state") or "").strip() | |
| if state: | |
| details.append(state) | |
| if kind == "pull_request": | |
| if bool(row.get("merged")): | |
| details.append("merged") | |
| if bool(row.get("draft")): | |
| details.append("draft") | |
| timestamp = row.get("updated_at") or row.get("created_at") | |
| if timestamp: | |
| details.append(str(timestamp)) | |
| if not details: | |
| return "" | |
| return f" ({', '.join(details)})" | |
| def _resolve_snapshot_dir(options: AnalysisOptions) -> Path: | |
| return resolve_snapshot_source_dir( | |
| snapshot_dir=options.snapshot_dir, | |
| local_snapshots_root=options.output_dir.resolve() / "snapshots", | |
| hf_repo_id=options.hf_repo_id, | |
| hf_revision=options.hf_revision, | |
| hf_materialize_dir=options.hf_materialize_dir, | |
| hf_output_dir=options.output_dir, | |
| ) | |
| def _load_snapshot(snapshot_dir: Path) -> SnapshotData: | |
| manifest_path = snapshot_dir / "manifest.json" | |
| manifest = read_json(manifest_path) if manifest_path.exists() else {} | |
| issues = read_parquet_rows(snapshot_dir / "issues.parquet") | |
| pull_requests = read_parquet_rows(snapshot_dir / "pull_requests.parquet") | |
| comments = read_parquet_rows(snapshot_dir / "comments.parquet") | |
| reviews = read_parquet_rows(snapshot_dir / "reviews.parquet") | |
| review_comments = read_parquet_rows(snapshot_dir / "review_comments.parquet") | |
| pr_files = read_parquet_rows(snapshot_dir / "pr_files.parquet") | |
| pr_diffs = read_parquet_rows(snapshot_dir / "pr_diffs.parquet") | |
| links = read_parquet_rows(snapshot_dir / "links.parquet") | |
| events_path = snapshot_dir / "events.parquet" | |
| events = read_parquet_rows(events_path) | |
| if not any( | |
| [ | |
| issues, | |
| pull_requests, | |
| comments, | |
| reviews, | |
| review_comments, | |
| pr_files, | |
| pr_diffs, | |
| links, | |
| events, | |
| ] | |
| ): | |
| parquet_files = sorted(str(path.name) for path in snapshot_dir.glob("*.parquet")) | |
| raise FileNotFoundError( | |
| f"No analysis tables found in {snapshot_dir}. " | |
| f"Expected local files like issues.parquet/pull_requests.parquet. " | |
| f"Found parquet files: {parquet_files or 'none'}. " | |
| "Use --hf-repo-id for Hugging Face datasets or point --snapshot-dir at a local slop-farmer snapshot." | |
| ) | |
| repo = ( | |
| manifest.get("repo") | |
| or (issues[0]["repo"] if issues else None) | |
| or (pull_requests[0]["repo"] if pull_requests else None) | |
| or (comments[0]["repo"] if comments else None) | |
| or "" | |
| ) | |
| snapshot_id = manifest.get("snapshot_id") or snapshot_dir.name | |
| evidence_quality = "full" if events_path.exists() and events else "partial" | |
| return SnapshotData( | |
| repo=repo, | |
| snapshot_id=snapshot_id, | |
| snapshot_dir=snapshot_dir, | |
| manifest=manifest, | |
| issues=issues, | |
| pull_requests=pull_requests, | |
| comments=comments, | |
| reviews=reviews, | |
| review_comments=review_comments, | |
| pr_files=pr_files, | |
| pr_diffs=pr_diffs, | |
| links=links, | |
| events=events, | |
| evidence_quality=evidence_quality, | |
| ) | |
| async def _build_report(snapshot: SnapshotData, options: AnalysisOptions) -> AnalysisBuildResult: | |
| combined_links = _combined_links(snapshot) | |
| llm_available = _can_use_fast_agent() | |
| hybrid_review_cache = HybridReviewCacheStore( | |
| hybrid_review_cache_dir(snapshot.snapshot_dir), | |
| _hybrid_review_cache_manifest(), | |
| enabled=options.ranking_backend == "hybrid", | |
| ) | |
| if hybrid_review_cache.invalidation_reason is not None: | |
| _analysis_log( | |
| "Hybrid review cache invalidated; ignoring cached entries " | |
| f"({hybrid_review_cache.invalidation_reason})" | |
| ) | |
| issue_map = {int(row["number"]): row for row in snapshot.issues} | |
| pr_map = {int(row["number"]): row for row in snapshot.pull_requests} | |
| suppressed_pr_reasons = suppressed_pull_request_reasons( | |
| snapshot.pull_requests, | |
| snapshot.pr_files, | |
| compile_cluster_suppression_rules(options.cluster_suppression_rules), | |
| ) | |
| if suppressed_pr_reasons: | |
| original_pr_count = len(pr_map) | |
| pr_map = { | |
| number: row for number, row in pr_map.items() if number not in suppressed_pr_reasons | |
| } | |
| _analysis_log( | |
| f"Suppressing {len(suppressed_pr_reasons)} routine PRs from clustering: " | |
| f"{len(pr_map)}/{original_pr_count} PRs kept" | |
| ) | |
| if options.open_prs_only: | |
| original_pr_count = len(pr_map) | |
| pr_map = { | |
| number: row | |
| for number, row in pr_map.items() | |
| if str(row.get("state") or "").lower() == "open" | |
| } | |
| _analysis_log( | |
| f"Restricting PR analysis to open PRs only: {len(pr_map)}/{original_pr_count} PRs kept " | |
| "(draft PRs remain eligible)" | |
| ) | |
| comment_map = { | |
| int(row["github_id"]): row for row in snapshot.comments if row.get("github_id") is not None | |
| } | |
| review_map = { | |
| int(row["github_id"]): row for row in snapshot.reviews if row.get("github_id") is not None | |
| } | |
| review_comment_map = { | |
| int(row["github_id"]): row | |
| for row in snapshot.review_comments | |
| if row.get("github_id") is not None | |
| } | |
| inbound_references, _ = _reference_counts( | |
| snapshot.repo, | |
| combined_links, | |
| issue_map, | |
| pr_map, | |
| comment_map, | |
| review_map, | |
| review_comment_map, | |
| ) | |
| explicit_issue_link_targets = _explicit_pr_issue_targets( | |
| repo=snapshot.repo, | |
| combined_links=combined_links, | |
| issue_map=issue_map, | |
| pr_map=pr_map, | |
| ) | |
| features = _artifact_features( | |
| snapshot, | |
| options=options, | |
| issue_map=issue_map, | |
| pr_map=pr_map, | |
| inbound_references=inbound_references, | |
| explicit_issue_link_targets=explicit_issue_link_targets, | |
| ) | |
| issue_hard_pairs = _issue_hard_pairs( | |
| repo=snapshot.repo, | |
| combined_links=combined_links, | |
| issue_map=issue_map, | |
| pr_map=pr_map, | |
| comment_map=comment_map, | |
| review_map=review_map, | |
| review_comment_map=review_comment_map, | |
| ) | |
| issue_soft_candidates = _issue_soft_candidates(issue_map, features, issue_hard_pairs) | |
| pr_soft_candidates, pr_pair_target_issues = _pr_duplicate_candidates( | |
| options=options, | |
| snapshot=snapshot, | |
| issue_map=issue_map, | |
| pr_map=pr_map, | |
| features=features, | |
| ) | |
| review_semaphore = asyncio.Semaphore(options.hybrid_llm_concurrency) | |
| ( | |
| (accepted_issue_pairs, issue_llm_enabled, issue_llm_reviews), | |
| (accepted_pr_pairs, pr_llm_enabled, pr_llm_reviews), | |
| ) = await asyncio.gather( | |
| _accepted_soft_pairs( | |
| options=options, | |
| snapshot=snapshot, | |
| features=features, | |
| hard_pairs=issue_hard_pairs, | |
| soft_candidates=issue_soft_candidates, | |
| label="issue", | |
| hybrid_review_cache=hybrid_review_cache, | |
| llm_available=llm_available, | |
| review_semaphore=review_semaphore, | |
| ), | |
| _accepted_soft_pairs( | |
| options=options, | |
| snapshot=snapshot, | |
| features=features, | |
| hard_pairs={}, | |
| soft_candidates=pr_soft_candidates, | |
| label="pull_request", | |
| hybrid_review_cache=hybrid_review_cache, | |
| llm_available=llm_available, | |
| review_semaphore=review_semaphore, | |
| ), | |
| ) | |
| issue_pairs = dict(issue_hard_pairs) | |
| for pair, detail in accepted_issue_pairs.items(): | |
| issue_pairs.setdefault(pair, set()).update( | |
| detail.get("evidence_types") or {"soft_similarity"} | |
| ) | |
| pr_pairs: dict[tuple[str, str], set[str]] = {} | |
| for pair, detail in accepted_pr_pairs.items(): | |
| pr_pairs.setdefault(pair, set()).update(detail.get("evidence_types") or {"soft_similarity"}) | |
| issue_clusters = _clusters( | |
| snapshot=snapshot, | |
| features=features, | |
| final_pairs=issue_pairs, | |
| pair_target_issues=defaultdict(set), | |
| llm_cluster_payloads={}, | |
| ) | |
| pr_clusters = _clusters( | |
| snapshot=snapshot, | |
| features=features, | |
| final_pairs=pr_pairs, | |
| pair_target_issues=pr_pair_target_issues, | |
| llm_cluster_payloads={}, | |
| ) | |
| clusters = _meta_bug_clusters( | |
| features=features, | |
| issue_clusters=issue_clusters, | |
| pr_clusters=pr_clusters, | |
| explicit_issue_link_targets=explicit_issue_link_targets, | |
| issue_map=issue_map, | |
| pr_map=pr_map, | |
| ) | |
| meta_clusters = sorted( | |
| clusters, key=lambda cluster: (-cluster.cluster_score, cluster.cluster_id) | |
| )[: options.max_clusters] | |
| duplicate_issues = [ | |
| DuplicateIssuesEntry( | |
| cluster_id=cluster.cluster_id, | |
| canonical_issue_number=cluster.canonical_issue_number, | |
| duplicate_issue_numbers=[ | |
| number | |
| for number in cluster.issue_numbers | |
| if number != cluster.canonical_issue_number | |
| ], | |
| reason=_duplicate_issue_reason(cluster), | |
| ) | |
| for cluster in clusters | |
| if cluster.canonical_issue_number is not None and len(cluster.issue_numbers) >= 2 | |
| ] | |
| duplicate_prs = [ | |
| DuplicatePrsEntry( | |
| cluster_id=cluster.cluster_id, | |
| canonical_pr_number=cluster.canonical_pr_number, | |
| duplicate_pr_numbers=[ | |
| number for number in cluster.pr_numbers if number != cluster.canonical_pr_number | |
| ], | |
| target_issue_number=cluster.target_issue_number, | |
| reason=_duplicate_pr_reason(cluster), | |
| ) | |
| for cluster in clusters | |
| if cluster.canonical_pr_number is not None and len(cluster.pr_numbers) >= 2 | |
| ] | |
| best_issue = _best_issue(meta_clusters, features) | |
| best_pr = _best_pr(meta_clusters, features) | |
| return AnalysisBuildResult( | |
| report=AnalysisReport( | |
| schema_version="1.0", | |
| repo=snapshot.repo, | |
| snapshot_id=snapshot.snapshot_id, | |
| generated_at=_iso_now(), | |
| evidence_quality=snapshot.evidence_quality, | |
| llm_enrichment=issue_llm_enabled or pr_llm_enabled, | |
| meta_bugs=[ | |
| MetaBugEntry( | |
| cluster_id=cluster.cluster_id, | |
| summary=cluster.summary, | |
| status=cluster.status, | |
| confidence=round(cluster.confidence, 3), | |
| canonical_issue_number=cluster.canonical_issue_number, | |
| canonical_pr_number=cluster.canonical_pr_number, | |
| issue_numbers=cluster.issue_numbers, | |
| pr_numbers=cluster.pr_numbers, | |
| evidence_types=cluster.evidence_types, | |
| pr_comparisons=_cluster_pr_comparisons(cluster, features), | |
| ) | |
| for cluster in meta_clusters | |
| ], | |
| duplicate_issues=duplicate_issues, | |
| duplicate_prs=duplicate_prs, | |
| best_issue=best_issue, | |
| best_pr=best_pr, | |
| ), | |
| llm_reviews=issue_llm_reviews + pr_llm_reviews, | |
| ) | |
| def _iso_now() -> str: | |
| return datetime.now(tz=UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z") | |
| def _combined_links(snapshot: SnapshotData) -> list[dict[str, Any]]: | |
| owner, repo_name = snapshot.repo.split("/", 1) | |
| extracted_at = snapshot.manifest.get("extracted_at") or _iso_now() | |
| rows = list(snapshot.links) | |
| for issue in snapshot.issues: | |
| rows.extend( | |
| build_text_link_rows( | |
| repo=snapshot.repo, | |
| owner=owner, | |
| repo_name=repo_name, | |
| source_type="issue", | |
| source_number=int(issue["number"]), | |
| source_id=issue.get("github_id"), | |
| body=issue.get("body"), | |
| snapshot_id=snapshot.snapshot_id, | |
| extracted_at=extracted_at, | |
| ) | |
| ) | |
| for pr in snapshot.pull_requests: | |
| rows.extend( | |
| build_text_link_rows( | |
| repo=snapshot.repo, | |
| owner=owner, | |
| repo_name=repo_name, | |
| source_type="pull_request", | |
| source_number=int(pr["number"]), | |
| source_id=pr.get("github_id"), | |
| body=pr.get("body"), | |
| snapshot_id=snapshot.snapshot_id, | |
| extracted_at=extracted_at, | |
| ) | |
| ) | |
| for comment in snapshot.comments: | |
| if comment.get("parent_number") is None: | |
| continue | |
| rows.extend( | |
| build_text_link_rows( | |
| repo=snapshot.repo, | |
| owner=owner, | |
| repo_name=repo_name, | |
| source_type="comment", | |
| source_number=int(comment["parent_number"]), | |
| source_id=comment.get("github_id"), | |
| body=comment.get("body"), | |
| snapshot_id=snapshot.snapshot_id, | |
| extracted_at=extracted_at, | |
| ) | |
| ) | |
| for review in snapshot.reviews: | |
| rows.extend( | |
| build_text_link_rows( | |
| repo=snapshot.repo, | |
| owner=owner, | |
| repo_name=repo_name, | |
| source_type="review", | |
| source_number=int(review["pull_request_number"]), | |
| source_id=review.get("github_id"), | |
| body=review.get("body"), | |
| snapshot_id=snapshot.snapshot_id, | |
| extracted_at=extracted_at, | |
| ) | |
| ) | |
| for review_comment in snapshot.review_comments: | |
| rows.extend( | |
| build_text_link_rows( | |
| repo=snapshot.repo, | |
| owner=owner, | |
| repo_name=repo_name, | |
| source_type="review_comment", | |
| source_number=int(review_comment["pull_request_number"]), | |
| source_id=review_comment.get("github_id"), | |
| body=review_comment.get("body"), | |
| snapshot_id=snapshot.snapshot_id, | |
| extracted_at=extracted_at, | |
| ) | |
| ) | |
| deduped: dict[tuple[Any, ...], dict[str, Any]] = {} | |
| for row in rows: | |
| key = tuple(row.get(field) for field in LINK_KEY_FIELDS) | |
| deduped[key] = row | |
| return list(deduped.values()) | |
| def _reference_counts( | |
| repo: str, | |
| links: list[dict[str, Any]], | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| comment_map: dict[int, dict[str, Any]], | |
| review_map: dict[int, dict[str, Any]], | |
| review_comment_map: dict[int, dict[str, Any]], | |
| ) -> tuple[Counter[str], defaultdict[int, set[int]]]: | |
| inbound_references: Counter[str] = Counter() | |
| explicit_issue_link_targets: defaultdict[int, set[int]] = defaultdict(set) | |
| for row in links: | |
| source_node = _resolve_source_node( | |
| row, issue_map, pr_map, comment_map, review_map, review_comment_map | |
| ) | |
| target_node = _resolve_target_node(repo, row, issue_map, pr_map) | |
| if source_node is not None and target_node is not None: | |
| inbound_references[target_node] += 1 | |
| if ( | |
| source_node | |
| and target_node | |
| and source_node.startswith("pull_request:") | |
| and target_node.startswith("issue:") | |
| ): | |
| explicit_issue_link_targets[int(source_node.split(":", 1)[1])].add( | |
| int(target_node.split(":", 1)[1]) | |
| ) | |
| return inbound_references, explicit_issue_link_targets | |
| def _artifact_features( | |
| snapshot: SnapshotData, | |
| *, | |
| options: AnalysisOptions, | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| inbound_references: Counter[str], | |
| explicit_issue_link_targets: defaultdict[int, set[int]], | |
| ) -> dict[str, ArtifactFeature]: | |
| template_cleanup = build_template_cleanup_settings( | |
| mode=options.pr_template_cleanup_mode, | |
| strip_html_comments=options.pr_template_strip_html_comments, | |
| trim_closing_reference_prefix=options.pr_template_trim_closing_reference_prefix, | |
| section_patterns=options.pr_template_section_patterns, | |
| line_patterns=options.pr_template_line_patterns, | |
| ) | |
| comments_by_parent: defaultdict[int, int] = defaultdict(int) | |
| reviews_by_pr: defaultdict[int, int] = defaultdict(int) | |
| review_comments_by_pr: defaultdict[int, int] = defaultdict(int) | |
| filenames_by_pr: defaultdict[int, set[str]] = defaultdict(set) | |
| file_ranges_by_pr: defaultdict[int, dict[str, list[tuple[int, int]]]] = defaultdict( | |
| lambda: defaultdict(list) | |
| ) | |
| patch_tokens_by_pr: defaultdict[int, list[str]] = defaultdict(list) | |
| diff_preview_by_pr: dict[int, str] = {} | |
| for comment in snapshot.comments: | |
| parent_number = comment.get("parent_number") | |
| if parent_number is not None: | |
| comments_by_parent[int(parent_number)] += 1 | |
| for review in snapshot.reviews: | |
| reviews_by_pr[int(review["pull_request_number"])] += 1 | |
| for review_comment in snapshot.review_comments: | |
| review_comments_by_pr[int(review_comment["pull_request_number"])] += 1 | |
| for pr_file in snapshot.pr_files: | |
| pr_number = pr_file.get("pull_request_number") | |
| filename = pr_file.get("filename") | |
| if pr_number is None or not filename: | |
| continue | |
| filenames_by_pr[int(pr_number)].add(str(filename)) | |
| patch = pr_file.get("patch") | |
| if patch: | |
| file_ranges_by_pr[int(pr_number)][str(filename)].extend(_patch_ranges(str(patch))) | |
| patch_tokens_by_pr[int(pr_number)].extend(_patch_content_tokens(str(patch))) | |
| for pr_diff in snapshot.pr_diffs: | |
| pr_number = pr_diff.get("pull_request_number") | |
| diff = pr_diff.get("diff") | |
| if pr_number is None or not diff: | |
| continue | |
| diff_preview_by_pr[int(pr_number)] = str(diff)[:1200] | |
| features: dict[str, ArtifactFeature] = {} | |
| for number, issue in issue_map.items(): | |
| title = issue.get("title") or "" | |
| body = issue.get("body") or "" | |
| node_id = f"issue:{number}" | |
| title_tokens = set(_tokenize(title, remove_stopwords=True)) | |
| features[node_id] = ArtifactFeature( | |
| node_id=node_id, | |
| kind="issue", | |
| number=number, | |
| row=issue, | |
| tokens=_tokenize(f"{title} {body}", remove_stopwords=True), | |
| title_tokens=title_tokens, | |
| title_length=len(title), | |
| body_length=len(body), | |
| discussion_activity=max( | |
| int(issue.get("comments_count") or 0), comments_by_parent[number] | |
| ), | |
| review_activity=0, | |
| inbound_references=inbound_references[node_id], | |
| explicit_issue_links=0, | |
| explicit_issue_targets=[], | |
| diff_size=0, | |
| filenames=[], | |
| diff_preview=None, | |
| file_ranges_by_name={}, | |
| patch_tokens=[], | |
| ) | |
| for number, pr in pr_map.items(): | |
| title = pr.get("title") or "" | |
| body = pr.get("body") or "" | |
| similarity_body = _strip_pull_request_template(body, settings=template_cleanup) | |
| node_id = f"pull_request:{number}" | |
| discussion_activity = max(int(pr.get("comments_count") or 0), comments_by_parent[number]) | |
| review_activity = reviews_by_pr[number] + max( | |
| int(pr.get("review_comments_count") or 0), review_comments_by_pr[number] | |
| ) | |
| diff_size = ( | |
| int(pr.get("additions") or 0) | |
| + int(pr.get("deletions") or 0) | |
| + int(pr.get("changed_files") or 0) * 10 | |
| ) | |
| features[node_id] = ArtifactFeature( | |
| node_id=node_id, | |
| kind="pull_request", | |
| number=number, | |
| row=pr, | |
| tokens=_tokenize(f"{title} {similarity_body}", remove_stopwords=True), | |
| title_tokens=set(_tokenize(title, remove_stopwords=True)), | |
| title_length=len(title), | |
| body_length=len(body), | |
| discussion_activity=discussion_activity, | |
| review_activity=review_activity, | |
| inbound_references=inbound_references[node_id], | |
| explicit_issue_links=len(explicit_issue_link_targets[number]), | |
| explicit_issue_targets=sorted(explicit_issue_link_targets[number]), | |
| diff_size=diff_size, | |
| filenames=sorted(filenames_by_pr[number]), | |
| diff_preview=diff_preview_by_pr.get(number), | |
| file_ranges_by_name={ | |
| filename: sorted(ranges) for filename, ranges in file_ranges_by_pr[number].items() | |
| }, | |
| patch_tokens=patch_tokens_by_pr[number], | |
| ) | |
| return features | |
| def _explicit_pr_issue_targets( | |
| *, | |
| repo: str, | |
| combined_links: list[dict[str, Any]], | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| ) -> defaultdict[int, set[int]]: | |
| targets: defaultdict[int, set[int]] = defaultdict(set) | |
| owner, repo_name = repo.split("/", 1) | |
| for row in combined_links: | |
| if row.get("source_type") != "pull_request": | |
| continue | |
| if row.get("link_type") != "closing_reference": | |
| continue | |
| if row.get("target_owner") != owner or row.get("target_repo") != repo_name: | |
| continue | |
| source_number = row.get("source_number") | |
| target_number = row.get("target_number") | |
| if source_number is None or target_number is None: | |
| continue | |
| pr_number = int(source_number) | |
| issue_number = int(target_number) | |
| if pr_number not in pr_map or issue_number not in issue_map: | |
| continue | |
| targets[pr_number].add(issue_number) | |
| return targets | |
| def _issue_hard_pairs( | |
| *, | |
| repo: str, | |
| combined_links: list[dict[str, Any]], | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| comment_map: dict[int, dict[str, Any]], | |
| review_map: dict[int, dict[str, Any]], | |
| review_comment_map: dict[int, dict[str, Any]], | |
| ) -> dict[tuple[str, str], set[str]]: | |
| hard_pairs: dict[tuple[str, str], set[str]] = defaultdict(set) | |
| for row in combined_links: | |
| source_node = _resolve_source_node( | |
| row, issue_map, pr_map, comment_map, review_map, review_comment_map | |
| ) | |
| target_node = _resolve_target_node(repo, row, issue_map, pr_map) | |
| if source_node is None or target_node is None or source_node == target_node: | |
| continue | |
| if ( | |
| row["link_type"] == "duplicate_reference" | |
| and source_node.startswith("issue:") | |
| and target_node.startswith("issue:") | |
| ): | |
| hard_pairs[_pair_key(source_node, target_node)].add("duplicate_reference") | |
| return hard_pairs | |
| def _issue_soft_candidates( | |
| issue_map: dict[int, dict[str, Any]], | |
| features: dict[str, ArtifactFeature], | |
| hard_pairs: dict[tuple[str, str], set[str]], | |
| ) -> dict[tuple[str, str], dict[str, Any]]: | |
| hard_neighbors: defaultdict[str, set[str]] = defaultdict(set) | |
| for left, right in hard_pairs: | |
| hard_neighbors[left].add(right) | |
| hard_neighbors[right].add(left) | |
| candidates = _bm25_candidates( | |
| numbers=sorted(issue_map), | |
| kind="issue", | |
| features=features, | |
| hard_neighbors=hard_neighbors, | |
| max_candidates=5, | |
| extra_filter=_issue_soft_filter, | |
| ) | |
| for detail in candidates.values(): | |
| detail["evidence_types"] = {"soft_similarity"} | |
| detail["deterministic_accept"] = False | |
| return candidates | |
| def _pr_duplicate_candidates( | |
| *, | |
| options: AnalysisOptions, | |
| snapshot: SnapshotData | None, | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| features: dict[str, ArtifactFeature], | |
| ) -> tuple[dict[tuple[str, str], dict[str, Any]], dict[tuple[str, str], set[int]]]: | |
| del options, snapshot | |
| candidates: dict[tuple[str, str], dict[str, Any]] = {} | |
| pair_target_issues: dict[tuple[str, str], set[int]] = defaultdict(set) | |
| explicit_targets = { | |
| number: set(features[f"pull_request:{number}"].explicit_issue_targets) | |
| for number in pr_map | |
| if features[f"pull_request:{number}"].explicit_issue_targets | |
| } | |
| for pair, detail in _bm25_candidates( | |
| numbers=sorted(pr_map), | |
| kind="pull_request", | |
| features=features, | |
| hard_neighbors=defaultdict(set), | |
| max_candidates=5, | |
| extra_filter=_pr_soft_filter, | |
| ).items(): | |
| left = features[pair[0]] | |
| right = features[pair[1]] | |
| shared_files = _shared_filenames(left, right) | |
| _merge_candidate_detail( | |
| candidates, | |
| pair, | |
| { | |
| **detail, | |
| "evidence_types": {"soft_similarity"}, | |
| "shared_filenames": shared_files, | |
| "shared_targets": sorted( | |
| explicit_targets.get(left.number, set()) | |
| & explicit_targets.get(right.number, set()) | |
| ), | |
| "deterministic_accept": detail["jaccard"] >= 0.35, | |
| }, | |
| ) | |
| prs_by_target: defaultdict[int, set[str]] = defaultdict(set) | |
| for pr_number, targets in explicit_targets.items(): | |
| for target in targets: | |
| prs_by_target[target].add(f"pull_request:{pr_number}") | |
| for target_issue, pr_nodes in prs_by_target.items(): | |
| pr_nodes_list = sorted(pr_nodes) | |
| if len(pr_nodes_list) < 2: | |
| continue | |
| scored_pairs: list[tuple[tuple[Any, ...], tuple[str, str], dict[str, Any]]] = [] | |
| for index, left_node in enumerate(pr_nodes_list): | |
| for right_node in pr_nodes_list[index + 1 :]: | |
| pair = _pair_key(left_node, right_node) | |
| left = features[left_node] | |
| right = features[right_node] | |
| if not _pr_soft_filter(left, right): | |
| continue | |
| shared_files = _shared_filenames(left, right) | |
| text_jaccard = _jaccard_sets(set(left.tokens), set(right.tokens)) | |
| if text_jaccard < LLM_SHARED_TARGET_MIN_TEXT_JACCARD and not shared_files: | |
| continue | |
| score = max(5.0, text_jaccard * 10.0 + len(shared_files)) | |
| scored_pairs.append( | |
| ( | |
| ( | |
| -len(shared_files), | |
| -text_jaccard, | |
| -score, | |
| pair[0], | |
| pair[1], | |
| ), | |
| pair, | |
| { | |
| "left": pair[0], | |
| "right": pair[1], | |
| "kind": "pull_request", | |
| "score": score, | |
| "jaccard": text_jaccard, | |
| "evidence_types": {"shared_issue_target"}, | |
| "shared_targets": [target_issue], | |
| "shared_filenames": shared_files, | |
| "deterministic_accept": text_jaccard >= 0.2, | |
| }, | |
| ) | |
| ) | |
| for pair, detail in _bounded_shared_target_pairs(scored_pairs): | |
| _merge_candidate_detail(candidates, pair, detail) | |
| pair_target_issues[pair].add(target_issue) | |
| return candidates, pair_target_issues | |
| def _bounded_shared_target_pairs( | |
| scored_pairs: list[tuple[tuple[Any, ...], tuple[str, str], dict[str, Any]]], | |
| ) -> list[tuple[tuple[str, str], dict[str, Any]]]: | |
| ordered = sorted(scored_pairs, key=lambda item: item[0]) | |
| if not ordered: | |
| return [] | |
| parent = {node: node for _, pair, _ in ordered for node in pair} | |
| def find(node: str) -> str: | |
| root = node | |
| while parent[root] != root: | |
| root = parent[root] | |
| while parent[node] != node: | |
| next_node = parent[node] | |
| parent[node] = root | |
| node = next_node | |
| return root | |
| def union(left: str, right: str) -> None: | |
| left_root = find(left) | |
| right_root = find(right) | |
| if left_root != right_root: | |
| parent[right_root] = left_root | |
| neighbor_counts: Counter[str] = Counter() | |
| selected_pairs: set[tuple[str, str]] = set() | |
| selected: list[tuple[tuple[str, str], dict[str, Any]]] = [] | |
| def record(pair: tuple[str, str], detail: dict[str, Any]) -> None: | |
| selected_pairs.add(pair) | |
| selected.append((pair, detail)) | |
| neighbor_counts[pair[0]] += 1 | |
| neighbor_counts[pair[1]] += 1 | |
| extra_pairs_added = 0 | |
| def can_add_redundant_pair(pair: tuple[str, str]) -> bool: | |
| return ( | |
| extra_pairs_added < LLM_SHARED_TARGET_MAX_EXTRA_PAIRS_PER_TARGET | |
| and neighbor_counts[pair[0]] < LLM_SHARED_TARGET_MAX_NEIGHBORS_PER_PR | |
| and neighbor_counts[pair[1]] < LLM_SHARED_TARGET_MAX_NEIGHBORS_PER_PR | |
| ) | |
| for _, pair, detail in ordered: | |
| if pair in selected_pairs: | |
| continue | |
| if find(pair[0]) == find(pair[1]): | |
| continue | |
| record(pair, detail) | |
| union(pair[0], pair[1]) | |
| for _, pair, detail in ordered: | |
| if pair in selected_pairs or not can_add_redundant_pair(pair): | |
| continue | |
| record(pair, detail) | |
| extra_pairs_added += 1 | |
| return selected | |
| def _merge_candidate_detail( | |
| candidates: dict[tuple[str, str], dict[str, Any]], | |
| pair: tuple[str, str], | |
| detail: dict[str, Any], | |
| ) -> None: | |
| current = candidates.get(pair) | |
| if current is None: | |
| copied = dict(detail) | |
| copied["evidence_types"] = set(detail.get("evidence_types") or []) | |
| copied["shared_targets"] = list(detail.get("shared_targets") or []) | |
| copied["shared_filenames"] = list(detail.get("shared_filenames") or []) | |
| copied["deterministic_accept"] = bool(detail.get("deterministic_accept")) | |
| candidates[pair] = copied | |
| return | |
| current["score"] = max(float(current.get("score") or 0.0), float(detail.get("score") or 0.0)) | |
| current["jaccard"] = max( | |
| float(current.get("jaccard") or 0.0), float(detail.get("jaccard") or 0.0) | |
| ) | |
| current["evidence_types"] = set(current.get("evidence_types") or []) | set( | |
| detail.get("evidence_types") or [] | |
| ) | |
| current["shared_targets"] = sorted( | |
| set(current.get("shared_targets") or []) | set(detail.get("shared_targets") or []) | |
| ) | |
| current["shared_filenames"] = sorted( | |
| set(current.get("shared_filenames") or []) | set(detail.get("shared_filenames") or []) | |
| )[:10] | |
| current["deterministic_accept"] = bool(current.get("deterministic_accept")) or bool( | |
| detail.get("deterministic_accept") | |
| ) | |
| def _shared_filenames(left: ArtifactFeature, right: ArtifactFeature) -> list[str]: | |
| return sorted(set(left.filenames) & set(right.filenames))[:10] | |
| def _hard_pairs( | |
| snapshot: SnapshotData, | |
| *, | |
| combined_links: list[dict[str, Any]], | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| comment_map: dict[int, dict[str, Any]], | |
| review_map: dict[int, dict[str, Any]], | |
| review_comment_map: dict[int, dict[str, Any]], | |
| ) -> tuple[dict[tuple[str, str], set[str]], dict[tuple[str, str], set[int]]]: | |
| hard_pairs: dict[tuple[str, str], set[str]] = defaultdict(set) | |
| pair_target_issues: dict[tuple[str, str], set[int]] = defaultdict(set) | |
| for row in combined_links: | |
| source_node = _resolve_source_node( | |
| row, issue_map, pr_map, comment_map, review_map, review_comment_map | |
| ) | |
| target_node = _resolve_target_node(snapshot.repo, row, issue_map, pr_map) | |
| if source_node is None or target_node is None or source_node == target_node: | |
| continue | |
| if ( | |
| row["link_type"] == "duplicate_reference" | |
| and source_node.startswith("issue:") | |
| and target_node.startswith("issue:") | |
| ): | |
| hard_pairs[_pair_key(source_node, target_node)].add("duplicate_reference") | |
| if ( | |
| row["link_type"] == "closing_reference" | |
| and source_node.startswith("pull_request:") | |
| and target_node.startswith("issue:") | |
| ): | |
| hard_pairs[_pair_key(source_node, target_node)].add("closing_reference") | |
| prs_by_target: defaultdict[int, set[str]] = defaultdict(set) | |
| for row in combined_links: | |
| source_node = _resolve_source_node( | |
| row, issue_map, pr_map, comment_map, review_map, review_comment_map | |
| ) | |
| target_node = _resolve_target_node(snapshot.repo, row, issue_map, pr_map) | |
| if ( | |
| source_node | |
| and target_node | |
| and source_node.startswith("pull_request:") | |
| and target_node.startswith("issue:") | |
| ): | |
| prs_by_target[int(target_node.split(":", 1)[1])].add(source_node) | |
| for target_issue, pr_nodes in prs_by_target.items(): | |
| pr_nodes_list = sorted(pr_nodes) | |
| if len(pr_nodes_list) < 2: | |
| continue | |
| for index, left in enumerate(pr_nodes_list): | |
| for right in pr_nodes_list[index + 1 :]: | |
| pair = _pair_key(left, right) | |
| hard_pairs[pair].add("shared_issue_target") | |
| pair_target_issues[pair].add(target_issue) | |
| for event in snapshot.events: | |
| source_number = event.get("source_issue_number") | |
| if event.get("event") != "cross-referenced" or source_number is None: | |
| continue | |
| parent_kind = event.get("parent_kind") | |
| parent_number = event.get("parent_number") | |
| if parent_kind not in {"issue", "pull_request"} or parent_number is None: | |
| continue | |
| parent_node = f"{parent_kind}:{int(parent_number)}" | |
| if parent_node not in features_from_maps(issue_map, pr_map): | |
| continue | |
| target_node = _node_from_number(int(source_number), issue_map, pr_map) | |
| if target_node is None or target_node == parent_node: | |
| continue | |
| hard_pairs[_pair_key(parent_node, target_node)].add("timeline:cross-referenced") | |
| return hard_pairs, pair_target_issues | |
| def features_from_maps( | |
| issue_map: dict[int, dict[str, Any]], pr_map: dict[int, dict[str, Any]] | |
| ) -> set[str]: | |
| return {f"issue:{number}" for number in issue_map} | { | |
| f"pull_request:{number}" for number in pr_map | |
| } | |
| def _soft_candidates( | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| features: dict[str, ArtifactFeature], | |
| hard_pairs: dict[tuple[str, str], set[str]], | |
| ) -> dict[tuple[str, str], dict[str, Any]]: | |
| candidates: dict[tuple[str, str], dict[str, Any]] = {} | |
| hard_neighbors: defaultdict[str, set[str]] = defaultdict(set) | |
| for left, right in hard_pairs: | |
| if left.split(":", 1)[0] == right.split(":", 1)[0]: | |
| hard_neighbors[left].add(right) | |
| hard_neighbors[right].add(left) | |
| issue_candidates = _bm25_candidates( | |
| numbers=sorted(issue_map), | |
| kind="issue", | |
| features=features, | |
| hard_neighbors=hard_neighbors, | |
| max_candidates=5, | |
| extra_filter=_issue_soft_filter, | |
| ) | |
| pr_candidates = _bm25_candidates( | |
| numbers=sorted(pr_map), | |
| kind="pull_request", | |
| features=features, | |
| hard_neighbors=hard_neighbors, | |
| max_candidates=5, | |
| extra_filter=_pr_soft_filter, | |
| ) | |
| candidates.update(issue_candidates) | |
| candidates.update(pr_candidates) | |
| return candidates | |
| def _bm25_candidates( | |
| *, | |
| numbers: list[int], | |
| kind: str, | |
| features: dict[str, ArtifactFeature], | |
| hard_neighbors: defaultdict[str, set[str]], | |
| max_candidates: int, | |
| extra_filter: Any, | |
| ) -> dict[tuple[str, str], dict[str, Any]]: | |
| if not numbers: | |
| return {} | |
| nodes = [f"{kind}:{number}" for number in numbers] | |
| token_sets = [set(features[node].tokens) for node in nodes] | |
| if len(nodes) > 4000: | |
| return _sparse_token_candidates( | |
| nodes=nodes, | |
| kind=kind, | |
| features=features, | |
| token_sets=token_sets, | |
| hard_neighbors=hard_neighbors, | |
| max_candidates=max_candidates, | |
| extra_filter=extra_filter, | |
| ) | |
| corpus = [features[node].tokens or ["empty"] for node in nodes] | |
| bm25 = BM25Okapi(corpus) | |
| candidates: dict[tuple[str, str], dict[str, Any]] = {} | |
| for index, node in enumerate(nodes): | |
| feature = features[node] | |
| if not feature.tokens: | |
| continue | |
| scores = bm25.get_scores(feature.tokens) | |
| ranked = sorted(range(len(nodes)), key=lambda position: scores[position], reverse=True) | |
| accepted = 0 | |
| for candidate_index in ranked: | |
| if candidate_index == index: | |
| continue | |
| candidate_node = nodes[candidate_index] | |
| if candidate_node in hard_neighbors[node]: | |
| continue | |
| score = float(scores[candidate_index]) | |
| if score <= 0: | |
| continue | |
| candidate_feature = features[candidate_node] | |
| jaccard = _jaccard_sets(token_sets[index], token_sets[candidate_index]) | |
| if jaccard < 0.2: | |
| continue | |
| if not extra_filter(feature, candidate_feature): | |
| continue | |
| pair = _pair_key(node, candidate_node) | |
| current = candidates.get(pair) | |
| if current is None or score > current["score"]: | |
| candidates[pair] = { | |
| "left": pair[0], | |
| "right": pair[1], | |
| "kind": kind, | |
| "score": score, | |
| "jaccard": jaccard, | |
| } | |
| accepted += 1 | |
| if accepted >= max_candidates: | |
| break | |
| return candidates | |
| def _sparse_token_candidates( | |
| *, | |
| nodes: list[str], | |
| kind: str, | |
| features: dict[str, ArtifactFeature], | |
| token_sets: list[set[str]], | |
| hard_neighbors: defaultdict[str, set[str]], | |
| max_candidates: int, | |
| extra_filter: Any, | |
| ) -> dict[tuple[str, str], dict[str, Any]]: | |
| anchor_tokens: list[set[str]] = [] | |
| inverted: defaultdict[str, list[int]] = defaultdict(list) | |
| for index, node in enumerate(nodes): | |
| tokens = features[node].title_tokens or token_sets[index] | |
| anchor_tokens.append(tokens) | |
| for token in tokens: | |
| inverted[token].append(index) | |
| candidates: dict[tuple[str, str], dict[str, Any]] = {} | |
| for index, node in enumerate(nodes): | |
| feature = features[node] | |
| if not token_sets[index]: | |
| continue | |
| probe_tokens = sorted(anchor_tokens[index], key=lambda token: len(inverted[token]))[:8] | |
| overlap_scores: Counter[int] = Counter() | |
| for token in probe_tokens: | |
| for candidate_index in inverted[token]: | |
| if candidate_index != index: | |
| overlap_scores[candidate_index] += 1 | |
| accepted = 0 | |
| for candidate_index, overlap in overlap_scores.most_common(): | |
| candidate_node = nodes[candidate_index] | |
| if candidate_node in hard_neighbors[node]: | |
| continue | |
| candidate_feature = features[candidate_node] | |
| jaccard = _jaccard_sets(token_sets[index], token_sets[candidate_index]) | |
| if jaccard < 0.2: | |
| continue | |
| if not extra_filter(feature, candidate_feature): | |
| continue | |
| pair = _pair_key(node, candidate_node) | |
| score = float(overlap) | |
| current = candidates.get(pair) | |
| if current is None or score > current["score"]: | |
| candidates[pair] = { | |
| "left": pair[0], | |
| "right": pair[1], | |
| "kind": kind, | |
| "score": score, | |
| "jaccard": jaccard, | |
| } | |
| accepted += 1 | |
| if accepted >= max_candidates: | |
| break | |
| return candidates | |
| def _issue_soft_filter(left: ArtifactFeature, right: ArtifactFeature) -> bool: | |
| if _days_between(left.row.get("created_at"), right.row.get("created_at")) <= 365: | |
| return True | |
| return len(left.title_tokens & right.title_tokens) >= 3 | |
| def _pr_soft_filter(left: ArtifactFeature, right: ArtifactFeature) -> bool: | |
| if not left.row.get("base_ref") or left.row.get("base_ref") != right.row.get("base_ref"): | |
| return False | |
| return _days_between(left.row.get("created_at"), right.row.get("created_at")) <= 180 | |
| def _estimate_packet_size(packet: dict[str, Any], model: str) -> PacketBudget: | |
| del model | |
| serialized = json.dumps(packet, indent=2, sort_keys=True) | |
| estimated_input_tokens = max( | |
| 1, (len(serialized) + LLM_PACKET_CHARS_PER_TOKEN - 1) // LLM_PACKET_CHARS_PER_TOKEN | |
| ) | |
| return PacketBudget( | |
| node_count=len(packet["nodes"]), | |
| item_count=len(packet["items"]), | |
| soft_pair_count=len(packet["soft_pairs"]), | |
| serialized_chars=len(serialized), | |
| estimated_input_tokens=estimated_input_tokens, | |
| estimated_eval_tokens=estimated_input_tokens * 2 + 256, | |
| ) | |
| def _packet_budget_json(budget: PacketBudget) -> dict[str, int]: | |
| return { | |
| "node_count": budget.node_count, | |
| "item_count": budget.item_count, | |
| "soft_pair_count": budget.soft_pair_count, | |
| "serialized_chars": budget.serialized_chars, | |
| "estimated_input_tokens": budget.estimated_input_tokens, | |
| "estimated_eval_tokens": budget.estimated_eval_tokens, | |
| } | |
| def _packet_over_budget(budget: PacketBudget) -> bool: | |
| return ( | |
| budget.node_count > LLM_MAX_NODES_PER_PACKET | |
| or budget.soft_pair_count > LLM_MAX_SOFT_PAIRS_PER_PACKET | |
| or budget.estimated_input_tokens > LLM_MAX_INPUT_TOKENS | |
| ) | |
| def _soft_pair_review_sort_key(pair: dict[str, Any]) -> tuple[Any, ...]: | |
| return ( | |
| 0 if not bool(pair.get("deterministic_accept", True)) else 1, | |
| -len(pair.get("shared_targets") or []), | |
| -float(pair.get("score") or 0.0), | |
| -float(pair.get("jaccard") or 0.0), | |
| str(pair["left"]), | |
| str(pair["right"]), | |
| ) | |
| def _review_subpacket(packet: dict[str, Any], soft_pairs: list[dict[str, Any]]) -> dict[str, Any]: | |
| node_ids = { | |
| node_id for pair in soft_pairs for node_id in (str(pair["left"]), str(pair["right"])) | |
| } | |
| soft_pair_keys = {_pair_key(str(pair["left"]), str(pair["right"])) for pair in soft_pairs} | |
| items_by_node = {str(item["node_id"]): item for item in packet["items"]} | |
| pair_evidence: dict[str, list[str]] = {} | |
| for key, evidence in packet["pair_evidence"].items(): | |
| left, right = key.split("|", 1) | |
| if left not in node_ids or right not in node_ids: | |
| continue | |
| filtered = sorted(value for value in evidence if value != "soft_similarity") | |
| if key in soft_pair_keys: | |
| pair_evidence[key] = sorted(evidence) | |
| elif filtered: | |
| pair_evidence[key] = filtered | |
| nodes = sorted(node_ids) | |
| return { | |
| "nodes": nodes, | |
| "items": [dict(items_by_node[node]) for node in nodes], | |
| "pair_evidence": pair_evidence, | |
| "soft_pairs": [dict(pair) for pair in soft_pairs], | |
| } | |
| def _split_packet_for_review(packet: dict[str, Any], model: str) -> list[dict[str, Any]]: | |
| if not packet["soft_pairs"]: | |
| return [packet] | |
| if not _packet_over_budget(_estimate_packet_size(packet, model)): | |
| return [packet] | |
| batches: list[list[dict[str, Any]]] = [] | |
| current_batch: list[dict[str, Any]] = [] | |
| for soft_pair in sorted(packet["soft_pairs"], key=_soft_pair_review_sort_key): | |
| candidate_batch = [*current_batch, soft_pair] | |
| candidate_packet = _review_subpacket(packet, candidate_batch) | |
| if current_batch and _packet_over_budget(_estimate_packet_size(candidate_packet, model)): | |
| batches.append(current_batch) | |
| current_batch = [soft_pair] | |
| continue | |
| current_batch = candidate_batch | |
| if current_batch: | |
| batches.append(current_batch) | |
| return [_review_subpacket(packet, batch) for batch in batches] | |
| def _trim_packet_for_llm( | |
| packet: dict[str, Any], *, max_diff_chars: int, max_filenames: int | |
| ) -> dict[str, Any]: | |
| return { | |
| "nodes": list(packet["nodes"]), | |
| "items": [ | |
| { | |
| **item, | |
| "filenames": list(item.get("filenames") or [])[:max_filenames], | |
| "diff_preview": ( | |
| None | |
| if item.get("diff_preview") is None | |
| else str(item["diff_preview"])[:max_diff_chars] | |
| ), | |
| } | |
| for item in packet["items"] | |
| ], | |
| "pair_evidence": {key: list(values) for key, values in packet["pair_evidence"].items()}, | |
| "soft_pairs": [dict(pair) for pair in packet["soft_pairs"]], | |
| } | |
| def _prepare_packet_for_llm( | |
| packet: dict[str, Any], model: str, *, split: bool | |
| ) -> PreparedLlmPacket | None: | |
| original_budget = _estimate_packet_size(packet, model) | |
| if not _packet_over_budget(original_budget): | |
| return PreparedLlmPacket( | |
| packet=packet, | |
| budget=original_budget, | |
| original_budget=original_budget, | |
| trimmed=False, | |
| aggressively_trimmed=False, | |
| split=split, | |
| ) | |
| trim_levels = ( | |
| (LLM_MAX_DIFF_CHARS_PER_ITEM, LLM_MAX_FILENAMES_PER_ITEM, False), | |
| ( | |
| max(120, LLM_MAX_DIFF_CHARS_PER_ITEM // 2), | |
| max(2, LLM_MAX_FILENAMES_PER_ITEM // 2), | |
| True, | |
| ), | |
| ) | |
| for max_diff_chars, max_filenames, aggressively_trimmed in trim_levels: | |
| trimmed_packet = _trim_packet_for_llm( | |
| packet, | |
| max_diff_chars=max_diff_chars, | |
| max_filenames=max_filenames, | |
| ) | |
| budget = _estimate_packet_size(trimmed_packet, model) | |
| if not _packet_over_budget(budget): | |
| return PreparedLlmPacket( | |
| packet=trimmed_packet, | |
| budget=budget, | |
| original_budget=original_budget, | |
| trimmed=True, | |
| aggressively_trimmed=aggressively_trimmed, | |
| split=split, | |
| ) | |
| return None | |
| def _accepted_nontrivial_soft_edge( | |
| packet: dict[str, Any], analyst_result: ClusterAnalystResponse | |
| ) -> bool: | |
| accepted = { | |
| _pair_key(verdict.left, verdict.right) | |
| for verdict in analyst_result.soft_edge_verdicts | |
| if verdict.accept | |
| } | |
| return any( | |
| not bool(pair.get("deterministic_accept", True)) | |
| and _pair_key(str(pair["left"]), str(pair["right"])) in accepted | |
| for pair in packet["soft_pairs"] | |
| ) | |
| def _should_run_evaluator( | |
| packet: dict[str, Any], | |
| budget: PacketBudget, | |
| *, | |
| split: bool, | |
| aggressively_trimmed: bool, | |
| analyst_result: ClusterAnalystResponse, | |
| ) -> bool: | |
| del split | |
| if aggressively_trimmed: | |
| return False | |
| if budget.estimated_eval_tokens > LLM_SKIP_EVALUATOR_ABOVE_TOKENS: | |
| return False | |
| return _accepted_nontrivial_soft_edge(packet, analyst_result) | |
| def _classify_llm_error(exc: Exception) -> str: | |
| message = f"{type(exc).__name__}: {exc}".lower() | |
| type_name = type(exc).__name__.lower() | |
| if ( | |
| "context window" in message | |
| or "maximum context length" in message | |
| or "exceeds the context" in message | |
| ): | |
| return "context_window_exceeded" | |
| if "timeout" in message or "timed out" in message: | |
| return "provider_timeout" | |
| if any(term in message for term in ("auth", "api key", "unauthorized", "forbidden")): | |
| return "provider_auth_error" | |
| if any(term in type_name for term in ("validation", "decode")) or "parse" in message: | |
| return "structured_parse_error" | |
| return "unknown_provider_error" | |
| def _summarize_llm_error(exc: Exception) -> str: | |
| return re.sub(r"\s+", " ", str(exc)).strip()[:300] | |
| def _packet_soft_pair_ids(packet: dict[str, Any]) -> list[str]: | |
| return [ | |
| "|".join(_pair_key(str(pair["left"]), str(pair["right"]))) for pair in packet["soft_pairs"] | |
| ] | |
| def _soft_pair_review_meta( | |
| *, | |
| label: str, | |
| component_index: int, | |
| component_count: int, | |
| review_unit_index: int, | |
| review_unit_count: int, | |
| cluster_id: str, | |
| component_budget: PacketBudget, | |
| budget: PacketBudget, | |
| prepared_review_unit_hash: str | None, | |
| trimmed: bool, | |
| aggressively_trimmed: bool, | |
| split: bool, | |
| packet: dict[str, Any], | |
| ) -> SoftPairReviewUnitMeta: | |
| prefix = ( | |
| f"LLM {label} soft-edge review {component_index}/{component_count}" | |
| f" unit {review_unit_index}/{review_unit_count}" | |
| ) | |
| return SoftPairReviewUnitMeta( | |
| label=label, | |
| component_index=component_index, | |
| component_count=component_count, | |
| review_unit_index=review_unit_index, | |
| review_unit_count=review_unit_count, | |
| cluster_id=cluster_id, | |
| prefix=prefix, | |
| nodes=tuple(str(node) for node in packet["nodes"]), | |
| soft_pairs=tuple(_packet_soft_pair_ids(packet)), | |
| component_budget=component_budget, | |
| budget=budget, | |
| prepared_review_unit_hash=prepared_review_unit_hash, | |
| trimmed=trimmed, | |
| aggressively_trimmed=aggressively_trimmed, | |
| split=split, | |
| ) | |
| def _completed_soft_pair_review_sort_key(review: CompletedSoftPairReview) -> tuple[int, int]: | |
| return ( | |
| review.meta.component_index, | |
| review.meta.review_unit_index, | |
| ) | |
| def _soft_pair_review_record( | |
| *, | |
| review: CompletedSoftPairReview, | |
| model: str, | |
| accepted_nontrivial_soft_edge: bool, | |
| ) -> dict[str, Any]: | |
| result = review.result | |
| return { | |
| "label": review.meta.label, | |
| "component_index": review.meta.component_index, | |
| "component_count": review.meta.component_count, | |
| "review_unit_index": review.meta.review_unit_index, | |
| "review_unit_count": review.meta.review_unit_count, | |
| "status": review.status, | |
| "reason": review.reason, | |
| "source": review.source, | |
| "cache_hit": review.cache_hit, | |
| "model": model, | |
| "cluster_id": review.meta.cluster_id, | |
| "nodes": list(review.meta.nodes), | |
| "soft_pairs": list(review.meta.soft_pairs), | |
| "prepared_review_unit_hash": review.meta.prepared_review_unit_hash, | |
| "component_budget": _packet_budget_json(review.meta.component_budget), | |
| "budget": _packet_budget_json(review.meta.budget), | |
| "overflow_policy": LLM_OVERFLOW_POLICY, | |
| "trimmed": review.meta.trimmed, | |
| "aggressively_trimmed": review.meta.aggressively_trimmed, | |
| "split": review.meta.split, | |
| "analyst_result": ( | |
| None | |
| if result is None or result.analyst_result is None | |
| else result.analyst_result.model_dump(mode="json") | |
| ), | |
| "evaluator_result": ( | |
| None | |
| if result is None or result.evaluator_result is None | |
| else result.evaluator_result.model_dump(mode="json") | |
| ), | |
| "evaluator_used": False if result is None else result.evaluator_used, | |
| "retried": False if result is None else result.retried, | |
| "accepted_nontrivial_soft_edge": accepted_nontrivial_soft_edge, | |
| "error_kind": None if result is None else result.error_kind, | |
| "error_message": None if result is None else result.error_message, | |
| } | |
| def _completed_soft_pair_review_from_result( | |
| pending: PendingSoftPairReview, | |
| result: ClusterAnalysisCallResult, | |
| ) -> CompletedSoftPairReview: | |
| return CompletedSoftPairReview( | |
| meta=pending.meta, | |
| result=result, | |
| status="reviewed" if result.analyst_result is not None else "error", | |
| reason=None, | |
| source="llm", | |
| cache_hit=False, | |
| ) | |
| async def _run_pending_soft_pair_review( | |
| pending: PendingSoftPairReview, | |
| *, | |
| model: str, | |
| review_semaphore: asyncio.Semaphore, | |
| ) -> CompletedSoftPairReview: | |
| async with review_semaphore: | |
| try: | |
| result = await _fast_agent_cluster_analysis(pending.prepared, model) | |
| except Exception as exc: | |
| result = ClusterAnalysisCallResult( | |
| analyst_result=None, | |
| evaluator_result=None, | |
| error_kind=_classify_llm_error(exc), | |
| error_message=_summarize_llm_error(exc), | |
| evaluator_used=False, | |
| retried=False, | |
| ) | |
| return _completed_soft_pair_review_from_result(pending, result) | |
| async def _run_pending_soft_pair_reviews( | |
| pending_reviews: list[PendingSoftPairReview], | |
| *, | |
| concurrency: int, | |
| model: str, | |
| review_semaphore: asyncio.Semaphore, | |
| ) -> list[CompletedSoftPairReview]: | |
| if not pending_reviews: | |
| return [] | |
| if concurrency <= 1: | |
| completed: list[CompletedSoftPairReview] = [] | |
| for pending in pending_reviews: | |
| completed.append( | |
| await _run_pending_soft_pair_review( | |
| pending, | |
| model=model, | |
| review_semaphore=review_semaphore, | |
| ) | |
| ) | |
| return completed | |
| tasks = [ | |
| asyncio.create_task( | |
| _run_pending_soft_pair_review( | |
| pending, | |
| model=model, | |
| review_semaphore=review_semaphore, | |
| ) | |
| ) | |
| for pending in pending_reviews | |
| ] | |
| return await asyncio.gather(*tasks) | |
| async def _accepted_soft_pairs( | |
| *, | |
| options: AnalysisOptions, | |
| snapshot: SnapshotData, | |
| features: dict[str, ArtifactFeature], | |
| hard_pairs: dict[tuple[str, str], set[str]], | |
| soft_candidates: dict[tuple[str, str], dict[str, Any]], | |
| label: str, | |
| hybrid_review_cache: HybridReviewCacheStore, | |
| llm_available: bool, | |
| review_semaphore: asyncio.Semaphore, | |
| ) -> tuple[dict[tuple[str, str], dict[str, Any]], bool, list[dict[str, Any]]]: | |
| del snapshot | |
| if not soft_candidates: | |
| return {}, False, [] | |
| deterministic_accepts = { | |
| pair: detail | |
| for pair, detail in soft_candidates.items() | |
| if bool(detail.get("deterministic_accept", True)) | |
| } | |
| if options.ranking_backend != "hybrid": | |
| return deterministic_accepts, False, [] | |
| if not llm_available and not hybrid_review_cache.has_entries: | |
| return deterministic_accepts, False, [] | |
| candidate_graph = dict(hard_pairs) | |
| for pair in soft_candidates: | |
| candidate_graph.setdefault(pair, set()).add("soft_similarity") | |
| component_payloads = _component_packets(features, candidate_graph, soft_candidates) | |
| pending_reviews: list[PendingSoftPairReview] = [] | |
| completed_reviews: list[CompletedSoftPairReview] = [] | |
| accepted: dict[tuple[str, str], dict[str, Any]] = dict(deterministic_accepts) | |
| llm_used = False | |
| review_records: list[dict[str, Any]] = [] | |
| total_components = len(component_payloads) | |
| for index, payload in enumerate(component_payloads, start=1): | |
| component_budget = _estimate_packet_size(payload, options.model) | |
| cluster_id = _cluster_id_from_nodes(payload["nodes"]) | |
| review_units = _split_packet_for_review(payload, options.model) | |
| if len(review_units) > 1: | |
| _analysis_log( | |
| f"LLM {label} soft-edge review {index}/{total_components}: " | |
| f"split oversized component into {len(review_units)} review units " | |
| f"(nodes={component_budget.node_count}, soft_pairs={component_budget.soft_pair_count}, " | |
| f"est_tokens={component_budget.estimated_input_tokens})" | |
| ) | |
| for unit_index, review_unit in enumerate(review_units, start=1): | |
| prepared = _prepare_packet_for_llm( | |
| review_unit, | |
| options.model, | |
| split=len(review_units) > 1, | |
| ) | |
| if prepared is None: | |
| unit_budget = _estimate_packet_size(review_unit, options.model) | |
| completed_reviews.append( | |
| CompletedSoftPairReview( | |
| meta=_soft_pair_review_meta( | |
| label=label, | |
| component_index=index, | |
| component_count=total_components, | |
| review_unit_index=unit_index, | |
| review_unit_count=len(review_units), | |
| cluster_id=cluster_id, | |
| component_budget=component_budget, | |
| budget=unit_budget, | |
| prepared_review_unit_hash=None, | |
| trimmed=True, | |
| aggressively_trimmed=True, | |
| split=len(review_units) > 1, | |
| packet=review_unit, | |
| ), | |
| result=None, | |
| status="skipped", | |
| reason="over_budget_after_truncate", | |
| source=None, | |
| cache_hit=False, | |
| ) | |
| ) | |
| continue | |
| prepared_review_unit = _prepared_review_unit_payload(prepared) | |
| cache_key = build_hybrid_review_cache_key( | |
| manifest=hybrid_review_cache.manifest, | |
| model=options.model, | |
| prepared_review_unit=prepared_review_unit, | |
| ) | |
| meta = _soft_pair_review_meta( | |
| label=label, | |
| component_index=index, | |
| component_count=total_components, | |
| review_unit_index=unit_index, | |
| review_unit_count=len(review_units), | |
| cluster_id=cluster_id, | |
| component_budget=component_budget, | |
| budget=prepared.budget, | |
| prepared_review_unit_hash=cache_key.prepared_review_unit_hash, | |
| trimmed=prepared.trimmed, | |
| aggressively_trimmed=prepared.aggressively_trimmed, | |
| split=prepared.split, | |
| packet=prepared.packet, | |
| ) | |
| cached_entry = hybrid_review_cache.get(cache_key) | |
| if cached_entry is not None: | |
| completed_reviews.append( | |
| CompletedSoftPairReview( | |
| meta=meta, | |
| result=_cluster_analysis_call_result_from_payload(cached_entry.result), | |
| status=( | |
| "reviewed" | |
| if cached_entry.result.get("analyst_result") is not None | |
| else "error" | |
| ), | |
| reason=None, | |
| source="cache", | |
| cache_hit=True, | |
| ) | |
| ) | |
| continue | |
| if not llm_available: | |
| completed_reviews.append( | |
| CompletedSoftPairReview( | |
| meta=meta, | |
| result=None, | |
| status="skipped", | |
| reason="llm_unavailable_cache_miss", | |
| source=None, | |
| cache_hit=False, | |
| ) | |
| ) | |
| continue | |
| pending_reviews.append( | |
| PendingSoftPairReview( | |
| meta=meta, | |
| prepared=prepared, | |
| cache_key=cache_key, | |
| ) | |
| ) | |
| reviewed_from_cache = sum(1 for review in completed_reviews if review.cache_hit) | |
| skipped_reviews = sum(1 for review in completed_reviews if review.status == "skipped") | |
| _analysis_log( | |
| f"LLM {label} soft-edge review scheduling: " | |
| f"units={len(pending_reviews) + len(completed_reviews)}, " | |
| f"cache_hits={reviewed_from_cache}, " | |
| f"cache_misses={len(pending_reviews)}, " | |
| f"skipped={skipped_reviews}, " | |
| f"concurrency={options.hybrid_llm_concurrency}" | |
| ) | |
| completed_reviews.extend( | |
| await _run_pending_soft_pair_reviews( | |
| pending_reviews, | |
| concurrency=options.hybrid_llm_concurrency, | |
| model=options.model, | |
| review_semaphore=review_semaphore, | |
| ) | |
| ) | |
| pending_by_position = { | |
| (pending.meta.component_index, pending.meta.review_unit_index): pending | |
| for pending in pending_reviews | |
| } | |
| for review in sorted(completed_reviews, key=_completed_soft_pair_review_sort_key): | |
| accepted_nontrivial = False | |
| pending = pending_by_position.get( | |
| (review.meta.component_index, review.meta.review_unit_index) | |
| ) | |
| result = review.result | |
| if review.reason == "over_budget_after_truncate": | |
| _analysis_log( | |
| f"{review.meta.prefix}: skipped over-budget packet " | |
| f"(nodes={review.meta.budget.node_count}, soft_pairs={review.meta.budget.soft_pair_count}, " | |
| f"est_tokens={review.meta.budget.estimated_input_tokens}, overflow_policy={LLM_OVERFLOW_POLICY})" | |
| ) | |
| elif review.reason == "llm_unavailable_cache_miss": | |
| _analysis_log( | |
| f"{review.meta.prefix}: cache miss with fast-agent unavailable; " | |
| "keeping deterministic-only soft edges" | |
| ) | |
| else: | |
| if review.cache_hit: | |
| _analysis_log( | |
| f"{review.meta.prefix}: cache hit " | |
| f"(nodes={review.meta.budget.node_count}, soft_pairs={review.meta.budget.soft_pair_count}, " | |
| f"est_tokens={review.meta.budget.estimated_input_tokens}, model={options.model})" | |
| ) | |
| if result is None or result.analyst_result is None: | |
| if result is not None and result.error_kind is not None: | |
| _analysis_log( | |
| f"{review.meta.prefix}: {result.error_kind}" | |
| f" (nodes={review.meta.budget.node_count}, soft_pairs={review.meta.budget.soft_pair_count}, " | |
| f"est_tokens={review.meta.budget.estimated_input_tokens}, " | |
| f"overflow_policy={LLM_OVERFLOW_POLICY})" | |
| ) | |
| else: | |
| _analysis_log(f"{review.meta.prefix}: no result") | |
| else: | |
| llm_used = True | |
| verdicts = { | |
| _pair_key(verdict.left, verdict.right): verdict | |
| for verdict in result.analyst_result.soft_edge_verdicts | |
| } | |
| accepted_count = sum(1 for verdict in verdicts.values() if verdict.accept) | |
| rejected_count = sum(1 for verdict in verdicts.values() if not verdict.accept) | |
| accepted_nontrivial = any( | |
| verdicts.get(_pair_key(*pair_id.split("|", 1))) is not None | |
| and verdicts[_pair_key(*pair_id.split("|", 1))].accept | |
| and not bool( | |
| soft_candidates[_pair_key(*pair_id.split("|", 1))].get( | |
| "deterministic_accept", | |
| True, | |
| ) | |
| ) | |
| for pair_id in review.meta.soft_pairs | |
| ) | |
| evaluator_status = "used" if result.evaluator_used else "skipped" | |
| _analysis_log( | |
| f"{review.meta.prefix}: {accepted_count} accepted, {rejected_count} rejected, " | |
| f"evaluator={evaluator_status}, source={review.source}" | |
| ) | |
| if result.error_kind is not None: | |
| _analysis_log( | |
| f"{review.meta.prefix}: {result.error_kind}; keeping analyst result" | |
| ) | |
| for pair_id in review.meta.soft_pairs: | |
| normalized_pair = _pair_key(*pair_id.split("|", 1)) | |
| verdict = verdicts.get(normalized_pair) | |
| if verdict is None: | |
| continue | |
| if verdict.accept: | |
| accepted[normalized_pair] = soft_candidates[normalized_pair] | |
| else: | |
| accepted.pop(normalized_pair, None) | |
| if ( | |
| pending is not None | |
| and review.source == "llm" | |
| and _cacheable_cluster_analysis_result(result) | |
| ): | |
| hybrid_review_cache.put( | |
| HybridReviewCacheEntry( | |
| key=pending.cache_key, | |
| result=_cluster_analysis_call_result_payload(result), | |
| cached_at=_iso_now(), | |
| nodes=tuple(pending.prepared.packet["nodes"]), | |
| soft_pairs=tuple(_packet_soft_pair_ids(pending.prepared.packet)), | |
| budget=_packet_budget_json(pending.prepared.budget), | |
| split=pending.prepared.split, | |
| trimmed=pending.prepared.trimmed, | |
| aggressively_trimmed=pending.prepared.aggressively_trimmed, | |
| ) | |
| ) | |
| review_records.append( | |
| _soft_pair_review_record( | |
| review=review, | |
| model=options.model, | |
| accepted_nontrivial_soft_edge=accepted_nontrivial, | |
| ) | |
| ) | |
| return accepted, llm_used, review_records | |
| def _component_packets( | |
| features: dict[str, ArtifactFeature], | |
| pairs: dict[tuple[str, str], set[str]], | |
| soft_candidates: dict[tuple[str, str], dict[str, Any]], | |
| ) -> list[dict[str, Any]]: | |
| components = _connected_components(features, pairs) | |
| packets: list[dict[str, Any]] = [] | |
| for nodes in components: | |
| pair_members = { | |
| pair: evidence | |
| for pair, evidence in pairs.items() | |
| if pair[0] in nodes and pair[1] in nodes | |
| } | |
| soft_pairs = [ | |
| { | |
| "left": pair[0], | |
| "right": pair[1], | |
| "score": detail["score"], | |
| "jaccard": detail["jaccard"], | |
| "evidence_types": sorted(detail.get("evidence_types") or []), | |
| "shared_targets": detail.get("shared_targets") or [], | |
| "shared_filenames": detail.get("shared_filenames") or [], | |
| "deterministic_accept": bool(detail.get("deterministic_accept", True)), | |
| } | |
| for pair, detail in soft_candidates.items() | |
| if pair[0] in nodes and pair[1] in nodes | |
| ] | |
| packets.append( | |
| { | |
| "nodes": nodes, | |
| "items": [_cluster_item(features[node]) for node in nodes], | |
| "pair_evidence": { | |
| f"{left}|{right}": sorted(evidence) | |
| for (left, right), evidence in pair_members.items() | |
| }, | |
| "soft_pairs": soft_pairs, | |
| } | |
| ) | |
| return packets | |
| def _clusters( | |
| *, | |
| snapshot: SnapshotData, | |
| features: dict[str, ArtifactFeature], | |
| final_pairs: dict[tuple[str, str], set[str]], | |
| pair_target_issues: dict[tuple[str, str], set[int]], | |
| llm_cluster_payloads: dict[str, ClusterAnalystResponse], | |
| ) -> list[ClusterRecord]: | |
| clusters: list[ClusterRecord] = [] | |
| for nodes in _connected_components(features, final_pairs): | |
| issue_numbers = sorted( | |
| int(node.split(":", 1)[1]) for node in nodes if node.startswith("issue:") | |
| ) | |
| pr_numbers = sorted( | |
| int(node.split(":", 1)[1]) for node in nodes if node.startswith("pull_request:") | |
| ) | |
| evidence_types = sorted( | |
| { | |
| evidence | |
| for (left, right), evidences in final_pairs.items() | |
| if left in nodes and right in nodes | |
| for evidence in evidences | |
| } | |
| ) | |
| target_counter: Counter[int] = Counter() | |
| for pair, targets in pair_target_issues.items(): | |
| if pair[0] in nodes and pair[1] in nodes: | |
| target_counter.update(targets) | |
| target_issue_number = target_counter.most_common(1)[0][0] if target_counter else None | |
| llm_payload = llm_cluster_payloads.get(_cluster_id_from_nodes(nodes)) | |
| clusters.append( | |
| _cluster_record_from_members( | |
| features=features, | |
| issue_numbers=issue_numbers, | |
| pr_numbers=pr_numbers, | |
| evidence_types=evidence_types, | |
| target_issue_number=target_issue_number, | |
| llm_payload=llm_payload, | |
| ) | |
| ) | |
| return clusters | |
| def _cluster_record_from_members( | |
| *, | |
| features: dict[str, ArtifactFeature], | |
| issue_numbers: list[int], | |
| pr_numbers: list[int], | |
| evidence_types: list[str], | |
| target_issue_number: int | None, | |
| llm_payload: ClusterAnalystResponse | None = None, | |
| ) -> ClusterRecord: | |
| nodes = sorted( | |
| [f"issue:{number}" for number in issue_numbers] | |
| + [f"pull_request:{number}" for number in pr_numbers] | |
| ) | |
| cluster_id = _cluster_id_from_nodes(nodes) | |
| canonical_issue_number = _canonical_issue(issue_numbers, features) | |
| canonical_pr_number = _canonical_pr(pr_numbers, features) | |
| status = _cluster_status(issue_numbers, pr_numbers, features) | |
| confidence = _cluster_confidence(evidence_types) | |
| summary = _cluster_summary(issue_numbers, pr_numbers, target_issue_number, evidence_types) | |
| canonical_issue_reason = ( | |
| _canonical_issue_reason(canonical_issue_number, features, issue_numbers) | |
| if canonical_issue_number is not None | |
| else None | |
| ) | |
| canonical_pr_reason = ( | |
| _canonical_pr_reason(canonical_pr_number, features, pr_numbers) | |
| if canonical_pr_number is not None | |
| else None | |
| ) | |
| best_issue_reason = ( | |
| _best_issue_reason(canonical_issue_number, features, len(nodes)) | |
| if canonical_issue_number is not None | |
| else None | |
| ) | |
| best_pr_reason = ( | |
| _best_pr_reason(canonical_pr_number, features, len(nodes)) | |
| if canonical_pr_number is not None | |
| else None | |
| ) | |
| if llm_payload is not None: | |
| summary = llm_payload.summary or summary | |
| confidence = max(0.0, min(1.0, llm_payload.confidence)) | |
| canonical_issue_reason = llm_payload.canonical_issue_reason or canonical_issue_reason | |
| canonical_pr_reason = llm_payload.canonical_pr_reason or canonical_pr_reason | |
| best_issue_reason = llm_payload.best_issue_reason or best_issue_reason | |
| best_pr_reason = llm_payload.best_pr_reason or best_pr_reason | |
| cluster_score = _cluster_score(issue_numbers, pr_numbers, features, status) | |
| best_issue_score = ( | |
| _issue_score(canonical_issue_number, features, len(nodes)) | |
| if canonical_issue_number is not None | |
| else None | |
| ) | |
| best_pr_score = ( | |
| _pr_score(canonical_pr_number, features, len(nodes)) | |
| if canonical_pr_number is not None | |
| else None | |
| ) | |
| return ClusterRecord( | |
| cluster_id=cluster_id, | |
| nodes=nodes, | |
| issue_numbers=issue_numbers, | |
| pr_numbers=pr_numbers, | |
| evidence_types=evidence_types, | |
| canonical_issue_number=canonical_issue_number, | |
| canonical_pr_number=canonical_pr_number, | |
| target_issue_number=target_issue_number, | |
| summary=summary, | |
| status=status, | |
| confidence=confidence, | |
| canonical_issue_reason=canonical_issue_reason, | |
| canonical_pr_reason=canonical_pr_reason, | |
| best_issue_reason=best_issue_reason, | |
| best_pr_reason=best_pr_reason, | |
| cluster_score=cluster_score, | |
| best_issue_score=best_issue_score, | |
| best_pr_score=best_pr_score, | |
| ) | |
| def _meta_bug_clusters( | |
| *, | |
| features: dict[str, ArtifactFeature], | |
| issue_clusters: list[ClusterRecord], | |
| pr_clusters: list[ClusterRecord], | |
| explicit_issue_link_targets: defaultdict[int, set[int]], | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| ) -> list[ClusterRecord]: | |
| issue_cluster_by_issue: dict[int, ClusterRecord] = {} | |
| for cluster in issue_clusters: | |
| for issue_number in cluster.issue_numbers: | |
| issue_cluster_by_issue[issue_number] = cluster | |
| pr_cluster_by_pr: dict[int, ClusterRecord] = {} | |
| for cluster in pr_clusters: | |
| for pr_number in cluster.pr_numbers: | |
| pr_cluster_by_pr[pr_number] = cluster | |
| prs_by_target_issue: defaultdict[int, set[int]] = defaultdict(set) | |
| for pr_number, targets in explicit_issue_link_targets.items(): | |
| for target in targets: | |
| if target in issue_map: | |
| prs_by_target_issue[target].add(pr_number) | |
| issue_anchors: list[ClusterRecord] = list(issue_clusters) | |
| targeted_issue_numbers = sorted(prs_by_target_issue) | |
| for issue_number in targeted_issue_numbers: | |
| if issue_number in issue_cluster_by_issue: | |
| continue | |
| singleton = _cluster_record_from_members( | |
| features=features, | |
| issue_numbers=[issue_number], | |
| pr_numbers=[], | |
| evidence_types=["closing_reference"], | |
| target_issue_number=issue_number, | |
| ) | |
| issue_anchors.append(singleton) | |
| issue_cluster_by_issue[issue_number] = singleton | |
| pr_groups: list[ClusterRecord] = list(pr_clusters) | |
| for pr_number, targets in explicit_issue_link_targets.items(): | |
| if pr_number in pr_cluster_by_pr or pr_number not in pr_map: | |
| continue | |
| singleton = _cluster_record_from_members( | |
| features=features, | |
| issue_numbers=[], | |
| pr_numbers=[pr_number], | |
| evidence_types=["closing_reference"], | |
| target_issue_number=min(targets) if targets else None, | |
| ) | |
| pr_groups.append(singleton) | |
| pr_cluster_by_pr[pr_number] = singleton | |
| anchor_buckets: dict[str, dict[str, Any]] = {} | |
| issue_anchor_for_issue: dict[int, str] = {} | |
| for cluster in issue_anchors: | |
| has_attached_prs = any( | |
| prs_by_target_issue.get(issue_number) for issue_number in cluster.issue_numbers | |
| ) | |
| if len(cluster.issue_numbers) < 2 and not has_attached_prs: | |
| continue | |
| anchor_buckets[cluster.cluster_id] = { | |
| "issue_numbers": set(cluster.issue_numbers), | |
| "pr_numbers": set(), | |
| "evidence_types": set(cluster.evidence_types), | |
| "target_issue_number": cluster.canonical_issue_number | |
| or (cluster.issue_numbers[0] if cluster.issue_numbers else None), | |
| } | |
| for issue_number in cluster.issue_numbers: | |
| issue_anchor_for_issue[issue_number] = cluster.cluster_id | |
| attached_pr_clusters: set[str] = set() | |
| for cluster in pr_groups: | |
| anchor_id = _select_issue_anchor_for_pr_cluster( | |
| cluster=cluster, | |
| explicit_issue_link_targets=explicit_issue_link_targets, | |
| issue_map=issue_map, | |
| issue_anchor_for_issue=issue_anchor_for_issue, | |
| anchor_buckets=anchor_buckets, | |
| ) | |
| if anchor_id is not None: | |
| bucket = anchor_buckets[anchor_id] | |
| bucket["pr_numbers"].update(cluster.pr_numbers) | |
| bucket["evidence_types"].update(cluster.evidence_types) | |
| bucket["evidence_types"].add("closing_reference") | |
| attached_pr_clusters.add(cluster.cluster_id) | |
| meta_clusters: list[ClusterRecord] = [] | |
| for bucket in anchor_buckets.values(): | |
| if len(bucket["pr_numbers"]) < 2: | |
| continue | |
| meta_clusters.append( | |
| _cluster_record_from_members( | |
| features=features, | |
| issue_numbers=sorted(bucket["issue_numbers"]), | |
| pr_numbers=sorted(bucket["pr_numbers"]), | |
| evidence_types=sorted(bucket["evidence_types"]), | |
| target_issue_number=bucket["target_issue_number"], | |
| ) | |
| ) | |
| for cluster in pr_groups: | |
| if cluster.cluster_id in attached_pr_clusters: | |
| continue | |
| if len(cluster.pr_numbers) < 2: | |
| continue | |
| meta_clusters.append( | |
| _cluster_record_from_members( | |
| features=features, | |
| issue_numbers=[], | |
| pr_numbers=cluster.pr_numbers, | |
| evidence_types=cluster.evidence_types, | |
| target_issue_number=cluster.target_issue_number, | |
| ) | |
| ) | |
| return sorted( | |
| {cluster.cluster_id: cluster for cluster in meta_clusters}.values(), | |
| key=lambda cluster: cluster.cluster_id, | |
| ) | |
| def _select_issue_anchor_for_pr_cluster( | |
| *, | |
| cluster: ClusterRecord, | |
| explicit_issue_link_targets: defaultdict[int, set[int]], | |
| issue_map: dict[int, dict[str, Any]], | |
| issue_anchor_for_issue: dict[int, str], | |
| anchor_buckets: dict[str, dict[str, Any]], | |
| ) -> str | None: | |
| anchor_counts: Counter[str] = Counter() | |
| targeted_pr_count = 0 | |
| for pr_number in cluster.pr_numbers: | |
| anchor_ids = { | |
| issue_anchor_for_issue[target] | |
| for target in explicit_issue_link_targets.get(pr_number, set()) | |
| if target in issue_map and target in issue_anchor_for_issue | |
| } | |
| if not anchor_ids: | |
| continue | |
| targeted_pr_count += 1 | |
| anchor_counts.update(anchor_ids) | |
| if not anchor_counts or targeted_pr_count <= 0: | |
| return None | |
| ranked = sorted( | |
| anchor_counts.items(), | |
| key=lambda item: ( | |
| -item[1], | |
| min(anchor_buckets[item[0]]["issue_numbers"]), | |
| ), | |
| ) | |
| winner_id, winner_count = ranked[0] | |
| runner_up_count = ranked[1][1] if len(ranked) > 1 else 0 | |
| if winner_count <= runner_up_count: | |
| return None | |
| if winner_count * 2 < targeted_pr_count: | |
| return None | |
| return winner_id | |
| def _best_issue( | |
| clusters: list[ClusterRecord], features: dict[str, ArtifactFeature] | |
| ) -> BestIssueEntry | None: | |
| candidates = [cluster for cluster in clusters if cluster.canonical_issue_number is not None] | |
| if not candidates: | |
| return None | |
| winner = min( | |
| candidates, | |
| key=lambda cluster: ( | |
| 0 | |
| if features[f"issue:{cluster.canonical_issue_number}"].row.get("state") == "open" | |
| else 1, | |
| -len(cluster.nodes), | |
| -features[f"issue:{cluster.canonical_issue_number}"].discussion_activity, | |
| -features[f"issue:{cluster.canonical_issue_number}"].inbound_references, | |
| _sort_timestamp( | |
| features[f"issue:{cluster.canonical_issue_number}"].row.get("created_at") | |
| ), | |
| cluster.canonical_issue_number, | |
| ), | |
| ) | |
| issue_number = winner.canonical_issue_number | |
| assert issue_number is not None | |
| issue_reason = winner.best_issue_reason | |
| if issue_reason is None: | |
| issue_reason = _best_issue_reason(issue_number, features, len(winner.nodes)) | |
| assert issue_reason is not None | |
| return BestIssueEntry( | |
| cluster_id=winner.cluster_id, | |
| issue_number=issue_number, | |
| reason=issue_reason, | |
| score=round(float(winner.best_issue_score or 0.0), 3), | |
| ) | |
| def _best_pr( | |
| clusters: list[ClusterRecord], features: dict[str, ArtifactFeature] | |
| ) -> BestPrEntry | None: | |
| candidates = [cluster for cluster in clusters if cluster.canonical_pr_number is not None] | |
| if not candidates: | |
| return None | |
| open_candidates = [ | |
| cluster | |
| for cluster in candidates | |
| if features[f"pull_request:{cluster.canonical_pr_number}"].row.get("state") == "open" | |
| and not bool(features[f"pull_request:{cluster.canonical_pr_number}"].row.get("draft")) | |
| ] | |
| pool = ( | |
| open_candidates | |
| or [ | |
| cluster | |
| for cluster in candidates | |
| if bool(features[f"pull_request:{cluster.canonical_pr_number}"].row.get("merged")) | |
| ] | |
| or candidates | |
| ) | |
| winner = min( | |
| pool, | |
| key=lambda cluster: ( | |
| 0 | |
| if features[f"pull_request:{cluster.canonical_pr_number}"].row.get("state") == "open" | |
| and not bool(features[f"pull_request:{cluster.canonical_pr_number}"].row.get("draft")) | |
| else 1, | |
| -len(cluster.nodes), | |
| -features[f"pull_request:{cluster.canonical_pr_number}"].explicit_issue_links, | |
| -( | |
| features[f"pull_request:{cluster.canonical_pr_number}"].discussion_activity | |
| + features[f"pull_request:{cluster.canonical_pr_number}"].review_activity | |
| ), | |
| features[f"pull_request:{cluster.canonical_pr_number}"].diff_size, | |
| _sort_timestamp( | |
| features[f"pull_request:{cluster.canonical_pr_number}"].row.get("created_at") | |
| ), | |
| cluster.canonical_pr_number, | |
| ), | |
| ) | |
| pr_number = winner.canonical_pr_number | |
| assert pr_number is not None | |
| pr_reason = winner.best_pr_reason | |
| if pr_reason is None: | |
| pr_reason = _best_pr_reason(pr_number, features, len(winner.nodes)) | |
| assert pr_reason is not None | |
| return BestPrEntry( | |
| cluster_id=winner.cluster_id, | |
| pr_number=pr_number, | |
| reason=pr_reason, | |
| score=round(float(winner.best_pr_score or 0.0), 3), | |
| ) | |
| def _resolve_source_node( | |
| row: dict[str, Any], | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| comment_map: dict[int, dict[str, Any]], | |
| review_map: dict[int, dict[str, Any]], | |
| review_comment_map: dict[int, dict[str, Any]], | |
| ) -> str | None: | |
| source_type = row.get("source_type") | |
| source_number = row.get("source_number") | |
| if source_type in {"issue", "pull_request"} and source_number is not None: | |
| return _node_from_number(int(source_number), issue_map, pr_map) | |
| source_id = row.get("source_github_id") | |
| if source_type == "comment" and source_id is not None: | |
| comment = comment_map.get(int(source_id)) | |
| if comment and comment.get("parent_number") is not None: | |
| parent_kind = comment.get("parent_kind") | |
| if parent_kind in {"issue", "pull_request"}: | |
| return _node_from_number(int(comment["parent_number"]), issue_map, pr_map) | |
| if source_type == "review" and source_id is not None: | |
| review = review_map.get(int(source_id)) | |
| if review: | |
| return _node_from_number(int(review["pull_request_number"]), issue_map, pr_map) | |
| if source_type == "review_comment" and source_id is not None: | |
| review_comment = review_comment_map.get(int(source_id)) | |
| if review_comment: | |
| return _node_from_number(int(review_comment["pull_request_number"]), issue_map, pr_map) | |
| if source_number is None: | |
| return None | |
| return _node_from_number(int(source_number), issue_map, pr_map) | |
| def _resolve_target_node( | |
| repo: str, | |
| row: dict[str, Any], | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| ) -> str | None: | |
| if ( | |
| row.get("target_owner") != repo.split("/", 1)[0] | |
| or row.get("target_repo") != repo.split("/", 1)[1] | |
| ): | |
| return None | |
| target_number = row.get("target_number") | |
| if target_number is None: | |
| return None | |
| return _node_from_number(int(target_number), issue_map, pr_map) | |
| def _node_from_number( | |
| number: int, issue_map: dict[int, dict[str, Any]], pr_map: dict[int, dict[str, Any]] | |
| ) -> str | None: | |
| if number in issue_map: | |
| return f"issue:{number}" | |
| if number in pr_map: | |
| return f"pull_request:{number}" | |
| return None | |
| def _connected_components( | |
| features: dict[str, ArtifactFeature], | |
| pairs: dict[tuple[str, str], set[str]], | |
| ) -> list[list[str]]: | |
| adjacency: defaultdict[str, set[str]] = defaultdict(set) | |
| for left, right in pairs: | |
| adjacency[left].add(right) | |
| adjacency[right].add(left) | |
| visited: set[str] = set() | |
| components: list[list[str]] = [] | |
| for node in sorted(adjacency): | |
| if node in visited: | |
| continue | |
| stack = [node] | |
| component: list[str] = [] | |
| while stack: | |
| current = stack.pop() | |
| if current in visited: | |
| continue | |
| visited.add(current) | |
| component.append(current) | |
| stack.extend(sorted(adjacency[current] - visited)) | |
| components.append(sorted(component)) | |
| return components | |
| def _canonical_issue(issue_numbers: list[int], features: dict[str, ArtifactFeature]) -> int | None: | |
| if not issue_numbers: | |
| return None | |
| return min( | |
| issue_numbers, | |
| key=lambda number: ( | |
| 0 if features[f"issue:{number}"].row.get("state") == "open" else 1, | |
| -features[f"issue:{number}"].inbound_references, | |
| -features[f"issue:{number}"].discussion_activity, | |
| _sort_timestamp(features[f"issue:{number}"].row.get("created_at")), | |
| number, | |
| ), | |
| ) | |
| def _canonical_pr(pr_numbers: list[int], features: dict[str, ArtifactFeature]) -> int | None: | |
| if not pr_numbers: | |
| return None | |
| return min( | |
| pr_numbers, | |
| key=lambda number: ( | |
| 0 if bool(features[f"pull_request:{number}"].row.get("merged")) else 1, | |
| 0 | |
| if features[f"pull_request:{number}"].row.get("state") == "open" | |
| and not bool(features[f"pull_request:{number}"].row.get("draft")) | |
| else 1, | |
| -features[f"pull_request:{number}"].explicit_issue_links, | |
| -( | |
| features[f"pull_request:{number}"].discussion_activity | |
| + features[f"pull_request:{number}"].review_activity | |
| ), | |
| features[f"pull_request:{number}"].diff_size, | |
| _sort_timestamp(features[f"pull_request:{number}"].row.get("created_at")), | |
| number, | |
| ), | |
| ) | |
| def _cluster_status( | |
| issue_numbers: list[int], pr_numbers: list[int], features: dict[str, ArtifactFeature] | |
| ) -> str: | |
| if any(features[f"issue:{number}"].row.get("state") == "open" for number in issue_numbers): | |
| return "open" | |
| if any( | |
| features[f"pull_request:{number}"].row.get("state") == "open" | |
| and not bool(features[f"pull_request:{number}"].row.get("draft")) | |
| for number in pr_numbers | |
| ): | |
| return "open" | |
| if any(bool(features[f"pull_request:{number}"].row.get("merged")) for number in pr_numbers): | |
| return "merged" | |
| return "closed" | |
| def _cluster_confidence(evidence_types: list[str]) -> float: | |
| confidence = 0.45 | |
| if "duplicate_reference" in evidence_types: | |
| confidence += 0.25 | |
| if "shared_issue_target" in evidence_types: | |
| confidence += 0.2 | |
| if "closing_reference" in evidence_types: | |
| confidence += 0.1 | |
| if "timeline:cross-referenced" in evidence_types: | |
| confidence += 0.1 | |
| if "soft_similarity" in evidence_types: | |
| confidence += 0.05 | |
| return min(confidence, 0.99) | |
| def _cluster_summary( | |
| issue_numbers: list[int], | |
| pr_numbers: list[int], | |
| target_issue_number: int | None, | |
| evidence_types: list[str], | |
| ) -> str: | |
| if issue_numbers and pr_numbers and target_issue_number is not None: | |
| return f"Cluster of {len(issue_numbers)} issues and {len(pr_numbers)} PRs centered on issue #{target_issue_number}." | |
| if pr_numbers and target_issue_number is not None: | |
| return f"Cluster of {len(pr_numbers)} PRs targeting issue #{target_issue_number}." | |
| if issue_numbers: | |
| return f"Cluster of {len(issue_numbers)} related issues linked by {', '.join(evidence_types[:2]) or 'duplicate evidence'}." | |
| return f"Cluster of {len(pr_numbers)} related pull requests linked by {', '.join(evidence_types[:2]) or 'shared evidence'}." | |
| def _cluster_score( | |
| issue_numbers: list[int], | |
| pr_numbers: list[int], | |
| features: dict[str, ArtifactFeature], | |
| status: str, | |
| ) -> float: | |
| cluster_size = len(issue_numbers) + len(pr_numbers) | |
| has_mixed = 1 if issue_numbers and pr_numbers else 0 | |
| duplicate_pressure = max(len(issue_numbers) - 1, 0) + max(len(pr_numbers) - 1, 0) | |
| open_bonus = 1 if status == "open" else 0 | |
| discussion = sum( | |
| features[f"issue:{number}"].discussion_activity for number in issue_numbers | |
| ) + sum( | |
| features[f"pull_request:{number}"].discussion_activity | |
| + features[f"pull_request:{number}"].review_activity | |
| for number in pr_numbers | |
| ) | |
| return float( | |
| cluster_size * 100 + has_mixed * 50 + duplicate_pressure * 25 + open_bonus * 20 + discussion | |
| ) | |
| def _issue_score( | |
| number: int | None, features: dict[str, ArtifactFeature], cluster_size: int | |
| ) -> float | None: | |
| if number is None: | |
| return None | |
| feature = features[f"issue:{number}"] | |
| score = 0.0 | |
| if feature.row.get("state") == "open": | |
| score += 100.0 | |
| score += cluster_size * 10.0 | |
| score += feature.discussion_activity * 2.0 | |
| score += feature.inbound_references | |
| return score | |
| def _pr_score( | |
| number: int | None, features: dict[str, ArtifactFeature], cluster_size: int | |
| ) -> float | None: | |
| if number is None: | |
| return None | |
| feature = features[f"pull_request:{number}"] | |
| score = 0.0 | |
| if feature.row.get("state") == "open" and not bool(feature.row.get("draft")): | |
| score += 120.0 | |
| elif bool(feature.row.get("merged")): | |
| score += 60.0 | |
| score += cluster_size * 10.0 | |
| score += feature.explicit_issue_links * 5.0 | |
| score += (feature.discussion_activity + feature.review_activity) * 2.0 | |
| score -= feature.diff_size / 1000.0 | |
| return score | |
| def _canonical_issue_reason( | |
| number: int | None, features: dict[str, ArtifactFeature], issue_numbers: list[int] | |
| ) -> str | None: | |
| if number is None: | |
| return None | |
| feature = features[f"issue:{number}"] | |
| return ( | |
| f"Issue #{number} is canonical because it is {'open' if feature.row.get('state') == 'open' else 'closed'}, " | |
| f"has {feature.inbound_references} inbound references, and has the strongest discussion signal in a cluster of {len(issue_numbers)} issues." | |
| ) | |
| def _canonical_pr_reason( | |
| number: int | None, features: dict[str, ArtifactFeature], pr_numbers: list[int] | |
| ) -> str | None: | |
| if number is None: | |
| return None | |
| feature = features[f"pull_request:{number}"] | |
| review_signal = feature.discussion_activity + feature.review_activity | |
| return ( | |
| f"PR #{number} is canonical because it is {'merged' if feature.row.get('merged') else 'open' if feature.row.get('state') == 'open' else 'closed'}, " | |
| f"links to {feature.explicit_issue_links} issues, and has {review_signal} review/discussion events across {len(pr_numbers)} related PRs." | |
| ) | |
| def _best_issue_reason( | |
| number: int | None, features: dict[str, ArtifactFeature], cluster_size: int | |
| ) -> str | None: | |
| if number is None: | |
| return None | |
| feature = features[f"issue:{number}"] | |
| return ( | |
| f"Issue #{number} is the strongest global issue candidate because it is {'open' if feature.row.get('state') == 'open' else 'closed'}, " | |
| f"belongs to a cluster with {cluster_size} artifacts, and carries {feature.discussion_activity} discussion comments plus {feature.inbound_references} inbound references." | |
| ) | |
| def _best_pr_reason( | |
| number: int | None, features: dict[str, ArtifactFeature], cluster_size: int | |
| ) -> str | None: | |
| if number is None: | |
| return None | |
| feature = features[f"pull_request:{number}"] | |
| return ( | |
| f"PR #{number} is the strongest global PR candidate because it is {'open' if feature.row.get('state') == 'open' else 'merged' if feature.row.get('merged') else 'closed'}, " | |
| f"belongs to a cluster with {cluster_size} artifacts, links to {feature.explicit_issue_links} issues, and carries {feature.discussion_activity + feature.review_activity} review/discussion events." | |
| ) | |
| def _duplicate_issue_reason(cluster: ClusterRecord) -> str: | |
| return f"Issues in {cluster.cluster_id} are treated as duplicates because they share {', '.join(cluster.evidence_types)} evidence." | |
| def _duplicate_pr_reason(cluster: ClusterRecord) -> str: | |
| if cluster.target_issue_number is not None: | |
| return f"PRs in {cluster.cluster_id} are treated as duplicates because they converge on issue #{cluster.target_issue_number} with {', '.join(cluster.evidence_types)} evidence." | |
| return f"PRs in {cluster.cluster_id} are treated as duplicates because they share {', '.join(cluster.evidence_types)} evidence." | |
| def _cluster_pr_comparisons( | |
| cluster: ClusterRecord, features: dict[str, ArtifactFeature] | |
| ) -> list[PrComparisonEntry]: | |
| comparisons: list[PrComparisonEntry] = [] | |
| numbers = sorted(cluster.pr_numbers) | |
| for index, left_number in enumerate(numbers): | |
| left = features[f"pull_request:{left_number}"] | |
| for right_number in numbers[index + 1 :]: | |
| right = features[f"pull_request:{right_number}"] | |
| comparisons.append(_pr_comparison(left, right)) | |
| return comparisons | |
| def _pr_comparison(left: ArtifactFeature, right: ArtifactFeature) -> PrComparisonEntry: | |
| shared_filenames = sorted(set(left.filenames) & set(right.filenames)) | |
| size_similarity = _size_similarity(left.diff_size, right.diff_size) | |
| file_overlap = _jaccard_sets(set(left.filenames), set(right.filenames)) | |
| area_overlap, shared_file_areas = _file_area_overlap( | |
| left.file_ranges_by_name, right.file_ranges_by_name | |
| ) | |
| patch_similarity = _jaccard_sets(set(left.patch_tokens), set(right.patch_tokens)) | |
| code_similarity = ( | |
| size_similarity * 0.20 + file_overlap * 0.30 + area_overlap * 0.35 + patch_similarity * 0.15 | |
| ) | |
| return PrComparisonEntry( | |
| left_pr_number=left.number, | |
| right_pr_number=right.number, | |
| code_similarity=round(code_similarity, 3), | |
| size_similarity=round(size_similarity, 3), | |
| file_overlap=round(file_overlap, 3), | |
| area_overlap=round(area_overlap, 3), | |
| patch_similarity=round(patch_similarity, 3), | |
| shared_filenames=shared_filenames, | |
| shared_file_areas=shared_file_areas, | |
| ) | |
| def _cluster_item(feature: ArtifactFeature) -> dict[str, Any]: | |
| return { | |
| "node_id": feature.node_id, | |
| "kind": feature.kind, | |
| "number": feature.number, | |
| "title": feature.row.get("title"), | |
| "state": feature.row.get("state"), | |
| "draft": feature.row.get("draft"), | |
| "merged": feature.row.get("merged"), | |
| "created_at": feature.row.get("created_at"), | |
| "body_length": feature.body_length, | |
| "discussion_activity": feature.discussion_activity, | |
| "review_activity": feature.review_activity, | |
| "inbound_references": feature.inbound_references, | |
| "explicit_issue_links": feature.explicit_issue_links, | |
| "explicit_issue_targets": feature.explicit_issue_targets, | |
| "diff_size": feature.diff_size, | |
| "filenames": feature.filenames[:20], | |
| "diff_preview": feature.diff_preview, | |
| } | |
| async def _fast_agent_cluster_analysis( | |
| prepared: PreparedLlmPacket, model: str | |
| ) -> ClusterAnalysisCallResult: | |
| try: | |
| from fast_agent import FastAgent | |
| except Exception as exc: | |
| return ClusterAnalysisCallResult( | |
| analyst_result=None, | |
| evaluator_result=None, | |
| error_kind=_classify_llm_error(exc), | |
| error_message=_summarize_llm_error(exc), | |
| evaluator_used=False, | |
| retried=False, | |
| ) | |
| fast = FastAgent("slop-farmer-analysis") | |
| async def analyst_stub() -> None: | |
| return None | |
| async def evaluator_stub() -> None: | |
| return None | |
| packet = prepared.packet | |
| prompt = json.dumps(packet, indent=2, sort_keys=True) | |
| try: | |
| async with fast.run() as agent: | |
| analyst_result, _ = await agent.cluster_analyst.structured( | |
| prompt, ClusterAnalystResponse | |
| ) | |
| if analyst_result is None: | |
| return ClusterAnalysisCallResult( | |
| analyst_result=None, | |
| evaluator_result=None, | |
| error_kind=None, | |
| error_message=None, | |
| evaluator_used=False, | |
| retried=False, | |
| ) | |
| if not _should_run_evaluator( | |
| packet, | |
| prepared.budget, | |
| split=prepared.split, | |
| aggressively_trimmed=prepared.aggressively_trimmed, | |
| analyst_result=analyst_result, | |
| ): | |
| return ClusterAnalysisCallResult( | |
| analyst_result=analyst_result, | |
| evaluator_result=None, | |
| error_kind=None, | |
| error_message=None, | |
| evaluator_used=False, | |
| retried=False, | |
| ) | |
| evaluation_prompt = json.dumps( | |
| {"packet": packet, "analyst_result": analyst_result.model_dump(mode="json")}, | |
| indent=2, | |
| sort_keys=True, | |
| ) | |
| try: | |
| evaluation_result, _ = await agent.cluster_evaluator.structured( | |
| evaluation_prompt, ClusterEvaluatorResponse | |
| ) | |
| except Exception as exc: | |
| return ClusterAnalysisCallResult( | |
| analyst_result=analyst_result, | |
| evaluator_result=None, | |
| error_kind=_classify_llm_error(exc), | |
| error_message=_summarize_llm_error(exc), | |
| evaluator_used=True, | |
| retried=False, | |
| ) | |
| if evaluation_result is None or evaluation_result.accept: | |
| return ClusterAnalysisCallResult( | |
| analyst_result=analyst_result, | |
| evaluator_result=evaluation_result, | |
| error_kind=None, | |
| error_message=None, | |
| evaluator_used=True, | |
| retried=False, | |
| ) | |
| retry_prompt = json.dumps( | |
| { | |
| "packet": packet, | |
| "previous_result": analyst_result.model_dump(mode="json"), | |
| "feedback": evaluation_result.feedback, | |
| }, | |
| indent=2, | |
| sort_keys=True, | |
| ) | |
| try: | |
| retry_result, _ = await agent.cluster_analyst.structured( | |
| retry_prompt, ClusterAnalystResponse | |
| ) | |
| except Exception as exc: | |
| return ClusterAnalysisCallResult( | |
| analyst_result=analyst_result, | |
| evaluator_result=evaluation_result, | |
| error_kind=_classify_llm_error(exc), | |
| error_message=_summarize_llm_error(exc), | |
| evaluator_used=True, | |
| retried=True, | |
| ) | |
| return ClusterAnalysisCallResult( | |
| analyst_result=retry_result or analyst_result, | |
| evaluator_result=evaluation_result, | |
| error_kind=None, | |
| error_message=None, | |
| evaluator_used=True, | |
| retried=True, | |
| ) | |
| except Exception as exc: | |
| return ClusterAnalysisCallResult( | |
| analyst_result=None, | |
| evaluator_result=None, | |
| error_kind=_classify_llm_error(exc), | |
| error_message=_summarize_llm_error(exc), | |
| evaluator_used=False, | |
| retried=False, | |
| ) | |
| def _can_use_fast_agent() -> bool: | |
| try: | |
| import fast_agent # noqa: F401 | |
| except Exception: | |
| return False | |
| return any(os.environ.get(name) for name in LLM_PROVIDER_ENV_VARS) | |
| def _tokenize(text: str | None, *, remove_stopwords: bool) -> list[str]: | |
| tokens = TOKEN_PATTERN.findall((text or "").lower()) | |
| if not remove_stopwords: | |
| return tokens | |
| return [token for token in tokens if token not in STOPWORDS] | |
| def _strip_pull_request_template( | |
| body: str | None, | |
| *, | |
| settings: Any | None = None, | |
| ) -> str: | |
| return strip_pull_request_template(body, settings=settings) | |
| def _patch_ranges(patch: str) -> list[tuple[int, int]]: | |
| ranges: list[tuple[int, int]] = [] | |
| for line in patch.splitlines(): | |
| match = HUNK_HEADER_PATTERN.match(line) | |
| if match is None: | |
| continue | |
| start = int(match.group("start")) | |
| count = int(match.group("count") or "1") | |
| end = start if count == 0 else start + count - 1 | |
| ranges.append((start, end)) | |
| return ranges | |
| def _patch_content_tokens(patch: str) -> list[str]: | |
| lines = [] | |
| for line in patch.splitlines(): | |
| if line.startswith("+++") or line.startswith("---"): | |
| continue | |
| if line.startswith("+") or line.startswith("-"): | |
| lines.append(line[1:]) | |
| return _tokenize("\n".join(lines), remove_stopwords=True) | |
| def _size_similarity(left: int, right: int) -> float: | |
| largest = max(left, right) | |
| if largest <= 0: | |
| return 1.0 | |
| return min(left, right) / largest | |
| def _file_area_overlap( | |
| left_ranges_by_name: dict[str, list[tuple[int, int]]], | |
| right_ranges_by_name: dict[str, list[tuple[int, int]]], | |
| ) -> tuple[float, list[PrFileAreaEntry]]: | |
| shared_names = sorted(set(left_ranges_by_name) & set(right_ranges_by_name)) | |
| if not shared_names: | |
| return 0.0, [] | |
| total_overlap = 0 | |
| total_union = 0 | |
| entries: list[PrFileAreaEntry] = [] | |
| for filename in shared_names: | |
| left_ranges = _merge_ranges(left_ranges_by_name.get(filename) or []) | |
| right_ranges = _merge_ranges(right_ranges_by_name.get(filename) or []) | |
| overlap = _ranges_overlap_size(left_ranges, right_ranges) | |
| union = _ranges_size(_merge_ranges([*left_ranges, *right_ranges])) | |
| total_overlap += overlap | |
| total_union += union | |
| entries.append( | |
| PrFileAreaEntry( | |
| filename=filename, | |
| left_ranges=[[start, end] for start, end in left_ranges], | |
| right_ranges=[[start, end] for start, end in right_ranges], | |
| ) | |
| ) | |
| if total_union == 0: | |
| return 0.0, entries | |
| return total_overlap / total_union, entries | |
| def _merge_ranges(ranges: list[tuple[int, int]]) -> list[tuple[int, int]]: | |
| if not ranges: | |
| return [] | |
| merged: list[tuple[int, int]] = [] | |
| for start, end in sorted(ranges): | |
| if not merged or start > merged[-1][1] + 1: | |
| merged.append((start, end)) | |
| continue | |
| merged[-1] = (merged[-1][0], max(merged[-1][1], end)) | |
| return merged | |
| def _ranges_size(ranges: list[tuple[int, int]]) -> int: | |
| return sum(end - start + 1 for start, end in ranges) | |
| def _ranges_overlap_size(left: list[tuple[int, int]], right: list[tuple[int, int]]) -> int: | |
| overlap = 0 | |
| left_index = 0 | |
| right_index = 0 | |
| while left_index < len(left) and right_index < len(right): | |
| left_start, left_end = left[left_index] | |
| right_start, right_end = right[right_index] | |
| overlap_start = max(left_start, right_start) | |
| overlap_end = min(left_end, right_end) | |
| if overlap_start <= overlap_end: | |
| overlap += overlap_end - overlap_start + 1 | |
| if left_end <= right_end: | |
| left_index += 1 | |
| else: | |
| right_index += 1 | |
| return overlap | |
| def _days_between(left: str | None, right: str | None) -> int: | |
| if not left or not right: | |
| return 10**9 | |
| return abs((_parse_dt(left) - _parse_dt(right)).days) | |
| def _parse_dt(value: str) -> datetime: | |
| return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(UTC) | |
| def _sort_timestamp(value: str | None) -> str: | |
| return value or "9999-99-99T99:99:99Z" | |
| def _pair_key(left: str, right: str) -> tuple[str, str]: | |
| return (left, right) if left <= right else (right, left) | |
| def _jaccard(left: list[str], right: list[str]) -> float: | |
| return _jaccard_sets(set(left), set(right)) | |
| def _jaccard_sets(left_set: set[str], right_set: set[str]) -> float: | |
| if not left_set or not right_set: | |
| return 0.0 | |
| return len(left_set & right_set) / len(left_set | right_set) | |
| def _cluster_id_from_nodes(nodes: list[str]) -> str: | |
| numbers = sorted(int(node.split(":", 1)[1]) for node in nodes) | |
| return f"cluster-{numbers[0]}-{len(nodes)}" | |