from __future__ import annotations import sys import tomllib from pathlib import Path from typing import Any import yaml PATH_LIKE_DEFAULT_KEYS = { "db", "output-dir", "workspace-root", "workspace", "pipeline-data-dir", "web-dir", "hf-materialize-dir", "snapshot-dir", "snapshot-root", "analysis-input", "contributors-input", "pr-scope-input", } def _string_tuple(value: Any) -> tuple[str, ...]: if not isinstance(value, list): return () return tuple(str(item) for item in value if str(item).strip()) def _dict_tuple(value: Any) -> tuple[dict[str, Any], ...]: if not isinstance(value, list): return () return tuple(item for item in value if isinstance(item, dict)) def _bool_value(value: Any, *, field_name: str, config_path: Path) -> bool: if isinstance(value, bool): return value raise ValueError(f"Expected boolean for {field_name} in config file: {config_path}") def _find_project_root(start: Path) -> Path: for directory in (start, *start.parents): if (directory / "pyproject.toml").exists(): return directory return start def _find_pyproject() -> Path | None: for directory in (Path.cwd(), *Path.cwd().parents): path = directory / "pyproject.toml" if path.exists(): return path return None def _pyproject_cli_defaults() -> dict[str, Any]: path = _find_pyproject() if path is None: return {} data = tomllib.loads(path.read_text(encoding="utf-8")) tool = data.get("tool") if not isinstance(tool, dict): return {} slop_farmer = tool.get("slop-farmer") if not isinstance(slop_farmer, dict): return {} return slop_farmer def _extract_command_config(raw: dict[str, Any], command: str) -> dict[str, Any]: value = raw.get(command) return value if isinstance(value, dict) else {} def _config_base_dir(config_path: Path) -> Path: return _find_project_root(config_path.parent.resolve()) def _resolve_config_path(config_path: Path, raw: str) -> str: path = Path(raw) if path.is_absolute(): return str(path) return str((_config_base_dir(config_path) / path).resolve()) def _resolve_command_paths(config_path: Path, values: dict[str, Any]) -> dict[str, Any]: resolved: dict[str, Any] = {} for key, value in values.items(): if key in PATH_LIKE_DEFAULT_KEYS and isinstance(value, str) and value: resolved[key] = _resolve_config_path(config_path, value) else: resolved[key] = value return resolved def _dashboard_config_defaults(config_path: Path) -> dict[str, dict[str, Any]]: if yaml is None: raise RuntimeError("PyYAML is required for --config support") payload = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} if not isinstance(payload, dict): raise ValueError(f"Expected mapping in config file: {config_path}") repo = payload.get("repo") workspace_raw = payload.get("workspace") dataset_id = payload.get("dataset_id") dashboard = payload.get("dashboard") analysis = payload.get("analysis") scrape = payload.get("scrape") pull_requests = payload.get("pull-requests") if dashboard is None: dashboard = {} if analysis is None: analysis = {} if scrape is None: scrape = {} if pull_requests is None: pull_requests = {} if not isinstance(dashboard, dict): raise ValueError(f"Expected dashboard mapping in config file: {config_path}") if not isinstance(analysis, dict): raise ValueError(f"Expected analysis mapping in config file: {config_path}") if not isinstance(scrape, dict): raise ValueError(f"Expected scrape mapping in config file: {config_path}") if not isinstance(pull_requests, dict): raise ValueError(f"Expected pull-requests mapping in config file: {config_path}") workspace_path = ( Path(_resolve_config_path(config_path, workspace_raw)) if isinstance(workspace_raw, str) and workspace_raw else None ) data_dir = workspace_path / "data" if workspace_path else None web_dir = workspace_path / "web" if workspace_path else None dashboard_dir = web_dir / "public" / "data" if web_dir else None dashboard_window_days = int(dashboard.get("window_days", 14)) contributor_window_days = int(dashboard.get("contributor_window_days", dashboard_window_days)) contributor_max_authors = int(dashboard.get("contributor_max_authors", 0)) template_cleanup = pull_requests.get("template_cleanup") or {} if not isinstance(template_cleanup, dict): raise ValueError( f"Expected pull-requests.template_cleanup mapping in config file: {config_path}" ) legacy_section_patterns = _string_tuple(pull_requests.get("template_strip_headings")) legacy_line_patterns = _string_tuple(pull_requests.get("template_strip_line_patterns")) pr_template_cleanup_mode = str( template_cleanup.get("mode", pull_requests.get("template_cleanup_mode", "merge_defaults")) ) pr_template_strip_html_comments = _bool_value( template_cleanup.get("strip_html_comments", True), field_name="pull-requests.template_cleanup.strip_html_comments", config_path=config_path, ) pr_template_trim_closing_reference_prefix = _bool_value( template_cleanup.get("trim_closing_reference_prefix", True), field_name="pull-requests.template_cleanup.trim_closing_reference_prefix", config_path=config_path, ) pr_template_section_patterns = ( _string_tuple(template_cleanup.get("section_patterns")) + legacy_section_patterns ) pr_template_line_patterns = ( _string_tuple(template_cleanup.get("line_patterns")) + legacy_line_patterns ) cluster_suppression_rules = _dict_tuple(pull_requests.get("cluster_suppression_rules")) tags = dashboard.get("tags") if isinstance(tags, list): tags_value = ",".join(str(tag).strip() for tag in tags if str(tag).strip()) else: tags_value = tags defaults: dict[str, dict[str, Any]] = { "scrape": { "repo": repo, "output-dir": str(data_dir) if data_dir else None, "hf-repo-id": dataset_id, "new-contributor-window-days": contributor_window_days, "new-contributor-max-authors": contributor_max_authors, }, "refresh-dataset": { "repo": repo, "hf-repo-id": dataset_id, "fetch-timeline": scrape.get("fetch-timeline"), "max-issues": scrape.get("max-issues"), "max-prs": scrape.get("max-prs"), "max-issue-comments": scrape.get("max-issue-comments"), "max-reviews-per-pr": scrape.get("max-reviews-per-pr"), "max-review-comments-per-pr": scrape.get("max-review-comments-per-pr"), "new-contributor-window-days": contributor_window_days, "new-contributor-max-authors": contributor_max_authors, "cluster-suppression-rules": cluster_suppression_rules, }, "analyze": { "output-dir": str(data_dir) if data_dir else None, "hf-repo-id": analysis.get("hf-repo-id", dataset_id), "model": analysis.get("model"), "ranking-backend": analysis.get("ranking_backend"), "max-clusters": analysis.get("max_clusters"), "hybrid-llm-concurrency": analysis.get("hybrid_llm_concurrency"), "cached_analysis": analysis.get("cached_analysis"), "open-prs-only": analysis.get("open_prs_only"), "pr-template-cleanup-mode": pr_template_cleanup_mode, "pr-template-strip-html-comments": pr_template_strip_html_comments, "pr-template-trim-closing-reference-prefix": pr_template_trim_closing_reference_prefix, "pr-template-section-patterns": pr_template_section_patterns, "pr-template-line-patterns": pr_template_line_patterns, "cluster-suppression-rules": cluster_suppression_rules, }, "pr-scope": { "output-dir": str(data_dir) if data_dir else None, "hf-repo-id": dataset_id, "cluster-suppression-rules": cluster_suppression_rules, }, "pr-search": { "output-dir": str(data_dir) if data_dir else None, "hf-repo-id": dataset_id, "cluster-suppression-rules": cluster_suppression_rules, }, "new-contributor-report": { "output-dir": str(data_dir) if data_dir else None, "hf-repo-id": dataset_id, "window-days": contributor_window_days, "max-authors": contributor_max_authors, }, "dashboard-data": { "output-dir": str(dashboard_dir) if dashboard_dir else None, "snapshot-root": str(data_dir / "snapshots") if data_dir else None, "hf-repo-id": dataset_id, "window-days": dashboard_window_days, }, "publish-analysis-artifacts": { "output-dir": str(data_dir) if data_dir else None, "hf-repo-id": dataset_id, }, "save-cache": { "output-dir": str(data_dir) if data_dir else None, "hf-repo-id": dataset_id, }, "deploy-dashboard": { "pipeline-data-dir": str(data_dir) if data_dir else None, "web-dir": str(web_dir) if web_dir else None, "hf-repo-id": dataset_id, "dashboard-window-days": dashboard_window_days, "contributor-window-days": contributor_window_days, "contributor-max-authors": contributor_max_authors, "space-id": dashboard.get("space_id"), "space-title": dashboard.get("title"), "space-emoji": dashboard.get("emoji"), "space-color-from": dashboard.get("color_from"), "space-color-to": dashboard.get("color_to"), "space-short-description": dashboard.get("short_description"), "dataset-id": dataset_id, "space-tags": tags_value, }, "dataset-status": { "repo": repo, "output-dir": str(data_dir) if data_dir else None, "hf-repo-id": dataset_id, }, } for command, values in defaults.items(): defaults[command] = {key: value for key, value in values.items() if value is not None} explicit_sections = {command: _extract_command_config(payload, command) for command in defaults} for command, values in explicit_sections.items(): if not values: continue defaults[command].update(_resolve_command_paths(config_path, values)) defaults["scrape"].update(_resolve_command_paths(config_path, scrape)) defaults["refresh-dataset"].update(_resolve_command_paths(config_path, scrape)) defaults["analyze"].update(_resolve_command_paths(config_path, analysis)) return defaults def project_cli_defaults(config_path: Path | None = None) -> dict[str, Any]: defaults = _pyproject_cli_defaults() if config_path is None: return defaults merged = dict(defaults) for command, values in _dashboard_config_defaults(config_path).items(): current = merged.get(command) if isinstance(current, dict): updated = dict(current) updated.update(values) merged[command] = updated else: merged[command] = dict(values) return merged def command_defaults(command: str, *, config_path: Path | None = None) -> dict[str, Any]: defaults = project_cli_defaults(config_path=config_path).get(command) if not isinstance(defaults, dict): return {} return defaults def extract_cli_config_path(argv: list[str] | None = None) -> Path | None: args = list(sys.argv[1:] if argv is None else argv) for index, arg in enumerate(args): if arg == "--config" and index + 1 < len(args): return Path(args[index + 1]).resolve() if arg.startswith("--config="): return Path(arg.split("=", 1)[1]).resolve() return None