from __future__ import annotations import json from collections import Counter, defaultdict from datetime import UTC, datetime, timedelta from pathlib import Path from typing import Any from slop_farmer.config import DashboardDataOptions from slop_farmer.data.parquet_io import read_json, read_parquet_rows from slop_farmer.data.snapshot_paths import ( ResolvedAnalysisReportPath, resolve_default_dashboard_analysis_report, ) from slop_farmer.data.snapshot_source import resolve_snapshot_source_dir def run_dashboard_data(options: DashboardDataOptions) -> Path: snapshot_dir = _resolve_snapshot_dir(options) manifest = _read_optional_json(snapshot_dir / "manifest.json") issues = read_parquet_rows(snapshot_dir / "issues.parquet") pull_requests = read_parquet_rows(snapshot_dir / "pull_requests.parquet") analysis_path = _resolve_analysis_input(snapshot_dir, options.analysis_input) analysis = _read_optional_json(analysis_path.path) if analysis_path is not None else {} contributor_report = _read_optional_json( options.contributors_input or snapshot_dir / "new-contributors-report.json" ) pr_scope_report = _read_optional_json( options.pr_scope_input or snapshot_dir / "pr-scope-clusters.json" ) repo = ( manifest.get("repo") or (pull_requests[0]["repo"] if pull_requests else None) or (issues[0]["repo"] if issues else None) or "" ) snapshot_id = manifest.get("snapshot_id") or snapshot_dir.name reference_time = _reference_time(snapshot_id, pull_requests) cutoff = reference_time - timedelta(days=options.window_days) issue_map = {int(row["number"]): row for row in issues if row.get("number") is not None} pr_map = {int(row["number"]): row for row in pull_requests if row.get("number") is not None} recent_pull_requests = [] for row in pull_requests: created_at = _coerce_datetime(row.get("created_at")) if created_at is not None and created_at >= cutoff: recent_pull_requests.append(row) recent_pull_requests.sort(key=lambda row: row.get("created_at") or "", reverse=True) recent_numbers = { int(row["number"]) for row in recent_pull_requests if row.get("number") is not None } clusters, memberships = _cluster_rows(analysis, issue_map, pr_map, recent_numbers) pr_scope_clusters = _pr_scope_cluster_rows(pr_scope_report, pr_map, recent_numbers) contributors = _contributor_rows(contributor_report, recent_pull_requests, memberships) prs = _pr_rows(recent_pull_requests, memberships) summary = { "repo": repo, "snapshot_id": snapshot_id, "generated_at": datetime.now(tz=UTC) .replace(microsecond=0) .isoformat() .replace("+00:00", "Z"), "window_days": options.window_days, "reference_time": reference_time.isoformat().replace("+00:00", "Z"), "pr_count": len(prs), "open_pr_count": sum(1 for row in prs if row["state"] == "open"), "merged_pr_count": sum(1 for row in prs if row["merged"]), "cluster_count": len(clusters), "clustered_pr_count": sum(1 for row in prs if row["cluster_id"]), "contributor_count": len(contributors), "analysis_available": bool(analysis), "analysis_source": None if analysis_path is None else analysis_path.source, "analysis_variant": None if analysis_path is None else analysis_path.variant, "analysis_snapshot_id": ( None if analysis_path is None else ( analysis_path.snapshot_id or ( str(analysis.get("snapshot_id")).strip() if analysis.get("snapshot_id") is not None else None ) ) ), "analysis_id": None if analysis_path is None else analysis_path.analysis_id, "contributors_available": bool(contributor_report), "pr_scope_available": bool(pr_scope_report), "pr_scope_cluster_count": len(pr_scope_clusters), } output_dir = options.output_dir.resolve() output_dir.mkdir(parents=True, exist_ok=True) _write_json(summary, output_dir / "summary.json") _write_json(clusters, output_dir / "clusters.json") _write_json(pr_scope_clusters, output_dir / "pr_scope_clusters.json") _write_json(prs, output_dir / "prs.json") _write_json(contributors, output_dir / "contributors.json") return output_dir def _resolve_snapshot_dir(options: DashboardDataOptions) -> Path: snapshots_root = ( options.snapshot_root.resolve() if options.snapshot_root is not None else (Path("data") / "snapshots").resolve() ) return resolve_snapshot_source_dir( snapshot_dir=options.snapshot_dir, local_snapshots_root=snapshots_root, hf_repo_id=options.hf_repo_id, hf_revision=options.hf_revision, hf_materialize_dir=options.hf_materialize_dir, hf_output_dir=snapshots_root.parent, ) def _resolve_analysis_input( snapshot_dir: Path, override_path: Path | None ) -> ResolvedAnalysisReportPath | None: if override_path is not None: resolved = override_path.resolve() if not resolved.exists(): raise FileNotFoundError(f"Dashboard analysis input not found: {resolved}") return ResolvedAnalysisReportPath( path=resolved, variant=_analysis_variant_for_path(resolved), source="override", ) return resolve_default_dashboard_analysis_report(snapshot_dir) def _read_optional_json(path: Path) -> dict[str, Any]: if path.exists(): return read_json(path) return {} def _write_json(payload: Any, path: Path) -> None: path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8") def _reference_time(snapshot_id: str, pull_requests: list[dict[str, Any]]) -> datetime: parsed = _parse_snapshot_id(snapshot_id) if parsed is not None: return parsed timestamps = [ timestamp for row in pull_requests for timestamp in ( _coerce_datetime(row.get("updated_at")), _coerce_datetime(row.get("created_at")), ) if timestamp is not None ] if timestamps: return max(timestamps) return datetime.now(tz=UTC) def _parse_snapshot_id(value: str) -> datetime | None: try: return datetime.strptime(value, "%Y%m%dT%H%M%SZ").replace(tzinfo=UTC) except ValueError: return None def _coerce_datetime(value: Any) -> datetime | None: if not value or not isinstance(value, str): return None try: return datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: return None def _coerce_int(value: Any) -> int | None: if value is None: return None try: return int(value) except (TypeError, ValueError): return None def _excerpt(value: Any, limit: int = 240) -> str | None: if not value or not isinstance(value, str): return None compact = " ".join(value.split()) if len(compact) <= limit: return compact return compact[: limit - 1].rstrip() + "…" def _analysis_variant_for_path(path: Path) -> str: if path.name == "analysis-report-hybrid.json": return "hybrid" if path.name == "analysis-report.json": return "deterministic" return "override" def _cluster_rows( analysis: dict[str, Any], issue_map: dict[int, dict[str, Any]], pr_map: dict[int, dict[str, Any]], recent_numbers: set[int], ) -> tuple[list[dict[str, Any]], dict[int, list[dict[str, str]]]]: rows: list[dict[str, Any]] = [] memberships: dict[int, list[dict[str, str]]] = defaultdict(list) for cluster in analysis.get("meta_bugs", []): pr_numbers = [_coerce_int(value) for value in cluster.get("pr_numbers", [])] pr_numbers = [value for value in pr_numbers if value is not None] recent_pr_numbers = [number for number in pr_numbers if number in recent_numbers] outside_window_pr_numbers = [ number for number in pr_numbers if number not in recent_numbers ] if not recent_pr_numbers: continue canonical_pr_number = _coerce_int(cluster.get("canonical_pr_number")) canonical_issue_number = _coerce_int(cluster.get("canonical_issue_number")) cluster_id = str(cluster.get("cluster_id") or f"cluster-{recent_pr_numbers[0]}") title = _cluster_title( cluster, issue_map, pr_map, canonical_issue_number, canonical_pr_number ) recent_authors = sorted( { str(pr_map[number].get("author_login")) for number in recent_pr_numbers if number in pr_map and pr_map[number].get("author_login") } ) last_activity_at = max( ( pr_map[number].get("updated_at") or pr_map[number].get("created_at") for number in recent_pr_numbers if number in pr_map ), default=None, ) row = { "cluster_id": cluster_id, "title": title, "summary": cluster.get("summary"), "status": cluster.get("status"), "confidence": cluster.get("confidence"), "canonical_issue_number": canonical_issue_number, "canonical_pr_number": canonical_pr_number, "issue_numbers": [ _coerce_int(value) for value in cluster.get("issue_numbers", []) if _coerce_int(value) is not None ], "pr_numbers": pr_numbers, "recent_pr_numbers": recent_pr_numbers, "pr_count": len(pr_numbers), "recent_pr_count": len(recent_pr_numbers), "outside_window_prs": [ _pr_member_stub(number, pr_map.get(number, {})) for number in outside_window_pr_numbers ], "authors": recent_authors, "last_activity_at": last_activity_at, "evidence_types": list(cluster.get("evidence_types", [])), "pr_similarity": _cluster_similarity_map(cluster, canonical_pr_number), "pairwise_similarity": _cluster_pairwise_similarity(cluster), "github_url": _cluster_github_url( issue_map, pr_map, canonical_issue_number, canonical_pr_number ), } rows.append(row) for number in recent_pr_numbers: role = "canonical" if canonical_pr_number == number else "member" memberships[number].append({"cluster_id": cluster_id, "role": role}) rows.sort( key=lambda row: ( -int(row["recent_pr_count"]), -int(row["pr_count"]), -(float(row["confidence"]) if row["confidence"] is not None else 0.0), row["last_activity_at"] or "", ), reverse=False, ) return rows, memberships def _cluster_title( cluster: dict[str, Any], issue_map: dict[int, dict[str, Any]], pr_map: dict[int, dict[str, Any]], canonical_issue_number: int | None, canonical_pr_number: int | None, ) -> str: if canonical_issue_number is not None and canonical_issue_number in issue_map: return str( issue_map[canonical_issue_number].get("title") or f"Issue #{canonical_issue_number}" ) if canonical_pr_number is not None and canonical_pr_number in pr_map: return str(pr_map[canonical_pr_number].get("title") or f"PR #{canonical_pr_number}") summary = cluster.get("summary") if summary: return str(summary) cluster_id = cluster.get("cluster_id") or "cluster" return str(cluster_id) def _cluster_github_url( issue_map: dict[int, dict[str, Any]], pr_map: dict[int, dict[str, Any]], canonical_issue_number: int | None, canonical_pr_number: int | None, ) -> str | None: if canonical_issue_number is not None and canonical_issue_number in issue_map: return issue_map[canonical_issue_number].get("html_url") if canonical_pr_number is not None and canonical_pr_number in pr_map: return pr_map[canonical_pr_number].get("html_url") return None def _cluster_similarity_map( cluster: dict[str, Any], canonical_pr_number: int | None ) -> dict[str, dict[str, float]]: if canonical_pr_number is None: return {} scores: dict[str, dict[str, float]] = {} for comparison in cluster.get("pr_comparisons", []): left = _coerce_int(comparison.get("left_pr_number")) right = _coerce_int(comparison.get("right_pr_number")) if left != canonical_pr_number and right != canonical_pr_number: continue other = right if left == canonical_pr_number else left if other is None: continue scores[str(other)] = { "patch_similarity": float(comparison.get("patch_similarity") or 0.0), "code_similarity": float(comparison.get("code_similarity") or 0.0), "size_similarity": float(comparison.get("size_similarity") or 0.0), "file_overlap": float(comparison.get("file_overlap") or 0.0), "area_overlap": float(comparison.get("area_overlap") or 0.0), } return scores def _cluster_pairwise_similarity(cluster: dict[str, Any]) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for comparison in cluster.get("pr_comparisons", []): left = _coerce_int(comparison.get("left_pr_number")) right = _coerce_int(comparison.get("right_pr_number")) if left is None or right is None: continue rows.append( { "left_pr_number": left, "right_pr_number": right, "patch_similarity": float(comparison.get("patch_similarity") or 0.0), "code_similarity": float(comparison.get("code_similarity") or 0.0), "size_similarity": float(comparison.get("size_similarity") or 0.0), "file_overlap": float(comparison.get("file_overlap") or 0.0), "area_overlap": float(comparison.get("area_overlap") or 0.0), } ) return rows def _pr_scope_cluster_rows( pr_scope_report: dict[str, Any], pr_map: dict[int, dict[str, Any]], recent_numbers: set[int], ) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for cluster in pr_scope_report.get("pr_scope_clusters", []): pr_numbers = [_coerce_int(value) for value in cluster.get("pr_numbers", [])] pr_numbers = [value for value in pr_numbers if value is not None] recent_pr_numbers = [number for number in pr_numbers if number in recent_numbers] outside_window_pr_numbers = [ number for number in pr_numbers if number not in recent_numbers ] if not recent_pr_numbers: continue representative_pr_number = _coerce_int(cluster.get("representative_pr_number")) recent_authors = sorted( { str(pr_map[number].get("author_login")) for number in recent_pr_numbers if number in pr_map and pr_map[number].get("author_login") } ) last_activity_at = max( ( pr_map[number].get("updated_at") or pr_map[number].get("created_at") for number in recent_pr_numbers if number in pr_map ), default=None, ) representative = pr_map.get(representative_pr_number or -1, {}) rows.append( { "kind": "pr_scope", "cluster_id": str(cluster.get("cluster_id") or f"pr-scope-{recent_pr_numbers[0]}"), "title": _pr_scope_title(cluster, pr_map, representative_pr_number), "summary": cluster.get("summary"), "representative_pr_number": representative_pr_number, "representative_title": representative.get("title"), "representative_url": representative.get("html_url"), "pr_numbers": pr_numbers, "recent_pr_numbers": recent_pr_numbers, "pr_count": len(pr_numbers), "recent_pr_count": len(recent_pr_numbers), "outside_window_prs": [ _pr_member_stub(number, pr_map.get(number, {})) for number in outside_window_pr_numbers ], "authors": recent_authors, "last_activity_at": last_activity_at, "average_similarity": float(cluster.get("average_similarity") or 0.0), "shared_filenames": list(cluster.get("shared_filenames") or []), "shared_directories": list(cluster.get("shared_directories") or []), "pairwise": _pr_scope_pairwise_rows(cluster), } ) rows.sort( key=lambda row: ( -int(row["recent_pr_count"]), -int(row["pr_count"]), -(float(row["average_similarity"]) if row["average_similarity"] is not None else 0.0), row["last_activity_at"] or "", str(row["cluster_id"]), ) ) return rows def _pr_scope_title( cluster: dict[str, Any], pr_map: dict[int, dict[str, Any]], representative_pr_number: int | None, ) -> str: if representative_pr_number is not None and representative_pr_number in pr_map: title = pr_map[representative_pr_number].get("title") if title: return f"Scope: {title}" shared_filenames = [str(value) for value in (cluster.get("shared_filenames") or []) if value] if shared_filenames: return f"Scope: {shared_filenames[0]}" shared_directories = [ str(value) for value in (cluster.get("shared_directories") or []) if value ] if shared_directories: return f"Scope: {shared_directories[0]}" summary = cluster.get("summary") if summary: return str(summary) return str(cluster.get("cluster_id") or "pr-scope") def _pr_scope_pairwise_rows(cluster: dict[str, Any]) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for comparison in cluster.get("pairwise", []): left = _coerce_int(comparison.get("left_pr_number")) right = _coerce_int(comparison.get("right_pr_number")) if left is None or right is None: continue rows.append( { "left_pr_number": left, "right_pr_number": right, "similarity": float(comparison.get("similarity") or 0.0), "content_similarity": float(comparison.get("content_similarity") or 0.0), "size_similarity": float(comparison.get("size_similarity") or 0.0), "breadth_similarity": float(comparison.get("breadth_similarity") or 0.0), "concentration_similarity": float( comparison.get("concentration_similarity") or 0.0 ), "shared_filenames": list(comparison.get("shared_filenames") or []), "shared_directories": list(comparison.get("shared_directories") or []), } ) return rows def _pr_member_stub(number: int, row: dict[str, Any]) -> dict[str, Any]: html_url = row.get("html_url") return { "number": number, "title": row.get("title"), "author": row.get("author_login"), "state": row.get("state"), "merged": bool(row.get("merged")), "draft": bool(row.get("draft")), "created_at": row.get("created_at"), "updated_at": row.get("updated_at"), "changed_files": _coerce_int(row.get("changed_files")), "additions": _coerce_int(row.get("additions")), "deletions": _coerce_int(row.get("deletions")), "html_url": html_url, "files_url": f"{html_url}/files" if html_url else None, } def _pr_rows( pull_requests: list[dict[str, Any]], memberships: dict[int, list[dict[str, str]]], ) -> list[dict[str, Any]]: rows = [] for row in pull_requests: number = _coerce_int(row.get("number")) if number is None: continue cluster_memberships = memberships.get(number, []) primary_membership = cluster_memberships[0] if cluster_memberships else None html_url = row.get("html_url") rows.append( { "number": number, "title": row.get("title"), "author": row.get("author_login"), "state": row.get("state"), "author_association": row.get("author_association"), "merged": bool(row.get("merged")), "draft": bool(row.get("draft")), "created_at": row.get("created_at"), "updated_at": row.get("updated_at"), "changed_files": _coerce_int(row.get("changed_files")), "additions": _coerce_int(row.get("additions")), "deletions": _coerce_int(row.get("deletions")), "comments_count": _coerce_int(row.get("comments_count")), "review_comments_count": _coerce_int(row.get("review_comments_count")), "labels": list(row.get("labels") or []), "body_excerpt": _excerpt(row.get("body")), "cluster_id": primary_membership["cluster_id"] if primary_membership else None, "cluster_role": primary_membership["role"] if primary_membership else None, "cluster_ids": [membership["cluster_id"] for membership in cluster_memberships], "html_url": html_url, "files_url": f"{html_url}/files" if html_url else None, "conversation_url": html_url, } ) return rows def _contributor_rows( contributor_report: dict[str, Any], pull_requests: list[dict[str, Any]], memberships: dict[int, list[dict[str, str]]], ) -> list[dict[str, Any]]: recent_pr_counts = Counter( str(row.get("author_login")) for row in pull_requests if row.get("author_login") ) recent_associations = _recent_repo_associations(pull_requests) recent_cluster_counts = Counter( str(row.get("author_login")) for row in pull_requests if row.get("author_login") for _membership in memberships.get(_coerce_int(row.get("number")) or -1, []) ) report_rows = contributor_report.get("contributors", []) if not report_rows: rows = [ { "author": author, "name": None, "profile_url": f"https://github.com/{author}", "repo_pull_requests_url": None, "repo_issues_url": None, "snapshot_pr_count": count, "snapshot_issue_count": 0, "recent_pr_count": count, "cluster_count": recent_cluster_counts.get(author, 0), "repo_association": recent_associations.get(author), "new_to_repo": None, "first_seen_in_snapshot": None, "report_reason": None, "known_contributor": _is_known_repo_association(recent_associations.get(author)), "follow_through_score": None, "breadth_score": None, "automation_risk_signal": None, "heuristic_note": None, "account_age_days": None, "quality_score": None, "public_pr_count_42d": None, "public_repo_count_42d": None, } for author, count in recent_pr_counts.items() ] rows.sort(key=lambda row: (-int(row["recent_pr_count"]), row["author"])) return rows rows = [] for contributor in report_rows: author = contributor.get("author_login") if not author: continue recent_pr_count = recent_pr_counts.get(str(author), 0) if recent_pr_count == 0 and not contributor.get("snapshot_pr_count"): continue rows.append( { "author": author, "name": contributor.get("name"), "profile_url": contributor.get("profile_url"), "repo_pull_requests_url": contributor.get("repo_pull_requests_url"), "repo_issues_url": contributor.get("repo_issues_url"), "snapshot_pr_count": _coerce_int(contributor.get("snapshot_pr_count")) or 0, "snapshot_issue_count": _coerce_int(contributor.get("snapshot_issue_count")) or 0, "recent_pr_count": recent_pr_count, "cluster_count": recent_cluster_counts.get(str(author), 0), "repo_association": contributor.get("repo_association") or recent_associations.get(str(author)), "new_to_repo": contributor.get("new_to_repo"), "first_seen_in_snapshot": contributor.get("first_seen_in_snapshot"), "report_reason": contributor.get("report_reason"), "known_contributor": _known_contributor(contributor), "follow_through_score": contributor.get("follow_through_score"), "breadth_score": contributor.get("breadth_score"), "automation_risk_signal": contributor.get("automation_risk_signal"), "heuristic_note": contributor.get("heuristic_note"), "account_age_days": _coerce_int(contributor.get("account_age_days")), "quality_score": None, "public_pr_count_42d": _coerce_int( (contributor.get("activity") or {}).get("visible_authored_pr_count") ), "public_repo_count_42d": _coerce_int( (contributor.get("activity") or {}).get("distinct_repos_with_authored_prs") ), } ) rows.sort( key=lambda row: ( -int(row["recent_pr_count"]), -int(row["snapshot_pr_count"]), -int(row["cluster_count"]), str(row["author"]), ) ) return rows def _known_contributor(contributor: dict[str, Any]) -> bool: return _is_known_repo_association(contributor.get("repo_association")) def _recent_repo_associations(pull_requests: list[dict[str, Any]]) -> dict[str, str | None]: grouped: dict[str, set[str]] = defaultdict(set) for row in pull_requests: login = str(row.get("author_login") or "").strip() association = str(row.get("author_association") or "").strip() if not login or not association: continue grouped[login].add(association) return {login: _select_repo_association(sorted(values)) for login, values in grouped.items()} def _select_repo_association(values: list[str]) -> str | None: if not values: return None priority = { "OWNER": 70, "MEMBER": 60, "COLLABORATOR": 50, "CONTRIBUTOR": 40, "FIRST_TIME_CONTRIBUTOR": 30, "FIRST_TIMER": 20, "NONE": 10, } return max(values, key=lambda value: (priority.get(value, 0), value)) def _is_known_repo_association(value: Any) -> bool: return str(value or "") in {"OWNER", "MEMBER", "COLLABORATOR"}