evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import json
from collections import Counter, defaultdict
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
from slop_farmer.config import DashboardDataOptions
from slop_farmer.data.parquet_io import read_json, read_parquet_rows
from slop_farmer.data.snapshot_paths import (
ResolvedAnalysisReportPath,
resolve_default_dashboard_analysis_report,
)
from slop_farmer.data.snapshot_source import resolve_snapshot_source_dir
def run_dashboard_data(options: DashboardDataOptions) -> Path:
snapshot_dir = _resolve_snapshot_dir(options)
manifest = _read_optional_json(snapshot_dir / "manifest.json")
issues = read_parquet_rows(snapshot_dir / "issues.parquet")
pull_requests = read_parquet_rows(snapshot_dir / "pull_requests.parquet")
analysis_path = _resolve_analysis_input(snapshot_dir, options.analysis_input)
analysis = _read_optional_json(analysis_path.path) if analysis_path is not None else {}
contributor_report = _read_optional_json(
options.contributors_input or snapshot_dir / "new-contributors-report.json"
)
pr_scope_report = _read_optional_json(
options.pr_scope_input or snapshot_dir / "pr-scope-clusters.json"
)
repo = (
manifest.get("repo")
or (pull_requests[0]["repo"] if pull_requests else None)
or (issues[0]["repo"] if issues else None)
or ""
)
snapshot_id = manifest.get("snapshot_id") or snapshot_dir.name
reference_time = _reference_time(snapshot_id, pull_requests)
cutoff = reference_time - timedelta(days=options.window_days)
issue_map = {int(row["number"]): row for row in issues if row.get("number") is not None}
pr_map = {int(row["number"]): row for row in pull_requests if row.get("number") is not None}
recent_pull_requests = []
for row in pull_requests:
created_at = _coerce_datetime(row.get("created_at"))
if created_at is not None and created_at >= cutoff:
recent_pull_requests.append(row)
recent_pull_requests.sort(key=lambda row: row.get("created_at") or "", reverse=True)
recent_numbers = {
int(row["number"]) for row in recent_pull_requests if row.get("number") is not None
}
clusters, memberships = _cluster_rows(analysis, issue_map, pr_map, recent_numbers)
pr_scope_clusters = _pr_scope_cluster_rows(pr_scope_report, pr_map, recent_numbers)
contributors = _contributor_rows(contributor_report, recent_pull_requests, memberships)
prs = _pr_rows(recent_pull_requests, memberships)
summary = {
"repo": repo,
"snapshot_id": snapshot_id,
"generated_at": datetime.now(tz=UTC)
.replace(microsecond=0)
.isoformat()
.replace("+00:00", "Z"),
"window_days": options.window_days,
"reference_time": reference_time.isoformat().replace("+00:00", "Z"),
"pr_count": len(prs),
"open_pr_count": sum(1 for row in prs if row["state"] == "open"),
"merged_pr_count": sum(1 for row in prs if row["merged"]),
"cluster_count": len(clusters),
"clustered_pr_count": sum(1 for row in prs if row["cluster_id"]),
"contributor_count": len(contributors),
"analysis_available": bool(analysis),
"analysis_source": None if analysis_path is None else analysis_path.source,
"analysis_variant": None if analysis_path is None else analysis_path.variant,
"analysis_snapshot_id": (
None
if analysis_path is None
else (
analysis_path.snapshot_id
or (
str(analysis.get("snapshot_id")).strip()
if analysis.get("snapshot_id") is not None
else None
)
)
),
"analysis_id": None if analysis_path is None else analysis_path.analysis_id,
"contributors_available": bool(contributor_report),
"pr_scope_available": bool(pr_scope_report),
"pr_scope_cluster_count": len(pr_scope_clusters),
}
output_dir = options.output_dir.resolve()
output_dir.mkdir(parents=True, exist_ok=True)
_write_json(summary, output_dir / "summary.json")
_write_json(clusters, output_dir / "clusters.json")
_write_json(pr_scope_clusters, output_dir / "pr_scope_clusters.json")
_write_json(prs, output_dir / "prs.json")
_write_json(contributors, output_dir / "contributors.json")
return output_dir
def _resolve_snapshot_dir(options: DashboardDataOptions) -> Path:
snapshots_root = (
options.snapshot_root.resolve()
if options.snapshot_root is not None
else (Path("data") / "snapshots").resolve()
)
return resolve_snapshot_source_dir(
snapshot_dir=options.snapshot_dir,
local_snapshots_root=snapshots_root,
hf_repo_id=options.hf_repo_id,
hf_revision=options.hf_revision,
hf_materialize_dir=options.hf_materialize_dir,
hf_output_dir=snapshots_root.parent,
)
def _resolve_analysis_input(
snapshot_dir: Path, override_path: Path | None
) -> ResolvedAnalysisReportPath | None:
if override_path is not None:
resolved = override_path.resolve()
if not resolved.exists():
raise FileNotFoundError(f"Dashboard analysis input not found: {resolved}")
return ResolvedAnalysisReportPath(
path=resolved,
variant=_analysis_variant_for_path(resolved),
source="override",
)
return resolve_default_dashboard_analysis_report(snapshot_dir)
def _read_optional_json(path: Path) -> dict[str, Any]:
if path.exists():
return read_json(path)
return {}
def _write_json(payload: Any, path: Path) -> None:
path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def _reference_time(snapshot_id: str, pull_requests: list[dict[str, Any]]) -> datetime:
parsed = _parse_snapshot_id(snapshot_id)
if parsed is not None:
return parsed
timestamps = [
timestamp
for row in pull_requests
for timestamp in (
_coerce_datetime(row.get("updated_at")),
_coerce_datetime(row.get("created_at")),
)
if timestamp is not None
]
if timestamps:
return max(timestamps)
return datetime.now(tz=UTC)
def _parse_snapshot_id(value: str) -> datetime | None:
try:
return datetime.strptime(value, "%Y%m%dT%H%M%SZ").replace(tzinfo=UTC)
except ValueError:
return None
def _coerce_datetime(value: Any) -> datetime | None:
if not value or not isinstance(value, str):
return None
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
def _coerce_int(value: Any) -> int | None:
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
return None
def _excerpt(value: Any, limit: int = 240) -> str | None:
if not value or not isinstance(value, str):
return None
compact = " ".join(value.split())
if len(compact) <= limit:
return compact
return compact[: limit - 1].rstrip() + "…"
def _analysis_variant_for_path(path: Path) -> str:
if path.name == "analysis-report-hybrid.json":
return "hybrid"
if path.name == "analysis-report.json":
return "deterministic"
return "override"
def _cluster_rows(
analysis: dict[str, Any],
issue_map: dict[int, dict[str, Any]],
pr_map: dict[int, dict[str, Any]],
recent_numbers: set[int],
) -> tuple[list[dict[str, Any]], dict[int, list[dict[str, str]]]]:
rows: list[dict[str, Any]] = []
memberships: dict[int, list[dict[str, str]]] = defaultdict(list)
for cluster in analysis.get("meta_bugs", []):
pr_numbers = [_coerce_int(value) for value in cluster.get("pr_numbers", [])]
pr_numbers = [value for value in pr_numbers if value is not None]
recent_pr_numbers = [number for number in pr_numbers if number in recent_numbers]
outside_window_pr_numbers = [
number for number in pr_numbers if number not in recent_numbers
]
if not recent_pr_numbers:
continue
canonical_pr_number = _coerce_int(cluster.get("canonical_pr_number"))
canonical_issue_number = _coerce_int(cluster.get("canonical_issue_number"))
cluster_id = str(cluster.get("cluster_id") or f"cluster-{recent_pr_numbers[0]}")
title = _cluster_title(
cluster, issue_map, pr_map, canonical_issue_number, canonical_pr_number
)
recent_authors = sorted(
{
str(pr_map[number].get("author_login"))
for number in recent_pr_numbers
if number in pr_map and pr_map[number].get("author_login")
}
)
last_activity_at = max(
(
pr_map[number].get("updated_at") or pr_map[number].get("created_at")
for number in recent_pr_numbers
if number in pr_map
),
default=None,
)
row = {
"cluster_id": cluster_id,
"title": title,
"summary": cluster.get("summary"),
"status": cluster.get("status"),
"confidence": cluster.get("confidence"),
"canonical_issue_number": canonical_issue_number,
"canonical_pr_number": canonical_pr_number,
"issue_numbers": [
_coerce_int(value)
for value in cluster.get("issue_numbers", [])
if _coerce_int(value) is not None
],
"pr_numbers": pr_numbers,
"recent_pr_numbers": recent_pr_numbers,
"pr_count": len(pr_numbers),
"recent_pr_count": len(recent_pr_numbers),
"outside_window_prs": [
_pr_member_stub(number, pr_map.get(number, {}))
for number in outside_window_pr_numbers
],
"authors": recent_authors,
"last_activity_at": last_activity_at,
"evidence_types": list(cluster.get("evidence_types", [])),
"pr_similarity": _cluster_similarity_map(cluster, canonical_pr_number),
"pairwise_similarity": _cluster_pairwise_similarity(cluster),
"github_url": _cluster_github_url(
issue_map, pr_map, canonical_issue_number, canonical_pr_number
),
}
rows.append(row)
for number in recent_pr_numbers:
role = "canonical" if canonical_pr_number == number else "member"
memberships[number].append({"cluster_id": cluster_id, "role": role})
rows.sort(
key=lambda row: (
-int(row["recent_pr_count"]),
-int(row["pr_count"]),
-(float(row["confidence"]) if row["confidence"] is not None else 0.0),
row["last_activity_at"] or "",
),
reverse=False,
)
return rows, memberships
def _cluster_title(
cluster: dict[str, Any],
issue_map: dict[int, dict[str, Any]],
pr_map: dict[int, dict[str, Any]],
canonical_issue_number: int | None,
canonical_pr_number: int | None,
) -> str:
if canonical_issue_number is not None and canonical_issue_number in issue_map:
return str(
issue_map[canonical_issue_number].get("title") or f"Issue #{canonical_issue_number}"
)
if canonical_pr_number is not None and canonical_pr_number in pr_map:
return str(pr_map[canonical_pr_number].get("title") or f"PR #{canonical_pr_number}")
summary = cluster.get("summary")
if summary:
return str(summary)
cluster_id = cluster.get("cluster_id") or "cluster"
return str(cluster_id)
def _cluster_github_url(
issue_map: dict[int, dict[str, Any]],
pr_map: dict[int, dict[str, Any]],
canonical_issue_number: int | None,
canonical_pr_number: int | None,
) -> str | None:
if canonical_issue_number is not None and canonical_issue_number in issue_map:
return issue_map[canonical_issue_number].get("html_url")
if canonical_pr_number is not None and canonical_pr_number in pr_map:
return pr_map[canonical_pr_number].get("html_url")
return None
def _cluster_similarity_map(
cluster: dict[str, Any], canonical_pr_number: int | None
) -> dict[str, dict[str, float]]:
if canonical_pr_number is None:
return {}
scores: dict[str, dict[str, float]] = {}
for comparison in cluster.get("pr_comparisons", []):
left = _coerce_int(comparison.get("left_pr_number"))
right = _coerce_int(comparison.get("right_pr_number"))
if left != canonical_pr_number and right != canonical_pr_number:
continue
other = right if left == canonical_pr_number else left
if other is None:
continue
scores[str(other)] = {
"patch_similarity": float(comparison.get("patch_similarity") or 0.0),
"code_similarity": float(comparison.get("code_similarity") or 0.0),
"size_similarity": float(comparison.get("size_similarity") or 0.0),
"file_overlap": float(comparison.get("file_overlap") or 0.0),
"area_overlap": float(comparison.get("area_overlap") or 0.0),
}
return scores
def _cluster_pairwise_similarity(cluster: dict[str, Any]) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for comparison in cluster.get("pr_comparisons", []):
left = _coerce_int(comparison.get("left_pr_number"))
right = _coerce_int(comparison.get("right_pr_number"))
if left is None or right is None:
continue
rows.append(
{
"left_pr_number": left,
"right_pr_number": right,
"patch_similarity": float(comparison.get("patch_similarity") or 0.0),
"code_similarity": float(comparison.get("code_similarity") or 0.0),
"size_similarity": float(comparison.get("size_similarity") or 0.0),
"file_overlap": float(comparison.get("file_overlap") or 0.0),
"area_overlap": float(comparison.get("area_overlap") or 0.0),
}
)
return rows
def _pr_scope_cluster_rows(
pr_scope_report: dict[str, Any],
pr_map: dict[int, dict[str, Any]],
recent_numbers: set[int],
) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for cluster in pr_scope_report.get("pr_scope_clusters", []):
pr_numbers = [_coerce_int(value) for value in cluster.get("pr_numbers", [])]
pr_numbers = [value for value in pr_numbers if value is not None]
recent_pr_numbers = [number for number in pr_numbers if number in recent_numbers]
outside_window_pr_numbers = [
number for number in pr_numbers if number not in recent_numbers
]
if not recent_pr_numbers:
continue
representative_pr_number = _coerce_int(cluster.get("representative_pr_number"))
recent_authors = sorted(
{
str(pr_map[number].get("author_login"))
for number in recent_pr_numbers
if number in pr_map and pr_map[number].get("author_login")
}
)
last_activity_at = max(
(
pr_map[number].get("updated_at") or pr_map[number].get("created_at")
for number in recent_pr_numbers
if number in pr_map
),
default=None,
)
representative = pr_map.get(representative_pr_number or -1, {})
rows.append(
{
"kind": "pr_scope",
"cluster_id": str(cluster.get("cluster_id") or f"pr-scope-{recent_pr_numbers[0]}"),
"title": _pr_scope_title(cluster, pr_map, representative_pr_number),
"summary": cluster.get("summary"),
"representative_pr_number": representative_pr_number,
"representative_title": representative.get("title"),
"representative_url": representative.get("html_url"),
"pr_numbers": pr_numbers,
"recent_pr_numbers": recent_pr_numbers,
"pr_count": len(pr_numbers),
"recent_pr_count": len(recent_pr_numbers),
"outside_window_prs": [
_pr_member_stub(number, pr_map.get(number, {}))
for number in outside_window_pr_numbers
],
"authors": recent_authors,
"last_activity_at": last_activity_at,
"average_similarity": float(cluster.get("average_similarity") or 0.0),
"shared_filenames": list(cluster.get("shared_filenames") or []),
"shared_directories": list(cluster.get("shared_directories") or []),
"pairwise": _pr_scope_pairwise_rows(cluster),
}
)
rows.sort(
key=lambda row: (
-int(row["recent_pr_count"]),
-int(row["pr_count"]),
-(float(row["average_similarity"]) if row["average_similarity"] is not None else 0.0),
row["last_activity_at"] or "",
str(row["cluster_id"]),
)
)
return rows
def _pr_scope_title(
cluster: dict[str, Any],
pr_map: dict[int, dict[str, Any]],
representative_pr_number: int | None,
) -> str:
if representative_pr_number is not None and representative_pr_number in pr_map:
title = pr_map[representative_pr_number].get("title")
if title:
return f"Scope: {title}"
shared_filenames = [str(value) for value in (cluster.get("shared_filenames") or []) if value]
if shared_filenames:
return f"Scope: {shared_filenames[0]}"
shared_directories = [
str(value) for value in (cluster.get("shared_directories") or []) if value
]
if shared_directories:
return f"Scope: {shared_directories[0]}"
summary = cluster.get("summary")
if summary:
return str(summary)
return str(cluster.get("cluster_id") or "pr-scope")
def _pr_scope_pairwise_rows(cluster: dict[str, Any]) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for comparison in cluster.get("pairwise", []):
left = _coerce_int(comparison.get("left_pr_number"))
right = _coerce_int(comparison.get("right_pr_number"))
if left is None or right is None:
continue
rows.append(
{
"left_pr_number": left,
"right_pr_number": right,
"similarity": float(comparison.get("similarity") or 0.0),
"content_similarity": float(comparison.get("content_similarity") or 0.0),
"size_similarity": float(comparison.get("size_similarity") or 0.0),
"breadth_similarity": float(comparison.get("breadth_similarity") or 0.0),
"concentration_similarity": float(
comparison.get("concentration_similarity") or 0.0
),
"shared_filenames": list(comparison.get("shared_filenames") or []),
"shared_directories": list(comparison.get("shared_directories") or []),
}
)
return rows
def _pr_member_stub(number: int, row: dict[str, Any]) -> dict[str, Any]:
html_url = row.get("html_url")
return {
"number": number,
"title": row.get("title"),
"author": row.get("author_login"),
"state": row.get("state"),
"merged": bool(row.get("merged")),
"draft": bool(row.get("draft")),
"created_at": row.get("created_at"),
"updated_at": row.get("updated_at"),
"changed_files": _coerce_int(row.get("changed_files")),
"additions": _coerce_int(row.get("additions")),
"deletions": _coerce_int(row.get("deletions")),
"html_url": html_url,
"files_url": f"{html_url}/files" if html_url else None,
}
def _pr_rows(
pull_requests: list[dict[str, Any]],
memberships: dict[int, list[dict[str, str]]],
) -> list[dict[str, Any]]:
rows = []
for row in pull_requests:
number = _coerce_int(row.get("number"))
if number is None:
continue
cluster_memberships = memberships.get(number, [])
primary_membership = cluster_memberships[0] if cluster_memberships else None
html_url = row.get("html_url")
rows.append(
{
"number": number,
"title": row.get("title"),
"author": row.get("author_login"),
"state": row.get("state"),
"author_association": row.get("author_association"),
"merged": bool(row.get("merged")),
"draft": bool(row.get("draft")),
"created_at": row.get("created_at"),
"updated_at": row.get("updated_at"),
"changed_files": _coerce_int(row.get("changed_files")),
"additions": _coerce_int(row.get("additions")),
"deletions": _coerce_int(row.get("deletions")),
"comments_count": _coerce_int(row.get("comments_count")),
"review_comments_count": _coerce_int(row.get("review_comments_count")),
"labels": list(row.get("labels") or []),
"body_excerpt": _excerpt(row.get("body")),
"cluster_id": primary_membership["cluster_id"] if primary_membership else None,
"cluster_role": primary_membership["role"] if primary_membership else None,
"cluster_ids": [membership["cluster_id"] for membership in cluster_memberships],
"html_url": html_url,
"files_url": f"{html_url}/files" if html_url else None,
"conversation_url": html_url,
}
)
return rows
def _contributor_rows(
contributor_report: dict[str, Any],
pull_requests: list[dict[str, Any]],
memberships: dict[int, list[dict[str, str]]],
) -> list[dict[str, Any]]:
recent_pr_counts = Counter(
str(row.get("author_login")) for row in pull_requests if row.get("author_login")
)
recent_associations = _recent_repo_associations(pull_requests)
recent_cluster_counts = Counter(
str(row.get("author_login"))
for row in pull_requests
if row.get("author_login")
for _membership in memberships.get(_coerce_int(row.get("number")) or -1, [])
)
report_rows = contributor_report.get("contributors", [])
if not report_rows:
rows = [
{
"author": author,
"name": None,
"profile_url": f"https://github.com/{author}",
"repo_pull_requests_url": None,
"repo_issues_url": None,
"snapshot_pr_count": count,
"snapshot_issue_count": 0,
"recent_pr_count": count,
"cluster_count": recent_cluster_counts.get(author, 0),
"repo_association": recent_associations.get(author),
"new_to_repo": None,
"first_seen_in_snapshot": None,
"report_reason": None,
"known_contributor": _is_known_repo_association(recent_associations.get(author)),
"follow_through_score": None,
"breadth_score": None,
"automation_risk_signal": None,
"heuristic_note": None,
"account_age_days": None,
"quality_score": None,
"public_pr_count_42d": None,
"public_repo_count_42d": None,
}
for author, count in recent_pr_counts.items()
]
rows.sort(key=lambda row: (-int(row["recent_pr_count"]), row["author"]))
return rows
rows = []
for contributor in report_rows:
author = contributor.get("author_login")
if not author:
continue
recent_pr_count = recent_pr_counts.get(str(author), 0)
if recent_pr_count == 0 and not contributor.get("snapshot_pr_count"):
continue
rows.append(
{
"author": author,
"name": contributor.get("name"),
"profile_url": contributor.get("profile_url"),
"repo_pull_requests_url": contributor.get("repo_pull_requests_url"),
"repo_issues_url": contributor.get("repo_issues_url"),
"snapshot_pr_count": _coerce_int(contributor.get("snapshot_pr_count")) or 0,
"snapshot_issue_count": _coerce_int(contributor.get("snapshot_issue_count")) or 0,
"recent_pr_count": recent_pr_count,
"cluster_count": recent_cluster_counts.get(str(author), 0),
"repo_association": contributor.get("repo_association")
or recent_associations.get(str(author)),
"new_to_repo": contributor.get("new_to_repo"),
"first_seen_in_snapshot": contributor.get("first_seen_in_snapshot"),
"report_reason": contributor.get("report_reason"),
"known_contributor": _known_contributor(contributor),
"follow_through_score": contributor.get("follow_through_score"),
"breadth_score": contributor.get("breadth_score"),
"automation_risk_signal": contributor.get("automation_risk_signal"),
"heuristic_note": contributor.get("heuristic_note"),
"account_age_days": _coerce_int(contributor.get("account_age_days")),
"quality_score": None,
"public_pr_count_42d": _coerce_int(
(contributor.get("activity") or {}).get("visible_authored_pr_count")
),
"public_repo_count_42d": _coerce_int(
(contributor.get("activity") or {}).get("distinct_repos_with_authored_prs")
),
}
)
rows.sort(
key=lambda row: (
-int(row["recent_pr_count"]),
-int(row["snapshot_pr_count"]),
-int(row["cluster_count"]),
str(row["author"]),
)
)
return rows
def _known_contributor(contributor: dict[str, Any]) -> bool:
return _is_known_repo_association(contributor.get("repo_association"))
def _recent_repo_associations(pull_requests: list[dict[str, Any]]) -> dict[str, str | None]:
grouped: dict[str, set[str]] = defaultdict(set)
for row in pull_requests:
login = str(row.get("author_login") or "").strip()
association = str(row.get("author_association") or "").strip()
if not login or not association:
continue
grouped[login].add(association)
return {login: _select_repo_association(sorted(values)) for login, values in grouped.items()}
def _select_repo_association(values: list[str]) -> str | None:
if not values:
return None
priority = {
"OWNER": 70,
"MEMBER": 60,
"COLLABORATOR": 50,
"CONTRIBUTOR": 40,
"FIRST_TIME_CONTRIBUTOR": 30,
"FIRST_TIMER": 20,
"NONE": 10,
}
return max(values, key=lambda value: (priority.get(value, 0), value))
def _is_known_repo_association(value: Any) -> bool:
return str(value or "") in {"OWNER", "MEMBER", "COLLABORATOR"}