evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import re
from collections import defaultdict
from typing import Any
REFERENCE_PATTERN = re.compile(
r"(?:(?P<owner>[A-Za-z0-9_.-]+)/(?P<repo>[A-Za-z0-9_.-]+))?#(?P<number>\d+)"
)
CLOSING_KEYWORD_PATTERN = re.compile(
r"\b(?P<verb>close[sd]?|fix(?:e[sd])?|resolve[sd]?)\s+(?:(?P<owner>[A-Za-z0-9_.-]+)/(?P<repo>[A-Za-z0-9_.-]+))?#(?P<number>\d+)\b",
flags=re.IGNORECASE,
)
DUPLICATE_REFERENCE_PATTERN = re.compile(
r"\bduplicate\s+of\s+(?:(?P<owner>[A-Za-z0-9_.-]+)/(?P<repo>[A-Za-z0-9_.-]+))?#(?P<number>\d+)\b",
flags=re.IGNORECASE,
)
def extract_references(
text: str | None, default_owner: str, default_repo: str
) -> list[dict[str, Any]]:
if not text:
return []
refs: list[dict[str, Any]] = []
for match in REFERENCE_PATTERN.finditer(text):
refs.append(
{
"target_owner": match.group("owner") or default_owner,
"target_repo": match.group("repo") or default_repo,
"target_number": int(match.group("number")),
"reference_kind": "mention",
}
)
for match in CLOSING_KEYWORD_PATTERN.finditer(text):
refs.append(
{
"target_owner": match.group("owner") or default_owner,
"target_repo": match.group("repo") or default_repo,
"target_number": int(match.group("number")),
"reference_kind": "closing_reference",
"verb": match.group("verb").lower(),
}
)
for match in DUPLICATE_REFERENCE_PATTERN.finditer(text):
refs.append(
{
"target_owner": match.group("owner") or default_owner,
"target_repo": match.group("repo") or default_repo,
"target_number": int(match.group("number")),
"reference_kind": "duplicate_reference",
}
)
return refs
def build_text_link_rows(
*,
repo: str,
owner: str,
repo_name: str,
source_type: str,
source_number: int,
source_id: int | None,
body: str | None,
snapshot_id: str,
extracted_at: str,
) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
seen: set[tuple[str, str, int, str]] = set()
for ref in extract_references(body, owner, repo_name):
key = (ref["target_owner"], ref["target_repo"], ref["target_number"], ref["reference_kind"])
if key in seen:
continue
seen.add(key)
rows.append(
{
"repo": repo,
"source_type": source_type,
"source_number": source_number,
"source_github_id": source_id,
"target_owner": ref["target_owner"],
"target_repo": ref["target_repo"],
"target_number": ref["target_number"],
"link_type": ref["reference_kind"],
"link_origin": "text",
"snapshot_id": snapshot_id,
"extracted_at": extracted_at,
}
)
return rows
def build_pr_duplicate_candidate_rows(
*,
repo: str,
pull_requests: list[dict[str, Any]],
link_rows: list[dict[str, Any]],
snapshot_id: str,
extracted_at: str,
) -> list[dict[str, Any]]:
pr_targets: dict[int, set[int]] = defaultdict(set)
for row in link_rows:
if row["source_type"] != "pull_request":
continue
if (
row["target_owner"] != repo.split("/", 1)[0]
or row["target_repo"] != repo.split("/", 1)[1]
):
continue
pr_targets[row["source_number"]].add(row["target_number"])
target_to_prs: dict[int, list[int]] = defaultdict(list)
for pr_row in pull_requests:
number = pr_row["number"]
for target in sorted(pr_targets.get(number, set())):
target_to_prs[target].append(number)
rows: list[dict[str, Any]] = []
for target_number, prs in target_to_prs.items():
if len(prs) < 2:
continue
for pr_number in prs:
rows.append(
{
"repo": repo,
"source_type": "pull_request",
"source_number": pr_number,
"source_github_id": None,
"target_owner": repo.split("/", 1)[0],
"target_repo": repo.split("/", 1)[1],
"target_number": target_number,
"link_type": "shared_issue_target",
"link_origin": "derived",
"snapshot_id": snapshot_id,
"extracted_at": extracted_at,
}
)
return rows