File size: 4,734 Bytes
dbf7313
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from __future__ import annotations

import re
from collections import defaultdict
from typing import Any

REFERENCE_PATTERN = re.compile(
    r"(?:(?P<owner>[A-Za-z0-9_.-]+)/(?P<repo>[A-Za-z0-9_.-]+))?#(?P<number>\d+)"
)
CLOSING_KEYWORD_PATTERN = re.compile(
    r"\b(?P<verb>close[sd]?|fix(?:e[sd])?|resolve[sd]?)\s+(?:(?P<owner>[A-Za-z0-9_.-]+)/(?P<repo>[A-Za-z0-9_.-]+))?#(?P<number>\d+)\b",
    flags=re.IGNORECASE,
)
DUPLICATE_REFERENCE_PATTERN = re.compile(
    r"\bduplicate\s+of\s+(?:(?P<owner>[A-Za-z0-9_.-]+)/(?P<repo>[A-Za-z0-9_.-]+))?#(?P<number>\d+)\b",
    flags=re.IGNORECASE,
)


def extract_references(
    text: str | None, default_owner: str, default_repo: str
) -> list[dict[str, Any]]:
    if not text:
        return []
    refs: list[dict[str, Any]] = []
    for match in REFERENCE_PATTERN.finditer(text):
        refs.append(
            {
                "target_owner": match.group("owner") or default_owner,
                "target_repo": match.group("repo") or default_repo,
                "target_number": int(match.group("number")),
                "reference_kind": "mention",
            }
        )
    for match in CLOSING_KEYWORD_PATTERN.finditer(text):
        refs.append(
            {
                "target_owner": match.group("owner") or default_owner,
                "target_repo": match.group("repo") or default_repo,
                "target_number": int(match.group("number")),
                "reference_kind": "closing_reference",
                "verb": match.group("verb").lower(),
            }
        )
    for match in DUPLICATE_REFERENCE_PATTERN.finditer(text):
        refs.append(
            {
                "target_owner": match.group("owner") or default_owner,
                "target_repo": match.group("repo") or default_repo,
                "target_number": int(match.group("number")),
                "reference_kind": "duplicate_reference",
            }
        )
    return refs


def build_text_link_rows(
    *,
    repo: str,
    owner: str,
    repo_name: str,
    source_type: str,
    source_number: int,
    source_id: int | None,
    body: str | None,
    snapshot_id: str,
    extracted_at: str,
) -> list[dict[str, Any]]:
    rows: list[dict[str, Any]] = []
    seen: set[tuple[str, str, int, str]] = set()
    for ref in extract_references(body, owner, repo_name):
        key = (ref["target_owner"], ref["target_repo"], ref["target_number"], ref["reference_kind"])
        if key in seen:
            continue
        seen.add(key)
        rows.append(
            {
                "repo": repo,
                "source_type": source_type,
                "source_number": source_number,
                "source_github_id": source_id,
                "target_owner": ref["target_owner"],
                "target_repo": ref["target_repo"],
                "target_number": ref["target_number"],
                "link_type": ref["reference_kind"],
                "link_origin": "text",
                "snapshot_id": snapshot_id,
                "extracted_at": extracted_at,
            }
        )
    return rows


def build_pr_duplicate_candidate_rows(
    *,
    repo: str,
    pull_requests: list[dict[str, Any]],
    link_rows: list[dict[str, Any]],
    snapshot_id: str,
    extracted_at: str,
) -> list[dict[str, Any]]:
    pr_targets: dict[int, set[int]] = defaultdict(set)
    for row in link_rows:
        if row["source_type"] != "pull_request":
            continue
        if (
            row["target_owner"] != repo.split("/", 1)[0]
            or row["target_repo"] != repo.split("/", 1)[1]
        ):
            continue
        pr_targets[row["source_number"]].add(row["target_number"])

    target_to_prs: dict[int, list[int]] = defaultdict(list)
    for pr_row in pull_requests:
        number = pr_row["number"]
        for target in sorted(pr_targets.get(number, set())):
            target_to_prs[target].append(number)

    rows: list[dict[str, Any]] = []
    for target_number, prs in target_to_prs.items():
        if len(prs) < 2:
            continue
        for pr_number in prs:
            rows.append(
                {
                    "repo": repo,
                    "source_type": "pull_request",
                    "source_number": pr_number,
                    "source_github_id": None,
                    "target_owner": repo.split("/", 1)[0],
                    "target_repo": repo.split("/", 1)[1],
                    "target_number": target_number,
                    "link_type": "shared_issue_target",
                    "link_origin": "derived",
                    "snapshot_id": snapshot_id,
                    "extracted_at": extracted_at,
                }
            )
    return rows