from __future__ import annotations import re from collections import defaultdict from typing import Any REFERENCE_PATTERN = re.compile( r"(?:(?P[A-Za-z0-9_.-]+)/(?P[A-Za-z0-9_.-]+))?#(?P\d+)" ) CLOSING_KEYWORD_PATTERN = re.compile( r"\b(?Pclose[sd]?|fix(?:e[sd])?|resolve[sd]?)\s+(?:(?P[A-Za-z0-9_.-]+)/(?P[A-Za-z0-9_.-]+))?#(?P\d+)\b", flags=re.IGNORECASE, ) DUPLICATE_REFERENCE_PATTERN = re.compile( r"\bduplicate\s+of\s+(?:(?P[A-Za-z0-9_.-]+)/(?P[A-Za-z0-9_.-]+))?#(?P\d+)\b", flags=re.IGNORECASE, ) def extract_references( text: str | None, default_owner: str, default_repo: str ) -> list[dict[str, Any]]: if not text: return [] refs: list[dict[str, Any]] = [] for match in REFERENCE_PATTERN.finditer(text): refs.append( { "target_owner": match.group("owner") or default_owner, "target_repo": match.group("repo") or default_repo, "target_number": int(match.group("number")), "reference_kind": "mention", } ) for match in CLOSING_KEYWORD_PATTERN.finditer(text): refs.append( { "target_owner": match.group("owner") or default_owner, "target_repo": match.group("repo") or default_repo, "target_number": int(match.group("number")), "reference_kind": "closing_reference", "verb": match.group("verb").lower(), } ) for match in DUPLICATE_REFERENCE_PATTERN.finditer(text): refs.append( { "target_owner": match.group("owner") or default_owner, "target_repo": match.group("repo") or default_repo, "target_number": int(match.group("number")), "reference_kind": "duplicate_reference", } ) return refs def build_text_link_rows( *, repo: str, owner: str, repo_name: str, source_type: str, source_number: int, source_id: int | None, body: str | None, snapshot_id: str, extracted_at: str, ) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] seen: set[tuple[str, str, int, str]] = set() for ref in extract_references(body, owner, repo_name): key = (ref["target_owner"], ref["target_repo"], ref["target_number"], ref["reference_kind"]) if key in seen: continue seen.add(key) rows.append( { "repo": repo, "source_type": source_type, "source_number": source_number, "source_github_id": source_id, "target_owner": ref["target_owner"], "target_repo": ref["target_repo"], "target_number": ref["target_number"], "link_type": ref["reference_kind"], "link_origin": "text", "snapshot_id": snapshot_id, "extracted_at": extracted_at, } ) return rows def build_pr_duplicate_candidate_rows( *, repo: str, pull_requests: list[dict[str, Any]], link_rows: list[dict[str, Any]], snapshot_id: str, extracted_at: str, ) -> list[dict[str, Any]]: pr_targets: dict[int, set[int]] = defaultdict(set) for row in link_rows: if row["source_type"] != "pull_request": continue if ( row["target_owner"] != repo.split("/", 1)[0] or row["target_repo"] != repo.split("/", 1)[1] ): continue pr_targets[row["source_number"]].add(row["target_number"]) target_to_prs: dict[int, list[int]] = defaultdict(list) for pr_row in pull_requests: number = pr_row["number"] for target in sorted(pr_targets.get(number, set())): target_to_prs[target].append(number) rows: list[dict[str, Any]] = [] for target_number, prs in target_to_prs.items(): if len(prs) < 2: continue for pr_number in prs: rows.append( { "repo": repo, "source_type": "pull_request", "source_number": pr_number, "source_github_id": None, "target_owner": repo.split("/", 1)[0], "target_repo": repo.split("/", 1)[1], "target_number": target_number, "link_type": "shared_issue_target", "link_origin": "derived", "snapshot_id": snapshot_id, "extracted_at": extracted_at, } ) return rows