File size: 7,022 Bytes
dbf7313
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
from __future__ import annotations

import os
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Any


def _read_gh_token() -> str | None:
    try:
        result = subprocess.run(
            ["gh", "auth", "token"],
            check=True,
            capture_output=True,
            text=True,
        )
    except (OSError, subprocess.CalledProcessError):
        return None
    token = result.stdout.strip()
    return token or None


def _read_dotenv_token() -> str | None:
    for directory in (Path.cwd(), *Path.cwd().parents):
        path = directory / ".env"
        if not path.exists():
            continue
        values: dict[str, str] = {}
        for line in path.read_text(encoding="utf-8").splitlines():
            line = line.strip()
            if not line or line.startswith("#") or "=" not in line:
                continue
            key, value = line.split("=", 1)
            values[key.strip()] = value.strip().strip("'").strip('"')
        for key in ("GITHUB_TOKEN", "GRAPHQL_TOKEN", "GH_TOKEN"):
            token = values.get(key)
            if token:
                return token
    return None


def resolve_github_token() -> str | None:
    return (
        os.environ.get("GITHUB_TOKEN")
        or os.environ.get("GRAPHQL_TOKEN")
        or os.environ.get("GH_TOKEN")
        or _read_dotenv_token()
        or _read_gh_token()
    )


@dataclass(slots=True)
class RepoRef:
    owner: str
    name: str

    @classmethod
    def parse(cls, raw: str) -> RepoRef:
        owner, sep, name = raw.partition("/")
        if not sep or not owner or not name:
            raise ValueError(f"Expected REPO in owner/name form, got: {raw!r}")
        return cls(owner=owner, name=name)

    @property
    def slug(self) -> str:
        return f"{self.owner}/{self.name}"


@dataclass(slots=True)
class PipelineOptions:
    repo: RepoRef
    output_dir: Path
    since: str | None
    resume: bool
    http_timeout: int
    http_max_retries: int
    max_issues: int | None
    max_prs: int | None
    max_issue_comments: int | None
    max_reviews_per_pr: int | None
    max_review_comments_per_pr: int | None
    fetch_timeline: bool
    new_contributor_report: bool
    new_contributor_window_days: int
    new_contributor_max_authors: int
    issue_max_age_days: int | None
    pr_max_age_days: int | None


@dataclass(slots=True)
class AnalysisOptions:
    snapshot_dir: Path | None
    output_dir: Path
    output: Path | None
    hf_repo_id: str | None
    hf_revision: str | None
    hf_materialize_dir: Path | None
    ranking_backend: str
    model: str
    max_clusters: int
    hybrid_llm_concurrency: int = 1
    open_prs_only: bool = False
    cached_analysis: bool = False
    pr_template_cleanup_mode: str = "merge_defaults"
    pr_template_strip_html_comments: bool = True
    pr_template_trim_closing_reference_prefix: bool = True
    pr_template_section_patterns: tuple[str, ...] = ()
    pr_template_line_patterns: tuple[str, ...] = ()
    cluster_suppression_rules: tuple[dict[str, Any], ...] = ()

    def __post_init__(self) -> None:
        if self.hybrid_llm_concurrency < 1:
            raise ValueError("hybrid_llm_concurrency must be >= 1")


@dataclass(slots=True)
class MarkdownReportOptions:
    input: Path
    output: Path | None
    snapshot_dir: Path | None


@dataclass(slots=True)
class NewContributorReportOptions:
    snapshot_dir: Path | None
    output_dir: Path
    output: Path | None
    json_output: Path | None
    window_days: int
    max_authors: int
    hf_repo_id: str | None = None
    hf_revision: str | None = None
    hf_materialize_dir: Path | None = None


@dataclass(slots=True)
class DashboardDataOptions:
    snapshot_dir: Path | None
    output_dir: Path
    analysis_input: Path | None
    contributors_input: Path | None
    pr_scope_input: Path | None
    window_days: int
    hf_repo_id: str | None = None
    hf_revision: str | None = None
    hf_materialize_dir: Path | None = None
    snapshot_root: Path | None = None


@dataclass(slots=True)
class DeployDashboardOptions:
    pipeline_data_dir: Path
    web_dir: Path
    snapshot_dir: Path | None
    analysis_input: Path | None
    contributors_input: Path | None
    pr_scope_input: Path | None
    hf_repo_id: str | None
    hf_revision: str | None
    hf_materialize_dir: Path | None
    refresh_contributors: bool
    dashboard_window_days: int
    contributor_window_days: int
    contributor_max_authors: int
    private_space: bool
    commit_message: str
    space_id: str
    space_title: str | None
    space_emoji: str
    space_color_from: str
    space_color_to: str
    space_short_description: str
    dataset_id: str | None
    space_tags: str | None


@dataclass(slots=True)
class PrScopeOptions:
    snapshot_dir: Path | None
    output_dir: Path
    output: Path | None
    hf_repo_id: str | None
    hf_revision: str | None
    hf_materialize_dir: Path | None
    cluster_suppression_rules: tuple[dict[str, Any], ...] = ()


@dataclass(slots=True)
class PrSearchRefreshOptions:
    snapshot_dir: Path | None
    output_dir: Path
    db: Path | None
    hf_repo_id: str | None
    hf_revision: str | None
    hf_materialize_dir: Path | None
    include_drafts: bool = False
    include_closed: bool = False
    limit_prs: int | None = None
    replace_active: bool = True
    cluster_suppression_rules: tuple[dict[str, Any], ...] = ()


@dataclass(slots=True)
class CheckpointImportOptions:
    source_repo_id: str
    output_dir: Path
    checkpoint_id: str | None
    checkpoint_root: str | None
    publish_repo_id: str | None
    private_hf_repo: bool
    force: bool


@dataclass(slots=True)
class SnapshotAdoptOptions:
    snapshot_dir: Path
    output_dir: Path
    next_since: str | None


@dataclass(slots=True)
class DatasetRefreshOptions:
    repo: RepoRef
    hf_repo_id: str
    private_hf_repo: bool
    max_issues: int | None
    max_prs: int | None
    max_issue_comments: int | None
    max_reviews_per_pr: int | None
    max_review_comments_per_pr: int | None
    fetch_timeline: bool
    new_contributor_report: bool
    new_contributor_window_days: int
    new_contributor_max_authors: int
    http_timeout: int
    http_max_retries: int
    checkpoint_every_comments: int
    checkpoint_every_prs: int
    cluster_suppression_rules: tuple[dict[str, Any], ...] = ()


@dataclass(slots=True)
class PublishAnalysisArtifactsOptions:
    output_dir: Path
    snapshot_dir: Path | None
    analysis_input: Path | None
    hf_repo_id: str
    analysis_id: str
    canonical: bool = False
    save_cache: bool = False
    private_hf_repo: bool = False


@dataclass(slots=True)
class SaveCacheOptions:
    output_dir: Path
    snapshot_dir: Path | None
    hf_repo_id: str
    private_hf_repo: bool = False


@dataclass(slots=True)
class DatasetStatusOptions:
    output_dir: Path
    hf_repo_id: str | None
    hf_revision: str | None
    repo: str | None = None
    json_output: bool = False