Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import sys | |
| import tomllib | |
| from pathlib import Path | |
| from typing import Any | |
| import yaml | |
| PATH_LIKE_DEFAULT_KEYS = { | |
| "db", | |
| "output-dir", | |
| "workspace-root", | |
| "workspace", | |
| "pipeline-data-dir", | |
| "web-dir", | |
| "hf-materialize-dir", | |
| "snapshot-dir", | |
| "snapshot-root", | |
| "analysis-input", | |
| "contributors-input", | |
| "pr-scope-input", | |
| } | |
| def _string_tuple(value: Any) -> tuple[str, ...]: | |
| if not isinstance(value, list): | |
| return () | |
| return tuple(str(item) for item in value if str(item).strip()) | |
| def _dict_tuple(value: Any) -> tuple[dict[str, Any], ...]: | |
| if not isinstance(value, list): | |
| return () | |
| return tuple(item for item in value if isinstance(item, dict)) | |
| def _bool_value(value: Any, *, field_name: str, config_path: Path) -> bool: | |
| if isinstance(value, bool): | |
| return value | |
| raise ValueError(f"Expected boolean for {field_name} in config file: {config_path}") | |
| def _find_project_root(start: Path) -> Path: | |
| for directory in (start, *start.parents): | |
| if (directory / "pyproject.toml").exists(): | |
| return directory | |
| return start | |
| def _find_pyproject() -> Path | None: | |
| for directory in (Path.cwd(), *Path.cwd().parents): | |
| path = directory / "pyproject.toml" | |
| if path.exists(): | |
| return path | |
| return None | |
| def _pyproject_cli_defaults() -> dict[str, Any]: | |
| path = _find_pyproject() | |
| if path is None: | |
| return {} | |
| data = tomllib.loads(path.read_text(encoding="utf-8")) | |
| tool = data.get("tool") | |
| if not isinstance(tool, dict): | |
| return {} | |
| slop_farmer = tool.get("slop-farmer") | |
| if not isinstance(slop_farmer, dict): | |
| return {} | |
| return slop_farmer | |
| def _extract_command_config(raw: dict[str, Any], command: str) -> dict[str, Any]: | |
| value = raw.get(command) | |
| return value if isinstance(value, dict) else {} | |
| def _config_base_dir(config_path: Path) -> Path: | |
| return _find_project_root(config_path.parent.resolve()) | |
| def _resolve_config_path(config_path: Path, raw: str) -> str: | |
| path = Path(raw) | |
| if path.is_absolute(): | |
| return str(path) | |
| return str((_config_base_dir(config_path) / path).resolve()) | |
| def _resolve_command_paths(config_path: Path, values: dict[str, Any]) -> dict[str, Any]: | |
| resolved: dict[str, Any] = {} | |
| for key, value in values.items(): | |
| if key in PATH_LIKE_DEFAULT_KEYS and isinstance(value, str) and value: | |
| resolved[key] = _resolve_config_path(config_path, value) | |
| else: | |
| resolved[key] = value | |
| return resolved | |
| def _dashboard_config_defaults(config_path: Path) -> dict[str, dict[str, Any]]: | |
| if yaml is None: | |
| raise RuntimeError("PyYAML is required for --config support") | |
| payload = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} | |
| if not isinstance(payload, dict): | |
| raise ValueError(f"Expected mapping in config file: {config_path}") | |
| repo = payload.get("repo") | |
| workspace_raw = payload.get("workspace") | |
| dataset_id = payload.get("dataset_id") | |
| dashboard = payload.get("dashboard") | |
| analysis = payload.get("analysis") | |
| scrape = payload.get("scrape") | |
| pull_requests = payload.get("pull-requests") | |
| if dashboard is None: | |
| dashboard = {} | |
| if analysis is None: | |
| analysis = {} | |
| if scrape is None: | |
| scrape = {} | |
| if pull_requests is None: | |
| pull_requests = {} | |
| if not isinstance(dashboard, dict): | |
| raise ValueError(f"Expected dashboard mapping in config file: {config_path}") | |
| if not isinstance(analysis, dict): | |
| raise ValueError(f"Expected analysis mapping in config file: {config_path}") | |
| if not isinstance(scrape, dict): | |
| raise ValueError(f"Expected scrape mapping in config file: {config_path}") | |
| if not isinstance(pull_requests, dict): | |
| raise ValueError(f"Expected pull-requests mapping in config file: {config_path}") | |
| workspace_path = ( | |
| Path(_resolve_config_path(config_path, workspace_raw)) | |
| if isinstance(workspace_raw, str) and workspace_raw | |
| else None | |
| ) | |
| data_dir = workspace_path / "data" if workspace_path else None | |
| web_dir = workspace_path / "web" if workspace_path else None | |
| dashboard_dir = web_dir / "public" / "data" if web_dir else None | |
| dashboard_window_days = int(dashboard.get("window_days", 14)) | |
| contributor_window_days = int(dashboard.get("contributor_window_days", dashboard_window_days)) | |
| contributor_max_authors = int(dashboard.get("contributor_max_authors", 0)) | |
| template_cleanup = pull_requests.get("template_cleanup") or {} | |
| if not isinstance(template_cleanup, dict): | |
| raise ValueError( | |
| f"Expected pull-requests.template_cleanup mapping in config file: {config_path}" | |
| ) | |
| legacy_section_patterns = _string_tuple(pull_requests.get("template_strip_headings")) | |
| legacy_line_patterns = _string_tuple(pull_requests.get("template_strip_line_patterns")) | |
| pr_template_cleanup_mode = str( | |
| template_cleanup.get("mode", pull_requests.get("template_cleanup_mode", "merge_defaults")) | |
| ) | |
| pr_template_strip_html_comments = _bool_value( | |
| template_cleanup.get("strip_html_comments", True), | |
| field_name="pull-requests.template_cleanup.strip_html_comments", | |
| config_path=config_path, | |
| ) | |
| pr_template_trim_closing_reference_prefix = _bool_value( | |
| template_cleanup.get("trim_closing_reference_prefix", True), | |
| field_name="pull-requests.template_cleanup.trim_closing_reference_prefix", | |
| config_path=config_path, | |
| ) | |
| pr_template_section_patterns = ( | |
| _string_tuple(template_cleanup.get("section_patterns")) + legacy_section_patterns | |
| ) | |
| pr_template_line_patterns = ( | |
| _string_tuple(template_cleanup.get("line_patterns")) + legacy_line_patterns | |
| ) | |
| cluster_suppression_rules = _dict_tuple(pull_requests.get("cluster_suppression_rules")) | |
| tags = dashboard.get("tags") | |
| if isinstance(tags, list): | |
| tags_value = ",".join(str(tag).strip() for tag in tags if str(tag).strip()) | |
| else: | |
| tags_value = tags | |
| defaults: dict[str, dict[str, Any]] = { | |
| "scrape": { | |
| "repo": repo, | |
| "output-dir": str(data_dir) if data_dir else None, | |
| "hf-repo-id": dataset_id, | |
| "new-contributor-window-days": contributor_window_days, | |
| "new-contributor-max-authors": contributor_max_authors, | |
| }, | |
| "refresh-dataset": { | |
| "repo": repo, | |
| "hf-repo-id": dataset_id, | |
| "fetch-timeline": scrape.get("fetch-timeline"), | |
| "max-issues": scrape.get("max-issues"), | |
| "max-prs": scrape.get("max-prs"), | |
| "max-issue-comments": scrape.get("max-issue-comments"), | |
| "max-reviews-per-pr": scrape.get("max-reviews-per-pr"), | |
| "max-review-comments-per-pr": scrape.get("max-review-comments-per-pr"), | |
| "new-contributor-window-days": contributor_window_days, | |
| "new-contributor-max-authors": contributor_max_authors, | |
| "cluster-suppression-rules": cluster_suppression_rules, | |
| }, | |
| "analyze": { | |
| "output-dir": str(data_dir) if data_dir else None, | |
| "hf-repo-id": analysis.get("hf-repo-id", dataset_id), | |
| "model": analysis.get("model"), | |
| "ranking-backend": analysis.get("ranking_backend"), | |
| "max-clusters": analysis.get("max_clusters"), | |
| "hybrid-llm-concurrency": analysis.get("hybrid_llm_concurrency"), | |
| "cached_analysis": analysis.get("cached_analysis"), | |
| "open-prs-only": analysis.get("open_prs_only"), | |
| "pr-template-cleanup-mode": pr_template_cleanup_mode, | |
| "pr-template-strip-html-comments": pr_template_strip_html_comments, | |
| "pr-template-trim-closing-reference-prefix": pr_template_trim_closing_reference_prefix, | |
| "pr-template-section-patterns": pr_template_section_patterns, | |
| "pr-template-line-patterns": pr_template_line_patterns, | |
| "cluster-suppression-rules": cluster_suppression_rules, | |
| }, | |
| "pr-scope": { | |
| "output-dir": str(data_dir) if data_dir else None, | |
| "hf-repo-id": dataset_id, | |
| "cluster-suppression-rules": cluster_suppression_rules, | |
| }, | |
| "pr-search": { | |
| "output-dir": str(data_dir) if data_dir else None, | |
| "hf-repo-id": dataset_id, | |
| "cluster-suppression-rules": cluster_suppression_rules, | |
| }, | |
| "new-contributor-report": { | |
| "output-dir": str(data_dir) if data_dir else None, | |
| "hf-repo-id": dataset_id, | |
| "window-days": contributor_window_days, | |
| "max-authors": contributor_max_authors, | |
| }, | |
| "dashboard-data": { | |
| "output-dir": str(dashboard_dir) if dashboard_dir else None, | |
| "snapshot-root": str(data_dir / "snapshots") if data_dir else None, | |
| "hf-repo-id": dataset_id, | |
| "window-days": dashboard_window_days, | |
| }, | |
| "publish-analysis-artifacts": { | |
| "output-dir": str(data_dir) if data_dir else None, | |
| "hf-repo-id": dataset_id, | |
| }, | |
| "save-cache": { | |
| "output-dir": str(data_dir) if data_dir else None, | |
| "hf-repo-id": dataset_id, | |
| }, | |
| "deploy-dashboard": { | |
| "pipeline-data-dir": str(data_dir) if data_dir else None, | |
| "web-dir": str(web_dir) if web_dir else None, | |
| "hf-repo-id": dataset_id, | |
| "dashboard-window-days": dashboard_window_days, | |
| "contributor-window-days": contributor_window_days, | |
| "contributor-max-authors": contributor_max_authors, | |
| "space-id": dashboard.get("space_id"), | |
| "space-title": dashboard.get("title"), | |
| "space-emoji": dashboard.get("emoji"), | |
| "space-color-from": dashboard.get("color_from"), | |
| "space-color-to": dashboard.get("color_to"), | |
| "space-short-description": dashboard.get("short_description"), | |
| "dataset-id": dataset_id, | |
| "space-tags": tags_value, | |
| }, | |
| "dataset-status": { | |
| "repo": repo, | |
| "output-dir": str(data_dir) if data_dir else None, | |
| "hf-repo-id": dataset_id, | |
| }, | |
| } | |
| for command, values in defaults.items(): | |
| defaults[command] = {key: value for key, value in values.items() if value is not None} | |
| explicit_sections = {command: _extract_command_config(payload, command) for command in defaults} | |
| for command, values in explicit_sections.items(): | |
| if not values: | |
| continue | |
| defaults[command].update(_resolve_command_paths(config_path, values)) | |
| defaults["scrape"].update(_resolve_command_paths(config_path, scrape)) | |
| defaults["refresh-dataset"].update(_resolve_command_paths(config_path, scrape)) | |
| defaults["analyze"].update(_resolve_command_paths(config_path, analysis)) | |
| return defaults | |
| def project_cli_defaults(config_path: Path | None = None) -> dict[str, Any]: | |
| defaults = _pyproject_cli_defaults() | |
| if config_path is None: | |
| return defaults | |
| merged = dict(defaults) | |
| for command, values in _dashboard_config_defaults(config_path).items(): | |
| current = merged.get(command) | |
| if isinstance(current, dict): | |
| updated = dict(current) | |
| updated.update(values) | |
| merged[command] = updated | |
| else: | |
| merged[command] = dict(values) | |
| return merged | |
| def command_defaults(command: str, *, config_path: Path | None = None) -> dict[str, Any]: | |
| defaults = project_cli_defaults(config_path=config_path).get(command) | |
| if not isinstance(defaults, dict): | |
| return {} | |
| return defaults | |
| def extract_cli_config_path(argv: list[str] | None = None) -> Path | None: | |
| args = list(sys.argv[1:] if argv is None else argv) | |
| for index, arg in enumerate(args): | |
| if arg == "--config" and index + 1 < len(args): | |
| return Path(args[index + 1]).resolve() | |
| if arg.startswith("--config="): | |
| return Path(arg.split("=", 1)[1]).resolve() | |
| return None | |