from __future__ import annotations import os import subprocess from dataclasses import dataclass from pathlib import Path from typing import Any def _read_gh_token() -> str | None: try: result = subprocess.run( ["gh", "auth", "token"], check=True, capture_output=True, text=True, ) except (OSError, subprocess.CalledProcessError): return None token = result.stdout.strip() return token or None def _read_dotenv_token() -> str | None: for directory in (Path.cwd(), *Path.cwd().parents): path = directory / ".env" if not path.exists(): continue values: dict[str, str] = {} for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) values[key.strip()] = value.strip().strip("'").strip('"') for key in ("GITHUB_TOKEN", "GRAPHQL_TOKEN", "GH_TOKEN"): token = values.get(key) if token: return token return None def resolve_github_token() -> str | None: return ( os.environ.get("GITHUB_TOKEN") or os.environ.get("GRAPHQL_TOKEN") or os.environ.get("GH_TOKEN") or _read_dotenv_token() or _read_gh_token() ) @dataclass(slots=True) class RepoRef: owner: str name: str @classmethod def parse(cls, raw: str) -> RepoRef: owner, sep, name = raw.partition("/") if not sep or not owner or not name: raise ValueError(f"Expected REPO in owner/name form, got: {raw!r}") return cls(owner=owner, name=name) @property def slug(self) -> str: return f"{self.owner}/{self.name}" @dataclass(slots=True) class PipelineOptions: repo: RepoRef output_dir: Path since: str | None resume: bool http_timeout: int http_max_retries: int max_issues: int | None max_prs: int | None max_issue_comments: int | None max_reviews_per_pr: int | None max_review_comments_per_pr: int | None fetch_timeline: bool new_contributor_report: bool new_contributor_window_days: int new_contributor_max_authors: int issue_max_age_days: int | None pr_max_age_days: int | None @dataclass(slots=True) class AnalysisOptions: snapshot_dir: Path | None output_dir: Path output: Path | None hf_repo_id: str | None hf_revision: str | None hf_materialize_dir: Path | None ranking_backend: str model: str max_clusters: int hybrid_llm_concurrency: int = 1 open_prs_only: bool = False cached_analysis: bool = False pr_template_cleanup_mode: str = "merge_defaults" pr_template_strip_html_comments: bool = True pr_template_trim_closing_reference_prefix: bool = True pr_template_section_patterns: tuple[str, ...] = () pr_template_line_patterns: tuple[str, ...] = () cluster_suppression_rules: tuple[dict[str, Any], ...] = () def __post_init__(self) -> None: if self.hybrid_llm_concurrency < 1: raise ValueError("hybrid_llm_concurrency must be >= 1") @dataclass(slots=True) class MarkdownReportOptions: input: Path output: Path | None snapshot_dir: Path | None @dataclass(slots=True) class NewContributorReportOptions: snapshot_dir: Path | None output_dir: Path output: Path | None json_output: Path | None window_days: int max_authors: int hf_repo_id: str | None = None hf_revision: str | None = None hf_materialize_dir: Path | None = None @dataclass(slots=True) class DashboardDataOptions: snapshot_dir: Path | None output_dir: Path analysis_input: Path | None contributors_input: Path | None pr_scope_input: Path | None window_days: int hf_repo_id: str | None = None hf_revision: str | None = None hf_materialize_dir: Path | None = None snapshot_root: Path | None = None @dataclass(slots=True) class DeployDashboardOptions: pipeline_data_dir: Path web_dir: Path snapshot_dir: Path | None analysis_input: Path | None contributors_input: Path | None pr_scope_input: Path | None hf_repo_id: str | None hf_revision: str | None hf_materialize_dir: Path | None refresh_contributors: bool dashboard_window_days: int contributor_window_days: int contributor_max_authors: int private_space: bool commit_message: str space_id: str space_title: str | None space_emoji: str space_color_from: str space_color_to: str space_short_description: str dataset_id: str | None space_tags: str | None @dataclass(slots=True) class PrScopeOptions: snapshot_dir: Path | None output_dir: Path output: Path | None hf_repo_id: str | None hf_revision: str | None hf_materialize_dir: Path | None cluster_suppression_rules: tuple[dict[str, Any], ...] = () @dataclass(slots=True) class PrSearchRefreshOptions: snapshot_dir: Path | None output_dir: Path db: Path | None hf_repo_id: str | None hf_revision: str | None hf_materialize_dir: Path | None include_drafts: bool = False include_closed: bool = False limit_prs: int | None = None replace_active: bool = True cluster_suppression_rules: tuple[dict[str, Any], ...] = () @dataclass(slots=True) class CheckpointImportOptions: source_repo_id: str output_dir: Path checkpoint_id: str | None checkpoint_root: str | None publish_repo_id: str | None private_hf_repo: bool force: bool @dataclass(slots=True) class SnapshotAdoptOptions: snapshot_dir: Path output_dir: Path next_since: str | None @dataclass(slots=True) class DatasetRefreshOptions: repo: RepoRef hf_repo_id: str private_hf_repo: bool max_issues: int | None max_prs: int | None max_issue_comments: int | None max_reviews_per_pr: int | None max_review_comments_per_pr: int | None fetch_timeline: bool new_contributor_report: bool new_contributor_window_days: int new_contributor_max_authors: int http_timeout: int http_max_retries: int checkpoint_every_comments: int checkpoint_every_prs: int cluster_suppression_rules: tuple[dict[str, Any], ...] = () @dataclass(slots=True) class PublishAnalysisArtifactsOptions: output_dir: Path snapshot_dir: Path | None analysis_input: Path | None hf_repo_id: str analysis_id: str canonical: bool = False save_cache: bool = False private_hf_repo: bool = False @dataclass(slots=True) class SaveCacheOptions: output_dir: Path snapshot_dir: Path | None hf_repo_id: str private_hf_repo: bool = False @dataclass(slots=True) class DatasetStatusOptions: output_dir: Path hf_repo_id: str | None hf_revision: str | None repo: str | None = None json_output: bool = False