diffusers-pr-api / src /slop_farmer /app_config.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import sys
import tomllib
from pathlib import Path
from typing import Any
import yaml
PATH_LIKE_DEFAULT_KEYS = {
"db",
"output-dir",
"workspace-root",
"workspace",
"pipeline-data-dir",
"web-dir",
"hf-materialize-dir",
"snapshot-dir",
"snapshot-root",
"analysis-input",
"contributors-input",
"pr-scope-input",
}
def _string_tuple(value: Any) -> tuple[str, ...]:
if not isinstance(value, list):
return ()
return tuple(str(item) for item in value if str(item).strip())
def _dict_tuple(value: Any) -> tuple[dict[str, Any], ...]:
if not isinstance(value, list):
return ()
return tuple(item for item in value if isinstance(item, dict))
def _bool_value(value: Any, *, field_name: str, config_path: Path) -> bool:
if isinstance(value, bool):
return value
raise ValueError(f"Expected boolean for {field_name} in config file: {config_path}")
def _find_project_root(start: Path) -> Path:
for directory in (start, *start.parents):
if (directory / "pyproject.toml").exists():
return directory
return start
def _find_pyproject() -> Path | None:
for directory in (Path.cwd(), *Path.cwd().parents):
path = directory / "pyproject.toml"
if path.exists():
return path
return None
def _pyproject_cli_defaults() -> dict[str, Any]:
path = _find_pyproject()
if path is None:
return {}
data = tomllib.loads(path.read_text(encoding="utf-8"))
tool = data.get("tool")
if not isinstance(tool, dict):
return {}
slop_farmer = tool.get("slop-farmer")
if not isinstance(slop_farmer, dict):
return {}
return slop_farmer
def _extract_command_config(raw: dict[str, Any], command: str) -> dict[str, Any]:
value = raw.get(command)
return value if isinstance(value, dict) else {}
def _config_base_dir(config_path: Path) -> Path:
return _find_project_root(config_path.parent.resolve())
def _resolve_config_path(config_path: Path, raw: str) -> str:
path = Path(raw)
if path.is_absolute():
return str(path)
return str((_config_base_dir(config_path) / path).resolve())
def _resolve_command_paths(config_path: Path, values: dict[str, Any]) -> dict[str, Any]:
resolved: dict[str, Any] = {}
for key, value in values.items():
if key in PATH_LIKE_DEFAULT_KEYS and isinstance(value, str) and value:
resolved[key] = _resolve_config_path(config_path, value)
else:
resolved[key] = value
return resolved
def _dashboard_config_defaults(config_path: Path) -> dict[str, dict[str, Any]]:
if yaml is None:
raise RuntimeError("PyYAML is required for --config support")
payload = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
if not isinstance(payload, dict):
raise ValueError(f"Expected mapping in config file: {config_path}")
repo = payload.get("repo")
workspace_raw = payload.get("workspace")
dataset_id = payload.get("dataset_id")
dashboard = payload.get("dashboard")
analysis = payload.get("analysis")
scrape = payload.get("scrape")
pull_requests = payload.get("pull-requests")
if dashboard is None:
dashboard = {}
if analysis is None:
analysis = {}
if scrape is None:
scrape = {}
if pull_requests is None:
pull_requests = {}
if not isinstance(dashboard, dict):
raise ValueError(f"Expected dashboard mapping in config file: {config_path}")
if not isinstance(analysis, dict):
raise ValueError(f"Expected analysis mapping in config file: {config_path}")
if not isinstance(scrape, dict):
raise ValueError(f"Expected scrape mapping in config file: {config_path}")
if not isinstance(pull_requests, dict):
raise ValueError(f"Expected pull-requests mapping in config file: {config_path}")
workspace_path = (
Path(_resolve_config_path(config_path, workspace_raw))
if isinstance(workspace_raw, str) and workspace_raw
else None
)
data_dir = workspace_path / "data" if workspace_path else None
web_dir = workspace_path / "web" if workspace_path else None
dashboard_dir = web_dir / "public" / "data" if web_dir else None
dashboard_window_days = int(dashboard.get("window_days", 14))
contributor_window_days = int(dashboard.get("contributor_window_days", dashboard_window_days))
contributor_max_authors = int(dashboard.get("contributor_max_authors", 0))
template_cleanup = pull_requests.get("template_cleanup") or {}
if not isinstance(template_cleanup, dict):
raise ValueError(
f"Expected pull-requests.template_cleanup mapping in config file: {config_path}"
)
legacy_section_patterns = _string_tuple(pull_requests.get("template_strip_headings"))
legacy_line_patterns = _string_tuple(pull_requests.get("template_strip_line_patterns"))
pr_template_cleanup_mode = str(
template_cleanup.get("mode", pull_requests.get("template_cleanup_mode", "merge_defaults"))
)
pr_template_strip_html_comments = _bool_value(
template_cleanup.get("strip_html_comments", True),
field_name="pull-requests.template_cleanup.strip_html_comments",
config_path=config_path,
)
pr_template_trim_closing_reference_prefix = _bool_value(
template_cleanup.get("trim_closing_reference_prefix", True),
field_name="pull-requests.template_cleanup.trim_closing_reference_prefix",
config_path=config_path,
)
pr_template_section_patterns = (
_string_tuple(template_cleanup.get("section_patterns")) + legacy_section_patterns
)
pr_template_line_patterns = (
_string_tuple(template_cleanup.get("line_patterns")) + legacy_line_patterns
)
cluster_suppression_rules = _dict_tuple(pull_requests.get("cluster_suppression_rules"))
tags = dashboard.get("tags")
if isinstance(tags, list):
tags_value = ",".join(str(tag).strip() for tag in tags if str(tag).strip())
else:
tags_value = tags
defaults: dict[str, dict[str, Any]] = {
"scrape": {
"repo": repo,
"output-dir": str(data_dir) if data_dir else None,
"hf-repo-id": dataset_id,
"new-contributor-window-days": contributor_window_days,
"new-contributor-max-authors": contributor_max_authors,
},
"refresh-dataset": {
"repo": repo,
"hf-repo-id": dataset_id,
"fetch-timeline": scrape.get("fetch-timeline"),
"max-issues": scrape.get("max-issues"),
"max-prs": scrape.get("max-prs"),
"max-issue-comments": scrape.get("max-issue-comments"),
"max-reviews-per-pr": scrape.get("max-reviews-per-pr"),
"max-review-comments-per-pr": scrape.get("max-review-comments-per-pr"),
"new-contributor-window-days": contributor_window_days,
"new-contributor-max-authors": contributor_max_authors,
"cluster-suppression-rules": cluster_suppression_rules,
},
"analyze": {
"output-dir": str(data_dir) if data_dir else None,
"hf-repo-id": analysis.get("hf-repo-id", dataset_id),
"model": analysis.get("model"),
"ranking-backend": analysis.get("ranking_backend"),
"max-clusters": analysis.get("max_clusters"),
"hybrid-llm-concurrency": analysis.get("hybrid_llm_concurrency"),
"cached_analysis": analysis.get("cached_analysis"),
"open-prs-only": analysis.get("open_prs_only"),
"pr-template-cleanup-mode": pr_template_cleanup_mode,
"pr-template-strip-html-comments": pr_template_strip_html_comments,
"pr-template-trim-closing-reference-prefix": pr_template_trim_closing_reference_prefix,
"pr-template-section-patterns": pr_template_section_patterns,
"pr-template-line-patterns": pr_template_line_patterns,
"cluster-suppression-rules": cluster_suppression_rules,
},
"pr-scope": {
"output-dir": str(data_dir) if data_dir else None,
"hf-repo-id": dataset_id,
"cluster-suppression-rules": cluster_suppression_rules,
},
"pr-search": {
"output-dir": str(data_dir) if data_dir else None,
"hf-repo-id": dataset_id,
"cluster-suppression-rules": cluster_suppression_rules,
},
"new-contributor-report": {
"output-dir": str(data_dir) if data_dir else None,
"hf-repo-id": dataset_id,
"window-days": contributor_window_days,
"max-authors": contributor_max_authors,
},
"dashboard-data": {
"output-dir": str(dashboard_dir) if dashboard_dir else None,
"snapshot-root": str(data_dir / "snapshots") if data_dir else None,
"hf-repo-id": dataset_id,
"window-days": dashboard_window_days,
},
"publish-analysis-artifacts": {
"output-dir": str(data_dir) if data_dir else None,
"hf-repo-id": dataset_id,
},
"save-cache": {
"output-dir": str(data_dir) if data_dir else None,
"hf-repo-id": dataset_id,
},
"deploy-dashboard": {
"pipeline-data-dir": str(data_dir) if data_dir else None,
"web-dir": str(web_dir) if web_dir else None,
"hf-repo-id": dataset_id,
"dashboard-window-days": dashboard_window_days,
"contributor-window-days": contributor_window_days,
"contributor-max-authors": contributor_max_authors,
"space-id": dashboard.get("space_id"),
"space-title": dashboard.get("title"),
"space-emoji": dashboard.get("emoji"),
"space-color-from": dashboard.get("color_from"),
"space-color-to": dashboard.get("color_to"),
"space-short-description": dashboard.get("short_description"),
"dataset-id": dataset_id,
"space-tags": tags_value,
},
"dataset-status": {
"repo": repo,
"output-dir": str(data_dir) if data_dir else None,
"hf-repo-id": dataset_id,
},
}
for command, values in defaults.items():
defaults[command] = {key: value for key, value in values.items() if value is not None}
explicit_sections = {command: _extract_command_config(payload, command) for command in defaults}
for command, values in explicit_sections.items():
if not values:
continue
defaults[command].update(_resolve_command_paths(config_path, values))
defaults["scrape"].update(_resolve_command_paths(config_path, scrape))
defaults["refresh-dataset"].update(_resolve_command_paths(config_path, scrape))
defaults["analyze"].update(_resolve_command_paths(config_path, analysis))
return defaults
def project_cli_defaults(config_path: Path | None = None) -> dict[str, Any]:
defaults = _pyproject_cli_defaults()
if config_path is None:
return defaults
merged = dict(defaults)
for command, values in _dashboard_config_defaults(config_path).items():
current = merged.get(command)
if isinstance(current, dict):
updated = dict(current)
updated.update(values)
merged[command] = updated
else:
merged[command] = dict(values)
return merged
def command_defaults(command: str, *, config_path: Path | None = None) -> dict[str, Any]:
defaults = project_cli_defaults(config_path=config_path).get(command)
if not isinstance(defaults, dict):
return {}
return defaults
def extract_cli_config_path(argv: list[str] | None = None) -> Path | None:
args = list(sys.argv[1:] if argv is None else argv)
for index, arg in enumerate(args):
if arg == "--config" and index + 1 < len(args):
return Path(args[index + 1]).resolve()
if arg.startswith("--config="):
return Path(arg.split("=", 1)[1]).resolve()
return None