Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import urllib.error | |
| import urllib.request | |
| from collections.abc import Callable, Iterable | |
| from typing import Any | |
| from slop_farmer.data.http import urlopen_with_retry | |
| class GhReplicaApiRequestError(RuntimeError): | |
| """Raised when ghreplica returns a non-recoverable HTTP response.""" | |
| def __init__(self, status_code: int, path: str, detail: str): | |
| self.status_code = status_code | |
| self.path = path | |
| self.detail = detail | |
| super().__init__(f"ghreplica API request failed: {status_code} {path} {detail}") | |
| class GhReplicaProbeUnavailableError(RuntimeError): | |
| """Raised when ghreplica cannot yet serve a live probe payload.""" | |
| def __init__(self, detail: str, *, status_code: int = 503): | |
| self.status_code = status_code | |
| super().__init__(detail) | |
| class GhrProbeClient: | |
| provider = "ghreplica" | |
| def __init__( | |
| self, | |
| *, | |
| base_url: str, | |
| timeout: int = 180, | |
| max_retries: int = 5, | |
| log: Callable[[str], None] | None = None, | |
| ): | |
| self.base_url = base_url.rstrip("/") | |
| self.timeout = timeout | |
| self.max_retries = max_retries | |
| self.log = log | |
| def _request_json(self, path: str) -> Any: | |
| request = urllib.request.Request(f"{self.base_url}{path}") | |
| request.add_header("Accept", "application/json") | |
| try: | |
| with urlopen_with_retry( | |
| request, | |
| timeout=self.timeout, | |
| max_retries=self.max_retries, | |
| log=self.log, | |
| label=path, | |
| ) as response: | |
| payload = response.read().decode("utf-8") | |
| except urllib.error.HTTPError as exc: | |
| detail = exc.read().decode("utf-8", errors="replace") | |
| raise GhReplicaApiRequestError(exc.code, path, detail) from exc | |
| return json.loads(payload) | |
| def _request_json_or_none(self, path: str) -> Any | None: | |
| try: | |
| return self._request_json(path) | |
| except GhReplicaApiRequestError as exc: | |
| if exc.status_code == 404: | |
| return None | |
| raise | |
| def get_pull_request(self, owner: str, repo: str, number: int) -> dict[str, Any]: | |
| try: | |
| payload = self._request_json(f"/v1/github/repos/{owner}/{repo}/pulls/{number}") | |
| except GhReplicaApiRequestError as exc: | |
| if exc.status_code == 404: | |
| raise GhReplicaProbeUnavailableError( | |
| f"PR #{number} was not found in ghreplica.", | |
| status_code=404, | |
| ) from exc | |
| raise | |
| if not isinstance(payload, dict): | |
| raise RuntimeError(f"Expected dict payload for pull request, got {type(payload)!r}") | |
| return payload | |
| def iter_pull_files(self, owner: str, repo: str, number: int) -> Iterable[dict[str, Any]]: | |
| try: | |
| payload = self._request_json(f"/v1/changes/repos/{owner}/{repo}/pulls/{number}/files") | |
| except GhReplicaApiRequestError as exc: | |
| if exc.status_code != 404: | |
| raise | |
| status = self.get_pull_request_status(owner, repo, number) | |
| if isinstance(status, dict): | |
| detail_bits = [] | |
| for key in ( | |
| "indexed", | |
| "backfill_in_progress", | |
| "changed_files", | |
| "indexed_file_count", | |
| ): | |
| if key in status: | |
| detail_bits.append(f"{key}={status[key]}") | |
| suffix = f" ({', '.join(detail_bits)})" if detail_bits else "" | |
| raise GhReplicaProbeUnavailableError( | |
| f"PR #{number} is not available in ghreplica yet{suffix}.", | |
| status_code=503, | |
| ) from exc | |
| raise GhReplicaProbeUnavailableError( | |
| f"PR #{number} was not found in ghreplica changed-file replica.", | |
| status_code=404, | |
| ) from exc | |
| rows = payload if isinstance(payload, list) else payload.get("files") | |
| if not isinstance(rows, list): | |
| raise RuntimeError( | |
| f"Expected list payload for pull request files, got {type(payload)!r}" | |
| ) | |
| for row in rows: | |
| if not isinstance(row, dict): | |
| continue | |
| additions = int(row.get("additions") or 0) | |
| deletions = int(row.get("deletions") or 0) | |
| yield { | |
| "sha": row.get("sha"), | |
| "filename": row.get("filename") or row.get("path"), | |
| "status": row.get("status"), | |
| "additions": additions, | |
| "deletions": deletions, | |
| "changes": row.get("changes") or additions + deletions, | |
| "blob_url": row.get("blob_url"), | |
| "raw_url": row.get("raw_url"), | |
| "contents_url": row.get("contents_url"), | |
| "previous_filename": row.get("previous_filename"), | |
| "patch": row.get("patch"), | |
| } | |
| def get_pull_request_status(self, owner: str, repo: str, number: int) -> dict[str, Any] | None: | |
| payload = self._request_json_or_none( | |
| f"/v1/changes/repos/{owner}/{repo}/pulls/{number}/status" | |
| ) | |
| if payload is None: | |
| return None | |
| if not isinstance(payload, dict): | |
| raise RuntimeError( | |
| f"Expected dict payload for pull request status, got {type(payload)!r}" | |
| ) | |
| return payload | |