Spaces:
Sleeping
Sleeping
File size: 4,734 Bytes
dbf7313 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | from __future__ import annotations
import re
from collections import defaultdict
from typing import Any
REFERENCE_PATTERN = re.compile(
r"(?:(?P<owner>[A-Za-z0-9_.-]+)/(?P<repo>[A-Za-z0-9_.-]+))?#(?P<number>\d+)"
)
CLOSING_KEYWORD_PATTERN = re.compile(
r"\b(?P<verb>close[sd]?|fix(?:e[sd])?|resolve[sd]?)\s+(?:(?P<owner>[A-Za-z0-9_.-]+)/(?P<repo>[A-Za-z0-9_.-]+))?#(?P<number>\d+)\b",
flags=re.IGNORECASE,
)
DUPLICATE_REFERENCE_PATTERN = re.compile(
r"\bduplicate\s+of\s+(?:(?P<owner>[A-Za-z0-9_.-]+)/(?P<repo>[A-Za-z0-9_.-]+))?#(?P<number>\d+)\b",
flags=re.IGNORECASE,
)
def extract_references(
text: str | None, default_owner: str, default_repo: str
) -> list[dict[str, Any]]:
if not text:
return []
refs: list[dict[str, Any]] = []
for match in REFERENCE_PATTERN.finditer(text):
refs.append(
{
"target_owner": match.group("owner") or default_owner,
"target_repo": match.group("repo") or default_repo,
"target_number": int(match.group("number")),
"reference_kind": "mention",
}
)
for match in CLOSING_KEYWORD_PATTERN.finditer(text):
refs.append(
{
"target_owner": match.group("owner") or default_owner,
"target_repo": match.group("repo") or default_repo,
"target_number": int(match.group("number")),
"reference_kind": "closing_reference",
"verb": match.group("verb").lower(),
}
)
for match in DUPLICATE_REFERENCE_PATTERN.finditer(text):
refs.append(
{
"target_owner": match.group("owner") or default_owner,
"target_repo": match.group("repo") or default_repo,
"target_number": int(match.group("number")),
"reference_kind": "duplicate_reference",
}
)
return refs
def build_text_link_rows(
*,
repo: str,
owner: str,
repo_name: str,
source_type: str,
source_number: int,
source_id: int | None,
body: str | None,
snapshot_id: str,
extracted_at: str,
) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
seen: set[tuple[str, str, int, str]] = set()
for ref in extract_references(body, owner, repo_name):
key = (ref["target_owner"], ref["target_repo"], ref["target_number"], ref["reference_kind"])
if key in seen:
continue
seen.add(key)
rows.append(
{
"repo": repo,
"source_type": source_type,
"source_number": source_number,
"source_github_id": source_id,
"target_owner": ref["target_owner"],
"target_repo": ref["target_repo"],
"target_number": ref["target_number"],
"link_type": ref["reference_kind"],
"link_origin": "text",
"snapshot_id": snapshot_id,
"extracted_at": extracted_at,
}
)
return rows
def build_pr_duplicate_candidate_rows(
*,
repo: str,
pull_requests: list[dict[str, Any]],
link_rows: list[dict[str, Any]],
snapshot_id: str,
extracted_at: str,
) -> list[dict[str, Any]]:
pr_targets: dict[int, set[int]] = defaultdict(set)
for row in link_rows:
if row["source_type"] != "pull_request":
continue
if (
row["target_owner"] != repo.split("/", 1)[0]
or row["target_repo"] != repo.split("/", 1)[1]
):
continue
pr_targets[row["source_number"]].add(row["target_number"])
target_to_prs: dict[int, list[int]] = defaultdict(list)
for pr_row in pull_requests:
number = pr_row["number"]
for target in sorted(pr_targets.get(number, set())):
target_to_prs[target].append(number)
rows: list[dict[str, Any]] = []
for target_number, prs in target_to_prs.items():
if len(prs) < 2:
continue
for pr_number in prs:
rows.append(
{
"repo": repo,
"source_type": "pull_request",
"source_number": pr_number,
"source_github_id": None,
"target_owner": repo.split("/", 1)[0],
"target_repo": repo.split("/", 1)[1],
"target_number": target_number,
"link_type": "shared_issue_target",
"link_origin": "derived",
"snapshot_id": snapshot_id,
"extracted_at": extracted_at,
}
)
return rows
|