Spaces:
Sleeping
Sleeping
File size: 7,022 Bytes
dbf7313 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 | from __future__ import annotations
import os
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Any
def _read_gh_token() -> str | None:
try:
result = subprocess.run(
["gh", "auth", "token"],
check=True,
capture_output=True,
text=True,
)
except (OSError, subprocess.CalledProcessError):
return None
token = result.stdout.strip()
return token or None
def _read_dotenv_token() -> str | None:
for directory in (Path.cwd(), *Path.cwd().parents):
path = directory / ".env"
if not path.exists():
continue
values: dict[str, str] = {}
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
values[key.strip()] = value.strip().strip("'").strip('"')
for key in ("GITHUB_TOKEN", "GRAPHQL_TOKEN", "GH_TOKEN"):
token = values.get(key)
if token:
return token
return None
def resolve_github_token() -> str | None:
return (
os.environ.get("GITHUB_TOKEN")
or os.environ.get("GRAPHQL_TOKEN")
or os.environ.get("GH_TOKEN")
or _read_dotenv_token()
or _read_gh_token()
)
@dataclass(slots=True)
class RepoRef:
owner: str
name: str
@classmethod
def parse(cls, raw: str) -> RepoRef:
owner, sep, name = raw.partition("/")
if not sep or not owner or not name:
raise ValueError(f"Expected REPO in owner/name form, got: {raw!r}")
return cls(owner=owner, name=name)
@property
def slug(self) -> str:
return f"{self.owner}/{self.name}"
@dataclass(slots=True)
class PipelineOptions:
repo: RepoRef
output_dir: Path
since: str | None
resume: bool
http_timeout: int
http_max_retries: int
max_issues: int | None
max_prs: int | None
max_issue_comments: int | None
max_reviews_per_pr: int | None
max_review_comments_per_pr: int | None
fetch_timeline: bool
new_contributor_report: bool
new_contributor_window_days: int
new_contributor_max_authors: int
issue_max_age_days: int | None
pr_max_age_days: int | None
@dataclass(slots=True)
class AnalysisOptions:
snapshot_dir: Path | None
output_dir: Path
output: Path | None
hf_repo_id: str | None
hf_revision: str | None
hf_materialize_dir: Path | None
ranking_backend: str
model: str
max_clusters: int
hybrid_llm_concurrency: int = 1
open_prs_only: bool = False
cached_analysis: bool = False
pr_template_cleanup_mode: str = "merge_defaults"
pr_template_strip_html_comments: bool = True
pr_template_trim_closing_reference_prefix: bool = True
pr_template_section_patterns: tuple[str, ...] = ()
pr_template_line_patterns: tuple[str, ...] = ()
cluster_suppression_rules: tuple[dict[str, Any], ...] = ()
def __post_init__(self) -> None:
if self.hybrid_llm_concurrency < 1:
raise ValueError("hybrid_llm_concurrency must be >= 1")
@dataclass(slots=True)
class MarkdownReportOptions:
input: Path
output: Path | None
snapshot_dir: Path | None
@dataclass(slots=True)
class NewContributorReportOptions:
snapshot_dir: Path | None
output_dir: Path
output: Path | None
json_output: Path | None
window_days: int
max_authors: int
hf_repo_id: str | None = None
hf_revision: str | None = None
hf_materialize_dir: Path | None = None
@dataclass(slots=True)
class DashboardDataOptions:
snapshot_dir: Path | None
output_dir: Path
analysis_input: Path | None
contributors_input: Path | None
pr_scope_input: Path | None
window_days: int
hf_repo_id: str | None = None
hf_revision: str | None = None
hf_materialize_dir: Path | None = None
snapshot_root: Path | None = None
@dataclass(slots=True)
class DeployDashboardOptions:
pipeline_data_dir: Path
web_dir: Path
snapshot_dir: Path | None
analysis_input: Path | None
contributors_input: Path | None
pr_scope_input: Path | None
hf_repo_id: str | None
hf_revision: str | None
hf_materialize_dir: Path | None
refresh_contributors: bool
dashboard_window_days: int
contributor_window_days: int
contributor_max_authors: int
private_space: bool
commit_message: str
space_id: str
space_title: str | None
space_emoji: str
space_color_from: str
space_color_to: str
space_short_description: str
dataset_id: str | None
space_tags: str | None
@dataclass(slots=True)
class PrScopeOptions:
snapshot_dir: Path | None
output_dir: Path
output: Path | None
hf_repo_id: str | None
hf_revision: str | None
hf_materialize_dir: Path | None
cluster_suppression_rules: tuple[dict[str, Any], ...] = ()
@dataclass(slots=True)
class PrSearchRefreshOptions:
snapshot_dir: Path | None
output_dir: Path
db: Path | None
hf_repo_id: str | None
hf_revision: str | None
hf_materialize_dir: Path | None
include_drafts: bool = False
include_closed: bool = False
limit_prs: int | None = None
replace_active: bool = True
cluster_suppression_rules: tuple[dict[str, Any], ...] = ()
@dataclass(slots=True)
class CheckpointImportOptions:
source_repo_id: str
output_dir: Path
checkpoint_id: str | None
checkpoint_root: str | None
publish_repo_id: str | None
private_hf_repo: bool
force: bool
@dataclass(slots=True)
class SnapshotAdoptOptions:
snapshot_dir: Path
output_dir: Path
next_since: str | None
@dataclass(slots=True)
class DatasetRefreshOptions:
repo: RepoRef
hf_repo_id: str
private_hf_repo: bool
max_issues: int | None
max_prs: int | None
max_issue_comments: int | None
max_reviews_per_pr: int | None
max_review_comments_per_pr: int | None
fetch_timeline: bool
new_contributor_report: bool
new_contributor_window_days: int
new_contributor_max_authors: int
http_timeout: int
http_max_retries: int
checkpoint_every_comments: int
checkpoint_every_prs: int
cluster_suppression_rules: tuple[dict[str, Any], ...] = ()
@dataclass(slots=True)
class PublishAnalysisArtifactsOptions:
output_dir: Path
snapshot_dir: Path | None
analysis_input: Path | None
hf_repo_id: str
analysis_id: str
canonical: bool = False
save_cache: bool = False
private_hf_repo: bool = False
@dataclass(slots=True)
class SaveCacheOptions:
output_dir: Path
snapshot_dir: Path | None
hf_repo_id: str
private_hf_repo: bool = False
@dataclass(slots=True)
class DatasetStatusOptions:
output_dir: Path
hf_repo_id: str | None
hf_revision: str | None
repo: str | None = None
json_output: bool = False
|