File size: 5,574 Bytes
dbf7313
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from __future__ import annotations

import json
import urllib.error
import urllib.request
from collections.abc import Callable, Iterable
from typing import Any

from slop_farmer.data.http import urlopen_with_retry


class GhReplicaApiRequestError(RuntimeError):
    """Raised when ghreplica returns a non-recoverable HTTP response."""

    def __init__(self, status_code: int, path: str, detail: str):
        self.status_code = status_code
        self.path = path
        self.detail = detail
        super().__init__(f"ghreplica API request failed: {status_code} {path} {detail}")


class GhReplicaProbeUnavailableError(RuntimeError):
    """Raised when ghreplica cannot yet serve a live probe payload."""

    def __init__(self, detail: str, *, status_code: int = 503):
        self.status_code = status_code
        super().__init__(detail)


class GhrProbeClient:
    provider = "ghreplica"

    def __init__(
        self,
        *,
        base_url: str,
        timeout: int = 180,
        max_retries: int = 5,
        log: Callable[[str], None] | None = None,
    ):
        self.base_url = base_url.rstrip("/")
        self.timeout = timeout
        self.max_retries = max_retries
        self.log = log

    def _request_json(self, path: str) -> Any:
        request = urllib.request.Request(f"{self.base_url}{path}")
        request.add_header("Accept", "application/json")
        try:
            with urlopen_with_retry(
                request,
                timeout=self.timeout,
                max_retries=self.max_retries,
                log=self.log,
                label=path,
            ) as response:
                payload = response.read().decode("utf-8")
        except urllib.error.HTTPError as exc:
            detail = exc.read().decode("utf-8", errors="replace")
            raise GhReplicaApiRequestError(exc.code, path, detail) from exc
        return json.loads(payload)

    def _request_json_or_none(self, path: str) -> Any | None:
        try:
            return self._request_json(path)
        except GhReplicaApiRequestError as exc:
            if exc.status_code == 404:
                return None
            raise

    def get_pull_request(self, owner: str, repo: str, number: int) -> dict[str, Any]:
        try:
            payload = self._request_json(f"/v1/github/repos/{owner}/{repo}/pulls/{number}")
        except GhReplicaApiRequestError as exc:
            if exc.status_code == 404:
                raise GhReplicaProbeUnavailableError(
                    f"PR #{number} was not found in ghreplica.",
                    status_code=404,
                ) from exc
            raise
        if not isinstance(payload, dict):
            raise RuntimeError(f"Expected dict payload for pull request, got {type(payload)!r}")
        return payload

    def iter_pull_files(self, owner: str, repo: str, number: int) -> Iterable[dict[str, Any]]:
        try:
            payload = self._request_json(f"/v1/changes/repos/{owner}/{repo}/pulls/{number}/files")
        except GhReplicaApiRequestError as exc:
            if exc.status_code != 404:
                raise
            status = self.get_pull_request_status(owner, repo, number)
            if isinstance(status, dict):
                detail_bits = []
                for key in (
                    "indexed",
                    "backfill_in_progress",
                    "changed_files",
                    "indexed_file_count",
                ):
                    if key in status:
                        detail_bits.append(f"{key}={status[key]}")
                suffix = f" ({', '.join(detail_bits)})" if detail_bits else ""
                raise GhReplicaProbeUnavailableError(
                    f"PR #{number} is not available in ghreplica yet{suffix}.",
                    status_code=503,
                ) from exc
            raise GhReplicaProbeUnavailableError(
                f"PR #{number} was not found in ghreplica changed-file replica.",
                status_code=404,
            ) from exc
        rows = payload if isinstance(payload, list) else payload.get("files")
        if not isinstance(rows, list):
            raise RuntimeError(
                f"Expected list payload for pull request files, got {type(payload)!r}"
            )
        for row in rows:
            if not isinstance(row, dict):
                continue
            additions = int(row.get("additions") or 0)
            deletions = int(row.get("deletions") or 0)
            yield {
                "sha": row.get("sha"),
                "filename": row.get("filename") or row.get("path"),
                "status": row.get("status"),
                "additions": additions,
                "deletions": deletions,
                "changes": row.get("changes") or additions + deletions,
                "blob_url": row.get("blob_url"),
                "raw_url": row.get("raw_url"),
                "contents_url": row.get("contents_url"),
                "previous_filename": row.get("previous_filename"),
                "patch": row.get("patch"),
            }

    def get_pull_request_status(self, owner: str, repo: str, number: int) -> dict[str, Any] | None:
        payload = self._request_json_or_none(
            f"/v1/changes/repos/{owner}/{repo}/pulls/{number}/status"
        )
        if payload is None:
            return None
        if not isinstance(payload, dict):
            raise RuntimeError(
                f"Expected dict payload for pull request status, got {type(payload)!r}"
            )
        return payload