from __future__ import annotations import json from pathlib import Path from typing import Any import duckdb TABLE_COLUMNS: dict[str, tuple[str, ...]] = { "pr_search_runs": ( "id", "repo", "snapshot_id", "snapshot_dir", "source_type", "hf_repo_id", "hf_revision", "started_at", "finished_at", "status", "settings_json", "notes", ), "pr_search_active_run": ( "repo", "run_id", "activated_at", ), "pr_search_documents": ( "run_id", "repo", "pr_number", "github_id", "author_login", "state", "draft", "merged", "title", "base_ref", "created_at", "updated_at", "merged_at", "additions", "deletions", "changed_files", "comments_count", "review_comments_count", "html_url", ), "pr_search_contributors": ( "run_id", "repo", "snapshot_id", "report_generated_at", "window_days", "author_login", "name", "profile_url", "repo_pull_requests_url", "repo_issues_url", "repo_first_seen_at", "repo_last_seen_at", "repo_primary_artifact_count", "repo_artifact_count", "snapshot_issue_count", "snapshot_pr_count", "snapshot_comment_count", "snapshot_review_count", "snapshot_review_comment_count", "repo_association", "new_to_repo", "first_seen_in_snapshot", "report_reason", "account_age_days", "young_account", "follow_through_score", "breadth_score", "automation_risk_signal", "heuristic_note", "public_orgs_json", "visible_authored_pr_count", "merged_pr_count", "closed_unmerged_pr_count", "open_pr_count", "merged_pr_rate", "closed_unmerged_pr_rate", "still_open_pr_rate", "distinct_repos_with_authored_prs", "distinct_repos_with_open_prs", "fetch_error", ), "pr_scope_features": ( "run_id", "repo", "pr_number", "feature_version", "total_changed_lines", "file_count", "directory_count", "dominant_dir_share", "filenames_json", "directories_json", "vector_json", "computed_at", ), "pr_scope_run_artifacts": ( "run_id", "repo", "feature_version", "idf_json", "computed_at", ), "pr_scope_neighbors": ( "run_id", "repo", "left_pr_number", "right_pr_number", "rank_from_left", "rank_from_right", "similarity", "content_similarity", "size_similarity", "breadth_similarity", "concentration_similarity", "shared_filenames_json", "shared_directories_json", "created_at", ), "pr_scope_clusters": ( "run_id", "repo", "cluster_id", "representative_pr_number", "cluster_size", "average_similarity", "summary", "shared_filenames_json", "shared_directories_json", "created_at", ), "pr_scope_cluster_members": ( "run_id", "repo", "cluster_id", "pr_number", "member_role", ), "pr_scope_cluster_candidates": ( "run_id", "repo", "pr_number", "cluster_id", "candidate_rank", "candidate_score", "matched_member_count", "best_member_pr_number", "max_member_similarity", "avg_top_member_similarity", "evidence_json", "assigned", ), } SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS pr_search_runs ( id VARCHAR, repo VARCHAR, snapshot_id VARCHAR, snapshot_dir VARCHAR, source_type VARCHAR, hf_repo_id VARCHAR, hf_revision VARCHAR, started_at VARCHAR, finished_at VARCHAR, status VARCHAR, settings_json VARCHAR, notes VARCHAR ); CREATE TABLE IF NOT EXISTS pr_search_active_run ( repo VARCHAR, run_id VARCHAR, activated_at VARCHAR ); CREATE TABLE IF NOT EXISTS pr_search_documents ( run_id VARCHAR, repo VARCHAR, pr_number BIGINT, github_id BIGINT, author_login VARCHAR, state VARCHAR, draft BOOLEAN, merged BOOLEAN, title VARCHAR, base_ref VARCHAR, created_at VARCHAR, updated_at VARCHAR, merged_at VARCHAR, additions BIGINT, deletions BIGINT, changed_files BIGINT, comments_count BIGINT, review_comments_count BIGINT, html_url VARCHAR ); CREATE TABLE IF NOT EXISTS pr_search_contributors ( run_id VARCHAR, repo VARCHAR, snapshot_id VARCHAR, report_generated_at VARCHAR, window_days BIGINT, author_login VARCHAR, name VARCHAR, profile_url VARCHAR, repo_pull_requests_url VARCHAR, repo_issues_url VARCHAR, repo_first_seen_at VARCHAR, repo_last_seen_at VARCHAR, repo_primary_artifact_count BIGINT, repo_artifact_count BIGINT, snapshot_issue_count BIGINT, snapshot_pr_count BIGINT, snapshot_comment_count BIGINT, snapshot_review_count BIGINT, snapshot_review_comment_count BIGINT, repo_association VARCHAR, new_to_repo BOOLEAN, first_seen_in_snapshot BOOLEAN, report_reason VARCHAR, account_age_days BIGINT, young_account BOOLEAN, follow_through_score VARCHAR, breadth_score VARCHAR, automation_risk_signal VARCHAR, heuristic_note VARCHAR, public_orgs_json VARCHAR, visible_authored_pr_count BIGINT, merged_pr_count BIGINT, closed_unmerged_pr_count BIGINT, open_pr_count BIGINT, merged_pr_rate DOUBLE, closed_unmerged_pr_rate DOUBLE, still_open_pr_rate DOUBLE, distinct_repos_with_authored_prs BIGINT, distinct_repos_with_open_prs BIGINT, fetch_error VARCHAR ); CREATE TABLE IF NOT EXISTS pr_scope_features ( run_id VARCHAR, repo VARCHAR, pr_number BIGINT, feature_version VARCHAR, total_changed_lines BIGINT, file_count BIGINT, directory_count BIGINT, dominant_dir_share DOUBLE, filenames_json VARCHAR, directories_json VARCHAR, vector_json VARCHAR, computed_at VARCHAR ); CREATE TABLE IF NOT EXISTS pr_scope_run_artifacts ( run_id VARCHAR, repo VARCHAR, feature_version VARCHAR, idf_json VARCHAR, computed_at VARCHAR ); CREATE TABLE IF NOT EXISTS pr_scope_neighbors ( run_id VARCHAR, repo VARCHAR, left_pr_number BIGINT, right_pr_number BIGINT, rank_from_left BIGINT, rank_from_right BIGINT, similarity DOUBLE, content_similarity DOUBLE, size_similarity DOUBLE, breadth_similarity DOUBLE, concentration_similarity DOUBLE, shared_filenames_json VARCHAR, shared_directories_json VARCHAR, created_at VARCHAR ); CREATE TABLE IF NOT EXISTS pr_scope_clusters ( run_id VARCHAR, repo VARCHAR, cluster_id VARCHAR, representative_pr_number BIGINT, cluster_size BIGINT, average_similarity DOUBLE, summary VARCHAR, shared_filenames_json VARCHAR, shared_directories_json VARCHAR, created_at VARCHAR ); CREATE TABLE IF NOT EXISTS pr_scope_cluster_members ( run_id VARCHAR, repo VARCHAR, cluster_id VARCHAR, pr_number BIGINT, member_role VARCHAR ); CREATE TABLE IF NOT EXISTS pr_scope_cluster_candidates ( run_id VARCHAR, repo VARCHAR, pr_number BIGINT, cluster_id VARCHAR, candidate_rank BIGINT, candidate_score DOUBLE, matched_member_count BIGINT, best_member_pr_number BIGINT, max_member_similarity DOUBLE, avg_top_member_similarity DOUBLE, evidence_json VARCHAR, assigned BOOLEAN ); CREATE INDEX IF NOT EXISTS idx_pr_search_active_run_repo ON pr_search_active_run (repo); CREATE INDEX IF NOT EXISTS idx_pr_search_runs_repo_status ON pr_search_runs (repo, status); CREATE INDEX IF NOT EXISTS idx_pr_search_documents_run_pr ON pr_search_documents (run_id, pr_number); CREATE INDEX IF NOT EXISTS idx_pr_search_documents_run_author ON pr_search_documents (run_id, author_login); CREATE INDEX IF NOT EXISTS idx_pr_search_contributors_run_author ON pr_search_contributors (run_id, author_login); CREATE INDEX IF NOT EXISTS idx_pr_scope_features_run_pr ON pr_scope_features (run_id, pr_number); CREATE INDEX IF NOT EXISTS idx_pr_scope_run_artifacts_run ON pr_scope_run_artifacts (run_id); CREATE INDEX IF NOT EXISTS idx_pr_scope_neighbors_run_left ON pr_scope_neighbors (run_id, left_pr_number); CREATE INDEX IF NOT EXISTS idx_pr_scope_neighbors_run_right ON pr_scope_neighbors (run_id, right_pr_number); CREATE INDEX IF NOT EXISTS idx_pr_scope_clusters_run_cluster ON pr_scope_clusters (run_id, cluster_id); CREATE INDEX IF NOT EXISTS idx_pr_scope_cluster_members_run_pr ON pr_scope_cluster_members (run_id, pr_number); CREATE INDEX IF NOT EXISTS idx_pr_scope_cluster_candidates_run_pr ON pr_scope_cluster_candidates (run_id, pr_number); """ def connect_pr_search_db(path: Path, *, read_only: bool = False) -> duckdb.DuckDBPyConnection: resolved = path.resolve() if read_only and not resolved.exists(): raise FileNotFoundError(f"PR search database does not exist: {resolved}") if not read_only: resolved.parent.mkdir(parents=True, exist_ok=True) connection = duckdb.connect(str(resolved), read_only=read_only) if not read_only: ensure_pr_search_schema(connection) return connection def ensure_pr_search_schema(connection: duckdb.DuckDBPyConnection) -> None: connection.execute(SCHEMA_SQL) connection.execute( "ALTER TABLE pr_search_documents ADD COLUMN IF NOT EXISTS author_login VARCHAR" ) def insert_rows( connection: duckdb.DuckDBPyConnection, table_name: str, rows: list[dict[str, Any]], ) -> None: if not rows: return columns = TABLE_COLUMNS[table_name] placeholders = ", ".join("?" for _ in columns) column_sql = ", ".join(columns) values = [tuple(_db_value(row.get(column)) for column in columns) for row in rows] connection.executemany( f"INSERT INTO {table_name} ({column_sql}) VALUES ({placeholders})", values, ) def update_run_status( connection: duckdb.DuckDBPyConnection, *, run_id: str, status: str, finished_at: str | None = None, notes: str | None = None, ) -> None: connection.execute( """ UPDATE pr_search_runs SET status = ?, finished_at = COALESCE(?, finished_at), notes = COALESCE(?, notes) WHERE id = ? """, [status, finished_at, notes, run_id], ) def replace_active_run( connection: duckdb.DuckDBPyConnection, *, repo: str, run_id: str, activated_at: str, ) -> str | None: previous = fetch_one( connection, "SELECT run_id FROM pr_search_active_run WHERE repo = ?", [repo], ) connection.execute("DELETE FROM pr_search_active_run WHERE repo = ?", [repo]) connection.execute( "INSERT INTO pr_search_active_run (repo, run_id, activated_at) VALUES (?, ?, ?)", [repo, run_id, activated_at], ) previous_run_id = None if previous is None else str(previous["run_id"]) if previous_run_id is not None and previous_run_id != run_id: connection.execute( "UPDATE pr_search_runs SET status = 'superseded' WHERE id = ? AND status = 'succeeded'", [previous_run_id], ) return previous_run_id def resolve_active_run( connection: duckdb.DuckDBPyConnection, *, repo: str | None = None, ) -> dict[str, Any]: if repo is None: active_repos = fetch_rows( connection, "SELECT repo FROM pr_search_active_run ORDER BY repo", ) if not active_repos: raise ValueError("No active PR search run found.") if len(active_repos) > 1: raise ValueError("Multiple active repos found; pass --repo.") repo = str(active_repos[0]["repo"]) row = fetch_one( connection, """ SELECT r.* FROM pr_search_runs AS r JOIN pr_search_active_run AS a ON a.run_id = r.id AND a.repo = r.repo WHERE a.repo = ? """, [repo], ) if row is None: raise ValueError(f"No active PR search run found for repo {repo!r}.") return row def get_run_counts(connection: duckdb.DuckDBPyConnection, *, run_id: str) -> dict[str, int]: return { "documents": _count(connection, "pr_search_documents", run_id), "contributors": _count(connection, "pr_search_contributors", run_id), "features": _count(connection, "pr_scope_features", run_id), "run_artifacts": _count(connection, "pr_scope_run_artifacts", run_id), "neighbors": _count(connection, "pr_scope_neighbors", run_id), "clusters": _count(connection, "pr_scope_clusters", run_id), "cluster_members": _count(connection, "pr_scope_cluster_members", run_id), "cluster_candidates": _count(connection, "pr_scope_cluster_candidates", run_id), } def get_document( connection: duckdb.DuckDBPyConnection, *, run_id: str, pr_number: int, ) -> dict[str, Any] | None: return fetch_one( connection, "SELECT * FROM pr_search_documents WHERE run_id = ? AND pr_number = ?", [run_id, pr_number], ) def get_contributor( connection: duckdb.DuckDBPyConnection, *, run_id: str, author_login: str, ) -> dict[str, Any] | None: return fetch_one( connection, """ SELECT * FROM pr_search_contributors WHERE run_id = ? AND lower(author_login) = lower(?) """, [run_id, author_login], ) def get_contributor_pulls( connection: duckdb.DuckDBPyConnection, *, run_id: str, author_login: str, limit: int, ) -> list[dict[str, Any]]: return fetch_rows( connection, """ SELECT pr_number, github_id, author_login, state, draft, merged, title, base_ref, created_at, updated_at, merged_at, additions, deletions, changed_files, comments_count, review_comments_count, html_url FROM pr_search_documents WHERE run_id = ? AND lower(author_login) = lower(?) ORDER BY updated_at DESC NULLS LAST, pr_number DESC LIMIT ? """, [run_id, author_login, limit], ) def get_feature( connection: duckdb.DuckDBPyConnection, *, run_id: str, pr_number: int, ) -> dict[str, Any] | None: return fetch_one( connection, "SELECT * FROM pr_scope_features WHERE run_id = ? AND pr_number = ?", [run_id, pr_number], ) def get_scope_run_artifact( connection: duckdb.DuckDBPyConnection, *, run_id: str, ) -> dict[str, Any] | None: try: return fetch_one( connection, """ SELECT * FROM pr_scope_run_artifacts WHERE run_id = ? """, [run_id], ) except duckdb.Error: return None def get_similar_pr_rows( connection: duckdb.DuckDBPyConnection, *, run_id: str, pr_number: int, limit: int, ) -> list[dict[str, Any]]: return fetch_rows( connection, """ SELECT CASE WHEN left_pr_number = ? THEN right_pr_number ELSE left_pr_number END AS neighbor_pr_number, CASE WHEN left_pr_number = ? THEN rank_from_left ELSE rank_from_right END AS neighbor_rank, similarity, content_similarity, size_similarity, breadth_similarity, concentration_similarity, shared_filenames_json, shared_directories_json FROM pr_scope_neighbors WHERE run_id = ? AND (? = left_pr_number OR ? = right_pr_number) ORDER BY neighbor_rank IS NULL, neighbor_rank, similarity DESC, neighbor_pr_number LIMIT ? """, [pr_number, pr_number, run_id, pr_number, pr_number, limit], ) def get_candidate_cluster_rows( connection: duckdb.DuckDBPyConnection, *, run_id: str, pr_number: int, limit: int, ) -> list[dict[str, Any]]: return fetch_rows( connection, """ SELECT c.cluster_id, c.candidate_rank, c.candidate_score, c.matched_member_count, c.best_member_pr_number, c.max_member_similarity, c.avg_top_member_similarity, c.evidence_json, c.assigned, cl.representative_pr_number, cl.cluster_size, cl.average_similarity, cl.summary, cl.shared_filenames_json, cl.shared_directories_json, d.title AS representative_title FROM pr_scope_cluster_candidates AS c JOIN pr_scope_clusters AS cl ON cl.run_id = c.run_id AND cl.cluster_id = c.cluster_id LEFT JOIN pr_search_documents AS d ON d.run_id = cl.run_id AND d.pr_number = cl.representative_pr_number WHERE c.run_id = ? AND c.pr_number = ? ORDER BY c.candidate_rank, c.candidate_score DESC, c.cluster_id LIMIT ? """, [run_id, pr_number, limit], ) def get_cluster( connection: duckdb.DuckDBPyConnection, *, run_id: str, cluster_id: str, ) -> dict[str, Any] | None: return fetch_one( connection, "SELECT * FROM pr_scope_clusters WHERE run_id = ? AND cluster_id = ?", [run_id, cluster_id], ) def get_cluster_members( connection: duckdb.DuckDBPyConnection, *, run_id: str, cluster_id: str, ) -> list[dict[str, Any]]: return fetch_rows( connection, """ SELECT m.pr_number, m.member_role, d.title, d.html_url, d.state, d.draft FROM pr_scope_cluster_members AS m LEFT JOIN pr_search_documents AS d ON d.run_id = m.run_id AND d.pr_number = m.pr_number WHERE m.run_id = ? AND m.cluster_id = ? ORDER BY m.member_role != 'representative', m.pr_number """, [run_id, cluster_id], ) def get_cluster_ids_for_prs( connection: duckdb.DuckDBPyConnection, *, run_id: str, pr_numbers: list[int], ) -> dict[int, list[str]]: if not pr_numbers: return {} placeholders = ", ".join("?" for _ in pr_numbers) rows = fetch_rows( connection, f""" SELECT pr_number, cluster_id FROM pr_scope_cluster_members WHERE run_id = ? AND pr_number IN ({placeholders}) ORDER BY pr_number, cluster_id """, [run_id, *pr_numbers], ) result: dict[int, list[str]] = {} for row in rows: result.setdefault(int(row["pr_number"]), []).append(str(row["cluster_id"])) return result def get_shared_cluster_ids( connection: duckdb.DuckDBPyConnection, *, run_id: str, left_pr_number: int, right_pr_number: int, ) -> list[str]: rows = fetch_rows( connection, """ SELECT left_members.cluster_id FROM pr_scope_cluster_members AS left_members JOIN pr_scope_cluster_members AS right_members ON right_members.run_id = left_members.run_id AND right_members.cluster_id = left_members.cluster_id WHERE left_members.run_id = ? AND left_members.pr_number = ? AND right_members.pr_number = ? ORDER BY left_members.cluster_id """, [run_id, left_pr_number, right_pr_number], ) return [str(row["cluster_id"]) for row in rows] def get_pair_neighbor_row( connection: duckdb.DuckDBPyConnection, *, run_id: str, left_pr_number: int, right_pr_number: int, ) -> dict[str, Any] | None: canonical_left = min(left_pr_number, right_pr_number) canonical_right = max(left_pr_number, right_pr_number) return fetch_one( connection, """ SELECT * FROM pr_scope_neighbors WHERE run_id = ? AND left_pr_number = ? AND right_pr_number = ? """, [run_id, canonical_left, canonical_right], ) def fetch_rows( connection: duckdb.DuckDBPyConnection, sql: str, parameters: list[Any] | tuple[Any, ...] | None = None, ) -> list[dict[str, Any]]: cursor = connection.execute(sql, parameters or []) columns = [column[0] for column in cursor.description] return [dict(zip(columns, row, strict=False)) for row in cursor.fetchall()] def fetch_one( connection: duckdb.DuckDBPyConnection, sql: str, parameters: list[Any] | tuple[Any, ...] | None = None, ) -> dict[str, Any] | None: rows = fetch_rows(connection, sql, parameters) return rows[0] if rows else None def _count(connection: duckdb.DuckDBPyConnection, table_name: str, run_id: str) -> int: row = fetch_one( connection, f"SELECT COUNT(*) AS row_count FROM {table_name} WHERE run_id = ?", [run_id], ) return 0 if row is None else int(row["row_count"]) def _db_value(value: Any) -> Any: if isinstance(value, (dict, list)): return json.dumps(value, sort_keys=True) return value