evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import argparse
import json
import sys
from collections.abc import Callable
from pathlib import Path
from typing import Any
from slop_farmer.app.duplicate_prs import DEFAULT_FILE_POLICY, FILE_POLICY_CHOICES
from slop_farmer.app_config import command_defaults, extract_cli_config_path
from slop_farmer.config import (
AnalysisOptions,
CheckpointImportOptions,
DashboardDataOptions,
DatasetRefreshOptions,
DatasetStatusOptions,
DeployDashboardOptions,
MarkdownReportOptions,
NewContributorReportOptions,
PipelineOptions,
PrScopeOptions,
PrSearchRefreshOptions,
PublishAnalysisArtifactsOptions,
RepoRef,
SaveCacheOptions,
SnapshotAdoptOptions,
)
from slop_farmer.reports.duplicate_prs import DEFAULT_DUPLICATE_PR_MODEL
CommandHandler = Callable[[argparse.Namespace, Path | None], None]
def _int_at_least(minimum: int) -> Callable[[str], int]:
def parse(raw: str) -> int:
value = int(raw)
if value < minimum:
raise argparse.ArgumentTypeError(f"expected integer >= {minimum}")
return value
return parse
def build_parser(*, config_path: Path | None = None) -> argparse.ArgumentParser:
defaults = _load_parser_defaults(config_path)
parser = argparse.ArgumentParser(prog="slop-farmer")
parser.add_argument(
"--config",
type=Path,
help="YAML config file with shared repo/workspace/dashboard defaults.",
)
subparsers = parser.add_subparsers(dest="command", required=True)
_add_scrape_parser(subparsers, defaults["scrape"])
_add_refresh_dataset_parser(subparsers, defaults["refresh-dataset"])
_add_analyze_parser(subparsers, defaults["analyze"])
_add_pr_scope_parser(subparsers, defaults["pr-scope"])
_add_checkpoint_import_parser(subparsers, defaults["import-hf-checkpoint"])
_add_adopt_snapshot_parser(subparsers, defaults["adopt-snapshot"])
_add_markdown_report_parser(subparsers)
_add_duplicate_prs_parser(subparsers)
_add_pr_search_parser(subparsers, defaults["pr-search"])
_add_new_contributor_report_parser(subparsers, defaults["new-contributor-report"])
_add_dashboard_data_parser(subparsers, defaults["dashboard-data"])
_add_publish_analysis_artifacts_parser(subparsers, defaults["publish-analysis-artifacts"])
_add_save_cache_parser(subparsers, defaults["save-cache"])
_add_deploy_dashboard_parser(subparsers, defaults["deploy-dashboard"])
_add_dataset_status_parser(subparsers, defaults["dataset-status"])
return parser
def _load_parser_defaults(config_path: Path | None) -> dict[str, dict[str, Any]]:
commands = (
"scrape",
"refresh-dataset",
"analyze",
"import-hf-checkpoint",
"pr-scope",
"pr-search",
"adopt-snapshot",
"new-contributor-report",
"dashboard-data",
"publish-analysis-artifacts",
"save-cache",
"deploy-dashboard",
"dataset-status",
)
return {command: command_defaults(command, config_path=config_path) for command in commands}
# Parser builders
def _add_scrape_parser(subparsers: Any, defaults: dict[str, Any]) -> None:
scrape = subparsers.add_parser("scrape", help="Scrape GitHub and write a snapshot dataset.")
scrape.add_argument(
"--repo",
default=defaults.get("repo", "huggingface/transformers"),
help="GitHub repository in owner/name form.",
)
scrape.add_argument("--output-dir", type=Path, default=Path(defaults.get("output-dir", "data")))
scrape.add_argument("--since", help="Incremental sync lower bound in ISO 8601 format.")
scrape.add_argument(
"--resume",
dest="resume",
action="store_true",
default=True,
help="Resume from the last successful local watermark when --since is not provided.",
)
scrape.add_argument(
"--no-resume",
dest="resume",
action="store_false",
help="Ignore local watermark state and run from scratch unless --since is set.",
)
scrape.add_argument(
"--http-timeout", type=int, default=180, help="Per-request timeout in seconds."
)
scrape.add_argument(
"--http-max-retries", type=int, default=5, help="Retries for transient network failures."
)
scrape.add_argument(
"--max-issues", type=int, default=None, help="Limit total issue endpoint items read."
)
scrape.add_argument(
"--max-prs", type=int, default=None, help="Limit pull requests to hydrate in detail."
)
scrape.add_argument(
"--issue-max-age-days",
type=int,
default=defaults.get("issue-max-age-days"),
help="Optional created_at age cap for issues included in the snapshot.",
)
scrape.add_argument(
"--pr-max-age-days",
type=int,
default=defaults.get("pr-max-age-days"),
help="Optional created_at age cap for pull requests included in the snapshot.",
)
scrape.add_argument(
"--max-issue-comments", type=int, default=None, help="Limit issue comment rows."
)
scrape.add_argument(
"--max-reviews-per-pr", type=int, default=None, help="Limit review rows per PR."
)
scrape.add_argument(
"--max-review-comments-per-pr",
type=int,
default=None,
help="Limit inline review comment rows per PR.",
)
scrape.add_argument(
"--fetch-timeline",
action="store_true",
default=bool(defaults.get("fetch-timeline", False)),
help="Fetch issue timeline events for linkage rows.",
)
scrape.add_argument(
"--new-contributor-report",
dest="new_contributor_report",
action="store_true",
default=defaults.get("new-contributor-report"),
help="Generate new contributor dataset/report artifacts for the local snapshot.",
)
scrape.add_argument(
"--no-new-contributor-report",
dest="new_contributor_report",
action="store_false",
help="Skip new contributor dataset/report generation.",
)
scrape.add_argument(
"--new-contributor-window-days",
type=int,
default=int(defaults.get("new-contributor-window-days", 42)),
help="Recent public activity window for contributor enrichment.",
)
scrape.add_argument(
"--new-contributor-max-authors",
type=int,
default=int(defaults.get("new-contributor-max-authors", 25)),
help="Maximum number of contributors to include in the new contributor report. Use 0 for no cap.",
)
def _add_refresh_dataset_parser(subparsers: Any, defaults: dict[str, Any]) -> None:
refresh = subparsers.add_parser(
"refresh-dataset",
help="Refresh the canonical Hugging Face dataset repo from remote watermark state.",
)
refresh.add_argument(
"--repo",
default=defaults.get("repo", "huggingface/transformers"),
help="GitHub repository in owner/name form.",
)
refresh.add_argument(
"--hf-repo-id",
default=defaults.get("hf-repo-id"),
required=defaults.get("hf-repo-id") is None,
help="Canonical Hugging Face dataset repo id to refresh.",
)
refresh.add_argument("--max-issues", type=int, default=defaults.get("max-issues"))
refresh.add_argument("--max-prs", type=int, default=defaults.get("max-prs"))
refresh.add_argument(
"--max-issue-comments", type=int, default=defaults.get("max-issue-comments")
)
refresh.add_argument(
"--max-reviews-per-pr", type=int, default=defaults.get("max-reviews-per-pr")
)
refresh.add_argument(
"--max-review-comments-per-pr",
type=int,
default=defaults.get("max-review-comments-per-pr"),
)
refresh.add_argument(
"--fetch-timeline",
action="store_true",
default=bool(defaults.get("fetch-timeline", False)),
)
refresh.add_argument(
"--new-contributor-report",
dest="new_contributor_report",
action="store_true",
default=bool(defaults.get("new-contributor-report", True)),
)
refresh.add_argument(
"--no-new-contributor-report",
dest="new_contributor_report",
action="store_false",
)
refresh.add_argument(
"--new-contributor-window-days",
type=int,
default=int(defaults.get("new-contributor-window-days", 42)),
)
refresh.add_argument(
"--new-contributor-max-authors",
type=int,
default=int(defaults.get("new-contributor-max-authors", 25)),
)
refresh.add_argument("--http-timeout", type=int, default=300)
refresh.add_argument("--http-max-retries", type=int, default=8)
refresh.add_argument("--checkpoint-every-comments", type=int, default=1000)
refresh.add_argument("--checkpoint-every-prs", type=int, default=25)
refresh.add_argument(
"--private-hf-repo",
dest="private_hf_repo",
action="store_true",
default=bool(defaults.get("private-hf-repo", False)),
help="Create the target dataset repo as private if needed.",
)
refresh.add_argument(
"--private",
dest="private_hf_repo",
action="store_true",
help=argparse.SUPPRESS,
)
def _add_analyze_parser(subparsers: Any, defaults: dict[str, Any]) -> None:
analyze = subparsers.add_parser(
"analyze",
help="Analyze a snapshot and write a local JSON report. Canonical publication is separate.",
)
analyze.add_argument(
"--snapshot-dir",
type=Path,
help="Snapshot directory to analyze. Defaults to the latest local snapshot.",
)
analyze.add_argument(
"--output-dir", type=Path, default=Path(defaults.get("output-dir", "data"))
)
analyze.add_argument("--output", type=Path, help="Output path for the analysis JSON.")
analyze.add_argument(
"--hf-repo-id",
default=defaults.get("hf-repo-id"),
help="Analyze a canonical Hugging Face dataset repo by materializing a self-consistent published snapshot locally.",
)
analyze.add_argument(
"--hf-revision",
default=defaults.get("hf-revision"),
help="Optional Hub revision for metadata and README download.",
)
analyze.add_argument(
"--hf-materialize-dir",
type=Path,
default=Path(defaults["hf-materialize-dir"])
if defaults.get("hf-materialize-dir")
else None,
help="Optional local directory used when materializing an HF dataset snapshot.",
)
analyze.add_argument(
"--ranking-backend",
choices=("hybrid", "deterministic"),
default=defaults.get("ranking-backend", "hybrid"),
help="Whether to use deterministic-only ranking or optional fast-agent enrichment.",
)
analyze.add_argument(
"--model",
default=defaults.get("model", "gpt-5.4-mini?service_tier=flex"),
help="Model string used by fast-agent when enabled.",
)
analyze.add_argument(
"--max-clusters",
type=int,
default=int(defaults.get("max-clusters", 10)),
help="Maximum number of meta clusters to include in the report.",
)
analyze.add_argument(
"--hybrid-llm-concurrency",
type=_int_at_least(1),
default=int(defaults.get("hybrid-llm-concurrency", 1)),
help=(
"Maximum number of hybrid LLM review units to run at once. "
"Use 1 to minimize provider pressure."
),
)
analyze.add_argument(
"--open-prs-only",
action="store_true",
default=bool(defaults.get("open-prs-only", False)),
help="Restrict PR analysis/clustering to open PRs only. Draft PRs are still included.",
)
def _add_pr_scope_parser(subparsers: Any, defaults: dict[str, Any]) -> None:
pr_scope = subparsers.add_parser(
"pr-scope", help="Cluster open PRs by holistic file/scope overlap."
)
pr_scope.add_argument(
"--snapshot-dir",
type=Path,
help="Snapshot directory to analyze. Defaults to the latest local snapshot.",
)
pr_scope.add_argument(
"--output-dir", type=Path, default=Path(defaults.get("output-dir", "data"))
)
pr_scope.add_argument(
"--output",
type=Path,
help="Output path for the PR scope JSON. Defaults next to the snapshot.",
)
pr_scope.add_argument(
"--hf-repo-id",
default=defaults.get("hf-repo-id"),
help="Analyze a Hugging Face dataset repo by materializing its parquet export locally.",
)
pr_scope.add_argument(
"--hf-revision",
default=defaults.get("hf-revision"),
help="Optional Hub revision for metadata and README download.",
)
pr_scope.add_argument(
"--hf-materialize-dir",
type=Path,
default=Path(defaults["hf-materialize-dir"])
if defaults.get("hf-materialize-dir")
else None,
help="Optional local directory used when materializing an HF dataset snapshot.",
)
def _add_checkpoint_import_parser(subparsers: Any, defaults: dict[str, Any]) -> None:
checkpoint_import = subparsers.add_parser(
"import-hf-checkpoint",
help="Import a checkpoint snapshot from an HF dataset repo into a clean local snapshot.",
)
checkpoint_import.add_argument(
"--source-repo-id",
default=defaults.get("source-repo-id", "burtenshaw/transformers-pr-slop-dataset"),
help="Source Hugging Face dataset repo id containing checkpoint folders.",
)
checkpoint_import.add_argument(
"--output-dir",
type=Path,
default=Path(defaults.get("output-dir", "eval_data")),
help="Local root directory where the imported snapshot should be written.",
)
checkpoint_import.add_argument(
"--checkpoint-id",
help="Optional checkpoint snapshot id. Defaults to the latest viable checkpoint.",
)
checkpoint_import.add_argument(
"--checkpoint-root",
choices=("checkpoints", "_checkpoints"),
help="Optional checkpoint root directory. Defaults to auto-detect.",
)
checkpoint_import.add_argument(
"--publish-repo-id",
help="Optional HF dataset repo id to publish the imported clean snapshot to.",
)
checkpoint_import.add_argument(
"--private-hf-repo",
action="store_true",
help="Create the publish target as private when --publish-repo-id is used.",
)
checkpoint_import.add_argument(
"--force",
action="store_true",
help="Overwrite an existing imported snapshot directory if present.",
)
def _add_adopt_snapshot_parser(subparsers: Any, defaults: dict[str, Any]) -> None:
adopt_snapshot = subparsers.add_parser(
"adopt-snapshot",
help="Mark an existing snapshot as the current pipeline base so the next scrape resumes from it.",
)
adopt_snapshot.add_argument(
"--snapshot-dir", type=Path, required=True, help="Existing local snapshot directory."
)
adopt_snapshot.add_argument(
"--output-dir",
type=Path,
default=Path(defaults.get("output-dir", "data")),
help="Pipeline workspace root where state/ and snapshots/latest.json should be written.",
)
adopt_snapshot.add_argument(
"--next-since",
help="Optional explicit watermark timestamp. Defaults to snapshot watermark.next_since, crawl_started_at, or extracted_at.",
)
def _add_markdown_report_parser(subparsers: Any) -> None:
markdown = subparsers.add_parser(
"markdown-report", help="Render a markdown report from an analysis JSON file."
)
markdown.add_argument(
"--input", type=Path, required=True, help="Path to an existing analysis JSON report."
)
markdown.add_argument(
"--output",
type=Path,
help="Output path for the markdown report. Defaults next to the input JSON.",
)
markdown.add_argument(
"--snapshot-dir",
type=Path,
help="Optional snapshot directory containing issues.parquet and pull_requests.parquet. Defaults to the input JSON parent directory.",
)
def _add_duplicate_prs_parser(subparsers: Any) -> None:
duplicate_prs = subparsers.add_parser(
"duplicate-prs",
help="List or merge mergeable duplicate PR clusters from hybrid-enriched analysis.",
)
duplicate_prs_subparsers = duplicate_prs.add_subparsers(
dest="duplicate_prs_command", required=True
)
duplicate_list = duplicate_prs_subparsers.add_parser(
"list",
help="List mergeable duplicate PR clusters from a hybrid-enriched analysis report.",
)
duplicate_list_source = duplicate_list.add_mutually_exclusive_group(required=True)
duplicate_list_source.add_argument(
"--report", type=Path, help="Path to an analysis JSON report."
)
duplicate_list_source.add_argument(
"--snapshot-dir", type=Path, help="Snapshot directory to analyze."
)
duplicate_list.add_argument(
"--limit", type=int, default=10, help="Maximum number of mergeable clusters to print."
)
duplicate_list.add_argument(
"--model",
default=DEFAULT_DUPLICATE_PR_MODEL,
help="Model string used for hybrid analysis and duplicate-PR mergeability gating.",
)
duplicate_merge = duplicate_prs_subparsers.add_parser(
"merge",
help="Use Codex to synthesize and publish a minimal upstream PR for a mergeable duplicate cluster.",
)
duplicate_merge_source = duplicate_merge.add_mutually_exclusive_group(required=True)
duplicate_merge_source.add_argument(
"--report", type=Path, help="Path to an analysis JSON report."
)
duplicate_merge_source.add_argument(
"--snapshot-dir", type=Path, help="Snapshot directory to analyze."
)
duplicate_merge.add_argument(
"--repo-dir",
type=Path,
required=True,
help="Local upstream repository checkout used for the synthesis worktree.",
)
duplicate_merge.add_argument(
"--upstream-repo",
help="Optional owner/name override for the upstream target repository.",
)
duplicate_merge.add_argument(
"--upstream-remote",
default="origin",
help="Remote in --repo-dir that points at the upstream repository. Defaults to origin.",
)
duplicate_merge.add_argument(
"--fork-remote",
default="fork",
help="Remote in the synthesis worktree used for pushing the branch. Defaults to fork.",
)
duplicate_merge.add_argument("--cluster-id", help="Optional cluster override.")
duplicate_merge.add_argument(
"--fork-repo",
help="Optional owner/name override for the fork push target. Overrides --fork-owner when both are set.",
)
duplicate_merge.add_argument(
"--fork-owner",
help="Optional GitHub fork owner override. Defaults to the authenticated user.",
)
duplicate_merge.add_argument(
"--file-policy",
choices=FILE_POLICY_CHOICES,
default=DEFAULT_FILE_POLICY,
help="Changed-file policy enforced on the synthesized branch.",
)
duplicate_merge.add_argument(
"--model",
default=DEFAULT_DUPLICATE_PR_MODEL,
help="Model string used for hybrid analysis, mergeability gating, and Codex synthesis.",
)
def _add_pr_search_parser(subparsers: Any, defaults: dict[str, Any]) -> None:
pr_search = subparsers.add_parser(
"pr-search",
help="Refresh and query the DuckDB-backed PR code-similarity index.",
)
pr_search_subparsers = pr_search.add_subparsers(dest="pr_search_command", required=True)
refresh = pr_search_subparsers.add_parser(
"refresh",
help="Refresh the PR code-similarity index from a local snapshot or HF dataset repo.",
)
refresh_source = refresh.add_mutually_exclusive_group()
refresh_source.add_argument(
"--snapshot-dir",
type=Path,
help="Snapshot directory to index. Defaults to the latest local snapshot.",
)
refresh_source.add_argument(
"--hf-repo-id",
default=defaults.get("hf-repo-id"),
help="Hugging Face dataset repo id to materialize before indexing.",
)
refresh.add_argument(
"--hf-revision",
default=defaults.get("hf-revision"),
help="Optional Hub revision for metadata and README download.",
)
refresh.add_argument(
"--hf-materialize-dir",
type=Path,
default=Path(defaults["hf-materialize-dir"])
if defaults.get("hf-materialize-dir")
else None,
help="Optional local directory used when materializing an HF dataset snapshot.",
)
refresh.add_argument(
"--output-dir",
type=Path,
default=Path(defaults.get("output-dir", "data")),
help="Workspace root used for latest snapshot resolution and default DB placement.",
)
refresh.add_argument(
"--db",
type=Path,
default=Path(defaults["db"]) if defaults.get("db") else None,
help="DuckDB file path. Defaults to <output-dir>/state/pr-search.duckdb.",
)
refresh.add_argument("--limit-prs", type=int, help="Optional cap on indexed PRs.")
refresh.add_argument(
"--include-drafts",
action="store_true",
default=bool(defaults.get("include-drafts", False)),
help="Include draft PRs in the indexed universe.",
)
refresh.add_argument(
"--include-closed",
action="store_true",
default=bool(defaults.get("include-closed", False)),
help="Include closed PRs in the indexed universe.",
)
refresh.add_argument(
"--replace-active",
dest="replace_active",
action="store_true",
default=True,
help="Activate the new run on success. Enabled by default.",
)
refresh.add_argument(
"--no-replace-active",
dest="replace_active",
action="store_false",
help="Write the new run without switching the active run pointer.",
)
similar = pr_search_subparsers.add_parser(
"similar", help="Show similar PRs for one indexed pull request."
)
similar.add_argument("pr_number", type=int, help="Pull request number to query.")
similar.add_argument(
"--db",
type=Path,
default=Path(defaults["db"]) if defaults.get("db") else None,
help="DuckDB file path. Defaults to <output-dir>/state/pr-search.duckdb.",
)
similar.add_argument(
"--output-dir",
type=Path,
default=Path(defaults.get("output-dir", "data")),
)
similar.add_argument("--repo", help="Optional repo override when the DB holds multiple repos.")
similar.add_argument("--limit", type=int, default=10, help="Maximum number of rows to show.")
similar.add_argument("--json", action="store_true", help="Emit machine-readable JSON.")
probe_github = pr_search_subparsers.add_parser(
"probe-github",
help="Fetch one live GitHub PR and compare it against the active indexed scope features.",
)
probe_github.add_argument("pr_number", type=int, help="Pull request number to probe.")
probe_github.add_argument(
"--repo",
help="GitHub repository in owner/name form. Defaults to the active repo in the DB.",
)
probe_github.add_argument(
"--db",
type=Path,
default=Path(defaults["db"]) if defaults.get("db") else None,
help="DuckDB file path. Defaults to <output-dir>/state/pr-search.duckdb.",
)
probe_github.add_argument(
"--output-dir",
type=Path,
default=Path(defaults.get("output-dir", "data")),
)
probe_github.add_argument(
"--limit",
type=int,
default=10,
help="Maximum number of similar PR rows to show.",
)
probe_github.add_argument("--json", action="store_true", help="Emit machine-readable JSON.")
candidate_clusters = pr_search_subparsers.add_parser(
"candidate-clusters",
help="Show candidate scope clusters for one indexed pull request.",
)
candidate_clusters.add_argument("pr_number", type=int, help="Pull request number to query.")
candidate_clusters.add_argument(
"--db",
type=Path,
default=Path(defaults["db"]) if defaults.get("db") else None,
help="DuckDB file path. Defaults to <output-dir>/state/pr-search.duckdb.",
)
candidate_clusters.add_argument(
"--output-dir",
type=Path,
default=Path(defaults.get("output-dir", "data")),
)
candidate_clusters.add_argument(
"--repo", help="Optional repo override when the DB holds multiple repos."
)
candidate_clusters.add_argument(
"--limit", type=int, default=5, help="Maximum number of rows to show."
)
candidate_clusters.add_argument("--json", action="store_true", help="Emit JSON.")
cluster = pr_search_subparsers.add_parser("cluster", help="Inspect one scope cluster.")
cluster_subparsers = cluster.add_subparsers(dest="pr_search_cluster_command", required=True)
cluster_show = cluster_subparsers.add_parser("show", help="Show cluster details.")
cluster_show.add_argument("cluster_id", help="Cluster identifier.")
cluster_show.add_argument(
"--db",
type=Path,
default=Path(defaults["db"]) if defaults.get("db") else None,
help="DuckDB file path. Defaults to <output-dir>/state/pr-search.duckdb.",
)
cluster_show.add_argument(
"--output-dir",
type=Path,
default=Path(defaults.get("output-dir", "data")),
)
cluster_show.add_argument("--repo", help="Optional repo override.")
cluster_show.add_argument("--json", action="store_true", help="Emit JSON.")
explain_pair = pr_search_subparsers.add_parser(
"explain-pair",
help="Explain one PR pair, falling back to on-demand scoring when needed.",
)
explain_pair.add_argument("left_pr_number", type=int)
explain_pair.add_argument("right_pr_number", type=int)
explain_pair.add_argument(
"--db",
type=Path,
default=Path(defaults["db"]) if defaults.get("db") else None,
help="DuckDB file path. Defaults to <output-dir>/state/pr-search.duckdb.",
)
explain_pair.add_argument(
"--output-dir",
type=Path,
default=Path(defaults.get("output-dir", "data")),
)
explain_pair.add_argument("--repo", help="Optional repo override.")
explain_pair.add_argument("--json", action="store_true", help="Emit JSON.")
status = pr_search_subparsers.add_parser("status", help="Show the active PR search run.")
status.add_argument(
"--db",
type=Path,
default=Path(defaults["db"]) if defaults.get("db") else None,
help="DuckDB file path. Defaults to <output-dir>/state/pr-search.duckdb.",
)
status.add_argument(
"--output-dir",
type=Path,
default=Path(defaults.get("output-dir", "data")),
)
status.add_argument("--repo", help="Optional repo override.")
status.add_argument("--json", action="store_true", help="Emit JSON.")
contributor = pr_search_subparsers.add_parser(
"contributor", help="Show indexed contributor summary for one author login."
)
contributor.add_argument("login", help="GitHub author login to query.")
contributor.add_argument(
"--db",
type=Path,
default=Path(defaults["db"]) if defaults.get("db") else None,
help="DuckDB file path. Defaults to <output-dir>/state/pr-search.duckdb.",
)
contributor.add_argument(
"--output-dir",
type=Path,
default=Path(defaults.get("output-dir", "data")),
)
contributor.add_argument("--repo", help="Optional repo override.")
contributor.add_argument("--json", action="store_true", help="Emit JSON.")
contributor_prs = pr_search_subparsers.add_parser(
"contributor-prs", help="List indexed PRs for one contributor login."
)
contributor_prs.add_argument("login", help="GitHub author login to query.")
contributor_prs.add_argument(
"--db",
type=Path,
default=Path(defaults["db"]) if defaults.get("db") else None,
help="DuckDB file path. Defaults to <output-dir>/state/pr-search.duckdb.",
)
contributor_prs.add_argument(
"--output-dir",
type=Path,
default=Path(defaults.get("output-dir", "data")),
)
contributor_prs.add_argument("--repo", help="Optional repo override.")
contributor_prs.add_argument("--limit", type=int, default=20, help="Maximum rows to show.")
contributor_prs.add_argument("--json", action="store_true", help="Emit JSON.")
pr_contributor = pr_search_subparsers.add_parser(
"pr-contributor", help="Show contributor summary for the author of one indexed PR."
)
pr_contributor.add_argument("pr_number", type=int, help="Pull request number to query.")
pr_contributor.add_argument(
"--db",
type=Path,
default=Path(defaults["db"]) if defaults.get("db") else None,
help="DuckDB file path. Defaults to <output-dir>/state/pr-search.duckdb.",
)
pr_contributor.add_argument(
"--output-dir",
type=Path,
default=Path(defaults.get("output-dir", "data")),
)
pr_contributor.add_argument("--repo", help="Optional repo override.")
pr_contributor.add_argument("--json", action="store_true", help="Emit JSON.")
def _add_new_contributor_report_parser(subparsers: Any, defaults: dict[str, Any]) -> None:
new_contributor = subparsers.add_parser(
"new-contributor-report",
help="Render a markdown report for newly observed contributors in a snapshot.",
)
new_contributor.add_argument(
"--snapshot-dir",
type=Path,
help="Snapshot directory to inspect. Defaults to the latest local snapshot.",
)
new_contributor.add_argument(
"--output-dir", type=Path, default=Path(defaults.get("output-dir", "data"))
)
new_contributor.add_argument(
"--output",
type=Path,
help="Output path for the markdown report. Defaults next to the snapshot.",
)
new_contributor.add_argument(
"--json-output", type=Path, help="Optional JSON output path. Defaults next to the snapshot."
)
new_contributor.add_argument(
"--hf-repo-id",
default=defaults.get("hf-repo-id"),
help="Analyze a Hugging Face dataset repo by materializing its parquet export locally.",
)
new_contributor.add_argument(
"--hf-revision",
default=defaults.get("hf-revision"),
help="Optional Hub revision for metadata and README download.",
)
new_contributor.add_argument(
"--hf-materialize-dir",
type=Path,
default=Path(defaults["hf-materialize-dir"])
if defaults.get("hf-materialize-dir")
else None,
help="Optional local directory used when materializing an HF dataset snapshot.",
)
new_contributor.add_argument(
"--window-days",
type=int,
default=int(defaults.get("window-days", 42)),
help="Recent public activity window for contributor enrichment.",
)
new_contributor.add_argument(
"--max-authors",
type=int,
default=int(defaults.get("max-authors", 25)),
help="Maximum number of contributors to include. Use 0 for no cap.",
)
def _add_dashboard_data_parser(subparsers: Any, defaults: dict[str, Any]) -> None:
dashboard = subparsers.add_parser(
"dashboard-data", help="Export frontend-ready JSON for the static dashboard."
)
dashboard.add_argument(
"--snapshot-dir",
type=Path,
help="Snapshot directory to export. Defaults to the latest local snapshot.",
)
dashboard.add_argument(
"--output-dir",
type=Path,
default=Path(defaults.get("output-dir", "web/public/data")),
)
dashboard.add_argument(
"--analysis-input",
type=Path,
help="Optional analysis report JSON override. Defaults to canonical published current analysis when available, otherwise falls back to snapshot-local analysis files.",
)
dashboard.add_argument(
"--contributors-input",
type=Path,
help="Optional contributor report JSON override. Defaults to the materialized snapshot's new-contributors-report.json.",
)
dashboard.add_argument(
"--pr-scope-input",
type=Path,
help="Optional PR scope cluster JSON override. Defaults to the materialized snapshot's pr-scope-clusters.json.",
)
dashboard.add_argument(
"--hf-repo-id",
default=defaults.get("hf-repo-id"),
help="Materialize the canonical Hugging Face dataset repo instead of using the latest local snapshot.",
)
dashboard.add_argument(
"--hf-revision",
default=defaults.get("hf-revision"),
help="Optional Hub revision for metadata and README download.",
)
dashboard.add_argument(
"--hf-materialize-dir",
type=Path,
default=Path(defaults["hf-materialize-dir"])
if defaults.get("hf-materialize-dir")
else None,
help="Optional local directory used when materializing an HF dataset snapshot.",
)
dashboard.add_argument(
"--window-days",
type=int,
default=int(defaults.get("window-days", 14)),
help="Recent PR window to expose in the dashboard.",
)
def _add_publish_analysis_artifacts_parser(subparsers: Any, defaults: dict[str, Any]) -> None:
publish_analysis = subparsers.add_parser(
"publish-analysis-artifacts",
help="Publish archived and optional canonical hybrid analysis artifacts to a dataset repo.",
)
publish_analysis.add_argument(
"--output-dir",
type=Path,
default=Path(defaults.get("output-dir", "data")),
help="Pipeline workspace root containing snapshots/latest.json.",
)
publish_analysis.add_argument(
"--snapshot-dir",
type=Path,
help="Optional explicit snapshot directory containing analysis-report-hybrid.json.",
)
publish_analysis.add_argument(
"--analysis-input",
type=Path,
help="Optional explicit hybrid analysis report JSON to publish instead of snapshot-dir discovery.",
)
publish_analysis.add_argument(
"--hf-repo-id",
default=defaults.get("hf-repo-id"),
required=defaults.get("hf-repo-id") is None,
help="Target Hugging Face dataset repo id.",
)
publish_analysis.add_argument("--analysis-id", required=True, help="Immutable analysis run id.")
publish_analysis.add_argument(
"--canonical",
action="store_true",
default=bool(defaults.get("canonical", False)),
help="Also update the stable analysis/current canonical alias.",
)
publish_analysis.add_argument(
"--save-cache",
action="store_true",
default=bool(defaults.get("save-cache", False)),
help="Also upload snapshot-local analysis-state/ as mutable operational cache at repo-root analysis-state/.",
)
publish_analysis.add_argument(
"--private-hf-repo",
action="store_true",
default=bool(defaults.get("private-hf-repo", False)),
help="Create the target dataset repo as private if needed.",
)
def _add_save_cache_parser(subparsers: Any, defaults: dict[str, Any]) -> None:
save_cache = subparsers.add_parser(
"save-cache",
help="Upload snapshot-local analysis-state/ as mutable operational cache to a dataset repo.",
)
save_cache.add_argument(
"--output-dir",
type=Path,
default=Path(defaults.get("output-dir", "data")),
help="Pipeline workspace root containing snapshots/latest.json.",
)
save_cache.add_argument(
"--snapshot-dir",
type=Path,
help="Optional explicit snapshot directory containing analysis-state/.",
)
save_cache.add_argument(
"--hf-repo-id",
default=defaults.get("hf-repo-id"),
required=defaults.get("hf-repo-id") is None,
help="Target Hugging Face dataset repo id.",
)
save_cache.add_argument(
"--private-hf-repo",
action="store_true",
default=bool(defaults.get("private-hf-repo", False)),
help="Create the target dataset repo as private if needed.",
)
def _add_deploy_dashboard_parser(subparsers: Any, defaults: dict[str, Any]) -> None:
deploy_dashboard = subparsers.add_parser(
"deploy-dashboard",
help="Build and publish the static dashboard to a Hugging Face Space from a materialized dataset view.",
)
deploy_dashboard.add_argument(
"--pipeline-data-dir",
type=Path,
default=Path(defaults.get("pipeline-data-dir", "data")),
)
deploy_dashboard.add_argument(
"--web-dir", type=Path, default=Path(defaults.get("web-dir", "web"))
)
deploy_dashboard.add_argument(
"--snapshot-dir",
type=Path,
help="Optional snapshot directory to publish. Defaults to the latest snapshot in --pipeline-data-dir.",
)
deploy_dashboard.add_argument(
"--analysis-input",
type=Path,
help="Optional analysis report JSON override. Omit to prefer canonical published current analysis when available.",
)
deploy_dashboard.add_argument(
"--contributors-input",
type=Path,
help="Optional contributor report JSON override.",
)
deploy_dashboard.add_argument(
"--pr-scope-input",
type=Path,
help="Optional PR scope cluster JSON override.",
)
deploy_dashboard.add_argument(
"--hf-repo-id",
default=defaults.get("hf-repo-id"),
help="Materialize the canonical Hugging Face dataset repo instead of using the latest local snapshot.",
)
deploy_dashboard.add_argument(
"--hf-revision",
default=defaults.get("hf-revision"),
help="Optional Hub revision for metadata and README download.",
)
deploy_dashboard.add_argument(
"--hf-materialize-dir",
type=Path,
default=Path(defaults["hf-materialize-dir"])
if defaults.get("hf-materialize-dir")
else None,
help="Optional local directory used when materializing an HF dataset snapshot.",
)
deploy_dashboard.add_argument(
"--refresh-contributors",
action="store_true",
default=bool(defaults.get("refresh-contributors", False)),
)
deploy_dashboard.add_argument(
"--dashboard-window-days",
type=int,
default=int(defaults.get("dashboard-window-days", 14)),
)
deploy_dashboard.add_argument(
"--contributor-window-days",
type=int,
default=int(
defaults.get("contributor-window-days", defaults.get("dashboard-window-days", 14))
),
)
deploy_dashboard.add_argument(
"--contributor-max-authors",
type=int,
default=int(defaults.get("contributor-max-authors", 0)),
)
deploy_dashboard.add_argument(
"--private-space",
action="store_true",
default=bool(defaults.get("private-space", False)),
)
deploy_dashboard.add_argument(
"--commit-message",
default=defaults.get("commit-message", "Deploy dashboard"),
)
deploy_dashboard.add_argument(
"--space-id",
default=defaults.get("space-id"),
help="Hugging Face Space repo id.",
)
deploy_dashboard.add_argument("--space-title", default=defaults.get("space-title"))
deploy_dashboard.add_argument("--space-emoji", default=defaults.get("space-emoji", "📊"))
deploy_dashboard.add_argument(
"--space-color-from", default=defaults.get("space-color-from", "indigo")
)
deploy_dashboard.add_argument(
"--space-color-to", default=defaults.get("space-color-to", "blue")
)
deploy_dashboard.add_argument(
"--space-short-description",
default=defaults.get(
"space-short-description", "Static dashboard for the slop-farmer PR analysis pipeline."
),
)
deploy_dashboard.add_argument("--dataset-id", default=defaults.get("dataset-id"))
deploy_dashboard.add_argument(
"--space-tags", default=defaults.get("space-tags", "dashboard,static")
)
def _add_dataset_status_parser(subparsers: Any, defaults: dict[str, Any]) -> None:
dataset_status = subparsers.add_parser(
"dataset-status",
help="Inspect canonical dataset freshness and the local latest pointer.",
)
dataset_status.add_argument("--repo", default=defaults.get("repo"))
dataset_status.add_argument(
"--output-dir",
type=Path,
default=Path(defaults.get("output-dir", "data")),
help="Local workspace root containing snapshots/latest.json.",
)
dataset_status.add_argument(
"--hf-repo-id",
default=defaults.get("hf-repo-id"),
help="Canonical Hugging Face dataset repo id to inspect.",
)
dataset_status.add_argument(
"--hf-revision",
default=defaults.get("hf-revision"),
help="Optional Hub revision for metadata and README download.",
)
dataset_status.add_argument("--json", action="store_true", help="Emit machine-readable JSON.")
# Dispatch helpers
def _explicit_flag_present(flag: str) -> bool:
return any(arg == flag or arg.startswith(f"{flag}=") for arg in sys.argv[1:])
def _resolve_hf_inputs(args: argparse.Namespace) -> tuple[str | None, str | None, Path | None]:
hf_repo_id = args.hf_repo_id
hf_revision = args.hf_revision
hf_materialize_dir = args.hf_materialize_dir
if args.snapshot_dir is not None and not _explicit_flag_present("--hf-repo-id"):
hf_repo_id = None
hf_revision = None
hf_materialize_dir = None
return hf_repo_id, hf_revision, hf_materialize_dir
def _run_scrape(args: argparse.Namespace, config_path: Path | None) -> None:
from slop_farmer.app.pipeline import run_pipeline
new_contributor_report = bool(args.new_contributor_report)
options = PipelineOptions(
repo=RepoRef.parse(args.repo),
output_dir=args.output_dir,
since=args.since,
resume=args.resume,
http_timeout=args.http_timeout,
http_max_retries=args.http_max_retries,
max_issues=args.max_issues,
max_prs=args.max_prs,
max_issue_comments=args.max_issue_comments,
max_reviews_per_pr=args.max_reviews_per_pr,
max_review_comments_per_pr=args.max_review_comments_per_pr,
fetch_timeline=args.fetch_timeline,
new_contributor_report=new_contributor_report,
new_contributor_window_days=args.new_contributor_window_days,
new_contributor_max_authors=args.new_contributor_max_authors,
issue_max_age_days=args.issue_max_age_days,
pr_max_age_days=args.pr_max_age_days,
)
print(run_pipeline(options))
def _run_refresh_dataset(args: argparse.Namespace, config_path: Path | None) -> None:
from slop_farmer.app.dataset_refresh import run_dataset_refresh
refresh_defaults = command_defaults("refresh-dataset", config_path=config_path)
result = run_dataset_refresh(
DatasetRefreshOptions(
repo=RepoRef.parse(args.repo),
hf_repo_id=args.hf_repo_id,
private_hf_repo=args.private_hf_repo,
max_issues=args.max_issues,
max_prs=args.max_prs,
max_issue_comments=args.max_issue_comments,
max_reviews_per_pr=args.max_reviews_per_pr,
max_review_comments_per_pr=args.max_review_comments_per_pr,
fetch_timeline=args.fetch_timeline,
new_contributor_report=args.new_contributor_report,
new_contributor_window_days=args.new_contributor_window_days,
new_contributor_max_authors=args.new_contributor_max_authors,
http_timeout=args.http_timeout,
http_max_retries=args.http_max_retries,
checkpoint_every_comments=args.checkpoint_every_comments,
checkpoint_every_prs=args.checkpoint_every_prs,
cluster_suppression_rules=tuple(refresh_defaults.get("cluster-suppression-rules", ())),
)
)
print(json.dumps(result, indent=2))
def _run_analyze(args: argparse.Namespace, config_path: Path | None) -> None:
from slop_farmer.reports.analysis import run_analysis
analyze_defaults = command_defaults("analyze", config_path=config_path)
hf_repo_id, hf_revision, hf_materialize_dir = _resolve_hf_inputs(args)
options = AnalysisOptions(
snapshot_dir=args.snapshot_dir,
output_dir=args.output_dir,
output=args.output,
hf_repo_id=hf_repo_id,
hf_revision=hf_revision,
hf_materialize_dir=hf_materialize_dir,
ranking_backend=args.ranking_backend,
model=args.model,
max_clusters=args.max_clusters,
hybrid_llm_concurrency=args.hybrid_llm_concurrency,
open_prs_only=args.open_prs_only,
cached_analysis=bool(analyze_defaults.get("cached_analysis", False)),
pr_template_cleanup_mode=str(
analyze_defaults.get("pr-template-cleanup-mode", "merge_defaults")
),
pr_template_strip_html_comments=bool(
analyze_defaults.get("pr-template-strip-html-comments", True)
),
pr_template_trim_closing_reference_prefix=bool(
analyze_defaults.get("pr-template-trim-closing-reference-prefix", True)
),
pr_template_section_patterns=tuple(
analyze_defaults.get("pr-template-section-patterns", ())
),
pr_template_line_patterns=tuple(analyze_defaults.get("pr-template-line-patterns", ())),
cluster_suppression_rules=tuple(analyze_defaults.get("cluster-suppression-rules", ())),
)
print(run_analysis(options))
def _run_markdown_report(args: argparse.Namespace, config_path: Path | None) -> None:
del config_path
from slop_farmer.reports.analysis import render_markdown_report
print(
render_markdown_report(
MarkdownReportOptions(
input=args.input,
output=args.output,
snapshot_dir=args.snapshot_dir,
)
)
)
def _run_duplicate_prs(args: argparse.Namespace, config_path: Path | None) -> None:
del config_path
from slop_farmer.app.duplicate_prs import run_duplicate_pr_merge
from slop_farmer.reports.duplicate_prs import list_mergeable_duplicate_pr_clusters
if args.duplicate_prs_command == "list":
clusters = list_mergeable_duplicate_pr_clusters(
report_path=args.report,
snapshot_dir=args.snapshot_dir,
limit=args.limit,
model=args.model,
)
print(json.dumps(clusters, indent=2))
return
result = run_duplicate_pr_merge(
report_path=args.report,
snapshot_dir=args.snapshot_dir,
repo_dir=args.repo_dir,
upstream_repo=args.upstream_repo,
upstream_remote=args.upstream_remote,
fork_remote=args.fork_remote,
cluster_id=args.cluster_id,
fork_repo=args.fork_repo,
fork_owner=args.fork_owner,
file_policy=args.file_policy,
model=args.model,
)
print(json.dumps(result, indent=2))
def _run_pr_scope(args: argparse.Namespace, config_path: Path | None) -> None:
from slop_farmer.reports.pr_scope import run_pr_scope_report
pr_scope_defaults = command_defaults("pr-scope", config_path=config_path)
hf_repo_id, hf_revision, hf_materialize_dir = _resolve_hf_inputs(args)
print(
run_pr_scope_report(
PrScopeOptions(
snapshot_dir=args.snapshot_dir,
output_dir=args.output_dir,
output=args.output,
hf_repo_id=hf_repo_id,
hf_revision=hf_revision,
hf_materialize_dir=hf_materialize_dir,
cluster_suppression_rules=tuple(
pr_scope_defaults.get("cluster-suppression-rules", ())
),
)
)
)
def _run_pr_search(args: argparse.Namespace, config_path: Path | None) -> None:
from slop_farmer.app.pr_search import (
explain_pr_search_pair,
format_pr_search_candidate_clusters,
format_pr_search_cluster,
format_pr_search_contributor,
format_pr_search_contributor_pulls,
format_pr_search_pair,
format_pr_search_probe,
format_pr_search_pull_contributor,
format_pr_search_similar,
format_pr_search_status,
get_pr_search_candidate_clusters,
get_pr_search_cluster,
get_pr_search_contributor,
get_pr_search_contributor_pulls,
get_pr_search_pull_contributor,
get_pr_search_similar,
get_pr_search_status,
probe_pr_search_github,
resolve_pr_search_db_path,
run_pr_search_refresh,
)
pr_search_defaults = command_defaults("pr-search", config_path=config_path)
if args.pr_search_command == "refresh":
hf_repo_id, hf_revision, hf_materialize_dir = _resolve_hf_inputs(args)
result = run_pr_search_refresh(
PrSearchRefreshOptions(
snapshot_dir=args.snapshot_dir,
output_dir=args.output_dir,
db=args.db,
hf_repo_id=hf_repo_id,
hf_revision=hf_revision,
hf_materialize_dir=hf_materialize_dir,
include_drafts=args.include_drafts,
include_closed=args.include_closed,
limit_prs=args.limit_prs,
replace_active=args.replace_active,
cluster_suppression_rules=tuple(
pr_search_defaults.get("cluster-suppression-rules", ())
),
)
)
print(json.dumps(result, indent=2))
return
db_path = resolve_pr_search_db_path(args.db, output_dir=args.output_dir)
if args.pr_search_command == "similar":
result = get_pr_search_similar(
db_path,
pr_number=args.pr_number,
repo=args.repo,
limit=args.limit,
)
print(json.dumps(result, indent=2) if args.json else format_pr_search_similar(result))
return
if args.pr_search_command == "probe-github":
result = probe_pr_search_github(
db_path,
pr_number=args.pr_number,
repo=args.repo,
limit=args.limit,
)
print(json.dumps(result, indent=2) if args.json else format_pr_search_probe(result))
return
if args.pr_search_command == "candidate-clusters":
result = get_pr_search_candidate_clusters(
db_path,
pr_number=args.pr_number,
repo=args.repo,
limit=args.limit,
)
print(
json.dumps(result, indent=2)
if args.json
else format_pr_search_candidate_clusters(result)
)
return
if args.pr_search_command == "cluster":
if args.pr_search_cluster_command != "show":
raise ValueError(
f"Unsupported pr-search cluster command: {args.pr_search_cluster_command}"
)
result = get_pr_search_cluster(
db_path,
cluster_id=args.cluster_id,
repo=args.repo,
)
print(json.dumps(result, indent=2) if args.json else format_pr_search_cluster(result))
return
if args.pr_search_command == "explain-pair":
result = explain_pr_search_pair(
db_path,
left_pr_number=args.left_pr_number,
right_pr_number=args.right_pr_number,
repo=args.repo,
)
print(json.dumps(result, indent=2) if args.json else format_pr_search_pair(result))
return
if args.pr_search_command == "status":
result = get_pr_search_status(db_path, repo=args.repo)
print(json.dumps(result, indent=2) if args.json else format_pr_search_status(result))
return
if args.pr_search_command == "contributor":
result = get_pr_search_contributor(db_path, author_login=args.login, repo=args.repo)
print(json.dumps(result, indent=2) if args.json else format_pr_search_contributor(result))
return
if args.pr_search_command == "contributor-prs":
result = get_pr_search_contributor_pulls(
db_path,
author_login=args.login,
repo=args.repo,
limit=args.limit,
)
print(
json.dumps(result, indent=2)
if args.json
else format_pr_search_contributor_pulls(result)
)
return
if args.pr_search_command == "pr-contributor":
result = get_pr_search_pull_contributor(
db_path,
pr_number=args.pr_number,
repo=args.repo,
)
print(
json.dumps(result, indent=2) if args.json else format_pr_search_pull_contributor(result)
)
return
raise ValueError(f"Unsupported pr-search command: {args.pr_search_command}")
def _run_import_hf_checkpoint(args: argparse.Namespace, config_path: Path | None) -> None:
del config_path
from slop_farmer.app.hf_checkpoint_import import import_hf_checkpoint
print(
import_hf_checkpoint(
CheckpointImportOptions(
source_repo_id=args.source_repo_id,
output_dir=args.output_dir,
checkpoint_id=args.checkpoint_id,
checkpoint_root=args.checkpoint_root,
publish_repo_id=args.publish_repo_id,
private_hf_repo=args.private_hf_repo,
force=args.force,
)
)
)
def _run_adopt_snapshot(args: argparse.Namespace, config_path: Path | None) -> None:
del config_path
from slop_farmer.app.snapshot_state import adopt_snapshot_for_pipeline
print(
adopt_snapshot_for_pipeline(
SnapshotAdoptOptions(
snapshot_dir=args.snapshot_dir,
output_dir=args.output_dir,
next_since=args.next_since,
)
)
)
def _run_new_contributor_report(args: argparse.Namespace, config_path: Path | None) -> None:
del config_path
from slop_farmer.reports.new_contributor_report import run_new_contributor_report
hf_repo_id, hf_revision, hf_materialize_dir = _resolve_hf_inputs(args)
print(
run_new_contributor_report(
NewContributorReportOptions(
snapshot_dir=args.snapshot_dir,
output_dir=args.output_dir,
output=args.output,
json_output=args.json_output,
hf_repo_id=hf_repo_id,
hf_revision=hf_revision,
hf_materialize_dir=hf_materialize_dir,
window_days=args.window_days,
max_authors=args.max_authors,
)
)
)
def _run_dashboard_data(args: argparse.Namespace, config_path: Path | None) -> None:
from slop_farmer.reports.dashboard import run_dashboard_data
dashboard_defaults = command_defaults("dashboard-data", config_path=config_path)
hf_repo_id, hf_revision, hf_materialize_dir = _resolve_hf_inputs(args)
print(
run_dashboard_data(
DashboardDataOptions(
snapshot_dir=args.snapshot_dir,
output_dir=args.output_dir,
analysis_input=args.analysis_input,
contributors_input=args.contributors_input,
pr_scope_input=args.pr_scope_input,
hf_repo_id=hf_repo_id,
hf_revision=hf_revision,
hf_materialize_dir=hf_materialize_dir,
window_days=args.window_days,
snapshot_root=(
Path(dashboard_defaults["snapshot-root"])
if dashboard_defaults.get("snapshot-root")
else None
),
)
)
)
def _run_deploy_dashboard(args: argparse.Namespace, config_path: Path | None) -> None:
del config_path
from slop_farmer.app.deploy import run_deploy_dashboard
hf_repo_id, hf_revision, hf_materialize_dir = _resolve_hf_inputs(args)
run_deploy_dashboard(
DeployDashboardOptions(
pipeline_data_dir=args.pipeline_data_dir,
web_dir=args.web_dir,
snapshot_dir=args.snapshot_dir,
analysis_input=args.analysis_input,
contributors_input=args.contributors_input,
pr_scope_input=args.pr_scope_input,
hf_repo_id=hf_repo_id,
hf_revision=hf_revision,
hf_materialize_dir=hf_materialize_dir,
refresh_contributors=args.refresh_contributors,
dashboard_window_days=args.dashboard_window_days,
contributor_window_days=args.contributor_window_days,
contributor_max_authors=args.contributor_max_authors,
private_space=args.private_space,
commit_message=args.commit_message,
space_id=args.space_id,
space_title=args.space_title,
space_emoji=args.space_emoji,
space_color_from=args.space_color_from,
space_color_to=args.space_color_to,
space_short_description=args.space_short_description,
dataset_id=args.dataset_id,
space_tags=args.space_tags,
)
)
def _run_dataset_status(args: argparse.Namespace, config_path: Path | None) -> None:
del config_path
from slop_farmer.app.dataset_status import format_dataset_status, get_dataset_status
result = get_dataset_status(
DatasetStatusOptions(
repo=args.repo,
output_dir=args.output_dir,
hf_repo_id=args.hf_repo_id,
hf_revision=args.hf_revision,
json_output=args.json,
)
)
print(json.dumps(result, indent=2) if args.json else format_dataset_status(result))
def _run_publish_analysis_artifacts(args: argparse.Namespace, config_path: Path | None) -> None:
del config_path
from slop_farmer.app.publish_analysis import run_publish_analysis_artifacts
print(
json.dumps(
run_publish_analysis_artifacts(
PublishAnalysisArtifactsOptions(
output_dir=args.output_dir,
snapshot_dir=args.snapshot_dir,
analysis_input=args.analysis_input,
hf_repo_id=args.hf_repo_id,
analysis_id=args.analysis_id,
canonical=args.canonical,
save_cache=args.save_cache,
private_hf_repo=args.private_hf_repo,
)
),
indent=2,
)
)
def _run_save_cache(args: argparse.Namespace, config_path: Path | None) -> None:
del config_path
from slop_farmer.app.save_cache import run_save_cache
print(
json.dumps(
run_save_cache(
SaveCacheOptions(
output_dir=args.output_dir,
snapshot_dir=args.snapshot_dir,
hf_repo_id=args.hf_repo_id,
private_hf_repo=args.private_hf_repo,
)
),
indent=2,
)
)
def main() -> None:
config_path = extract_cli_config_path()
parser = build_parser(config_path=config_path)
args = parser.parse_args()
handlers: dict[str, CommandHandler] = {
"scrape": _run_scrape,
"refresh-dataset": _run_refresh_dataset,
"analyze": _run_analyze,
"markdown-report": _run_markdown_report,
"duplicate-prs": _run_duplicate_prs,
"pr-scope": _run_pr_scope,
"pr-search": _run_pr_search,
"import-hf-checkpoint": _run_import_hf_checkpoint,
"adopt-snapshot": _run_adopt_snapshot,
"new-contributor-report": _run_new_contributor_report,
"dashboard-data": _run_dashboard_data,
"deploy-dashboard": _run_deploy_dashboard,
"dataset-status": _run_dataset_status,
"publish-analysis-artifacts": _run_publish_analysis_artifacts,
"save-cache": _run_save_cache,
}
handler = handlers.get(args.command)
if handler is None:
parser.error(f"Unknown command: {args.command}")
handler(args, config_path)
if __name__ == "__main__":
main()