diffusers-pr-api / src /slop_farmer /reports /pr_heuristics.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import re
from collections import defaultdict
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from typing import Any
HTML_COMMENT_PATTERN = re.compile(r"<!--.*?-->", re.DOTALL)
DEFAULT_TEMPLATE_CLEANUP_MODE = "merge_defaults"
DEFAULT_STRIP_HTML_COMMENTS = True
DEFAULT_TRIM_CLOSING_REFERENCE_PREFIX = True
DEFAULT_TEMPLATE_SECTION_PATTERNS = (
r"^#{1,6}\s*code agent policy\s*$",
r"^#{1,6}\s*before submitting\s*$",
r"^#{1,6}\s*who can review\?\s*$",
)
DEFAULT_TEMPLATE_LINE_PATTERNS = (
r"^#{1,6}\s*what does this pr do\?\s*$",
r"^(?:fix(?:e[sd])?|close[sd]?|resolve[sd]?)\s*#\s*\(?issue\)?\s*$",
)
PR_TEMPLATE_CLOSING_REFERENCE_PREFIX_PATTERN = re.compile(
r"""
^
(?P<prefix>\s*(?:fix(?:e[sd])?|close[sd]?|resolve[sd]?)\s+)
(?:
(?:[a-z0-9_.-]+/[a-z0-9_.-]+)?\#\s*\d+
(?:\s*(?:,|and)\s*(?:[a-z0-9_.-]+/[a-z0-9_.-]+)?\#\s*\d+)*
)
\s*(?:[:\-\u2013\u2014]\s*)?
(?P<rest>.*)
$
""",
re.IGNORECASE | re.VERBOSE,
)
def compile_casefold_patterns(patterns: Sequence[str]) -> tuple[re.Pattern[str], ...]:
return tuple(re.compile(pattern, re.IGNORECASE) for pattern in patterns if pattern.strip())
@dataclass(slots=True, frozen=True)
class TemplateCleanupSettings:
strip_html_comments: bool
trim_closing_reference_prefix: bool
section_patterns: tuple[re.Pattern[str], ...]
line_patterns: tuple[re.Pattern[str], ...]
def build_template_cleanup_settings(
*,
mode: str = DEFAULT_TEMPLATE_CLEANUP_MODE,
strip_html_comments: bool = DEFAULT_STRIP_HTML_COMMENTS,
trim_closing_reference_prefix: bool = DEFAULT_TRIM_CLOSING_REFERENCE_PREFIX,
section_patterns: Sequence[str] = (),
line_patterns: Sequence[str] = (),
) -> TemplateCleanupSettings:
if mode == "off":
return TemplateCleanupSettings(
strip_html_comments=False,
trim_closing_reference_prefix=False,
section_patterns=(),
line_patterns=(),
)
if mode == "merge_defaults":
section_sources = (*DEFAULT_TEMPLATE_SECTION_PATTERNS, *section_patterns)
line_sources = (*DEFAULT_TEMPLATE_LINE_PATTERNS, *line_patterns)
elif mode == "replace_defaults":
section_sources = tuple(section_patterns)
line_sources = tuple(line_patterns)
else:
raise ValueError(f"Unknown PR template cleanup mode: {mode}")
return TemplateCleanupSettings(
strip_html_comments=strip_html_comments,
trim_closing_reference_prefix=trim_closing_reference_prefix,
section_patterns=compile_casefold_patterns(section_sources),
line_patterns=compile_casefold_patterns(line_sources),
)
def strip_pull_request_template(
body: str | None,
*,
settings: TemplateCleanupSettings | None = None,
) -> str:
text = (body or "").replace("\r\n", "\n").replace("\r", "\n")
if not text:
return ""
cleanup = settings or build_template_cleanup_settings()
if cleanup.strip_html_comments:
text = HTML_COMMENT_PATTERN.sub("\n", text)
cleaned_lines: list[str] = []
skip_section = False
for raw_line in text.splitlines():
line = raw_line.rstrip()
normalized = line.strip()
if any(pattern.match(normalized) for pattern in cleanup.line_patterns):
continue
if any(pattern.match(normalized) for pattern in cleanup.section_patterns):
skip_section = True
continue
if skip_section:
if normalized.startswith("#"):
skip_section = False
else:
continue
if cleanup.trim_closing_reference_prefix:
trimmed_reference = _trim_closing_reference_prefix(normalized)
if trimmed_reference == "":
continue
if trimmed_reference is not None:
cleaned_lines.append(trimmed_reference)
continue
cleaned_lines.append(line)
return collapse_blank_lines(cleaned_lines)
def collapse_blank_lines(lines: list[str]) -> str:
collapsed: list[str] = []
previous_blank = True
for line in lines:
stripped = line.strip()
if not stripped:
if previous_blank:
continue
collapsed.append("")
previous_blank = True
continue
collapsed.append(stripped)
previous_blank = False
while collapsed and not collapsed[-1]:
collapsed.pop()
return "\n".join(collapsed)
def _trim_closing_reference_prefix(line: str) -> str | None:
match = PR_TEMPLATE_CLOSING_REFERENCE_PREFIX_PATTERN.match(line)
if match is None:
return None
return match.group("rest").strip()
@dataclass(slots=True, frozen=True)
class ClusterSuppressionRule:
id: str
title_patterns: tuple[re.Pattern[str], ...] = ()
body_patterns: tuple[re.Pattern[str], ...] = ()
path_patterns: tuple[re.Pattern[str], ...] = ()
def matches(self, *, title: str, body: str, paths: Sequence[str]) -> bool:
if not (self.title_patterns or self.body_patterns or self.path_patterns):
return False
if self.title_patterns and not any(
pattern.search(title) for pattern in self.title_patterns
):
return False
if self.body_patterns and not any(pattern.search(body) for pattern in self.body_patterns):
return False
return not self.path_patterns or any(
pattern.search(path) for pattern in self.path_patterns for path in paths
)
def compile_cluster_suppression_rules(
payload: Sequence[Mapping[str, Any]],
) -> tuple[ClusterSuppressionRule, ...]:
rules: list[ClusterSuppressionRule] = []
for index, raw_rule in enumerate(payload, start=1):
rule_id = str(raw_rule.get("id") or raw_rule.get("name") or f"rule-{index}").strip()
if not rule_id:
rule_id = f"rule-{index}"
rules.append(
ClusterSuppressionRule(
id=rule_id,
title_patterns=compile_casefold_patterns(
_string_list(raw_rule.get("title_patterns"))
),
body_patterns=compile_casefold_patterns(
_string_list(raw_rule.get("body_patterns"))
),
path_patterns=compile_casefold_patterns(
_string_list(raw_rule.get("path_patterns"))
),
)
)
return tuple(rules)
def suppressed_pull_request_reasons(
pull_requests: Sequence[Mapping[str, Any]],
pr_files: Sequence[Mapping[str, Any]],
rules: Sequence[ClusterSuppressionRule],
) -> dict[int, list[str]]:
if not rules:
return {}
paths_by_pr: defaultdict[int, list[str]] = defaultdict(list)
for row in pr_files:
pr_number = row.get("pull_request_number")
filename = str(row.get("filename") or "").strip()
if pr_number is None or not filename:
continue
paths_by_pr[int(pr_number)].append(filename)
suppressed: dict[int, list[str]] = {}
for row in pull_requests:
number = row.get("number")
if number is None:
continue
pr_number = int(number)
title = str(row.get("title") or "")
body = str(row.get("body") or "")
matched = [
rule.id
for rule in rules
if rule.matches(title=title, body=body, paths=paths_by_pr.get(pr_number, []))
]
if matched:
suppressed[pr_number] = matched
return suppressed
def _string_list(value: Any) -> tuple[str, ...]:
if not isinstance(value, list):
return ()
return tuple(str(item) for item in value if str(item).strip())