from __future__ import annotations import argparse import json import sys from collections.abc import Callable from pathlib import Path from typing import Any from slop_farmer.app.duplicate_prs import DEFAULT_FILE_POLICY, FILE_POLICY_CHOICES from slop_farmer.app_config import command_defaults, extract_cli_config_path from slop_farmer.config import ( AnalysisOptions, CheckpointImportOptions, DashboardDataOptions, DatasetRefreshOptions, DatasetStatusOptions, DeployDashboardOptions, MarkdownReportOptions, NewContributorReportOptions, PipelineOptions, PrScopeOptions, PrSearchRefreshOptions, PublishAnalysisArtifactsOptions, RepoRef, SaveCacheOptions, SnapshotAdoptOptions, ) from slop_farmer.reports.duplicate_prs import DEFAULT_DUPLICATE_PR_MODEL CommandHandler = Callable[[argparse.Namespace, Path | None], None] def _int_at_least(minimum: int) -> Callable[[str], int]: def parse(raw: str) -> int: value = int(raw) if value < minimum: raise argparse.ArgumentTypeError(f"expected integer >= {minimum}") return value return parse def build_parser(*, config_path: Path | None = None) -> argparse.ArgumentParser: defaults = _load_parser_defaults(config_path) parser = argparse.ArgumentParser(prog="slop-farmer") parser.add_argument( "--config", type=Path, help="YAML config file with shared repo/workspace/dashboard defaults.", ) subparsers = parser.add_subparsers(dest="command", required=True) _add_scrape_parser(subparsers, defaults["scrape"]) _add_refresh_dataset_parser(subparsers, defaults["refresh-dataset"]) _add_analyze_parser(subparsers, defaults["analyze"]) _add_pr_scope_parser(subparsers, defaults["pr-scope"]) _add_checkpoint_import_parser(subparsers, defaults["import-hf-checkpoint"]) _add_adopt_snapshot_parser(subparsers, defaults["adopt-snapshot"]) _add_markdown_report_parser(subparsers) _add_duplicate_prs_parser(subparsers) _add_pr_search_parser(subparsers, defaults["pr-search"]) _add_new_contributor_report_parser(subparsers, defaults["new-contributor-report"]) _add_dashboard_data_parser(subparsers, defaults["dashboard-data"]) _add_publish_analysis_artifacts_parser(subparsers, defaults["publish-analysis-artifacts"]) _add_save_cache_parser(subparsers, defaults["save-cache"]) _add_deploy_dashboard_parser(subparsers, defaults["deploy-dashboard"]) _add_dataset_status_parser(subparsers, defaults["dataset-status"]) return parser def _load_parser_defaults(config_path: Path | None) -> dict[str, dict[str, Any]]: commands = ( "scrape", "refresh-dataset", "analyze", "import-hf-checkpoint", "pr-scope", "pr-search", "adopt-snapshot", "new-contributor-report", "dashboard-data", "publish-analysis-artifacts", "save-cache", "deploy-dashboard", "dataset-status", ) return {command: command_defaults(command, config_path=config_path) for command in commands} # Parser builders def _add_scrape_parser(subparsers: Any, defaults: dict[str, Any]) -> None: scrape = subparsers.add_parser("scrape", help="Scrape GitHub and write a snapshot dataset.") scrape.add_argument( "--repo", default=defaults.get("repo", "huggingface/transformers"), help="GitHub repository in owner/name form.", ) scrape.add_argument("--output-dir", type=Path, default=Path(defaults.get("output-dir", "data"))) scrape.add_argument("--since", help="Incremental sync lower bound in ISO 8601 format.") scrape.add_argument( "--resume", dest="resume", action="store_true", default=True, help="Resume from the last successful local watermark when --since is not provided.", ) scrape.add_argument( "--no-resume", dest="resume", action="store_false", help="Ignore local watermark state and run from scratch unless --since is set.", ) scrape.add_argument( "--http-timeout", type=int, default=180, help="Per-request timeout in seconds." ) scrape.add_argument( "--http-max-retries", type=int, default=5, help="Retries for transient network failures." ) scrape.add_argument( "--max-issues", type=int, default=None, help="Limit total issue endpoint items read." ) scrape.add_argument( "--max-prs", type=int, default=None, help="Limit pull requests to hydrate in detail." ) scrape.add_argument( "--issue-max-age-days", type=int, default=defaults.get("issue-max-age-days"), help="Optional created_at age cap for issues included in the snapshot.", ) scrape.add_argument( "--pr-max-age-days", type=int, default=defaults.get("pr-max-age-days"), help="Optional created_at age cap for pull requests included in the snapshot.", ) scrape.add_argument( "--max-issue-comments", type=int, default=None, help="Limit issue comment rows." ) scrape.add_argument( "--max-reviews-per-pr", type=int, default=None, help="Limit review rows per PR." ) scrape.add_argument( "--max-review-comments-per-pr", type=int, default=None, help="Limit inline review comment rows per PR.", ) scrape.add_argument( "--fetch-timeline", action="store_true", default=bool(defaults.get("fetch-timeline", False)), help="Fetch issue timeline events for linkage rows.", ) scrape.add_argument( "--new-contributor-report", dest="new_contributor_report", action="store_true", default=defaults.get("new-contributor-report"), help="Generate new contributor dataset/report artifacts for the local snapshot.", ) scrape.add_argument( "--no-new-contributor-report", dest="new_contributor_report", action="store_false", help="Skip new contributor dataset/report generation.", ) scrape.add_argument( "--new-contributor-window-days", type=int, default=int(defaults.get("new-contributor-window-days", 42)), help="Recent public activity window for contributor enrichment.", ) scrape.add_argument( "--new-contributor-max-authors", type=int, default=int(defaults.get("new-contributor-max-authors", 25)), help="Maximum number of contributors to include in the new contributor report. Use 0 for no cap.", ) def _add_refresh_dataset_parser(subparsers: Any, defaults: dict[str, Any]) -> None: refresh = subparsers.add_parser( "refresh-dataset", help="Refresh the canonical Hugging Face dataset repo from remote watermark state.", ) refresh.add_argument( "--repo", default=defaults.get("repo", "huggingface/transformers"), help="GitHub repository in owner/name form.", ) refresh.add_argument( "--hf-repo-id", default=defaults.get("hf-repo-id"), required=defaults.get("hf-repo-id") is None, help="Canonical Hugging Face dataset repo id to refresh.", ) refresh.add_argument("--max-issues", type=int, default=defaults.get("max-issues")) refresh.add_argument("--max-prs", type=int, default=defaults.get("max-prs")) refresh.add_argument( "--max-issue-comments", type=int, default=defaults.get("max-issue-comments") ) refresh.add_argument( "--max-reviews-per-pr", type=int, default=defaults.get("max-reviews-per-pr") ) refresh.add_argument( "--max-review-comments-per-pr", type=int, default=defaults.get("max-review-comments-per-pr"), ) refresh.add_argument( "--fetch-timeline", action="store_true", default=bool(defaults.get("fetch-timeline", False)), ) refresh.add_argument( "--new-contributor-report", dest="new_contributor_report", action="store_true", default=bool(defaults.get("new-contributor-report", True)), ) refresh.add_argument( "--no-new-contributor-report", dest="new_contributor_report", action="store_false", ) refresh.add_argument( "--new-contributor-window-days", type=int, default=int(defaults.get("new-contributor-window-days", 42)), ) refresh.add_argument( "--new-contributor-max-authors", type=int, default=int(defaults.get("new-contributor-max-authors", 25)), ) refresh.add_argument("--http-timeout", type=int, default=300) refresh.add_argument("--http-max-retries", type=int, default=8) refresh.add_argument("--checkpoint-every-comments", type=int, default=1000) refresh.add_argument("--checkpoint-every-prs", type=int, default=25) refresh.add_argument( "--private-hf-repo", dest="private_hf_repo", action="store_true", default=bool(defaults.get("private-hf-repo", False)), help="Create the target dataset repo as private if needed.", ) refresh.add_argument( "--private", dest="private_hf_repo", action="store_true", help=argparse.SUPPRESS, ) def _add_analyze_parser(subparsers: Any, defaults: dict[str, Any]) -> None: analyze = subparsers.add_parser( "analyze", help="Analyze a snapshot and write a local JSON report. Canonical publication is separate.", ) analyze.add_argument( "--snapshot-dir", type=Path, help="Snapshot directory to analyze. Defaults to the latest local snapshot.", ) analyze.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")) ) analyze.add_argument("--output", type=Path, help="Output path for the analysis JSON.") analyze.add_argument( "--hf-repo-id", default=defaults.get("hf-repo-id"), help="Analyze a canonical Hugging Face dataset repo by materializing a self-consistent published snapshot locally.", ) analyze.add_argument( "--hf-revision", default=defaults.get("hf-revision"), help="Optional Hub revision for metadata and README download.", ) analyze.add_argument( "--hf-materialize-dir", type=Path, default=Path(defaults["hf-materialize-dir"]) if defaults.get("hf-materialize-dir") else None, help="Optional local directory used when materializing an HF dataset snapshot.", ) analyze.add_argument( "--ranking-backend", choices=("hybrid", "deterministic"), default=defaults.get("ranking-backend", "hybrid"), help="Whether to use deterministic-only ranking or optional fast-agent enrichment.", ) analyze.add_argument( "--model", default=defaults.get("model", "gpt-5.4-mini?service_tier=flex"), help="Model string used by fast-agent when enabled.", ) analyze.add_argument( "--max-clusters", type=int, default=int(defaults.get("max-clusters", 10)), help="Maximum number of meta clusters to include in the report.", ) analyze.add_argument( "--hybrid-llm-concurrency", type=_int_at_least(1), default=int(defaults.get("hybrid-llm-concurrency", 1)), help=( "Maximum number of hybrid LLM review units to run at once. " "Use 1 to minimize provider pressure." ), ) analyze.add_argument( "--open-prs-only", action="store_true", default=bool(defaults.get("open-prs-only", False)), help="Restrict PR analysis/clustering to open PRs only. Draft PRs are still included.", ) def _add_pr_scope_parser(subparsers: Any, defaults: dict[str, Any]) -> None: pr_scope = subparsers.add_parser( "pr-scope", help="Cluster open PRs by holistic file/scope overlap." ) pr_scope.add_argument( "--snapshot-dir", type=Path, help="Snapshot directory to analyze. Defaults to the latest local snapshot.", ) pr_scope.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")) ) pr_scope.add_argument( "--output", type=Path, help="Output path for the PR scope JSON. Defaults next to the snapshot.", ) pr_scope.add_argument( "--hf-repo-id", default=defaults.get("hf-repo-id"), help="Analyze a Hugging Face dataset repo by materializing its parquet export locally.", ) pr_scope.add_argument( "--hf-revision", default=defaults.get("hf-revision"), help="Optional Hub revision for metadata and README download.", ) pr_scope.add_argument( "--hf-materialize-dir", type=Path, default=Path(defaults["hf-materialize-dir"]) if defaults.get("hf-materialize-dir") else None, help="Optional local directory used when materializing an HF dataset snapshot.", ) def _add_checkpoint_import_parser(subparsers: Any, defaults: dict[str, Any]) -> None: checkpoint_import = subparsers.add_parser( "import-hf-checkpoint", help="Import a checkpoint snapshot from an HF dataset repo into a clean local snapshot.", ) checkpoint_import.add_argument( "--source-repo-id", default=defaults.get("source-repo-id", "burtenshaw/transformers-pr-slop-dataset"), help="Source Hugging Face dataset repo id containing checkpoint folders.", ) checkpoint_import.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "eval_data")), help="Local root directory where the imported snapshot should be written.", ) checkpoint_import.add_argument( "--checkpoint-id", help="Optional checkpoint snapshot id. Defaults to the latest viable checkpoint.", ) checkpoint_import.add_argument( "--checkpoint-root", choices=("checkpoints", "_checkpoints"), help="Optional checkpoint root directory. Defaults to auto-detect.", ) checkpoint_import.add_argument( "--publish-repo-id", help="Optional HF dataset repo id to publish the imported clean snapshot to.", ) checkpoint_import.add_argument( "--private-hf-repo", action="store_true", help="Create the publish target as private when --publish-repo-id is used.", ) checkpoint_import.add_argument( "--force", action="store_true", help="Overwrite an existing imported snapshot directory if present.", ) def _add_adopt_snapshot_parser(subparsers: Any, defaults: dict[str, Any]) -> None: adopt_snapshot = subparsers.add_parser( "adopt-snapshot", help="Mark an existing snapshot as the current pipeline base so the next scrape resumes from it.", ) adopt_snapshot.add_argument( "--snapshot-dir", type=Path, required=True, help="Existing local snapshot directory." ) adopt_snapshot.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")), help="Pipeline workspace root where state/ and snapshots/latest.json should be written.", ) adopt_snapshot.add_argument( "--next-since", help="Optional explicit watermark timestamp. Defaults to snapshot watermark.next_since, crawl_started_at, or extracted_at.", ) def _add_markdown_report_parser(subparsers: Any) -> None: markdown = subparsers.add_parser( "markdown-report", help="Render a markdown report from an analysis JSON file." ) markdown.add_argument( "--input", type=Path, required=True, help="Path to an existing analysis JSON report." ) markdown.add_argument( "--output", type=Path, help="Output path for the markdown report. Defaults next to the input JSON.", ) markdown.add_argument( "--snapshot-dir", type=Path, help="Optional snapshot directory containing issues.parquet and pull_requests.parquet. Defaults to the input JSON parent directory.", ) def _add_duplicate_prs_parser(subparsers: Any) -> None: duplicate_prs = subparsers.add_parser( "duplicate-prs", help="List or merge mergeable duplicate PR clusters from hybrid-enriched analysis.", ) duplicate_prs_subparsers = duplicate_prs.add_subparsers( dest="duplicate_prs_command", required=True ) duplicate_list = duplicate_prs_subparsers.add_parser( "list", help="List mergeable duplicate PR clusters from a hybrid-enriched analysis report.", ) duplicate_list_source = duplicate_list.add_mutually_exclusive_group(required=True) duplicate_list_source.add_argument( "--report", type=Path, help="Path to an analysis JSON report." ) duplicate_list_source.add_argument( "--snapshot-dir", type=Path, help="Snapshot directory to analyze." ) duplicate_list.add_argument( "--limit", type=int, default=10, help="Maximum number of mergeable clusters to print." ) duplicate_list.add_argument( "--model", default=DEFAULT_DUPLICATE_PR_MODEL, help="Model string used for hybrid analysis and duplicate-PR mergeability gating.", ) duplicate_merge = duplicate_prs_subparsers.add_parser( "merge", help="Use Codex to synthesize and publish a minimal upstream PR for a mergeable duplicate cluster.", ) duplicate_merge_source = duplicate_merge.add_mutually_exclusive_group(required=True) duplicate_merge_source.add_argument( "--report", type=Path, help="Path to an analysis JSON report." ) duplicate_merge_source.add_argument( "--snapshot-dir", type=Path, help="Snapshot directory to analyze." ) duplicate_merge.add_argument( "--repo-dir", type=Path, required=True, help="Local upstream repository checkout used for the synthesis worktree.", ) duplicate_merge.add_argument( "--upstream-repo", help="Optional owner/name override for the upstream target repository.", ) duplicate_merge.add_argument( "--upstream-remote", default="origin", help="Remote in --repo-dir that points at the upstream repository. Defaults to origin.", ) duplicate_merge.add_argument( "--fork-remote", default="fork", help="Remote in the synthesis worktree used for pushing the branch. Defaults to fork.", ) duplicate_merge.add_argument("--cluster-id", help="Optional cluster override.") duplicate_merge.add_argument( "--fork-repo", help="Optional owner/name override for the fork push target. Overrides --fork-owner when both are set.", ) duplicate_merge.add_argument( "--fork-owner", help="Optional GitHub fork owner override. Defaults to the authenticated user.", ) duplicate_merge.add_argument( "--file-policy", choices=FILE_POLICY_CHOICES, default=DEFAULT_FILE_POLICY, help="Changed-file policy enforced on the synthesized branch.", ) duplicate_merge.add_argument( "--model", default=DEFAULT_DUPLICATE_PR_MODEL, help="Model string used for hybrid analysis, mergeability gating, and Codex synthesis.", ) def _add_pr_search_parser(subparsers: Any, defaults: dict[str, Any]) -> None: pr_search = subparsers.add_parser( "pr-search", help="Refresh and query the DuckDB-backed PR code-similarity index.", ) pr_search_subparsers = pr_search.add_subparsers(dest="pr_search_command", required=True) refresh = pr_search_subparsers.add_parser( "refresh", help="Refresh the PR code-similarity index from a local snapshot or HF dataset repo.", ) refresh_source = refresh.add_mutually_exclusive_group() refresh_source.add_argument( "--snapshot-dir", type=Path, help="Snapshot directory to index. Defaults to the latest local snapshot.", ) refresh_source.add_argument( "--hf-repo-id", default=defaults.get("hf-repo-id"), help="Hugging Face dataset repo id to materialize before indexing.", ) refresh.add_argument( "--hf-revision", default=defaults.get("hf-revision"), help="Optional Hub revision for metadata and README download.", ) refresh.add_argument( "--hf-materialize-dir", type=Path, default=Path(defaults["hf-materialize-dir"]) if defaults.get("hf-materialize-dir") else None, help="Optional local directory used when materializing an HF dataset snapshot.", ) refresh.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")), help="Workspace root used for latest snapshot resolution and default DB placement.", ) refresh.add_argument( "--db", type=Path, default=Path(defaults["db"]) if defaults.get("db") else None, help="DuckDB file path. Defaults to /state/pr-search.duckdb.", ) refresh.add_argument("--limit-prs", type=int, help="Optional cap on indexed PRs.") refresh.add_argument( "--include-drafts", action="store_true", default=bool(defaults.get("include-drafts", False)), help="Include draft PRs in the indexed universe.", ) refresh.add_argument( "--include-closed", action="store_true", default=bool(defaults.get("include-closed", False)), help="Include closed PRs in the indexed universe.", ) refresh.add_argument( "--replace-active", dest="replace_active", action="store_true", default=True, help="Activate the new run on success. Enabled by default.", ) refresh.add_argument( "--no-replace-active", dest="replace_active", action="store_false", help="Write the new run without switching the active run pointer.", ) similar = pr_search_subparsers.add_parser( "similar", help="Show similar PRs for one indexed pull request." ) similar.add_argument("pr_number", type=int, help="Pull request number to query.") similar.add_argument( "--db", type=Path, default=Path(defaults["db"]) if defaults.get("db") else None, help="DuckDB file path. Defaults to /state/pr-search.duckdb.", ) similar.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")), ) similar.add_argument("--repo", help="Optional repo override when the DB holds multiple repos.") similar.add_argument("--limit", type=int, default=10, help="Maximum number of rows to show.") similar.add_argument("--json", action="store_true", help="Emit machine-readable JSON.") probe_github = pr_search_subparsers.add_parser( "probe-github", help="Fetch one live GitHub PR and compare it against the active indexed scope features.", ) probe_github.add_argument("pr_number", type=int, help="Pull request number to probe.") probe_github.add_argument( "--repo", help="GitHub repository in owner/name form. Defaults to the active repo in the DB.", ) probe_github.add_argument( "--db", type=Path, default=Path(defaults["db"]) if defaults.get("db") else None, help="DuckDB file path. Defaults to /state/pr-search.duckdb.", ) probe_github.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")), ) probe_github.add_argument( "--limit", type=int, default=10, help="Maximum number of similar PR rows to show.", ) probe_github.add_argument("--json", action="store_true", help="Emit machine-readable JSON.") candidate_clusters = pr_search_subparsers.add_parser( "candidate-clusters", help="Show candidate scope clusters for one indexed pull request.", ) candidate_clusters.add_argument("pr_number", type=int, help="Pull request number to query.") candidate_clusters.add_argument( "--db", type=Path, default=Path(defaults["db"]) if defaults.get("db") else None, help="DuckDB file path. Defaults to /state/pr-search.duckdb.", ) candidate_clusters.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")), ) candidate_clusters.add_argument( "--repo", help="Optional repo override when the DB holds multiple repos." ) candidate_clusters.add_argument( "--limit", type=int, default=5, help="Maximum number of rows to show." ) candidate_clusters.add_argument("--json", action="store_true", help="Emit JSON.") cluster = pr_search_subparsers.add_parser("cluster", help="Inspect one scope cluster.") cluster_subparsers = cluster.add_subparsers(dest="pr_search_cluster_command", required=True) cluster_show = cluster_subparsers.add_parser("show", help="Show cluster details.") cluster_show.add_argument("cluster_id", help="Cluster identifier.") cluster_show.add_argument( "--db", type=Path, default=Path(defaults["db"]) if defaults.get("db") else None, help="DuckDB file path. Defaults to /state/pr-search.duckdb.", ) cluster_show.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")), ) cluster_show.add_argument("--repo", help="Optional repo override.") cluster_show.add_argument("--json", action="store_true", help="Emit JSON.") explain_pair = pr_search_subparsers.add_parser( "explain-pair", help="Explain one PR pair, falling back to on-demand scoring when needed.", ) explain_pair.add_argument("left_pr_number", type=int) explain_pair.add_argument("right_pr_number", type=int) explain_pair.add_argument( "--db", type=Path, default=Path(defaults["db"]) if defaults.get("db") else None, help="DuckDB file path. Defaults to /state/pr-search.duckdb.", ) explain_pair.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")), ) explain_pair.add_argument("--repo", help="Optional repo override.") explain_pair.add_argument("--json", action="store_true", help="Emit JSON.") status = pr_search_subparsers.add_parser("status", help="Show the active PR search run.") status.add_argument( "--db", type=Path, default=Path(defaults["db"]) if defaults.get("db") else None, help="DuckDB file path. Defaults to /state/pr-search.duckdb.", ) status.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")), ) status.add_argument("--repo", help="Optional repo override.") status.add_argument("--json", action="store_true", help="Emit JSON.") contributor = pr_search_subparsers.add_parser( "contributor", help="Show indexed contributor summary for one author login." ) contributor.add_argument("login", help="GitHub author login to query.") contributor.add_argument( "--db", type=Path, default=Path(defaults["db"]) if defaults.get("db") else None, help="DuckDB file path. Defaults to /state/pr-search.duckdb.", ) contributor.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")), ) contributor.add_argument("--repo", help="Optional repo override.") contributor.add_argument("--json", action="store_true", help="Emit JSON.") contributor_prs = pr_search_subparsers.add_parser( "contributor-prs", help="List indexed PRs for one contributor login." ) contributor_prs.add_argument("login", help="GitHub author login to query.") contributor_prs.add_argument( "--db", type=Path, default=Path(defaults["db"]) if defaults.get("db") else None, help="DuckDB file path. Defaults to /state/pr-search.duckdb.", ) contributor_prs.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")), ) contributor_prs.add_argument("--repo", help="Optional repo override.") contributor_prs.add_argument("--limit", type=int, default=20, help="Maximum rows to show.") contributor_prs.add_argument("--json", action="store_true", help="Emit JSON.") pr_contributor = pr_search_subparsers.add_parser( "pr-contributor", help="Show contributor summary for the author of one indexed PR." ) pr_contributor.add_argument("pr_number", type=int, help="Pull request number to query.") pr_contributor.add_argument( "--db", type=Path, default=Path(defaults["db"]) if defaults.get("db") else None, help="DuckDB file path. Defaults to /state/pr-search.duckdb.", ) pr_contributor.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")), ) pr_contributor.add_argument("--repo", help="Optional repo override.") pr_contributor.add_argument("--json", action="store_true", help="Emit JSON.") def _add_new_contributor_report_parser(subparsers: Any, defaults: dict[str, Any]) -> None: new_contributor = subparsers.add_parser( "new-contributor-report", help="Render a markdown report for newly observed contributors in a snapshot.", ) new_contributor.add_argument( "--snapshot-dir", type=Path, help="Snapshot directory to inspect. Defaults to the latest local snapshot.", ) new_contributor.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")) ) new_contributor.add_argument( "--output", type=Path, help="Output path for the markdown report. Defaults next to the snapshot.", ) new_contributor.add_argument( "--json-output", type=Path, help="Optional JSON output path. Defaults next to the snapshot." ) new_contributor.add_argument( "--hf-repo-id", default=defaults.get("hf-repo-id"), help="Analyze a Hugging Face dataset repo by materializing its parquet export locally.", ) new_contributor.add_argument( "--hf-revision", default=defaults.get("hf-revision"), help="Optional Hub revision for metadata and README download.", ) new_contributor.add_argument( "--hf-materialize-dir", type=Path, default=Path(defaults["hf-materialize-dir"]) if defaults.get("hf-materialize-dir") else None, help="Optional local directory used when materializing an HF dataset snapshot.", ) new_contributor.add_argument( "--window-days", type=int, default=int(defaults.get("window-days", 42)), help="Recent public activity window for contributor enrichment.", ) new_contributor.add_argument( "--max-authors", type=int, default=int(defaults.get("max-authors", 25)), help="Maximum number of contributors to include. Use 0 for no cap.", ) def _add_dashboard_data_parser(subparsers: Any, defaults: dict[str, Any]) -> None: dashboard = subparsers.add_parser( "dashboard-data", help="Export frontend-ready JSON for the static dashboard." ) dashboard.add_argument( "--snapshot-dir", type=Path, help="Snapshot directory to export. Defaults to the latest local snapshot.", ) dashboard.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "web/public/data")), ) dashboard.add_argument( "--analysis-input", type=Path, help="Optional analysis report JSON override. Defaults to canonical published current analysis when available, otherwise falls back to snapshot-local analysis files.", ) dashboard.add_argument( "--contributors-input", type=Path, help="Optional contributor report JSON override. Defaults to the materialized snapshot's new-contributors-report.json.", ) dashboard.add_argument( "--pr-scope-input", type=Path, help="Optional PR scope cluster JSON override. Defaults to the materialized snapshot's pr-scope-clusters.json.", ) dashboard.add_argument( "--hf-repo-id", default=defaults.get("hf-repo-id"), help="Materialize the canonical Hugging Face dataset repo instead of using the latest local snapshot.", ) dashboard.add_argument( "--hf-revision", default=defaults.get("hf-revision"), help="Optional Hub revision for metadata and README download.", ) dashboard.add_argument( "--hf-materialize-dir", type=Path, default=Path(defaults["hf-materialize-dir"]) if defaults.get("hf-materialize-dir") else None, help="Optional local directory used when materializing an HF dataset snapshot.", ) dashboard.add_argument( "--window-days", type=int, default=int(defaults.get("window-days", 14)), help="Recent PR window to expose in the dashboard.", ) def _add_publish_analysis_artifacts_parser(subparsers: Any, defaults: dict[str, Any]) -> None: publish_analysis = subparsers.add_parser( "publish-analysis-artifacts", help="Publish archived and optional canonical hybrid analysis artifacts to a dataset repo.", ) publish_analysis.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")), help="Pipeline workspace root containing snapshots/latest.json.", ) publish_analysis.add_argument( "--snapshot-dir", type=Path, help="Optional explicit snapshot directory containing analysis-report-hybrid.json.", ) publish_analysis.add_argument( "--analysis-input", type=Path, help="Optional explicit hybrid analysis report JSON to publish instead of snapshot-dir discovery.", ) publish_analysis.add_argument( "--hf-repo-id", default=defaults.get("hf-repo-id"), required=defaults.get("hf-repo-id") is None, help="Target Hugging Face dataset repo id.", ) publish_analysis.add_argument("--analysis-id", required=True, help="Immutable analysis run id.") publish_analysis.add_argument( "--canonical", action="store_true", default=bool(defaults.get("canonical", False)), help="Also update the stable analysis/current canonical alias.", ) publish_analysis.add_argument( "--save-cache", action="store_true", default=bool(defaults.get("save-cache", False)), help="Also upload snapshot-local analysis-state/ as mutable operational cache at repo-root analysis-state/.", ) publish_analysis.add_argument( "--private-hf-repo", action="store_true", default=bool(defaults.get("private-hf-repo", False)), help="Create the target dataset repo as private if needed.", ) def _add_save_cache_parser(subparsers: Any, defaults: dict[str, Any]) -> None: save_cache = subparsers.add_parser( "save-cache", help="Upload snapshot-local analysis-state/ as mutable operational cache to a dataset repo.", ) save_cache.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")), help="Pipeline workspace root containing snapshots/latest.json.", ) save_cache.add_argument( "--snapshot-dir", type=Path, help="Optional explicit snapshot directory containing analysis-state/.", ) save_cache.add_argument( "--hf-repo-id", default=defaults.get("hf-repo-id"), required=defaults.get("hf-repo-id") is None, help="Target Hugging Face dataset repo id.", ) save_cache.add_argument( "--private-hf-repo", action="store_true", default=bool(defaults.get("private-hf-repo", False)), help="Create the target dataset repo as private if needed.", ) def _add_deploy_dashboard_parser(subparsers: Any, defaults: dict[str, Any]) -> None: deploy_dashboard = subparsers.add_parser( "deploy-dashboard", help="Build and publish the static dashboard to a Hugging Face Space from a materialized dataset view.", ) deploy_dashboard.add_argument( "--pipeline-data-dir", type=Path, default=Path(defaults.get("pipeline-data-dir", "data")), ) deploy_dashboard.add_argument( "--web-dir", type=Path, default=Path(defaults.get("web-dir", "web")) ) deploy_dashboard.add_argument( "--snapshot-dir", type=Path, help="Optional snapshot directory to publish. Defaults to the latest snapshot in --pipeline-data-dir.", ) deploy_dashboard.add_argument( "--analysis-input", type=Path, help="Optional analysis report JSON override. Omit to prefer canonical published current analysis when available.", ) deploy_dashboard.add_argument( "--contributors-input", type=Path, help="Optional contributor report JSON override.", ) deploy_dashboard.add_argument( "--pr-scope-input", type=Path, help="Optional PR scope cluster JSON override.", ) deploy_dashboard.add_argument( "--hf-repo-id", default=defaults.get("hf-repo-id"), help="Materialize the canonical Hugging Face dataset repo instead of using the latest local snapshot.", ) deploy_dashboard.add_argument( "--hf-revision", default=defaults.get("hf-revision"), help="Optional Hub revision for metadata and README download.", ) deploy_dashboard.add_argument( "--hf-materialize-dir", type=Path, default=Path(defaults["hf-materialize-dir"]) if defaults.get("hf-materialize-dir") else None, help="Optional local directory used when materializing an HF dataset snapshot.", ) deploy_dashboard.add_argument( "--refresh-contributors", action="store_true", default=bool(defaults.get("refresh-contributors", False)), ) deploy_dashboard.add_argument( "--dashboard-window-days", type=int, default=int(defaults.get("dashboard-window-days", 14)), ) deploy_dashboard.add_argument( "--contributor-window-days", type=int, default=int( defaults.get("contributor-window-days", defaults.get("dashboard-window-days", 14)) ), ) deploy_dashboard.add_argument( "--contributor-max-authors", type=int, default=int(defaults.get("contributor-max-authors", 0)), ) deploy_dashboard.add_argument( "--private-space", action="store_true", default=bool(defaults.get("private-space", False)), ) deploy_dashboard.add_argument( "--commit-message", default=defaults.get("commit-message", "Deploy dashboard"), ) deploy_dashboard.add_argument( "--space-id", default=defaults.get("space-id"), help="Hugging Face Space repo id.", ) deploy_dashboard.add_argument("--space-title", default=defaults.get("space-title")) deploy_dashboard.add_argument("--space-emoji", default=defaults.get("space-emoji", "📊")) deploy_dashboard.add_argument( "--space-color-from", default=defaults.get("space-color-from", "indigo") ) deploy_dashboard.add_argument( "--space-color-to", default=defaults.get("space-color-to", "blue") ) deploy_dashboard.add_argument( "--space-short-description", default=defaults.get( "space-short-description", "Static dashboard for the slop-farmer PR analysis pipeline." ), ) deploy_dashboard.add_argument("--dataset-id", default=defaults.get("dataset-id")) deploy_dashboard.add_argument( "--space-tags", default=defaults.get("space-tags", "dashboard,static") ) def _add_dataset_status_parser(subparsers: Any, defaults: dict[str, Any]) -> None: dataset_status = subparsers.add_parser( "dataset-status", help="Inspect canonical dataset freshness and the local latest pointer.", ) dataset_status.add_argument("--repo", default=defaults.get("repo")) dataset_status.add_argument( "--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")), help="Local workspace root containing snapshots/latest.json.", ) dataset_status.add_argument( "--hf-repo-id", default=defaults.get("hf-repo-id"), help="Canonical Hugging Face dataset repo id to inspect.", ) dataset_status.add_argument( "--hf-revision", default=defaults.get("hf-revision"), help="Optional Hub revision for metadata and README download.", ) dataset_status.add_argument("--json", action="store_true", help="Emit machine-readable JSON.") # Dispatch helpers def _explicit_flag_present(flag: str) -> bool: return any(arg == flag or arg.startswith(f"{flag}=") for arg in sys.argv[1:]) def _resolve_hf_inputs(args: argparse.Namespace) -> tuple[str | None, str | None, Path | None]: hf_repo_id = args.hf_repo_id hf_revision = args.hf_revision hf_materialize_dir = args.hf_materialize_dir if args.snapshot_dir is not None and not _explicit_flag_present("--hf-repo-id"): hf_repo_id = None hf_revision = None hf_materialize_dir = None return hf_repo_id, hf_revision, hf_materialize_dir def _run_scrape(args: argparse.Namespace, config_path: Path | None) -> None: from slop_farmer.app.pipeline import run_pipeline new_contributor_report = bool(args.new_contributor_report) options = PipelineOptions( repo=RepoRef.parse(args.repo), output_dir=args.output_dir, since=args.since, resume=args.resume, http_timeout=args.http_timeout, http_max_retries=args.http_max_retries, max_issues=args.max_issues, max_prs=args.max_prs, max_issue_comments=args.max_issue_comments, max_reviews_per_pr=args.max_reviews_per_pr, max_review_comments_per_pr=args.max_review_comments_per_pr, fetch_timeline=args.fetch_timeline, new_contributor_report=new_contributor_report, new_contributor_window_days=args.new_contributor_window_days, new_contributor_max_authors=args.new_contributor_max_authors, issue_max_age_days=args.issue_max_age_days, pr_max_age_days=args.pr_max_age_days, ) print(run_pipeline(options)) def _run_refresh_dataset(args: argparse.Namespace, config_path: Path | None) -> None: from slop_farmer.app.dataset_refresh import run_dataset_refresh refresh_defaults = command_defaults("refresh-dataset", config_path=config_path) result = run_dataset_refresh( DatasetRefreshOptions( repo=RepoRef.parse(args.repo), hf_repo_id=args.hf_repo_id, private_hf_repo=args.private_hf_repo, max_issues=args.max_issues, max_prs=args.max_prs, max_issue_comments=args.max_issue_comments, max_reviews_per_pr=args.max_reviews_per_pr, max_review_comments_per_pr=args.max_review_comments_per_pr, fetch_timeline=args.fetch_timeline, new_contributor_report=args.new_contributor_report, new_contributor_window_days=args.new_contributor_window_days, new_contributor_max_authors=args.new_contributor_max_authors, http_timeout=args.http_timeout, http_max_retries=args.http_max_retries, checkpoint_every_comments=args.checkpoint_every_comments, checkpoint_every_prs=args.checkpoint_every_prs, cluster_suppression_rules=tuple(refresh_defaults.get("cluster-suppression-rules", ())), ) ) print(json.dumps(result, indent=2)) def _run_analyze(args: argparse.Namespace, config_path: Path | None) -> None: from slop_farmer.reports.analysis import run_analysis analyze_defaults = command_defaults("analyze", config_path=config_path) hf_repo_id, hf_revision, hf_materialize_dir = _resolve_hf_inputs(args) options = AnalysisOptions( snapshot_dir=args.snapshot_dir, output_dir=args.output_dir, output=args.output, hf_repo_id=hf_repo_id, hf_revision=hf_revision, hf_materialize_dir=hf_materialize_dir, ranking_backend=args.ranking_backend, model=args.model, max_clusters=args.max_clusters, hybrid_llm_concurrency=args.hybrid_llm_concurrency, open_prs_only=args.open_prs_only, cached_analysis=bool(analyze_defaults.get("cached_analysis", False)), pr_template_cleanup_mode=str( analyze_defaults.get("pr-template-cleanup-mode", "merge_defaults") ), pr_template_strip_html_comments=bool( analyze_defaults.get("pr-template-strip-html-comments", True) ), pr_template_trim_closing_reference_prefix=bool( analyze_defaults.get("pr-template-trim-closing-reference-prefix", True) ), pr_template_section_patterns=tuple( analyze_defaults.get("pr-template-section-patterns", ()) ), pr_template_line_patterns=tuple(analyze_defaults.get("pr-template-line-patterns", ())), cluster_suppression_rules=tuple(analyze_defaults.get("cluster-suppression-rules", ())), ) print(run_analysis(options)) def _run_markdown_report(args: argparse.Namespace, config_path: Path | None) -> None: del config_path from slop_farmer.reports.analysis import render_markdown_report print( render_markdown_report( MarkdownReportOptions( input=args.input, output=args.output, snapshot_dir=args.snapshot_dir, ) ) ) def _run_duplicate_prs(args: argparse.Namespace, config_path: Path | None) -> None: del config_path from slop_farmer.app.duplicate_prs import run_duplicate_pr_merge from slop_farmer.reports.duplicate_prs import list_mergeable_duplicate_pr_clusters if args.duplicate_prs_command == "list": clusters = list_mergeable_duplicate_pr_clusters( report_path=args.report, snapshot_dir=args.snapshot_dir, limit=args.limit, model=args.model, ) print(json.dumps(clusters, indent=2)) return result = run_duplicate_pr_merge( report_path=args.report, snapshot_dir=args.snapshot_dir, repo_dir=args.repo_dir, upstream_repo=args.upstream_repo, upstream_remote=args.upstream_remote, fork_remote=args.fork_remote, cluster_id=args.cluster_id, fork_repo=args.fork_repo, fork_owner=args.fork_owner, file_policy=args.file_policy, model=args.model, ) print(json.dumps(result, indent=2)) def _run_pr_scope(args: argparse.Namespace, config_path: Path | None) -> None: from slop_farmer.reports.pr_scope import run_pr_scope_report pr_scope_defaults = command_defaults("pr-scope", config_path=config_path) hf_repo_id, hf_revision, hf_materialize_dir = _resolve_hf_inputs(args) print( run_pr_scope_report( PrScopeOptions( snapshot_dir=args.snapshot_dir, output_dir=args.output_dir, output=args.output, hf_repo_id=hf_repo_id, hf_revision=hf_revision, hf_materialize_dir=hf_materialize_dir, cluster_suppression_rules=tuple( pr_scope_defaults.get("cluster-suppression-rules", ()) ), ) ) ) def _run_pr_search(args: argparse.Namespace, config_path: Path | None) -> None: from slop_farmer.app.pr_search import ( explain_pr_search_pair, format_pr_search_candidate_clusters, format_pr_search_cluster, format_pr_search_contributor, format_pr_search_contributor_pulls, format_pr_search_pair, format_pr_search_probe, format_pr_search_pull_contributor, format_pr_search_similar, format_pr_search_status, get_pr_search_candidate_clusters, get_pr_search_cluster, get_pr_search_contributor, get_pr_search_contributor_pulls, get_pr_search_pull_contributor, get_pr_search_similar, get_pr_search_status, probe_pr_search_github, resolve_pr_search_db_path, run_pr_search_refresh, ) pr_search_defaults = command_defaults("pr-search", config_path=config_path) if args.pr_search_command == "refresh": hf_repo_id, hf_revision, hf_materialize_dir = _resolve_hf_inputs(args) result = run_pr_search_refresh( PrSearchRefreshOptions( snapshot_dir=args.snapshot_dir, output_dir=args.output_dir, db=args.db, hf_repo_id=hf_repo_id, hf_revision=hf_revision, hf_materialize_dir=hf_materialize_dir, include_drafts=args.include_drafts, include_closed=args.include_closed, limit_prs=args.limit_prs, replace_active=args.replace_active, cluster_suppression_rules=tuple( pr_search_defaults.get("cluster-suppression-rules", ()) ), ) ) print(json.dumps(result, indent=2)) return db_path = resolve_pr_search_db_path(args.db, output_dir=args.output_dir) if args.pr_search_command == "similar": result = get_pr_search_similar( db_path, pr_number=args.pr_number, repo=args.repo, limit=args.limit, ) print(json.dumps(result, indent=2) if args.json else format_pr_search_similar(result)) return if args.pr_search_command == "probe-github": result = probe_pr_search_github( db_path, pr_number=args.pr_number, repo=args.repo, limit=args.limit, ) print(json.dumps(result, indent=2) if args.json else format_pr_search_probe(result)) return if args.pr_search_command == "candidate-clusters": result = get_pr_search_candidate_clusters( db_path, pr_number=args.pr_number, repo=args.repo, limit=args.limit, ) print( json.dumps(result, indent=2) if args.json else format_pr_search_candidate_clusters(result) ) return if args.pr_search_command == "cluster": if args.pr_search_cluster_command != "show": raise ValueError( f"Unsupported pr-search cluster command: {args.pr_search_cluster_command}" ) result = get_pr_search_cluster( db_path, cluster_id=args.cluster_id, repo=args.repo, ) print(json.dumps(result, indent=2) if args.json else format_pr_search_cluster(result)) return if args.pr_search_command == "explain-pair": result = explain_pr_search_pair( db_path, left_pr_number=args.left_pr_number, right_pr_number=args.right_pr_number, repo=args.repo, ) print(json.dumps(result, indent=2) if args.json else format_pr_search_pair(result)) return if args.pr_search_command == "status": result = get_pr_search_status(db_path, repo=args.repo) print(json.dumps(result, indent=2) if args.json else format_pr_search_status(result)) return if args.pr_search_command == "contributor": result = get_pr_search_contributor(db_path, author_login=args.login, repo=args.repo) print(json.dumps(result, indent=2) if args.json else format_pr_search_contributor(result)) return if args.pr_search_command == "contributor-prs": result = get_pr_search_contributor_pulls( db_path, author_login=args.login, repo=args.repo, limit=args.limit, ) print( json.dumps(result, indent=2) if args.json else format_pr_search_contributor_pulls(result) ) return if args.pr_search_command == "pr-contributor": result = get_pr_search_pull_contributor( db_path, pr_number=args.pr_number, repo=args.repo, ) print( json.dumps(result, indent=2) if args.json else format_pr_search_pull_contributor(result) ) return raise ValueError(f"Unsupported pr-search command: {args.pr_search_command}") def _run_import_hf_checkpoint(args: argparse.Namespace, config_path: Path | None) -> None: del config_path from slop_farmer.app.hf_checkpoint_import import import_hf_checkpoint print( import_hf_checkpoint( CheckpointImportOptions( source_repo_id=args.source_repo_id, output_dir=args.output_dir, checkpoint_id=args.checkpoint_id, checkpoint_root=args.checkpoint_root, publish_repo_id=args.publish_repo_id, private_hf_repo=args.private_hf_repo, force=args.force, ) ) ) def _run_adopt_snapshot(args: argparse.Namespace, config_path: Path | None) -> None: del config_path from slop_farmer.app.snapshot_state import adopt_snapshot_for_pipeline print( adopt_snapshot_for_pipeline( SnapshotAdoptOptions( snapshot_dir=args.snapshot_dir, output_dir=args.output_dir, next_since=args.next_since, ) ) ) def _run_new_contributor_report(args: argparse.Namespace, config_path: Path | None) -> None: del config_path from slop_farmer.reports.new_contributor_report import run_new_contributor_report hf_repo_id, hf_revision, hf_materialize_dir = _resolve_hf_inputs(args) print( run_new_contributor_report( NewContributorReportOptions( snapshot_dir=args.snapshot_dir, output_dir=args.output_dir, output=args.output, json_output=args.json_output, hf_repo_id=hf_repo_id, hf_revision=hf_revision, hf_materialize_dir=hf_materialize_dir, window_days=args.window_days, max_authors=args.max_authors, ) ) ) def _run_dashboard_data(args: argparse.Namespace, config_path: Path | None) -> None: from slop_farmer.reports.dashboard import run_dashboard_data dashboard_defaults = command_defaults("dashboard-data", config_path=config_path) hf_repo_id, hf_revision, hf_materialize_dir = _resolve_hf_inputs(args) print( run_dashboard_data( DashboardDataOptions( snapshot_dir=args.snapshot_dir, output_dir=args.output_dir, analysis_input=args.analysis_input, contributors_input=args.contributors_input, pr_scope_input=args.pr_scope_input, hf_repo_id=hf_repo_id, hf_revision=hf_revision, hf_materialize_dir=hf_materialize_dir, window_days=args.window_days, snapshot_root=( Path(dashboard_defaults["snapshot-root"]) if dashboard_defaults.get("snapshot-root") else None ), ) ) ) def _run_deploy_dashboard(args: argparse.Namespace, config_path: Path | None) -> None: del config_path from slop_farmer.app.deploy import run_deploy_dashboard hf_repo_id, hf_revision, hf_materialize_dir = _resolve_hf_inputs(args) run_deploy_dashboard( DeployDashboardOptions( pipeline_data_dir=args.pipeline_data_dir, web_dir=args.web_dir, snapshot_dir=args.snapshot_dir, analysis_input=args.analysis_input, contributors_input=args.contributors_input, pr_scope_input=args.pr_scope_input, hf_repo_id=hf_repo_id, hf_revision=hf_revision, hf_materialize_dir=hf_materialize_dir, refresh_contributors=args.refresh_contributors, dashboard_window_days=args.dashboard_window_days, contributor_window_days=args.contributor_window_days, contributor_max_authors=args.contributor_max_authors, private_space=args.private_space, commit_message=args.commit_message, space_id=args.space_id, space_title=args.space_title, space_emoji=args.space_emoji, space_color_from=args.space_color_from, space_color_to=args.space_color_to, space_short_description=args.space_short_description, dataset_id=args.dataset_id, space_tags=args.space_tags, ) ) def _run_dataset_status(args: argparse.Namespace, config_path: Path | None) -> None: del config_path from slop_farmer.app.dataset_status import format_dataset_status, get_dataset_status result = get_dataset_status( DatasetStatusOptions( repo=args.repo, output_dir=args.output_dir, hf_repo_id=args.hf_repo_id, hf_revision=args.hf_revision, json_output=args.json, ) ) print(json.dumps(result, indent=2) if args.json else format_dataset_status(result)) def _run_publish_analysis_artifacts(args: argparse.Namespace, config_path: Path | None) -> None: del config_path from slop_farmer.app.publish_analysis import run_publish_analysis_artifacts print( json.dumps( run_publish_analysis_artifacts( PublishAnalysisArtifactsOptions( output_dir=args.output_dir, snapshot_dir=args.snapshot_dir, analysis_input=args.analysis_input, hf_repo_id=args.hf_repo_id, analysis_id=args.analysis_id, canonical=args.canonical, save_cache=args.save_cache, private_hf_repo=args.private_hf_repo, ) ), indent=2, ) ) def _run_save_cache(args: argparse.Namespace, config_path: Path | None) -> None: del config_path from slop_farmer.app.save_cache import run_save_cache print( json.dumps( run_save_cache( SaveCacheOptions( output_dir=args.output_dir, snapshot_dir=args.snapshot_dir, hf_repo_id=args.hf_repo_id, private_hf_repo=args.private_hf_repo, ) ), indent=2, ) ) def main() -> None: config_path = extract_cli_config_path() parser = build_parser(config_path=config_path) args = parser.parse_args() handlers: dict[str, CommandHandler] = { "scrape": _run_scrape, "refresh-dataset": _run_refresh_dataset, "analyze": _run_analyze, "markdown-report": _run_markdown_report, "duplicate-prs": _run_duplicate_prs, "pr-scope": _run_pr_scope, "pr-search": _run_pr_search, "import-hf-checkpoint": _run_import_hf_checkpoint, "adopt-snapshot": _run_adopt_snapshot, "new-contributor-report": _run_new_contributor_report, "dashboard-data": _run_dashboard_data, "deploy-dashboard": _run_deploy_dashboard, "dataset-status": _run_dataset_status, "publish-analysis-artifacts": _run_publish_analysis_artifacts, "save-cache": _run_save_cache, } handler = handlers.get(args.command) if handler is None: parser.error(f"Unknown command: {args.command}") handler(args, config_path) if __name__ == "__main__": main()