Spaces:
Sleeping
Sleeping
File size: 5,574 Bytes
dbf7313 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | from __future__ import annotations
import json
import urllib.error
import urllib.request
from collections.abc import Callable, Iterable
from typing import Any
from slop_farmer.data.http import urlopen_with_retry
class GhReplicaApiRequestError(RuntimeError):
"""Raised when ghreplica returns a non-recoverable HTTP response."""
def __init__(self, status_code: int, path: str, detail: str):
self.status_code = status_code
self.path = path
self.detail = detail
super().__init__(f"ghreplica API request failed: {status_code} {path} {detail}")
class GhReplicaProbeUnavailableError(RuntimeError):
"""Raised when ghreplica cannot yet serve a live probe payload."""
def __init__(self, detail: str, *, status_code: int = 503):
self.status_code = status_code
super().__init__(detail)
class GhrProbeClient:
provider = "ghreplica"
def __init__(
self,
*,
base_url: str,
timeout: int = 180,
max_retries: int = 5,
log: Callable[[str], None] | None = None,
):
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.max_retries = max_retries
self.log = log
def _request_json(self, path: str) -> Any:
request = urllib.request.Request(f"{self.base_url}{path}")
request.add_header("Accept", "application/json")
try:
with urlopen_with_retry(
request,
timeout=self.timeout,
max_retries=self.max_retries,
log=self.log,
label=path,
) as response:
payload = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise GhReplicaApiRequestError(exc.code, path, detail) from exc
return json.loads(payload)
def _request_json_or_none(self, path: str) -> Any | None:
try:
return self._request_json(path)
except GhReplicaApiRequestError as exc:
if exc.status_code == 404:
return None
raise
def get_pull_request(self, owner: str, repo: str, number: int) -> dict[str, Any]:
try:
payload = self._request_json(f"/v1/github/repos/{owner}/{repo}/pulls/{number}")
except GhReplicaApiRequestError as exc:
if exc.status_code == 404:
raise GhReplicaProbeUnavailableError(
f"PR #{number} was not found in ghreplica.",
status_code=404,
) from exc
raise
if not isinstance(payload, dict):
raise RuntimeError(f"Expected dict payload for pull request, got {type(payload)!r}")
return payload
def iter_pull_files(self, owner: str, repo: str, number: int) -> Iterable[dict[str, Any]]:
try:
payload = self._request_json(f"/v1/changes/repos/{owner}/{repo}/pulls/{number}/files")
except GhReplicaApiRequestError as exc:
if exc.status_code != 404:
raise
status = self.get_pull_request_status(owner, repo, number)
if isinstance(status, dict):
detail_bits = []
for key in (
"indexed",
"backfill_in_progress",
"changed_files",
"indexed_file_count",
):
if key in status:
detail_bits.append(f"{key}={status[key]}")
suffix = f" ({', '.join(detail_bits)})" if detail_bits else ""
raise GhReplicaProbeUnavailableError(
f"PR #{number} is not available in ghreplica yet{suffix}.",
status_code=503,
) from exc
raise GhReplicaProbeUnavailableError(
f"PR #{number} was not found in ghreplica changed-file replica.",
status_code=404,
) from exc
rows = payload if isinstance(payload, list) else payload.get("files")
if not isinstance(rows, list):
raise RuntimeError(
f"Expected list payload for pull request files, got {type(payload)!r}"
)
for row in rows:
if not isinstance(row, dict):
continue
additions = int(row.get("additions") or 0)
deletions = int(row.get("deletions") or 0)
yield {
"sha": row.get("sha"),
"filename": row.get("filename") or row.get("path"),
"status": row.get("status"),
"additions": additions,
"deletions": deletions,
"changes": row.get("changes") or additions + deletions,
"blob_url": row.get("blob_url"),
"raw_url": row.get("raw_url"),
"contents_url": row.get("contents_url"),
"previous_filename": row.get("previous_filename"),
"patch": row.get("patch"),
}
def get_pull_request_status(self, owner: str, repo: str, number: int) -> dict[str, Any] | None:
payload = self._request_json_or_none(
f"/v1/changes/repos/{owner}/{repo}/pulls/{number}/status"
)
if payload is None:
return None
if not isinstance(payload, dict):
raise RuntimeError(
f"Expected dict payload for pull request status, got {type(payload)!r}"
)
return payload
|