evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal
from slop_farmer.data.parquet_io import read_json, read_parquet_rows
from slop_farmer.data.snapshot_paths import (
CURRENT_ANALYSIS_MANIFEST_PATH,
load_current_analysis_manifest,
repo_relative_path_to_local,
)
AnalysisVariant = Literal["auto", "hybrid", "deterministic"]
@dataclass(slots=True, frozen=True)
class _SnapshotMetadata:
repo: str
snapshot_id: str
@dataclass(slots=True, frozen=True)
class _AnalysisSelection:
path: Path
payload: dict[str, Any]
variant_used: str
llm_enrichment: bool
def get_snapshot_surfaces(snapshot_dir: Path) -> dict[str, Any]:
issue_status = get_issue_cluster_status(snapshot_dir, variant="auto")
contributor_status = get_contributor_status(snapshot_dir)
return {
"issues": {
"available": issue_status["available"],
"variant_used": issue_status.get("variant_used"),
"llm_enrichment": issue_status.get("llm_enrichment"),
"generated_at": issue_status.get("generated_at"),
"cluster_count": (issue_status.get("counts") or {}).get("meta_bugs", 0),
"duplicate_pr_count": (issue_status.get("counts") or {}).get("duplicate_prs", 0),
"available_variants": issue_status.get("available_variants") or [],
},
"contributors": {
"available": contributor_status["available"],
"generated_at": contributor_status.get("generated_at"),
"contributor_count": contributor_status.get("contributor_count", 0),
},
}
def get_issue_cluster_status(snapshot_dir: Path, *, variant: AnalysisVariant) -> dict[str, Any]:
metadata = _snapshot_metadata(snapshot_dir)
candidates = _analysis_candidates(snapshot_dir)
selection = _select_analysis_report(candidates, variant=variant)
status = {
"repo": metadata.repo,
"snapshot_id": metadata.snapshot_id,
"variant_requested": variant,
"available": selection is not None,
"available_variants": sorted({candidate["variant"] for candidate in candidates}),
}
if selection is None:
return {
**status,
"variant_used": None,
"llm_enrichment": False,
"generated_at": None,
"report_path": None,
"counts": {"meta_bugs": 0, "duplicate_issues": 0, "duplicate_prs": 0},
}
payload = selection.payload
return {
**status,
"variant_used": selection.variant_used,
"llm_enrichment": selection.llm_enrichment,
"generated_at": payload.get("generated_at"),
"report_path": selection.path.name,
"counts": _analysis_counts(payload),
}
def list_issue_clusters(
snapshot_dir: Path,
*,
limit: int | None,
variant: AnalysisVariant,
) -> dict[str, Any]:
metadata, selection, issue_map, pr_map = _analysis_context(snapshot_dir, variant=variant)
base = _analysis_base_payload(metadata, selection, variant=variant)
if selection is None:
return {**base, "clusters": [], "cluster_count": 0}
clusters = [
_issue_cluster_summary(cluster, issue_map, pr_map, rank=index)
for index, cluster in enumerate(selection.payload.get("meta_bugs") or [], start=1)
]
total = len(clusters)
return {
**base,
"clusters": clusters[:limit] if limit is not None else clusters,
"cluster_count": total,
}
def get_issue_cluster(
snapshot_dir: Path,
*,
cluster_id: str,
variant: AnalysisVariant,
) -> dict[str, Any]:
metadata, selection, issue_map, pr_map = _analysis_context(snapshot_dir, variant=variant)
base = _analysis_base_payload(metadata, selection, variant=variant)
if selection is None:
return {
**base,
"cluster_id": cluster_id,
"found": False,
"cluster": None,
"issues": [],
"pull_requests": [],
}
cluster = next(
(
row
for row in selection.payload.get("meta_bugs") or []
if str(row.get("cluster_id") or "") == cluster_id
),
None,
)
if cluster is None:
return {
**base,
"cluster_id": cluster_id,
"found": False,
"cluster": None,
"issues": [],
"pull_requests": [],
}
issue_numbers = _ordered_ints(cluster.get("issue_numbers"))
pr_numbers = _ordered_ints(cluster.get("pr_numbers"))
canonical_pr_number = _coerce_int(cluster.get("canonical_pr_number"))
return {
**base,
"cluster_id": cluster_id,
"found": True,
"cluster": {
**_issue_cluster_summary(cluster, issue_map, pr_map),
"canonical_issue_reason": cluster.get("canonical_issue_reason"),
"canonical_pr_reason": cluster.get("canonical_pr_reason"),
"best_issue_reason": cluster.get("best_issue_reason"),
"best_pr_reason": cluster.get("best_pr_reason"),
},
"issues": [_issue_member_row(number, issue_map.get(number)) for number in issue_numbers],
"pull_requests": [
_pr_member_row(
number,
pr_map.get(number),
role="canonical" if canonical_pr_number == number else "member",
)
for number in pr_numbers
],
}
def get_issue_clusters_for_pr(
snapshot_dir: Path,
*,
pr_number: int,
variant: AnalysisVariant,
) -> dict[str, Any]:
metadata, selection, issue_map, pr_map = _analysis_context(snapshot_dir, variant=variant)
base = _analysis_base_payload(metadata, selection, variant=variant)
if selection is None:
return {**base, "pr_number": pr_number, "found": False, "clusters": [], "cluster_count": 0}
matches = []
for index, cluster in enumerate(selection.payload.get("meta_bugs") or [], start=1):
pr_numbers = _ordered_ints(cluster.get("pr_numbers"))
if pr_number not in pr_numbers:
continue
canonical_pr_number = _coerce_int(cluster.get("canonical_pr_number"))
matches.append(
{
**_issue_cluster_summary(cluster, issue_map, pr_map, rank=index),
"membership_role": "canonical" if canonical_pr_number == pr_number else "member",
}
)
return {
**base,
"pr_number": pr_number,
"found": bool(matches),
"clusters": matches,
"cluster_count": len(matches),
}
def check_issue_cluster_membership(
snapshot_dir: Path,
*,
pr_number: int,
cluster_id: str | None,
variant: AnalysisVariant,
) -> dict[str, Any]:
lookup = get_issue_clusters_for_pr(snapshot_dir, pr_number=pr_number, variant=variant)
matches = list(lookup.get("clusters") or [])
matching_cluster_ids = [str(row.get("cluster_id")) for row in matches if row.get("cluster_id")]
if cluster_id is None:
return {
**lookup,
"cluster_id": None,
"matched": bool(matching_cluster_ids),
"matching_cluster_ids": matching_cluster_ids,
}
match = next((row for row in matches if row.get("cluster_id") == cluster_id), None)
return {
**lookup,
"cluster_id": cluster_id,
"matched": match is not None,
"matching_cluster_ids": matching_cluster_ids,
"membership": match,
}
def list_issue_duplicate_prs(
snapshot_dir: Path,
*,
limit: int | None,
variant: AnalysisVariant,
) -> dict[str, Any]:
metadata, selection, issue_map, pr_map = _analysis_context(snapshot_dir, variant=variant)
base = _analysis_base_payload(metadata, selection, variant=variant)
if selection is None:
return {**base, "duplicate_prs": [], "duplicate_pr_count": 0}
rows = [
_duplicate_pr_summary(entry, issue_map, pr_map, rank=index)
for index, entry in enumerate(selection.payload.get("duplicate_prs") or [], start=1)
]
total = len(rows)
return {
**base,
"duplicate_prs": rows[:limit] if limit is not None else rows,
"duplicate_pr_count": total,
}
def get_issue_best(snapshot_dir: Path, *, variant: AnalysisVariant) -> dict[str, Any]:
metadata, selection, issue_map, pr_map = _analysis_context(snapshot_dir, variant=variant)
base = _analysis_base_payload(metadata, selection, variant=variant)
if selection is None:
return {**base, "best_issue": None, "best_pr": None}
return {
**base,
"best_issue": _best_issue_summary(selection.payload.get("best_issue"), issue_map),
"best_pr": _best_pr_summary(selection.payload.get("best_pr"), pr_map),
}
def get_contributor_status(snapshot_dir: Path) -> dict[str, Any]:
metadata = _snapshot_metadata(snapshot_dir)
report = _read_optional_json(snapshot_dir / "new-contributors-report.json")
raw_contributors = report.get("contributors")
contributors: list[Any] = raw_contributors if isinstance(raw_contributors, list) else []
return {
"repo": str(report.get("repo") or metadata.repo),
"snapshot_id": str(report.get("snapshot_id") or metadata.snapshot_id),
"available": bool(report),
"generated_at": report.get("generated_at"),
"window_days": _coerce_int(report.get("window_days")),
"contributor_count": len(contributors),
}
def list_contributors(snapshot_dir: Path, *, limit: int | None) -> dict[str, Any]:
status = get_contributor_status(snapshot_dir)
report = _read_optional_json(snapshot_dir / "new-contributors-report.json")
rows = [
_contributor_summary(entry, rank=index)
for index, entry in enumerate(report.get("contributors") or [], start=1)
if isinstance(entry, dict)
]
total = len(rows)
return {
**status,
"contributors": rows[:limit] if limit is not None else rows,
"contributor_count": total,
}
def get_contributor(snapshot_dir: Path, *, author_login: str) -> dict[str, Any]:
status = get_contributor_status(snapshot_dir)
report = _read_optional_json(snapshot_dir / "new-contributors-report.json")
contributor = _find_contributor(report.get("contributors") or [], author_login)
if contributor is None:
return {
**status,
"author_login": author_login,
"found": False,
"summary": None,
"risk": None,
"contributor": None,
}
return {
**status,
"author_login": str(contributor.get("author_login") or author_login),
"found": True,
"summary": _contributor_summary(contributor),
"risk": _contributor_risk(contributor),
"contributor": contributor,
}
def get_contributor_risk(snapshot_dir: Path, *, author_login: str) -> dict[str, Any]:
contributor = get_contributor(snapshot_dir, author_login=author_login)
risk = contributor.get("risk")
return {
"repo": contributor.get("repo"),
"snapshot_id": contributor.get("snapshot_id"),
"available": contributor.get("available"),
"generated_at": contributor.get("generated_at"),
"author_login": contributor.get("author_login"),
"found": contributor.get("found"),
"risk_available": risk is not None,
"risk": risk,
}
def _analysis_context(
snapshot_dir: Path,
*,
variant: AnalysisVariant,
) -> tuple[
_SnapshotMetadata,
_AnalysisSelection | None,
dict[int, dict[str, Any]],
dict[int, dict[str, Any]],
]:
metadata = _snapshot_metadata(snapshot_dir)
selection = _select_analysis_report(_analysis_candidates(snapshot_dir), variant=variant)
issue_map, pr_map = _artifact_maps(snapshot_dir)
return metadata, selection, issue_map, pr_map
def _analysis_base_payload(
metadata: _SnapshotMetadata,
selection: _AnalysisSelection | None,
*,
variant: AnalysisVariant,
) -> dict[str, Any]:
base = {
"repo": metadata.repo,
"snapshot_id": metadata.snapshot_id,
"variant_requested": variant,
"available": selection is not None,
"variant_used": None,
"llm_enrichment": False,
"generated_at": None,
}
if selection is None:
return base
return {
**base,
"variant_used": selection.variant_used,
"llm_enrichment": selection.llm_enrichment,
"generated_at": selection.payload.get("generated_at"),
}
def _analysis_candidates(snapshot_dir: Path) -> list[dict[str, Any]]:
candidates: list[dict[str, Any]] = []
for path in _analysis_report_paths(snapshot_dir):
payload = _read_optional_json(path)
if not payload:
continue
llm_enrichment = bool(payload.get("llm_enrichment"))
candidates.append(
{
"path": path,
"payload": payload,
"variant": _analysis_variant(path.name, payload, llm_enrichment=llm_enrichment),
"llm_enrichment": llm_enrichment,
}
)
return candidates
def _select_analysis_report(
candidates: list[dict[str, Any]],
*,
variant: AnalysisVariant,
) -> _AnalysisSelection | None:
if not candidates:
return None
if variant == "auto":
ordered = sorted(candidates, key=_analysis_auto_priority)
else:
ordered = [candidate for candidate in candidates if candidate["variant"] == variant]
ordered.sort(key=_analysis_specific_priority)
if not ordered:
return None
winner = ordered[0]
return _AnalysisSelection(
path=Path(winner["path"]),
payload=dict(winner["payload"]),
variant_used=str(winner["variant"]),
llm_enrichment=bool(winner["llm_enrichment"]),
)
def _analysis_report_paths(snapshot_dir: Path) -> list[Path]:
ordered: list[Path] = []
current_manifest_path = repo_relative_path_to_local(
snapshot_dir, CURRENT_ANALYSIS_MANIFEST_PATH
)
if current_manifest_path.exists():
try:
current_manifest = load_current_analysis_manifest(current_manifest_path)
except ValueError:
current_manifest = None
if current_manifest is not None:
for artifact_path in (current_manifest.get("artifacts") or {}).values():
if not isinstance(artifact_path, str):
continue
ordered.append(repo_relative_path_to_local(snapshot_dir, artifact_path))
ordered.extend(
[
snapshot_dir / "analysis-report-hybrid.json",
snapshot_dir / "analysis-report-deterministic.json",
snapshot_dir / "analysis-report.json",
]
)
seen: set[Path] = set()
deduped: list[Path] = []
for path in ordered:
if path in seen:
continue
seen.add(path)
deduped.append(path)
deduped.extend(
path for path in sorted(snapshot_dir.glob("analysis-report*.json")) if path not in seen
)
return [path for path in deduped if path.exists()]
def _analysis_auto_priority(candidate: dict[str, Any]) -> tuple[int, str]:
path = Path(candidate["path"])
if path.name == "analysis-report-hybrid.json":
return (0, path.name)
if bool(candidate.get("llm_enrichment")):
return (1, path.name)
if path.name == "analysis-report.json":
return (2, path.name)
return (3, path.name)
def _analysis_specific_priority(candidate: dict[str, Any]) -> tuple[int, str]:
path = Path(candidate["path"])
if path.name.endswith(f"-{candidate['variant']}.json"):
return (0, path.name)
if path.name == "analysis-report.json":
return (1, path.name)
return (2, path.name)
def _analysis_variant(path_name: str, payload: dict[str, Any], *, llm_enrichment: bool) -> str:
lowered = path_name.lower()
if "hybrid" in lowered:
return "hybrid"
if "deterministic" in lowered:
return "deterministic"
if isinstance(payload.get("variant_used"), str):
variant_used = str(payload["variant_used"]).strip().lower()
if variant_used in {"hybrid", "deterministic"}:
return variant_used
return "hybrid" if llm_enrichment else "deterministic"
def _analysis_counts(payload: dict[str, Any]) -> dict[str, int]:
return {
"meta_bugs": len(payload.get("meta_bugs") or []),
"duplicate_issues": len(payload.get("duplicate_issues") or []),
"duplicate_prs": len(payload.get("duplicate_prs") or []),
}
def _artifact_maps(
snapshot_dir: Path,
) -> tuple[dict[int, dict[str, Any]], dict[int, dict[str, Any]]]:
issue_rows = (
read_parquet_rows(snapshot_dir / "issues.parquet")
if (snapshot_dir / "issues.parquet").exists()
else []
)
pr_rows = (
read_parquet_rows(snapshot_dir / "pull_requests.parquet")
if (snapshot_dir / "pull_requests.parquet").exists()
else []
)
issue_map = {
int(row["number"]): row for row in issue_rows if _coerce_int(row.get("number")) is not None
}
pr_map = {
int(row["number"]): row for row in pr_rows if _coerce_int(row.get("number")) is not None
}
return issue_map, pr_map
def _issue_cluster_summary(
cluster: dict[str, Any],
issue_map: dict[int, dict[str, Any]],
pr_map: dict[int, dict[str, Any]],
*,
rank: int | None = None,
) -> dict[str, Any]:
canonical_issue_number = _coerce_int(cluster.get("canonical_issue_number"))
canonical_pr_number = _coerce_int(cluster.get("canonical_pr_number"))
issue_numbers = _ordered_ints(cluster.get("issue_numbers"))
pr_numbers = _ordered_ints(cluster.get("pr_numbers"))
return {
"rank": rank,
"cluster_id": str(cluster.get("cluster_id") or f"cluster-{rank or 0}"),
"title": _cluster_title(
cluster, issue_map, pr_map, canonical_issue_number, canonical_pr_number
),
"summary": cluster.get("summary"),
"status": cluster.get("status"),
"confidence": _coerce_float(cluster.get("confidence")),
"canonical_issue_number": canonical_issue_number,
"canonical_issue_title": _title_for_issue(canonical_issue_number, issue_map),
"canonical_issue_url": _url_for_issue(canonical_issue_number, issue_map),
"canonical_pr_number": canonical_pr_number,
"canonical_pr_title": _title_for_pr(canonical_pr_number, pr_map),
"canonical_pr_url": _url_for_pr(canonical_pr_number, pr_map),
"issue_numbers": issue_numbers,
"issue_count": len(issue_numbers),
"pr_numbers": pr_numbers,
"pr_count": len(pr_numbers),
"evidence_types": [str(value) for value in (cluster.get("evidence_types") or []) if value],
"github_url": _cluster_url(canonical_issue_number, canonical_pr_number, issue_map, pr_map),
}
def _cluster_title(
cluster: dict[str, Any],
issue_map: dict[int, dict[str, Any]],
pr_map: dict[int, dict[str, Any]],
canonical_issue_number: int | None,
canonical_pr_number: int | None,
) -> str:
issue_title = _title_for_issue(canonical_issue_number, issue_map)
if issue_title:
return issue_title
pr_title = _title_for_pr(canonical_pr_number, pr_map)
if pr_title:
return pr_title
summary = str(cluster.get("summary") or "").strip()
if summary:
return summary
return str(cluster.get("cluster_id") or "cluster")
def _cluster_url(
canonical_issue_number: int | None,
canonical_pr_number: int | None,
issue_map: dict[int, dict[str, Any]],
pr_map: dict[int, dict[str, Any]],
) -> str | None:
return _url_for_issue(canonical_issue_number, issue_map) or _url_for_pr(
canonical_pr_number, pr_map
)
def _duplicate_pr_summary(
entry: dict[str, Any],
issue_map: dict[int, dict[str, Any]],
pr_map: dict[int, dict[str, Any]],
*,
rank: int,
) -> dict[str, Any]:
canonical_pr_number = _coerce_int(entry.get("canonical_pr_number"))
target_issue_number = _coerce_int(entry.get("target_issue_number"))
duplicates = _ordered_ints(entry.get("duplicate_pr_numbers"))
return {
"rank": rank,
"cluster_id": str(entry.get("cluster_id") or f"duplicate-pr-{rank}"),
"canonical_pr_number": canonical_pr_number,
"canonical_pr_title": _title_for_pr(canonical_pr_number, pr_map),
"canonical_pr_url": _url_for_pr(canonical_pr_number, pr_map),
"target_issue_number": target_issue_number,
"target_issue_title": _title_for_issue(target_issue_number, issue_map),
"target_issue_url": _url_for_issue(target_issue_number, issue_map),
"duplicate_pr_numbers": duplicates,
"duplicate_pr_count": len(duplicates),
"reason": entry.get("reason"),
}
def _best_issue_summary(entry: Any, issue_map: dict[int, dict[str, Any]]) -> dict[str, Any] | None:
if not isinstance(entry, dict):
return None
issue_number = _coerce_int(entry.get("issue_number"))
return {
"cluster_id": entry.get("cluster_id"),
"issue_number": issue_number,
"title": _title_for_issue(issue_number, issue_map),
"url": _url_for_issue(issue_number, issue_map),
"reason": entry.get("reason"),
"score": _coerce_float(entry.get("score")),
}
def _best_pr_summary(entry: Any, pr_map: dict[int, dict[str, Any]]) -> dict[str, Any] | None:
if not isinstance(entry, dict):
return None
pr_number = _coerce_int(entry.get("pr_number"))
return {
"cluster_id": entry.get("cluster_id"),
"pr_number": pr_number,
"title": _title_for_pr(pr_number, pr_map),
"url": _url_for_pr(pr_number, pr_map),
"reason": entry.get("reason"),
"score": _coerce_float(entry.get("score")),
}
def _issue_member_row(number: int, row: dict[str, Any] | None) -> dict[str, Any]:
row = row or {}
return {
"number": number,
"title": row.get("title"),
"state": row.get("state"),
"author_login": row.get("author_login"),
"created_at": row.get("created_at"),
"updated_at": row.get("updated_at"),
"html_url": row.get("html_url"),
}
def _pr_member_row(number: int, row: dict[str, Any] | None, *, role: str) -> dict[str, Any]:
row = row or {}
return {
"number": number,
"role": role,
"title": row.get("title"),
"author_login": row.get("author_login"),
"state": row.get("state"),
"draft": bool(row.get("draft")),
"merged": bool(row.get("merged")),
"author_association": row.get("author_association"),
"created_at": row.get("created_at"),
"updated_at": row.get("updated_at"),
"html_url": row.get("html_url"),
}
def _contributor_summary(contributor: dict[str, Any], *, rank: int | None = None) -> dict[str, Any]:
raw_activity = contributor.get("activity")
activity: dict[str, Any] = raw_activity if isinstance(raw_activity, dict) else {}
return {
"rank": rank,
"author_login": contributor.get("author_login"),
"name": contributor.get("name"),
"profile_url": contributor.get("profile_url"),
"repo_association": contributor.get("repo_association"),
"first_seen_in_snapshot": contributor.get("first_seen_in_snapshot"),
"new_to_repo": contributor.get("new_to_repo"),
"snapshot_pr_count": _coerce_int(contributor.get("snapshot_pr_count")) or 0,
"snapshot_issue_count": _coerce_int(contributor.get("snapshot_issue_count")) or 0,
"follow_through_score": contributor.get("follow_through_score"),
"breadth_score": contributor.get("breadth_score"),
"automation_risk_signal": contributor.get("automation_risk_signal"),
"heuristic_note": contributor.get("heuristic_note"),
"account_age_days": _coerce_int(contributor.get("account_age_days")),
"public_pr_count_42d": _coerce_int(activity.get("visible_authored_pr_count")),
"public_repo_count_42d": _coerce_int(activity.get("distinct_repos_with_authored_prs")),
"repo_pull_requests_url": contributor.get("repo_pull_requests_url"),
"repo_issues_url": contributor.get("repo_issues_url"),
}
def _contributor_risk(contributor: dict[str, Any]) -> dict[str, Any]:
raw_activity = contributor.get("activity")
activity: dict[str, Any] = raw_activity if isinstance(raw_activity, dict) else {}
return {
"automation_risk_signal": contributor.get("automation_risk_signal"),
"heuristic_note": contributor.get("heuristic_note"),
"follow_through_score": contributor.get("follow_through_score"),
"breadth_score": contributor.get("breadth_score"),
"account_age_days": _coerce_int(contributor.get("account_age_days")),
"public_pr_count_42d": _coerce_int(activity.get("visible_authored_pr_count")),
"public_repo_count_42d": _coerce_int(activity.get("distinct_repos_with_authored_prs")),
"report_reason": contributor.get("report_reason"),
}
def _find_contributor(entries: list[Any], author_login: str) -> dict[str, Any] | None:
lowered = author_login.casefold()
for entry in entries:
if not isinstance(entry, dict):
continue
login = str(entry.get("author_login") or "")
if login.casefold() == lowered:
return entry
return None
def _snapshot_metadata(snapshot_dir: Path) -> _SnapshotMetadata:
manifest = _read_optional_json(snapshot_dir / "manifest.json")
repo = str(manifest.get("repo") or _infer_repo(snapshot_dir) or "")
snapshot_id = str(manifest.get("snapshot_id") or snapshot_dir.name)
return _SnapshotMetadata(repo=repo, snapshot_id=snapshot_id)
def _infer_repo(snapshot_dir: Path) -> str | None:
for filename in ("pull_requests.parquet", "issues.parquet"):
path = snapshot_dir / filename
if not path.exists():
continue
rows = read_parquet_rows(path)
if rows and rows[0].get("repo"):
return str(rows[0]["repo"])
for filename in _analysis_report_paths(snapshot_dir):
payload = _read_optional_json(filename)
if payload.get("repo"):
return str(payload["repo"])
report = _read_optional_json(snapshot_dir / "new-contributors-report.json")
if report.get("repo"):
return str(report["repo"])
return None
def _title_for_issue(number: int | None, issue_map: dict[int, dict[str, Any]]) -> str | None:
if number is None or number not in issue_map:
return None
title = issue_map[number].get("title")
return str(title) if title else None
def _url_for_issue(number: int | None, issue_map: dict[int, dict[str, Any]]) -> str | None:
if number is None or number not in issue_map:
return None
value = issue_map[number].get("html_url")
return str(value) if value else None
def _title_for_pr(number: int | None, pr_map: dict[int, dict[str, Any]]) -> str | None:
if number is None or number not in pr_map:
return None
title = pr_map[number].get("title")
return str(title) if title else None
def _url_for_pr(number: int | None, pr_map: dict[int, dict[str, Any]]) -> str | None:
if number is None or number not in pr_map:
return None
value = pr_map[number].get("html_url")
return str(value) if value else None
def _ordered_ints(values: Any) -> list[int]:
if not isinstance(values, list):
return []
ordered: list[int] = []
for value in values:
number = _coerce_int(value)
if number is not None:
ordered.append(number)
return ordered
def _coerce_int(value: Any) -> int | None:
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
return None
def _coerce_float(value: Any) -> float | None:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def _read_optional_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
payload = read_json(path)
return payload if isinstance(payload, dict) else {}