Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any, Literal | |
| from slop_farmer.data.parquet_io import read_json, read_parquet_rows | |
| from slop_farmer.data.snapshot_paths import ( | |
| CURRENT_ANALYSIS_MANIFEST_PATH, | |
| load_current_analysis_manifest, | |
| repo_relative_path_to_local, | |
| ) | |
| AnalysisVariant = Literal["auto", "hybrid", "deterministic"] | |
| class _SnapshotMetadata: | |
| repo: str | |
| snapshot_id: str | |
| class _AnalysisSelection: | |
| path: Path | |
| payload: dict[str, Any] | |
| variant_used: str | |
| llm_enrichment: bool | |
| def get_snapshot_surfaces(snapshot_dir: Path) -> dict[str, Any]: | |
| issue_status = get_issue_cluster_status(snapshot_dir, variant="auto") | |
| contributor_status = get_contributor_status(snapshot_dir) | |
| return { | |
| "issues": { | |
| "available": issue_status["available"], | |
| "variant_used": issue_status.get("variant_used"), | |
| "llm_enrichment": issue_status.get("llm_enrichment"), | |
| "generated_at": issue_status.get("generated_at"), | |
| "cluster_count": (issue_status.get("counts") or {}).get("meta_bugs", 0), | |
| "duplicate_pr_count": (issue_status.get("counts") or {}).get("duplicate_prs", 0), | |
| "available_variants": issue_status.get("available_variants") or [], | |
| }, | |
| "contributors": { | |
| "available": contributor_status["available"], | |
| "generated_at": contributor_status.get("generated_at"), | |
| "contributor_count": contributor_status.get("contributor_count", 0), | |
| }, | |
| } | |
| def get_issue_cluster_status(snapshot_dir: Path, *, variant: AnalysisVariant) -> dict[str, Any]: | |
| metadata = _snapshot_metadata(snapshot_dir) | |
| candidates = _analysis_candidates(snapshot_dir) | |
| selection = _select_analysis_report(candidates, variant=variant) | |
| status = { | |
| "repo": metadata.repo, | |
| "snapshot_id": metadata.snapshot_id, | |
| "variant_requested": variant, | |
| "available": selection is not None, | |
| "available_variants": sorted({candidate["variant"] for candidate in candidates}), | |
| } | |
| if selection is None: | |
| return { | |
| **status, | |
| "variant_used": None, | |
| "llm_enrichment": False, | |
| "generated_at": None, | |
| "report_path": None, | |
| "counts": {"meta_bugs": 0, "duplicate_issues": 0, "duplicate_prs": 0}, | |
| } | |
| payload = selection.payload | |
| return { | |
| **status, | |
| "variant_used": selection.variant_used, | |
| "llm_enrichment": selection.llm_enrichment, | |
| "generated_at": payload.get("generated_at"), | |
| "report_path": selection.path.name, | |
| "counts": _analysis_counts(payload), | |
| } | |
| def list_issue_clusters( | |
| snapshot_dir: Path, | |
| *, | |
| limit: int | None, | |
| variant: AnalysisVariant, | |
| ) -> dict[str, Any]: | |
| metadata, selection, issue_map, pr_map = _analysis_context(snapshot_dir, variant=variant) | |
| base = _analysis_base_payload(metadata, selection, variant=variant) | |
| if selection is None: | |
| return {**base, "clusters": [], "cluster_count": 0} | |
| clusters = [ | |
| _issue_cluster_summary(cluster, issue_map, pr_map, rank=index) | |
| for index, cluster in enumerate(selection.payload.get("meta_bugs") or [], start=1) | |
| ] | |
| total = len(clusters) | |
| return { | |
| **base, | |
| "clusters": clusters[:limit] if limit is not None else clusters, | |
| "cluster_count": total, | |
| } | |
| def get_issue_cluster( | |
| snapshot_dir: Path, | |
| *, | |
| cluster_id: str, | |
| variant: AnalysisVariant, | |
| ) -> dict[str, Any]: | |
| metadata, selection, issue_map, pr_map = _analysis_context(snapshot_dir, variant=variant) | |
| base = _analysis_base_payload(metadata, selection, variant=variant) | |
| if selection is None: | |
| return { | |
| **base, | |
| "cluster_id": cluster_id, | |
| "found": False, | |
| "cluster": None, | |
| "issues": [], | |
| "pull_requests": [], | |
| } | |
| cluster = next( | |
| ( | |
| row | |
| for row in selection.payload.get("meta_bugs") or [] | |
| if str(row.get("cluster_id") or "") == cluster_id | |
| ), | |
| None, | |
| ) | |
| if cluster is None: | |
| return { | |
| **base, | |
| "cluster_id": cluster_id, | |
| "found": False, | |
| "cluster": None, | |
| "issues": [], | |
| "pull_requests": [], | |
| } | |
| issue_numbers = _ordered_ints(cluster.get("issue_numbers")) | |
| pr_numbers = _ordered_ints(cluster.get("pr_numbers")) | |
| canonical_pr_number = _coerce_int(cluster.get("canonical_pr_number")) | |
| return { | |
| **base, | |
| "cluster_id": cluster_id, | |
| "found": True, | |
| "cluster": { | |
| **_issue_cluster_summary(cluster, issue_map, pr_map), | |
| "canonical_issue_reason": cluster.get("canonical_issue_reason"), | |
| "canonical_pr_reason": cluster.get("canonical_pr_reason"), | |
| "best_issue_reason": cluster.get("best_issue_reason"), | |
| "best_pr_reason": cluster.get("best_pr_reason"), | |
| }, | |
| "issues": [_issue_member_row(number, issue_map.get(number)) for number in issue_numbers], | |
| "pull_requests": [ | |
| _pr_member_row( | |
| number, | |
| pr_map.get(number), | |
| role="canonical" if canonical_pr_number == number else "member", | |
| ) | |
| for number in pr_numbers | |
| ], | |
| } | |
| def get_issue_clusters_for_pr( | |
| snapshot_dir: Path, | |
| *, | |
| pr_number: int, | |
| variant: AnalysisVariant, | |
| ) -> dict[str, Any]: | |
| metadata, selection, issue_map, pr_map = _analysis_context(snapshot_dir, variant=variant) | |
| base = _analysis_base_payload(metadata, selection, variant=variant) | |
| if selection is None: | |
| return {**base, "pr_number": pr_number, "found": False, "clusters": [], "cluster_count": 0} | |
| matches = [] | |
| for index, cluster in enumerate(selection.payload.get("meta_bugs") or [], start=1): | |
| pr_numbers = _ordered_ints(cluster.get("pr_numbers")) | |
| if pr_number not in pr_numbers: | |
| continue | |
| canonical_pr_number = _coerce_int(cluster.get("canonical_pr_number")) | |
| matches.append( | |
| { | |
| **_issue_cluster_summary(cluster, issue_map, pr_map, rank=index), | |
| "membership_role": "canonical" if canonical_pr_number == pr_number else "member", | |
| } | |
| ) | |
| return { | |
| **base, | |
| "pr_number": pr_number, | |
| "found": bool(matches), | |
| "clusters": matches, | |
| "cluster_count": len(matches), | |
| } | |
| def check_issue_cluster_membership( | |
| snapshot_dir: Path, | |
| *, | |
| pr_number: int, | |
| cluster_id: str | None, | |
| variant: AnalysisVariant, | |
| ) -> dict[str, Any]: | |
| lookup = get_issue_clusters_for_pr(snapshot_dir, pr_number=pr_number, variant=variant) | |
| matches = list(lookup.get("clusters") or []) | |
| matching_cluster_ids = [str(row.get("cluster_id")) for row in matches if row.get("cluster_id")] | |
| if cluster_id is None: | |
| return { | |
| **lookup, | |
| "cluster_id": None, | |
| "matched": bool(matching_cluster_ids), | |
| "matching_cluster_ids": matching_cluster_ids, | |
| } | |
| match = next((row for row in matches if row.get("cluster_id") == cluster_id), None) | |
| return { | |
| **lookup, | |
| "cluster_id": cluster_id, | |
| "matched": match is not None, | |
| "matching_cluster_ids": matching_cluster_ids, | |
| "membership": match, | |
| } | |
| def list_issue_duplicate_prs( | |
| snapshot_dir: Path, | |
| *, | |
| limit: int | None, | |
| variant: AnalysisVariant, | |
| ) -> dict[str, Any]: | |
| metadata, selection, issue_map, pr_map = _analysis_context(snapshot_dir, variant=variant) | |
| base = _analysis_base_payload(metadata, selection, variant=variant) | |
| if selection is None: | |
| return {**base, "duplicate_prs": [], "duplicate_pr_count": 0} | |
| rows = [ | |
| _duplicate_pr_summary(entry, issue_map, pr_map, rank=index) | |
| for index, entry in enumerate(selection.payload.get("duplicate_prs") or [], start=1) | |
| ] | |
| total = len(rows) | |
| return { | |
| **base, | |
| "duplicate_prs": rows[:limit] if limit is not None else rows, | |
| "duplicate_pr_count": total, | |
| } | |
| def get_issue_best(snapshot_dir: Path, *, variant: AnalysisVariant) -> dict[str, Any]: | |
| metadata, selection, issue_map, pr_map = _analysis_context(snapshot_dir, variant=variant) | |
| base = _analysis_base_payload(metadata, selection, variant=variant) | |
| if selection is None: | |
| return {**base, "best_issue": None, "best_pr": None} | |
| return { | |
| **base, | |
| "best_issue": _best_issue_summary(selection.payload.get("best_issue"), issue_map), | |
| "best_pr": _best_pr_summary(selection.payload.get("best_pr"), pr_map), | |
| } | |
| def get_contributor_status(snapshot_dir: Path) -> dict[str, Any]: | |
| metadata = _snapshot_metadata(snapshot_dir) | |
| report = _read_optional_json(snapshot_dir / "new-contributors-report.json") | |
| raw_contributors = report.get("contributors") | |
| contributors: list[Any] = raw_contributors if isinstance(raw_contributors, list) else [] | |
| return { | |
| "repo": str(report.get("repo") or metadata.repo), | |
| "snapshot_id": str(report.get("snapshot_id") or metadata.snapshot_id), | |
| "available": bool(report), | |
| "generated_at": report.get("generated_at"), | |
| "window_days": _coerce_int(report.get("window_days")), | |
| "contributor_count": len(contributors), | |
| } | |
| def list_contributors(snapshot_dir: Path, *, limit: int | None) -> dict[str, Any]: | |
| status = get_contributor_status(snapshot_dir) | |
| report = _read_optional_json(snapshot_dir / "new-contributors-report.json") | |
| rows = [ | |
| _contributor_summary(entry, rank=index) | |
| for index, entry in enumerate(report.get("contributors") or [], start=1) | |
| if isinstance(entry, dict) | |
| ] | |
| total = len(rows) | |
| return { | |
| **status, | |
| "contributors": rows[:limit] if limit is not None else rows, | |
| "contributor_count": total, | |
| } | |
| def get_contributor(snapshot_dir: Path, *, author_login: str) -> dict[str, Any]: | |
| status = get_contributor_status(snapshot_dir) | |
| report = _read_optional_json(snapshot_dir / "new-contributors-report.json") | |
| contributor = _find_contributor(report.get("contributors") or [], author_login) | |
| if contributor is None: | |
| return { | |
| **status, | |
| "author_login": author_login, | |
| "found": False, | |
| "summary": None, | |
| "risk": None, | |
| "contributor": None, | |
| } | |
| return { | |
| **status, | |
| "author_login": str(contributor.get("author_login") or author_login), | |
| "found": True, | |
| "summary": _contributor_summary(contributor), | |
| "risk": _contributor_risk(contributor), | |
| "contributor": contributor, | |
| } | |
| def get_contributor_risk(snapshot_dir: Path, *, author_login: str) -> dict[str, Any]: | |
| contributor = get_contributor(snapshot_dir, author_login=author_login) | |
| risk = contributor.get("risk") | |
| return { | |
| "repo": contributor.get("repo"), | |
| "snapshot_id": contributor.get("snapshot_id"), | |
| "available": contributor.get("available"), | |
| "generated_at": contributor.get("generated_at"), | |
| "author_login": contributor.get("author_login"), | |
| "found": contributor.get("found"), | |
| "risk_available": risk is not None, | |
| "risk": risk, | |
| } | |
| def _analysis_context( | |
| snapshot_dir: Path, | |
| *, | |
| variant: AnalysisVariant, | |
| ) -> tuple[ | |
| _SnapshotMetadata, | |
| _AnalysisSelection | None, | |
| dict[int, dict[str, Any]], | |
| dict[int, dict[str, Any]], | |
| ]: | |
| metadata = _snapshot_metadata(snapshot_dir) | |
| selection = _select_analysis_report(_analysis_candidates(snapshot_dir), variant=variant) | |
| issue_map, pr_map = _artifact_maps(snapshot_dir) | |
| return metadata, selection, issue_map, pr_map | |
| def _analysis_base_payload( | |
| metadata: _SnapshotMetadata, | |
| selection: _AnalysisSelection | None, | |
| *, | |
| variant: AnalysisVariant, | |
| ) -> dict[str, Any]: | |
| base = { | |
| "repo": metadata.repo, | |
| "snapshot_id": metadata.snapshot_id, | |
| "variant_requested": variant, | |
| "available": selection is not None, | |
| "variant_used": None, | |
| "llm_enrichment": False, | |
| "generated_at": None, | |
| } | |
| if selection is None: | |
| return base | |
| return { | |
| **base, | |
| "variant_used": selection.variant_used, | |
| "llm_enrichment": selection.llm_enrichment, | |
| "generated_at": selection.payload.get("generated_at"), | |
| } | |
| def _analysis_candidates(snapshot_dir: Path) -> list[dict[str, Any]]: | |
| candidates: list[dict[str, Any]] = [] | |
| for path in _analysis_report_paths(snapshot_dir): | |
| payload = _read_optional_json(path) | |
| if not payload: | |
| continue | |
| llm_enrichment = bool(payload.get("llm_enrichment")) | |
| candidates.append( | |
| { | |
| "path": path, | |
| "payload": payload, | |
| "variant": _analysis_variant(path.name, payload, llm_enrichment=llm_enrichment), | |
| "llm_enrichment": llm_enrichment, | |
| } | |
| ) | |
| return candidates | |
| def _select_analysis_report( | |
| candidates: list[dict[str, Any]], | |
| *, | |
| variant: AnalysisVariant, | |
| ) -> _AnalysisSelection | None: | |
| if not candidates: | |
| return None | |
| if variant == "auto": | |
| ordered = sorted(candidates, key=_analysis_auto_priority) | |
| else: | |
| ordered = [candidate for candidate in candidates if candidate["variant"] == variant] | |
| ordered.sort(key=_analysis_specific_priority) | |
| if not ordered: | |
| return None | |
| winner = ordered[0] | |
| return _AnalysisSelection( | |
| path=Path(winner["path"]), | |
| payload=dict(winner["payload"]), | |
| variant_used=str(winner["variant"]), | |
| llm_enrichment=bool(winner["llm_enrichment"]), | |
| ) | |
| def _analysis_report_paths(snapshot_dir: Path) -> list[Path]: | |
| ordered: list[Path] = [] | |
| current_manifest_path = repo_relative_path_to_local( | |
| snapshot_dir, CURRENT_ANALYSIS_MANIFEST_PATH | |
| ) | |
| if current_manifest_path.exists(): | |
| try: | |
| current_manifest = load_current_analysis_manifest(current_manifest_path) | |
| except ValueError: | |
| current_manifest = None | |
| if current_manifest is not None: | |
| for artifact_path in (current_manifest.get("artifacts") or {}).values(): | |
| if not isinstance(artifact_path, str): | |
| continue | |
| ordered.append(repo_relative_path_to_local(snapshot_dir, artifact_path)) | |
| ordered.extend( | |
| [ | |
| snapshot_dir / "analysis-report-hybrid.json", | |
| snapshot_dir / "analysis-report-deterministic.json", | |
| snapshot_dir / "analysis-report.json", | |
| ] | |
| ) | |
| seen: set[Path] = set() | |
| deduped: list[Path] = [] | |
| for path in ordered: | |
| if path in seen: | |
| continue | |
| seen.add(path) | |
| deduped.append(path) | |
| deduped.extend( | |
| path for path in sorted(snapshot_dir.glob("analysis-report*.json")) if path not in seen | |
| ) | |
| return [path for path in deduped if path.exists()] | |
| def _analysis_auto_priority(candidate: dict[str, Any]) -> tuple[int, str]: | |
| path = Path(candidate["path"]) | |
| if path.name == "analysis-report-hybrid.json": | |
| return (0, path.name) | |
| if bool(candidate.get("llm_enrichment")): | |
| return (1, path.name) | |
| if path.name == "analysis-report.json": | |
| return (2, path.name) | |
| return (3, path.name) | |
| def _analysis_specific_priority(candidate: dict[str, Any]) -> tuple[int, str]: | |
| path = Path(candidate["path"]) | |
| if path.name.endswith(f"-{candidate['variant']}.json"): | |
| return (0, path.name) | |
| if path.name == "analysis-report.json": | |
| return (1, path.name) | |
| return (2, path.name) | |
| def _analysis_variant(path_name: str, payload: dict[str, Any], *, llm_enrichment: bool) -> str: | |
| lowered = path_name.lower() | |
| if "hybrid" in lowered: | |
| return "hybrid" | |
| if "deterministic" in lowered: | |
| return "deterministic" | |
| if isinstance(payload.get("variant_used"), str): | |
| variant_used = str(payload["variant_used"]).strip().lower() | |
| if variant_used in {"hybrid", "deterministic"}: | |
| return variant_used | |
| return "hybrid" if llm_enrichment else "deterministic" | |
| def _analysis_counts(payload: dict[str, Any]) -> dict[str, int]: | |
| return { | |
| "meta_bugs": len(payload.get("meta_bugs") or []), | |
| "duplicate_issues": len(payload.get("duplicate_issues") or []), | |
| "duplicate_prs": len(payload.get("duplicate_prs") or []), | |
| } | |
| def _artifact_maps( | |
| snapshot_dir: Path, | |
| ) -> tuple[dict[int, dict[str, Any]], dict[int, dict[str, Any]]]: | |
| issue_rows = ( | |
| read_parquet_rows(snapshot_dir / "issues.parquet") | |
| if (snapshot_dir / "issues.parquet").exists() | |
| else [] | |
| ) | |
| pr_rows = ( | |
| read_parquet_rows(snapshot_dir / "pull_requests.parquet") | |
| if (snapshot_dir / "pull_requests.parquet").exists() | |
| else [] | |
| ) | |
| issue_map = { | |
| int(row["number"]): row for row in issue_rows if _coerce_int(row.get("number")) is not None | |
| } | |
| pr_map = { | |
| int(row["number"]): row for row in pr_rows if _coerce_int(row.get("number")) is not None | |
| } | |
| return issue_map, pr_map | |
| def _issue_cluster_summary( | |
| cluster: dict[str, Any], | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| *, | |
| rank: int | None = None, | |
| ) -> dict[str, Any]: | |
| canonical_issue_number = _coerce_int(cluster.get("canonical_issue_number")) | |
| canonical_pr_number = _coerce_int(cluster.get("canonical_pr_number")) | |
| issue_numbers = _ordered_ints(cluster.get("issue_numbers")) | |
| pr_numbers = _ordered_ints(cluster.get("pr_numbers")) | |
| return { | |
| "rank": rank, | |
| "cluster_id": str(cluster.get("cluster_id") or f"cluster-{rank or 0}"), | |
| "title": _cluster_title( | |
| cluster, issue_map, pr_map, canonical_issue_number, canonical_pr_number | |
| ), | |
| "summary": cluster.get("summary"), | |
| "status": cluster.get("status"), | |
| "confidence": _coerce_float(cluster.get("confidence")), | |
| "canonical_issue_number": canonical_issue_number, | |
| "canonical_issue_title": _title_for_issue(canonical_issue_number, issue_map), | |
| "canonical_issue_url": _url_for_issue(canonical_issue_number, issue_map), | |
| "canonical_pr_number": canonical_pr_number, | |
| "canonical_pr_title": _title_for_pr(canonical_pr_number, pr_map), | |
| "canonical_pr_url": _url_for_pr(canonical_pr_number, pr_map), | |
| "issue_numbers": issue_numbers, | |
| "issue_count": len(issue_numbers), | |
| "pr_numbers": pr_numbers, | |
| "pr_count": len(pr_numbers), | |
| "evidence_types": [str(value) for value in (cluster.get("evidence_types") or []) if value], | |
| "github_url": _cluster_url(canonical_issue_number, canonical_pr_number, issue_map, pr_map), | |
| } | |
| def _cluster_title( | |
| cluster: dict[str, Any], | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| canonical_issue_number: int | None, | |
| canonical_pr_number: int | None, | |
| ) -> str: | |
| issue_title = _title_for_issue(canonical_issue_number, issue_map) | |
| if issue_title: | |
| return issue_title | |
| pr_title = _title_for_pr(canonical_pr_number, pr_map) | |
| if pr_title: | |
| return pr_title | |
| summary = str(cluster.get("summary") or "").strip() | |
| if summary: | |
| return summary | |
| return str(cluster.get("cluster_id") or "cluster") | |
| def _cluster_url( | |
| canonical_issue_number: int | None, | |
| canonical_pr_number: int | None, | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| ) -> str | None: | |
| return _url_for_issue(canonical_issue_number, issue_map) or _url_for_pr( | |
| canonical_pr_number, pr_map | |
| ) | |
| def _duplicate_pr_summary( | |
| entry: dict[str, Any], | |
| issue_map: dict[int, dict[str, Any]], | |
| pr_map: dict[int, dict[str, Any]], | |
| *, | |
| rank: int, | |
| ) -> dict[str, Any]: | |
| canonical_pr_number = _coerce_int(entry.get("canonical_pr_number")) | |
| target_issue_number = _coerce_int(entry.get("target_issue_number")) | |
| duplicates = _ordered_ints(entry.get("duplicate_pr_numbers")) | |
| return { | |
| "rank": rank, | |
| "cluster_id": str(entry.get("cluster_id") or f"duplicate-pr-{rank}"), | |
| "canonical_pr_number": canonical_pr_number, | |
| "canonical_pr_title": _title_for_pr(canonical_pr_number, pr_map), | |
| "canonical_pr_url": _url_for_pr(canonical_pr_number, pr_map), | |
| "target_issue_number": target_issue_number, | |
| "target_issue_title": _title_for_issue(target_issue_number, issue_map), | |
| "target_issue_url": _url_for_issue(target_issue_number, issue_map), | |
| "duplicate_pr_numbers": duplicates, | |
| "duplicate_pr_count": len(duplicates), | |
| "reason": entry.get("reason"), | |
| } | |
| def _best_issue_summary(entry: Any, issue_map: dict[int, dict[str, Any]]) -> dict[str, Any] | None: | |
| if not isinstance(entry, dict): | |
| return None | |
| issue_number = _coerce_int(entry.get("issue_number")) | |
| return { | |
| "cluster_id": entry.get("cluster_id"), | |
| "issue_number": issue_number, | |
| "title": _title_for_issue(issue_number, issue_map), | |
| "url": _url_for_issue(issue_number, issue_map), | |
| "reason": entry.get("reason"), | |
| "score": _coerce_float(entry.get("score")), | |
| } | |
| def _best_pr_summary(entry: Any, pr_map: dict[int, dict[str, Any]]) -> dict[str, Any] | None: | |
| if not isinstance(entry, dict): | |
| return None | |
| pr_number = _coerce_int(entry.get("pr_number")) | |
| return { | |
| "cluster_id": entry.get("cluster_id"), | |
| "pr_number": pr_number, | |
| "title": _title_for_pr(pr_number, pr_map), | |
| "url": _url_for_pr(pr_number, pr_map), | |
| "reason": entry.get("reason"), | |
| "score": _coerce_float(entry.get("score")), | |
| } | |
| def _issue_member_row(number: int, row: dict[str, Any] | None) -> dict[str, Any]: | |
| row = row or {} | |
| return { | |
| "number": number, | |
| "title": row.get("title"), | |
| "state": row.get("state"), | |
| "author_login": row.get("author_login"), | |
| "created_at": row.get("created_at"), | |
| "updated_at": row.get("updated_at"), | |
| "html_url": row.get("html_url"), | |
| } | |
| def _pr_member_row(number: int, row: dict[str, Any] | None, *, role: str) -> dict[str, Any]: | |
| row = row or {} | |
| return { | |
| "number": number, | |
| "role": role, | |
| "title": row.get("title"), | |
| "author_login": row.get("author_login"), | |
| "state": row.get("state"), | |
| "draft": bool(row.get("draft")), | |
| "merged": bool(row.get("merged")), | |
| "author_association": row.get("author_association"), | |
| "created_at": row.get("created_at"), | |
| "updated_at": row.get("updated_at"), | |
| "html_url": row.get("html_url"), | |
| } | |
| def _contributor_summary(contributor: dict[str, Any], *, rank: int | None = None) -> dict[str, Any]: | |
| raw_activity = contributor.get("activity") | |
| activity: dict[str, Any] = raw_activity if isinstance(raw_activity, dict) else {} | |
| return { | |
| "rank": rank, | |
| "author_login": contributor.get("author_login"), | |
| "name": contributor.get("name"), | |
| "profile_url": contributor.get("profile_url"), | |
| "repo_association": contributor.get("repo_association"), | |
| "first_seen_in_snapshot": contributor.get("first_seen_in_snapshot"), | |
| "new_to_repo": contributor.get("new_to_repo"), | |
| "snapshot_pr_count": _coerce_int(contributor.get("snapshot_pr_count")) or 0, | |
| "snapshot_issue_count": _coerce_int(contributor.get("snapshot_issue_count")) or 0, | |
| "follow_through_score": contributor.get("follow_through_score"), | |
| "breadth_score": contributor.get("breadth_score"), | |
| "automation_risk_signal": contributor.get("automation_risk_signal"), | |
| "heuristic_note": contributor.get("heuristic_note"), | |
| "account_age_days": _coerce_int(contributor.get("account_age_days")), | |
| "public_pr_count_42d": _coerce_int(activity.get("visible_authored_pr_count")), | |
| "public_repo_count_42d": _coerce_int(activity.get("distinct_repos_with_authored_prs")), | |
| "repo_pull_requests_url": contributor.get("repo_pull_requests_url"), | |
| "repo_issues_url": contributor.get("repo_issues_url"), | |
| } | |
| def _contributor_risk(contributor: dict[str, Any]) -> dict[str, Any]: | |
| raw_activity = contributor.get("activity") | |
| activity: dict[str, Any] = raw_activity if isinstance(raw_activity, dict) else {} | |
| return { | |
| "automation_risk_signal": contributor.get("automation_risk_signal"), | |
| "heuristic_note": contributor.get("heuristic_note"), | |
| "follow_through_score": contributor.get("follow_through_score"), | |
| "breadth_score": contributor.get("breadth_score"), | |
| "account_age_days": _coerce_int(contributor.get("account_age_days")), | |
| "public_pr_count_42d": _coerce_int(activity.get("visible_authored_pr_count")), | |
| "public_repo_count_42d": _coerce_int(activity.get("distinct_repos_with_authored_prs")), | |
| "report_reason": contributor.get("report_reason"), | |
| } | |
| def _find_contributor(entries: list[Any], author_login: str) -> dict[str, Any] | None: | |
| lowered = author_login.casefold() | |
| for entry in entries: | |
| if not isinstance(entry, dict): | |
| continue | |
| login = str(entry.get("author_login") or "") | |
| if login.casefold() == lowered: | |
| return entry | |
| return None | |
| def _snapshot_metadata(snapshot_dir: Path) -> _SnapshotMetadata: | |
| manifest = _read_optional_json(snapshot_dir / "manifest.json") | |
| repo = str(manifest.get("repo") or _infer_repo(snapshot_dir) or "") | |
| snapshot_id = str(manifest.get("snapshot_id") or snapshot_dir.name) | |
| return _SnapshotMetadata(repo=repo, snapshot_id=snapshot_id) | |
| def _infer_repo(snapshot_dir: Path) -> str | None: | |
| for filename in ("pull_requests.parquet", "issues.parquet"): | |
| path = snapshot_dir / filename | |
| if not path.exists(): | |
| continue | |
| rows = read_parquet_rows(path) | |
| if rows and rows[0].get("repo"): | |
| return str(rows[0]["repo"]) | |
| for filename in _analysis_report_paths(snapshot_dir): | |
| payload = _read_optional_json(filename) | |
| if payload.get("repo"): | |
| return str(payload["repo"]) | |
| report = _read_optional_json(snapshot_dir / "new-contributors-report.json") | |
| if report.get("repo"): | |
| return str(report["repo"]) | |
| return None | |
| def _title_for_issue(number: int | None, issue_map: dict[int, dict[str, Any]]) -> str | None: | |
| if number is None or number not in issue_map: | |
| return None | |
| title = issue_map[number].get("title") | |
| return str(title) if title else None | |
| def _url_for_issue(number: int | None, issue_map: dict[int, dict[str, Any]]) -> str | None: | |
| if number is None or number not in issue_map: | |
| return None | |
| value = issue_map[number].get("html_url") | |
| return str(value) if value else None | |
| def _title_for_pr(number: int | None, pr_map: dict[int, dict[str, Any]]) -> str | None: | |
| if number is None or number not in pr_map: | |
| return None | |
| title = pr_map[number].get("title") | |
| return str(title) if title else None | |
| def _url_for_pr(number: int | None, pr_map: dict[int, dict[str, Any]]) -> str | None: | |
| if number is None or number not in pr_map: | |
| return None | |
| value = pr_map[number].get("html_url") | |
| return str(value) if value else None | |
| def _ordered_ints(values: Any) -> list[int]: | |
| if not isinstance(values, list): | |
| return [] | |
| ordered: list[int] = [] | |
| for value in values: | |
| number = _coerce_int(value) | |
| if number is not None: | |
| ordered.append(number) | |
| return ordered | |
| def _coerce_int(value: Any) -> int | None: | |
| if value is None: | |
| return None | |
| try: | |
| return int(value) | |
| except (TypeError, ValueError): | |
| return None | |
| def _coerce_float(value: Any) -> float | None: | |
| if value is None: | |
| return None | |
| try: | |
| return float(value) | |
| except (TypeError, ValueError): | |
| return None | |
| def _read_optional_json(path: Path) -> dict[str, Any]: | |
| if not path.exists(): | |
| return {} | |
| payload = read_json(path) | |
| return payload if isinstance(payload, dict) else {} | |