from __future__ import annotations import json import urllib.error import urllib.request from collections.abc import Callable, Iterable from typing import Any from slop_farmer.data.http import urlopen_with_retry class GhReplicaApiRequestError(RuntimeError): """Raised when ghreplica returns a non-recoverable HTTP response.""" def __init__(self, status_code: int, path: str, detail: str): self.status_code = status_code self.path = path self.detail = detail super().__init__(f"ghreplica API request failed: {status_code} {path} {detail}") class GhReplicaProbeUnavailableError(RuntimeError): """Raised when ghreplica cannot yet serve a live probe payload.""" def __init__(self, detail: str, *, status_code: int = 503): self.status_code = status_code super().__init__(detail) class GhrProbeClient: provider = "ghreplica" def __init__( self, *, base_url: str, timeout: int = 180, max_retries: int = 5, log: Callable[[str], None] | None = None, ): self.base_url = base_url.rstrip("/") self.timeout = timeout self.max_retries = max_retries self.log = log def _request_json(self, path: str) -> Any: request = urllib.request.Request(f"{self.base_url}{path}") request.add_header("Accept", "application/json") try: with urlopen_with_retry( request, timeout=self.timeout, max_retries=self.max_retries, log=self.log, label=path, ) as response: payload = response.read().decode("utf-8") except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") raise GhReplicaApiRequestError(exc.code, path, detail) from exc return json.loads(payload) def _request_json_or_none(self, path: str) -> Any | None: try: return self._request_json(path) except GhReplicaApiRequestError as exc: if exc.status_code == 404: return None raise def get_pull_request(self, owner: str, repo: str, number: int) -> dict[str, Any]: try: payload = self._request_json(f"/v1/github/repos/{owner}/{repo}/pulls/{number}") except GhReplicaApiRequestError as exc: if exc.status_code == 404: raise GhReplicaProbeUnavailableError( f"PR #{number} was not found in ghreplica.", status_code=404, ) from exc raise if not isinstance(payload, dict): raise RuntimeError(f"Expected dict payload for pull request, got {type(payload)!r}") return payload def iter_pull_files(self, owner: str, repo: str, number: int) -> Iterable[dict[str, Any]]: try: payload = self._request_json(f"/v1/changes/repos/{owner}/{repo}/pulls/{number}/files") except GhReplicaApiRequestError as exc: if exc.status_code != 404: raise status = self.get_pull_request_status(owner, repo, number) if isinstance(status, dict): detail_bits = [] for key in ( "indexed", "backfill_in_progress", "changed_files", "indexed_file_count", ): if key in status: detail_bits.append(f"{key}={status[key]}") suffix = f" ({', '.join(detail_bits)})" if detail_bits else "" raise GhReplicaProbeUnavailableError( f"PR #{number} is not available in ghreplica yet{suffix}.", status_code=503, ) from exc raise GhReplicaProbeUnavailableError( f"PR #{number} was not found in ghreplica changed-file replica.", status_code=404, ) from exc rows = payload if isinstance(payload, list) else payload.get("files") if not isinstance(rows, list): raise RuntimeError( f"Expected list payload for pull request files, got {type(payload)!r}" ) for row in rows: if not isinstance(row, dict): continue additions = int(row.get("additions") or 0) deletions = int(row.get("deletions") or 0) yield { "sha": row.get("sha"), "filename": row.get("filename") or row.get("path"), "status": row.get("status"), "additions": additions, "deletions": deletions, "changes": row.get("changes") or additions + deletions, "blob_url": row.get("blob_url"), "raw_url": row.get("raw_url"), "contents_url": row.get("contents_url"), "previous_filename": row.get("previous_filename"), "patch": row.get("patch"), } def get_pull_request_status(self, owner: str, repo: str, number: int) -> dict[str, Any] | None: payload = self._request_json_or_none( f"/v1/changes/repos/{owner}/{repo}/pulls/{number}/status" ) if payload is None: return None if not isinstance(payload, dict): raise RuntimeError( f"Expected dict payload for pull request status, got {type(payload)!r}" ) return payload