from __future__ import annotations from dataclasses import dataclass from pathlib import Path from typing import Any, Literal from slop_farmer.data.parquet_io import read_json, read_parquet_rows from slop_farmer.data.snapshot_paths import ( CURRENT_ANALYSIS_MANIFEST_PATH, load_current_analysis_manifest, repo_relative_path_to_local, ) AnalysisVariant = Literal["auto", "hybrid", "deterministic"] @dataclass(slots=True, frozen=True) class _SnapshotMetadata: repo: str snapshot_id: str @dataclass(slots=True, frozen=True) class _AnalysisSelection: path: Path payload: dict[str, Any] variant_used: str llm_enrichment: bool def get_snapshot_surfaces(snapshot_dir: Path) -> dict[str, Any]: issue_status = get_issue_cluster_status(snapshot_dir, variant="auto") contributor_status = get_contributor_status(snapshot_dir) return { "issues": { "available": issue_status["available"], "variant_used": issue_status.get("variant_used"), "llm_enrichment": issue_status.get("llm_enrichment"), "generated_at": issue_status.get("generated_at"), "cluster_count": (issue_status.get("counts") or {}).get("meta_bugs", 0), "duplicate_pr_count": (issue_status.get("counts") or {}).get("duplicate_prs", 0), "available_variants": issue_status.get("available_variants") or [], }, "contributors": { "available": contributor_status["available"], "generated_at": contributor_status.get("generated_at"), "contributor_count": contributor_status.get("contributor_count", 0), }, } def get_issue_cluster_status(snapshot_dir: Path, *, variant: AnalysisVariant) -> dict[str, Any]: metadata = _snapshot_metadata(snapshot_dir) candidates = _analysis_candidates(snapshot_dir) selection = _select_analysis_report(candidates, variant=variant) status = { "repo": metadata.repo, "snapshot_id": metadata.snapshot_id, "variant_requested": variant, "available": selection is not None, "available_variants": sorted({candidate["variant"] for candidate in candidates}), } if selection is None: return { **status, "variant_used": None, "llm_enrichment": False, "generated_at": None, "report_path": None, "counts": {"meta_bugs": 0, "duplicate_issues": 0, "duplicate_prs": 0}, } payload = selection.payload return { **status, "variant_used": selection.variant_used, "llm_enrichment": selection.llm_enrichment, "generated_at": payload.get("generated_at"), "report_path": selection.path.name, "counts": _analysis_counts(payload), } def list_issue_clusters( snapshot_dir: Path, *, limit: int | None, variant: AnalysisVariant, ) -> dict[str, Any]: metadata, selection, issue_map, pr_map = _analysis_context(snapshot_dir, variant=variant) base = _analysis_base_payload(metadata, selection, variant=variant) if selection is None: return {**base, "clusters": [], "cluster_count": 0} clusters = [ _issue_cluster_summary(cluster, issue_map, pr_map, rank=index) for index, cluster in enumerate(selection.payload.get("meta_bugs") or [], start=1) ] total = len(clusters) return { **base, "clusters": clusters[:limit] if limit is not None else clusters, "cluster_count": total, } def get_issue_cluster( snapshot_dir: Path, *, cluster_id: str, variant: AnalysisVariant, ) -> dict[str, Any]: metadata, selection, issue_map, pr_map = _analysis_context(snapshot_dir, variant=variant) base = _analysis_base_payload(metadata, selection, variant=variant) if selection is None: return { **base, "cluster_id": cluster_id, "found": False, "cluster": None, "issues": [], "pull_requests": [], } cluster = next( ( row for row in selection.payload.get("meta_bugs") or [] if str(row.get("cluster_id") or "") == cluster_id ), None, ) if cluster is None: return { **base, "cluster_id": cluster_id, "found": False, "cluster": None, "issues": [], "pull_requests": [], } issue_numbers = _ordered_ints(cluster.get("issue_numbers")) pr_numbers = _ordered_ints(cluster.get("pr_numbers")) canonical_pr_number = _coerce_int(cluster.get("canonical_pr_number")) return { **base, "cluster_id": cluster_id, "found": True, "cluster": { **_issue_cluster_summary(cluster, issue_map, pr_map), "canonical_issue_reason": cluster.get("canonical_issue_reason"), "canonical_pr_reason": cluster.get("canonical_pr_reason"), "best_issue_reason": cluster.get("best_issue_reason"), "best_pr_reason": cluster.get("best_pr_reason"), }, "issues": [_issue_member_row(number, issue_map.get(number)) for number in issue_numbers], "pull_requests": [ _pr_member_row( number, pr_map.get(number), role="canonical" if canonical_pr_number == number else "member", ) for number in pr_numbers ], } def get_issue_clusters_for_pr( snapshot_dir: Path, *, pr_number: int, variant: AnalysisVariant, ) -> dict[str, Any]: metadata, selection, issue_map, pr_map = _analysis_context(snapshot_dir, variant=variant) base = _analysis_base_payload(metadata, selection, variant=variant) if selection is None: return {**base, "pr_number": pr_number, "found": False, "clusters": [], "cluster_count": 0} matches = [] for index, cluster in enumerate(selection.payload.get("meta_bugs") or [], start=1): pr_numbers = _ordered_ints(cluster.get("pr_numbers")) if pr_number not in pr_numbers: continue canonical_pr_number = _coerce_int(cluster.get("canonical_pr_number")) matches.append( { **_issue_cluster_summary(cluster, issue_map, pr_map, rank=index), "membership_role": "canonical" if canonical_pr_number == pr_number else "member", } ) return { **base, "pr_number": pr_number, "found": bool(matches), "clusters": matches, "cluster_count": len(matches), } def check_issue_cluster_membership( snapshot_dir: Path, *, pr_number: int, cluster_id: str | None, variant: AnalysisVariant, ) -> dict[str, Any]: lookup = get_issue_clusters_for_pr(snapshot_dir, pr_number=pr_number, variant=variant) matches = list(lookup.get("clusters") or []) matching_cluster_ids = [str(row.get("cluster_id")) for row in matches if row.get("cluster_id")] if cluster_id is None: return { **lookup, "cluster_id": None, "matched": bool(matching_cluster_ids), "matching_cluster_ids": matching_cluster_ids, } match = next((row for row in matches if row.get("cluster_id") == cluster_id), None) return { **lookup, "cluster_id": cluster_id, "matched": match is not None, "matching_cluster_ids": matching_cluster_ids, "membership": match, } def list_issue_duplicate_prs( snapshot_dir: Path, *, limit: int | None, variant: AnalysisVariant, ) -> dict[str, Any]: metadata, selection, issue_map, pr_map = _analysis_context(snapshot_dir, variant=variant) base = _analysis_base_payload(metadata, selection, variant=variant) if selection is None: return {**base, "duplicate_prs": [], "duplicate_pr_count": 0} rows = [ _duplicate_pr_summary(entry, issue_map, pr_map, rank=index) for index, entry in enumerate(selection.payload.get("duplicate_prs") or [], start=1) ] total = len(rows) return { **base, "duplicate_prs": rows[:limit] if limit is not None else rows, "duplicate_pr_count": total, } def get_issue_best(snapshot_dir: Path, *, variant: AnalysisVariant) -> dict[str, Any]: metadata, selection, issue_map, pr_map = _analysis_context(snapshot_dir, variant=variant) base = _analysis_base_payload(metadata, selection, variant=variant) if selection is None: return {**base, "best_issue": None, "best_pr": None} return { **base, "best_issue": _best_issue_summary(selection.payload.get("best_issue"), issue_map), "best_pr": _best_pr_summary(selection.payload.get("best_pr"), pr_map), } def get_contributor_status(snapshot_dir: Path) -> dict[str, Any]: metadata = _snapshot_metadata(snapshot_dir) report = _read_optional_json(snapshot_dir / "new-contributors-report.json") raw_contributors = report.get("contributors") contributors: list[Any] = raw_contributors if isinstance(raw_contributors, list) else [] return { "repo": str(report.get("repo") or metadata.repo), "snapshot_id": str(report.get("snapshot_id") or metadata.snapshot_id), "available": bool(report), "generated_at": report.get("generated_at"), "window_days": _coerce_int(report.get("window_days")), "contributor_count": len(contributors), } def list_contributors(snapshot_dir: Path, *, limit: int | None) -> dict[str, Any]: status = get_contributor_status(snapshot_dir) report = _read_optional_json(snapshot_dir / "new-contributors-report.json") rows = [ _contributor_summary(entry, rank=index) for index, entry in enumerate(report.get("contributors") or [], start=1) if isinstance(entry, dict) ] total = len(rows) return { **status, "contributors": rows[:limit] if limit is not None else rows, "contributor_count": total, } def get_contributor(snapshot_dir: Path, *, author_login: str) -> dict[str, Any]: status = get_contributor_status(snapshot_dir) report = _read_optional_json(snapshot_dir / "new-contributors-report.json") contributor = _find_contributor(report.get("contributors") or [], author_login) if contributor is None: return { **status, "author_login": author_login, "found": False, "summary": None, "risk": None, "contributor": None, } return { **status, "author_login": str(contributor.get("author_login") or author_login), "found": True, "summary": _contributor_summary(contributor), "risk": _contributor_risk(contributor), "contributor": contributor, } def get_contributor_risk(snapshot_dir: Path, *, author_login: str) -> dict[str, Any]: contributor = get_contributor(snapshot_dir, author_login=author_login) risk = contributor.get("risk") return { "repo": contributor.get("repo"), "snapshot_id": contributor.get("snapshot_id"), "available": contributor.get("available"), "generated_at": contributor.get("generated_at"), "author_login": contributor.get("author_login"), "found": contributor.get("found"), "risk_available": risk is not None, "risk": risk, } def _analysis_context( snapshot_dir: Path, *, variant: AnalysisVariant, ) -> tuple[ _SnapshotMetadata, _AnalysisSelection | None, dict[int, dict[str, Any]], dict[int, dict[str, Any]], ]: metadata = _snapshot_metadata(snapshot_dir) selection = _select_analysis_report(_analysis_candidates(snapshot_dir), variant=variant) issue_map, pr_map = _artifact_maps(snapshot_dir) return metadata, selection, issue_map, pr_map def _analysis_base_payload( metadata: _SnapshotMetadata, selection: _AnalysisSelection | None, *, variant: AnalysisVariant, ) -> dict[str, Any]: base = { "repo": metadata.repo, "snapshot_id": metadata.snapshot_id, "variant_requested": variant, "available": selection is not None, "variant_used": None, "llm_enrichment": False, "generated_at": None, } if selection is None: return base return { **base, "variant_used": selection.variant_used, "llm_enrichment": selection.llm_enrichment, "generated_at": selection.payload.get("generated_at"), } def _analysis_candidates(snapshot_dir: Path) -> list[dict[str, Any]]: candidates: list[dict[str, Any]] = [] for path in _analysis_report_paths(snapshot_dir): payload = _read_optional_json(path) if not payload: continue llm_enrichment = bool(payload.get("llm_enrichment")) candidates.append( { "path": path, "payload": payload, "variant": _analysis_variant(path.name, payload, llm_enrichment=llm_enrichment), "llm_enrichment": llm_enrichment, } ) return candidates def _select_analysis_report( candidates: list[dict[str, Any]], *, variant: AnalysisVariant, ) -> _AnalysisSelection | None: if not candidates: return None if variant == "auto": ordered = sorted(candidates, key=_analysis_auto_priority) else: ordered = [candidate for candidate in candidates if candidate["variant"] == variant] ordered.sort(key=_analysis_specific_priority) if not ordered: return None winner = ordered[0] return _AnalysisSelection( path=Path(winner["path"]), payload=dict(winner["payload"]), variant_used=str(winner["variant"]), llm_enrichment=bool(winner["llm_enrichment"]), ) def _analysis_report_paths(snapshot_dir: Path) -> list[Path]: ordered: list[Path] = [] current_manifest_path = repo_relative_path_to_local( snapshot_dir, CURRENT_ANALYSIS_MANIFEST_PATH ) if current_manifest_path.exists(): try: current_manifest = load_current_analysis_manifest(current_manifest_path) except ValueError: current_manifest = None if current_manifest is not None: for artifact_path in (current_manifest.get("artifacts") or {}).values(): if not isinstance(artifact_path, str): continue ordered.append(repo_relative_path_to_local(snapshot_dir, artifact_path)) ordered.extend( [ snapshot_dir / "analysis-report-hybrid.json", snapshot_dir / "analysis-report-deterministic.json", snapshot_dir / "analysis-report.json", ] ) seen: set[Path] = set() deduped: list[Path] = [] for path in ordered: if path in seen: continue seen.add(path) deduped.append(path) deduped.extend( path for path in sorted(snapshot_dir.glob("analysis-report*.json")) if path not in seen ) return [path for path in deduped if path.exists()] def _analysis_auto_priority(candidate: dict[str, Any]) -> tuple[int, str]: path = Path(candidate["path"]) if path.name == "analysis-report-hybrid.json": return (0, path.name) if bool(candidate.get("llm_enrichment")): return (1, path.name) if path.name == "analysis-report.json": return (2, path.name) return (3, path.name) def _analysis_specific_priority(candidate: dict[str, Any]) -> tuple[int, str]: path = Path(candidate["path"]) if path.name.endswith(f"-{candidate['variant']}.json"): return (0, path.name) if path.name == "analysis-report.json": return (1, path.name) return (2, path.name) def _analysis_variant(path_name: str, payload: dict[str, Any], *, llm_enrichment: bool) -> str: lowered = path_name.lower() if "hybrid" in lowered: return "hybrid" if "deterministic" in lowered: return "deterministic" if isinstance(payload.get("variant_used"), str): variant_used = str(payload["variant_used"]).strip().lower() if variant_used in {"hybrid", "deterministic"}: return variant_used return "hybrid" if llm_enrichment else "deterministic" def _analysis_counts(payload: dict[str, Any]) -> dict[str, int]: return { "meta_bugs": len(payload.get("meta_bugs") or []), "duplicate_issues": len(payload.get("duplicate_issues") or []), "duplicate_prs": len(payload.get("duplicate_prs") or []), } def _artifact_maps( snapshot_dir: Path, ) -> tuple[dict[int, dict[str, Any]], dict[int, dict[str, Any]]]: issue_rows = ( read_parquet_rows(snapshot_dir / "issues.parquet") if (snapshot_dir / "issues.parquet").exists() else [] ) pr_rows = ( read_parquet_rows(snapshot_dir / "pull_requests.parquet") if (snapshot_dir / "pull_requests.parquet").exists() else [] ) issue_map = { int(row["number"]): row for row in issue_rows if _coerce_int(row.get("number")) is not None } pr_map = { int(row["number"]): row for row in pr_rows if _coerce_int(row.get("number")) is not None } return issue_map, pr_map def _issue_cluster_summary( cluster: dict[str, Any], issue_map: dict[int, dict[str, Any]], pr_map: dict[int, dict[str, Any]], *, rank: int | None = None, ) -> dict[str, Any]: canonical_issue_number = _coerce_int(cluster.get("canonical_issue_number")) canonical_pr_number = _coerce_int(cluster.get("canonical_pr_number")) issue_numbers = _ordered_ints(cluster.get("issue_numbers")) pr_numbers = _ordered_ints(cluster.get("pr_numbers")) return { "rank": rank, "cluster_id": str(cluster.get("cluster_id") or f"cluster-{rank or 0}"), "title": _cluster_title( cluster, issue_map, pr_map, canonical_issue_number, canonical_pr_number ), "summary": cluster.get("summary"), "status": cluster.get("status"), "confidence": _coerce_float(cluster.get("confidence")), "canonical_issue_number": canonical_issue_number, "canonical_issue_title": _title_for_issue(canonical_issue_number, issue_map), "canonical_issue_url": _url_for_issue(canonical_issue_number, issue_map), "canonical_pr_number": canonical_pr_number, "canonical_pr_title": _title_for_pr(canonical_pr_number, pr_map), "canonical_pr_url": _url_for_pr(canonical_pr_number, pr_map), "issue_numbers": issue_numbers, "issue_count": len(issue_numbers), "pr_numbers": pr_numbers, "pr_count": len(pr_numbers), "evidence_types": [str(value) for value in (cluster.get("evidence_types") or []) if value], "github_url": _cluster_url(canonical_issue_number, canonical_pr_number, issue_map, pr_map), } def _cluster_title( cluster: dict[str, Any], issue_map: dict[int, dict[str, Any]], pr_map: dict[int, dict[str, Any]], canonical_issue_number: int | None, canonical_pr_number: int | None, ) -> str: issue_title = _title_for_issue(canonical_issue_number, issue_map) if issue_title: return issue_title pr_title = _title_for_pr(canonical_pr_number, pr_map) if pr_title: return pr_title summary = str(cluster.get("summary") or "").strip() if summary: return summary return str(cluster.get("cluster_id") or "cluster") def _cluster_url( canonical_issue_number: int | None, canonical_pr_number: int | None, issue_map: dict[int, dict[str, Any]], pr_map: dict[int, dict[str, Any]], ) -> str | None: return _url_for_issue(canonical_issue_number, issue_map) or _url_for_pr( canonical_pr_number, pr_map ) def _duplicate_pr_summary( entry: dict[str, Any], issue_map: dict[int, dict[str, Any]], pr_map: dict[int, dict[str, Any]], *, rank: int, ) -> dict[str, Any]: canonical_pr_number = _coerce_int(entry.get("canonical_pr_number")) target_issue_number = _coerce_int(entry.get("target_issue_number")) duplicates = _ordered_ints(entry.get("duplicate_pr_numbers")) return { "rank": rank, "cluster_id": str(entry.get("cluster_id") or f"duplicate-pr-{rank}"), "canonical_pr_number": canonical_pr_number, "canonical_pr_title": _title_for_pr(canonical_pr_number, pr_map), "canonical_pr_url": _url_for_pr(canonical_pr_number, pr_map), "target_issue_number": target_issue_number, "target_issue_title": _title_for_issue(target_issue_number, issue_map), "target_issue_url": _url_for_issue(target_issue_number, issue_map), "duplicate_pr_numbers": duplicates, "duplicate_pr_count": len(duplicates), "reason": entry.get("reason"), } def _best_issue_summary(entry: Any, issue_map: dict[int, dict[str, Any]]) -> dict[str, Any] | None: if not isinstance(entry, dict): return None issue_number = _coerce_int(entry.get("issue_number")) return { "cluster_id": entry.get("cluster_id"), "issue_number": issue_number, "title": _title_for_issue(issue_number, issue_map), "url": _url_for_issue(issue_number, issue_map), "reason": entry.get("reason"), "score": _coerce_float(entry.get("score")), } def _best_pr_summary(entry: Any, pr_map: dict[int, dict[str, Any]]) -> dict[str, Any] | None: if not isinstance(entry, dict): return None pr_number = _coerce_int(entry.get("pr_number")) return { "cluster_id": entry.get("cluster_id"), "pr_number": pr_number, "title": _title_for_pr(pr_number, pr_map), "url": _url_for_pr(pr_number, pr_map), "reason": entry.get("reason"), "score": _coerce_float(entry.get("score")), } def _issue_member_row(number: int, row: dict[str, Any] | None) -> dict[str, Any]: row = row or {} return { "number": number, "title": row.get("title"), "state": row.get("state"), "author_login": row.get("author_login"), "created_at": row.get("created_at"), "updated_at": row.get("updated_at"), "html_url": row.get("html_url"), } def _pr_member_row(number: int, row: dict[str, Any] | None, *, role: str) -> dict[str, Any]: row = row or {} return { "number": number, "role": role, "title": row.get("title"), "author_login": row.get("author_login"), "state": row.get("state"), "draft": bool(row.get("draft")), "merged": bool(row.get("merged")), "author_association": row.get("author_association"), "created_at": row.get("created_at"), "updated_at": row.get("updated_at"), "html_url": row.get("html_url"), } def _contributor_summary(contributor: dict[str, Any], *, rank: int | None = None) -> dict[str, Any]: raw_activity = contributor.get("activity") activity: dict[str, Any] = raw_activity if isinstance(raw_activity, dict) else {} return { "rank": rank, "author_login": contributor.get("author_login"), "name": contributor.get("name"), "profile_url": contributor.get("profile_url"), "repo_association": contributor.get("repo_association"), "first_seen_in_snapshot": contributor.get("first_seen_in_snapshot"), "new_to_repo": contributor.get("new_to_repo"), "snapshot_pr_count": _coerce_int(contributor.get("snapshot_pr_count")) or 0, "snapshot_issue_count": _coerce_int(contributor.get("snapshot_issue_count")) or 0, "follow_through_score": contributor.get("follow_through_score"), "breadth_score": contributor.get("breadth_score"), "automation_risk_signal": contributor.get("automation_risk_signal"), "heuristic_note": contributor.get("heuristic_note"), "account_age_days": _coerce_int(contributor.get("account_age_days")), "public_pr_count_42d": _coerce_int(activity.get("visible_authored_pr_count")), "public_repo_count_42d": _coerce_int(activity.get("distinct_repos_with_authored_prs")), "repo_pull_requests_url": contributor.get("repo_pull_requests_url"), "repo_issues_url": contributor.get("repo_issues_url"), } def _contributor_risk(contributor: dict[str, Any]) -> dict[str, Any]: raw_activity = contributor.get("activity") activity: dict[str, Any] = raw_activity if isinstance(raw_activity, dict) else {} return { "automation_risk_signal": contributor.get("automation_risk_signal"), "heuristic_note": contributor.get("heuristic_note"), "follow_through_score": contributor.get("follow_through_score"), "breadth_score": contributor.get("breadth_score"), "account_age_days": _coerce_int(contributor.get("account_age_days")), "public_pr_count_42d": _coerce_int(activity.get("visible_authored_pr_count")), "public_repo_count_42d": _coerce_int(activity.get("distinct_repos_with_authored_prs")), "report_reason": contributor.get("report_reason"), } def _find_contributor(entries: list[Any], author_login: str) -> dict[str, Any] | None: lowered = author_login.casefold() for entry in entries: if not isinstance(entry, dict): continue login = str(entry.get("author_login") or "") if login.casefold() == lowered: return entry return None def _snapshot_metadata(snapshot_dir: Path) -> _SnapshotMetadata: manifest = _read_optional_json(snapshot_dir / "manifest.json") repo = str(manifest.get("repo") or _infer_repo(snapshot_dir) or "") snapshot_id = str(manifest.get("snapshot_id") or snapshot_dir.name) return _SnapshotMetadata(repo=repo, snapshot_id=snapshot_id) def _infer_repo(snapshot_dir: Path) -> str | None: for filename in ("pull_requests.parquet", "issues.parquet"): path = snapshot_dir / filename if not path.exists(): continue rows = read_parquet_rows(path) if rows and rows[0].get("repo"): return str(rows[0]["repo"]) for filename in _analysis_report_paths(snapshot_dir): payload = _read_optional_json(filename) if payload.get("repo"): return str(payload["repo"]) report = _read_optional_json(snapshot_dir / "new-contributors-report.json") if report.get("repo"): return str(report["repo"]) return None def _title_for_issue(number: int | None, issue_map: dict[int, dict[str, Any]]) -> str | None: if number is None or number not in issue_map: return None title = issue_map[number].get("title") return str(title) if title else None def _url_for_issue(number: int | None, issue_map: dict[int, dict[str, Any]]) -> str | None: if number is None or number not in issue_map: return None value = issue_map[number].get("html_url") return str(value) if value else None def _title_for_pr(number: int | None, pr_map: dict[int, dict[str, Any]]) -> str | None: if number is None or number not in pr_map: return None title = pr_map[number].get("title") return str(title) if title else None def _url_for_pr(number: int | None, pr_map: dict[int, dict[str, Any]]) -> str | None: if number is None or number not in pr_map: return None value = pr_map[number].get("html_url") return str(value) if value else None def _ordered_ints(values: Any) -> list[int]: if not isinstance(values, list): return [] ordered: list[int] = [] for value in values: number = _coerce_int(value) if number is not None: ordered.append(number) return ordered def _coerce_int(value: Any) -> int | None: if value is None: return None try: return int(value) except (TypeError, ValueError): return None def _coerce_float(value: Any) -> float | None: if value is None: return None try: return float(value) except (TypeError, ValueError): return None def _read_optional_json(path: Path) -> dict[str, Any]: if not path.exists(): return {} payload = read_json(path) return payload if isinstance(payload, dict) else {}