diffusers-pr-api / src /slop_farmer /data /search_duckdb.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import duckdb
TABLE_COLUMNS: dict[str, tuple[str, ...]] = {
"pr_search_runs": (
"id",
"repo",
"snapshot_id",
"snapshot_dir",
"source_type",
"hf_repo_id",
"hf_revision",
"started_at",
"finished_at",
"status",
"settings_json",
"notes",
),
"pr_search_active_run": (
"repo",
"run_id",
"activated_at",
),
"pr_search_documents": (
"run_id",
"repo",
"pr_number",
"github_id",
"author_login",
"state",
"draft",
"merged",
"title",
"base_ref",
"created_at",
"updated_at",
"merged_at",
"additions",
"deletions",
"changed_files",
"comments_count",
"review_comments_count",
"html_url",
),
"pr_search_contributors": (
"run_id",
"repo",
"snapshot_id",
"report_generated_at",
"window_days",
"author_login",
"name",
"profile_url",
"repo_pull_requests_url",
"repo_issues_url",
"repo_first_seen_at",
"repo_last_seen_at",
"repo_primary_artifact_count",
"repo_artifact_count",
"snapshot_issue_count",
"snapshot_pr_count",
"snapshot_comment_count",
"snapshot_review_count",
"snapshot_review_comment_count",
"repo_association",
"new_to_repo",
"first_seen_in_snapshot",
"report_reason",
"account_age_days",
"young_account",
"follow_through_score",
"breadth_score",
"automation_risk_signal",
"heuristic_note",
"public_orgs_json",
"visible_authored_pr_count",
"merged_pr_count",
"closed_unmerged_pr_count",
"open_pr_count",
"merged_pr_rate",
"closed_unmerged_pr_rate",
"still_open_pr_rate",
"distinct_repos_with_authored_prs",
"distinct_repos_with_open_prs",
"fetch_error",
),
"pr_scope_features": (
"run_id",
"repo",
"pr_number",
"feature_version",
"total_changed_lines",
"file_count",
"directory_count",
"dominant_dir_share",
"filenames_json",
"directories_json",
"vector_json",
"computed_at",
),
"pr_scope_run_artifacts": (
"run_id",
"repo",
"feature_version",
"idf_json",
"computed_at",
),
"pr_scope_neighbors": (
"run_id",
"repo",
"left_pr_number",
"right_pr_number",
"rank_from_left",
"rank_from_right",
"similarity",
"content_similarity",
"size_similarity",
"breadth_similarity",
"concentration_similarity",
"shared_filenames_json",
"shared_directories_json",
"created_at",
),
"pr_scope_clusters": (
"run_id",
"repo",
"cluster_id",
"representative_pr_number",
"cluster_size",
"average_similarity",
"summary",
"shared_filenames_json",
"shared_directories_json",
"created_at",
),
"pr_scope_cluster_members": (
"run_id",
"repo",
"cluster_id",
"pr_number",
"member_role",
),
"pr_scope_cluster_candidates": (
"run_id",
"repo",
"pr_number",
"cluster_id",
"candidate_rank",
"candidate_score",
"matched_member_count",
"best_member_pr_number",
"max_member_similarity",
"avg_top_member_similarity",
"evidence_json",
"assigned",
),
}
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS pr_search_runs (
id VARCHAR,
repo VARCHAR,
snapshot_id VARCHAR,
snapshot_dir VARCHAR,
source_type VARCHAR,
hf_repo_id VARCHAR,
hf_revision VARCHAR,
started_at VARCHAR,
finished_at VARCHAR,
status VARCHAR,
settings_json VARCHAR,
notes VARCHAR
);
CREATE TABLE IF NOT EXISTS pr_search_active_run (
repo VARCHAR,
run_id VARCHAR,
activated_at VARCHAR
);
CREATE TABLE IF NOT EXISTS pr_search_documents (
run_id VARCHAR,
repo VARCHAR,
pr_number BIGINT,
github_id BIGINT,
author_login VARCHAR,
state VARCHAR,
draft BOOLEAN,
merged BOOLEAN,
title VARCHAR,
base_ref VARCHAR,
created_at VARCHAR,
updated_at VARCHAR,
merged_at VARCHAR,
additions BIGINT,
deletions BIGINT,
changed_files BIGINT,
comments_count BIGINT,
review_comments_count BIGINT,
html_url VARCHAR
);
CREATE TABLE IF NOT EXISTS pr_search_contributors (
run_id VARCHAR,
repo VARCHAR,
snapshot_id VARCHAR,
report_generated_at VARCHAR,
window_days BIGINT,
author_login VARCHAR,
name VARCHAR,
profile_url VARCHAR,
repo_pull_requests_url VARCHAR,
repo_issues_url VARCHAR,
repo_first_seen_at VARCHAR,
repo_last_seen_at VARCHAR,
repo_primary_artifact_count BIGINT,
repo_artifact_count BIGINT,
snapshot_issue_count BIGINT,
snapshot_pr_count BIGINT,
snapshot_comment_count BIGINT,
snapshot_review_count BIGINT,
snapshot_review_comment_count BIGINT,
repo_association VARCHAR,
new_to_repo BOOLEAN,
first_seen_in_snapshot BOOLEAN,
report_reason VARCHAR,
account_age_days BIGINT,
young_account BOOLEAN,
follow_through_score VARCHAR,
breadth_score VARCHAR,
automation_risk_signal VARCHAR,
heuristic_note VARCHAR,
public_orgs_json VARCHAR,
visible_authored_pr_count BIGINT,
merged_pr_count BIGINT,
closed_unmerged_pr_count BIGINT,
open_pr_count BIGINT,
merged_pr_rate DOUBLE,
closed_unmerged_pr_rate DOUBLE,
still_open_pr_rate DOUBLE,
distinct_repos_with_authored_prs BIGINT,
distinct_repos_with_open_prs BIGINT,
fetch_error VARCHAR
);
CREATE TABLE IF NOT EXISTS pr_scope_features (
run_id VARCHAR,
repo VARCHAR,
pr_number BIGINT,
feature_version VARCHAR,
total_changed_lines BIGINT,
file_count BIGINT,
directory_count BIGINT,
dominant_dir_share DOUBLE,
filenames_json VARCHAR,
directories_json VARCHAR,
vector_json VARCHAR,
computed_at VARCHAR
);
CREATE TABLE IF NOT EXISTS pr_scope_run_artifacts (
run_id VARCHAR,
repo VARCHAR,
feature_version VARCHAR,
idf_json VARCHAR,
computed_at VARCHAR
);
CREATE TABLE IF NOT EXISTS pr_scope_neighbors (
run_id VARCHAR,
repo VARCHAR,
left_pr_number BIGINT,
right_pr_number BIGINT,
rank_from_left BIGINT,
rank_from_right BIGINT,
similarity DOUBLE,
content_similarity DOUBLE,
size_similarity DOUBLE,
breadth_similarity DOUBLE,
concentration_similarity DOUBLE,
shared_filenames_json VARCHAR,
shared_directories_json VARCHAR,
created_at VARCHAR
);
CREATE TABLE IF NOT EXISTS pr_scope_clusters (
run_id VARCHAR,
repo VARCHAR,
cluster_id VARCHAR,
representative_pr_number BIGINT,
cluster_size BIGINT,
average_similarity DOUBLE,
summary VARCHAR,
shared_filenames_json VARCHAR,
shared_directories_json VARCHAR,
created_at VARCHAR
);
CREATE TABLE IF NOT EXISTS pr_scope_cluster_members (
run_id VARCHAR,
repo VARCHAR,
cluster_id VARCHAR,
pr_number BIGINT,
member_role VARCHAR
);
CREATE TABLE IF NOT EXISTS pr_scope_cluster_candidates (
run_id VARCHAR,
repo VARCHAR,
pr_number BIGINT,
cluster_id VARCHAR,
candidate_rank BIGINT,
candidate_score DOUBLE,
matched_member_count BIGINT,
best_member_pr_number BIGINT,
max_member_similarity DOUBLE,
avg_top_member_similarity DOUBLE,
evidence_json VARCHAR,
assigned BOOLEAN
);
CREATE INDEX IF NOT EXISTS idx_pr_search_active_run_repo ON pr_search_active_run (repo);
CREATE INDEX IF NOT EXISTS idx_pr_search_runs_repo_status ON pr_search_runs (repo, status);
CREATE INDEX IF NOT EXISTS idx_pr_search_documents_run_pr ON pr_search_documents (run_id, pr_number);
CREATE INDEX IF NOT EXISTS idx_pr_search_documents_run_author ON pr_search_documents (run_id, author_login);
CREATE INDEX IF NOT EXISTS idx_pr_search_contributors_run_author ON pr_search_contributors (run_id, author_login);
CREATE INDEX IF NOT EXISTS idx_pr_scope_features_run_pr ON pr_scope_features (run_id, pr_number);
CREATE INDEX IF NOT EXISTS idx_pr_scope_run_artifacts_run ON pr_scope_run_artifacts (run_id);
CREATE INDEX IF NOT EXISTS idx_pr_scope_neighbors_run_left ON pr_scope_neighbors (run_id, left_pr_number);
CREATE INDEX IF NOT EXISTS idx_pr_scope_neighbors_run_right ON pr_scope_neighbors (run_id, right_pr_number);
CREATE INDEX IF NOT EXISTS idx_pr_scope_clusters_run_cluster ON pr_scope_clusters (run_id, cluster_id);
CREATE INDEX IF NOT EXISTS idx_pr_scope_cluster_members_run_pr ON pr_scope_cluster_members (run_id, pr_number);
CREATE INDEX IF NOT EXISTS idx_pr_scope_cluster_candidates_run_pr ON pr_scope_cluster_candidates (run_id, pr_number);
"""
def connect_pr_search_db(path: Path, *, read_only: bool = False) -> duckdb.DuckDBPyConnection:
resolved = path.resolve()
if read_only and not resolved.exists():
raise FileNotFoundError(f"PR search database does not exist: {resolved}")
if not read_only:
resolved.parent.mkdir(parents=True, exist_ok=True)
connection = duckdb.connect(str(resolved), read_only=read_only)
if not read_only:
ensure_pr_search_schema(connection)
return connection
def ensure_pr_search_schema(connection: duckdb.DuckDBPyConnection) -> None:
connection.execute(SCHEMA_SQL)
connection.execute(
"ALTER TABLE pr_search_documents ADD COLUMN IF NOT EXISTS author_login VARCHAR"
)
def insert_rows(
connection: duckdb.DuckDBPyConnection,
table_name: str,
rows: list[dict[str, Any]],
) -> None:
if not rows:
return
columns = TABLE_COLUMNS[table_name]
placeholders = ", ".join("?" for _ in columns)
column_sql = ", ".join(columns)
values = [tuple(_db_value(row.get(column)) for column in columns) for row in rows]
connection.executemany(
f"INSERT INTO {table_name} ({column_sql}) VALUES ({placeholders})",
values,
)
def update_run_status(
connection: duckdb.DuckDBPyConnection,
*,
run_id: str,
status: str,
finished_at: str | None = None,
notes: str | None = None,
) -> None:
connection.execute(
"""
UPDATE pr_search_runs
SET status = ?, finished_at = COALESCE(?, finished_at), notes = COALESCE(?, notes)
WHERE id = ?
""",
[status, finished_at, notes, run_id],
)
def replace_active_run(
connection: duckdb.DuckDBPyConnection,
*,
repo: str,
run_id: str,
activated_at: str,
) -> str | None:
previous = fetch_one(
connection,
"SELECT run_id FROM pr_search_active_run WHERE repo = ?",
[repo],
)
connection.execute("DELETE FROM pr_search_active_run WHERE repo = ?", [repo])
connection.execute(
"INSERT INTO pr_search_active_run (repo, run_id, activated_at) VALUES (?, ?, ?)",
[repo, run_id, activated_at],
)
previous_run_id = None if previous is None else str(previous["run_id"])
if previous_run_id is not None and previous_run_id != run_id:
connection.execute(
"UPDATE pr_search_runs SET status = 'superseded' WHERE id = ? AND status = 'succeeded'",
[previous_run_id],
)
return previous_run_id
def resolve_active_run(
connection: duckdb.DuckDBPyConnection,
*,
repo: str | None = None,
) -> dict[str, Any]:
if repo is None:
active_repos = fetch_rows(
connection,
"SELECT repo FROM pr_search_active_run ORDER BY repo",
)
if not active_repos:
raise ValueError("No active PR search run found.")
if len(active_repos) > 1:
raise ValueError("Multiple active repos found; pass --repo.")
repo = str(active_repos[0]["repo"])
row = fetch_one(
connection,
"""
SELECT r.*
FROM pr_search_runs AS r
JOIN pr_search_active_run AS a
ON a.run_id = r.id AND a.repo = r.repo
WHERE a.repo = ?
""",
[repo],
)
if row is None:
raise ValueError(f"No active PR search run found for repo {repo!r}.")
return row
def get_run_counts(connection: duckdb.DuckDBPyConnection, *, run_id: str) -> dict[str, int]:
return {
"documents": _count(connection, "pr_search_documents", run_id),
"contributors": _count(connection, "pr_search_contributors", run_id),
"features": _count(connection, "pr_scope_features", run_id),
"run_artifacts": _count(connection, "pr_scope_run_artifacts", run_id),
"neighbors": _count(connection, "pr_scope_neighbors", run_id),
"clusters": _count(connection, "pr_scope_clusters", run_id),
"cluster_members": _count(connection, "pr_scope_cluster_members", run_id),
"cluster_candidates": _count(connection, "pr_scope_cluster_candidates", run_id),
}
def get_document(
connection: duckdb.DuckDBPyConnection,
*,
run_id: str,
pr_number: int,
) -> dict[str, Any] | None:
return fetch_one(
connection,
"SELECT * FROM pr_search_documents WHERE run_id = ? AND pr_number = ?",
[run_id, pr_number],
)
def get_contributor(
connection: duckdb.DuckDBPyConnection,
*,
run_id: str,
author_login: str,
) -> dict[str, Any] | None:
return fetch_one(
connection,
"""
SELECT *
FROM pr_search_contributors
WHERE run_id = ? AND lower(author_login) = lower(?)
""",
[run_id, author_login],
)
def get_contributor_pulls(
connection: duckdb.DuckDBPyConnection,
*,
run_id: str,
author_login: str,
limit: int,
) -> list[dict[str, Any]]:
return fetch_rows(
connection,
"""
SELECT
pr_number,
github_id,
author_login,
state,
draft,
merged,
title,
base_ref,
created_at,
updated_at,
merged_at,
additions,
deletions,
changed_files,
comments_count,
review_comments_count,
html_url
FROM pr_search_documents
WHERE run_id = ? AND lower(author_login) = lower(?)
ORDER BY updated_at DESC NULLS LAST, pr_number DESC
LIMIT ?
""",
[run_id, author_login, limit],
)
def get_feature(
connection: duckdb.DuckDBPyConnection,
*,
run_id: str,
pr_number: int,
) -> dict[str, Any] | None:
return fetch_one(
connection,
"SELECT * FROM pr_scope_features WHERE run_id = ? AND pr_number = ?",
[run_id, pr_number],
)
def get_scope_run_artifact(
connection: duckdb.DuckDBPyConnection,
*,
run_id: str,
) -> dict[str, Any] | None:
try:
return fetch_one(
connection,
"""
SELECT *
FROM pr_scope_run_artifacts
WHERE run_id = ?
""",
[run_id],
)
except duckdb.Error:
return None
def get_similar_pr_rows(
connection: duckdb.DuckDBPyConnection,
*,
run_id: str,
pr_number: int,
limit: int,
) -> list[dict[str, Any]]:
return fetch_rows(
connection,
"""
SELECT
CASE WHEN left_pr_number = ? THEN right_pr_number ELSE left_pr_number END AS neighbor_pr_number,
CASE WHEN left_pr_number = ? THEN rank_from_left ELSE rank_from_right END AS neighbor_rank,
similarity,
content_similarity,
size_similarity,
breadth_similarity,
concentration_similarity,
shared_filenames_json,
shared_directories_json
FROM pr_scope_neighbors
WHERE run_id = ? AND (? = left_pr_number OR ? = right_pr_number)
ORDER BY neighbor_rank IS NULL, neighbor_rank, similarity DESC, neighbor_pr_number
LIMIT ?
""",
[pr_number, pr_number, run_id, pr_number, pr_number, limit],
)
def get_candidate_cluster_rows(
connection: duckdb.DuckDBPyConnection,
*,
run_id: str,
pr_number: int,
limit: int,
) -> list[dict[str, Any]]:
return fetch_rows(
connection,
"""
SELECT
c.cluster_id,
c.candidate_rank,
c.candidate_score,
c.matched_member_count,
c.best_member_pr_number,
c.max_member_similarity,
c.avg_top_member_similarity,
c.evidence_json,
c.assigned,
cl.representative_pr_number,
cl.cluster_size,
cl.average_similarity,
cl.summary,
cl.shared_filenames_json,
cl.shared_directories_json,
d.title AS representative_title
FROM pr_scope_cluster_candidates AS c
JOIN pr_scope_clusters AS cl
ON cl.run_id = c.run_id AND cl.cluster_id = c.cluster_id
LEFT JOIN pr_search_documents AS d
ON d.run_id = cl.run_id AND d.pr_number = cl.representative_pr_number
WHERE c.run_id = ? AND c.pr_number = ?
ORDER BY c.candidate_rank, c.candidate_score DESC, c.cluster_id
LIMIT ?
""",
[run_id, pr_number, limit],
)
def get_cluster(
connection: duckdb.DuckDBPyConnection,
*,
run_id: str,
cluster_id: str,
) -> dict[str, Any] | None:
return fetch_one(
connection,
"SELECT * FROM pr_scope_clusters WHERE run_id = ? AND cluster_id = ?",
[run_id, cluster_id],
)
def get_cluster_members(
connection: duckdb.DuckDBPyConnection,
*,
run_id: str,
cluster_id: str,
) -> list[dict[str, Any]]:
return fetch_rows(
connection,
"""
SELECT
m.pr_number,
m.member_role,
d.title,
d.html_url,
d.state,
d.draft
FROM pr_scope_cluster_members AS m
LEFT JOIN pr_search_documents AS d
ON d.run_id = m.run_id AND d.pr_number = m.pr_number
WHERE m.run_id = ? AND m.cluster_id = ?
ORDER BY m.member_role != 'representative', m.pr_number
""",
[run_id, cluster_id],
)
def get_cluster_ids_for_prs(
connection: duckdb.DuckDBPyConnection,
*,
run_id: str,
pr_numbers: list[int],
) -> dict[int, list[str]]:
if not pr_numbers:
return {}
placeholders = ", ".join("?" for _ in pr_numbers)
rows = fetch_rows(
connection,
f"""
SELECT pr_number, cluster_id
FROM pr_scope_cluster_members
WHERE run_id = ? AND pr_number IN ({placeholders})
ORDER BY pr_number, cluster_id
""",
[run_id, *pr_numbers],
)
result: dict[int, list[str]] = {}
for row in rows:
result.setdefault(int(row["pr_number"]), []).append(str(row["cluster_id"]))
return result
def get_shared_cluster_ids(
connection: duckdb.DuckDBPyConnection,
*,
run_id: str,
left_pr_number: int,
right_pr_number: int,
) -> list[str]:
rows = fetch_rows(
connection,
"""
SELECT left_members.cluster_id
FROM pr_scope_cluster_members AS left_members
JOIN pr_scope_cluster_members AS right_members
ON right_members.run_id = left_members.run_id
AND right_members.cluster_id = left_members.cluster_id
WHERE left_members.run_id = ?
AND left_members.pr_number = ?
AND right_members.pr_number = ?
ORDER BY left_members.cluster_id
""",
[run_id, left_pr_number, right_pr_number],
)
return [str(row["cluster_id"]) for row in rows]
def get_pair_neighbor_row(
connection: duckdb.DuckDBPyConnection,
*,
run_id: str,
left_pr_number: int,
right_pr_number: int,
) -> dict[str, Any] | None:
canonical_left = min(left_pr_number, right_pr_number)
canonical_right = max(left_pr_number, right_pr_number)
return fetch_one(
connection,
"""
SELECT *
FROM pr_scope_neighbors
WHERE run_id = ? AND left_pr_number = ? AND right_pr_number = ?
""",
[run_id, canonical_left, canonical_right],
)
def fetch_rows(
connection: duckdb.DuckDBPyConnection,
sql: str,
parameters: list[Any] | tuple[Any, ...] | None = None,
) -> list[dict[str, Any]]:
cursor = connection.execute(sql, parameters or [])
columns = [column[0] for column in cursor.description]
return [dict(zip(columns, row, strict=False)) for row in cursor.fetchall()]
def fetch_one(
connection: duckdb.DuckDBPyConnection,
sql: str,
parameters: list[Any] | tuple[Any, ...] | None = None,
) -> dict[str, Any] | None:
rows = fetch_rows(connection, sql, parameters)
return rows[0] if rows else None
def _count(connection: duckdb.DuckDBPyConnection, table_name: str, run_id: str) -> int:
row = fetch_one(
connection,
f"SELECT COUNT(*) AS row_count FROM {table_name} WHERE run_id = ?",
[run_id],
)
return 0 if row is None else int(row["row_count"])
def _db_value(value: Any) -> Any:
if isinstance(value, (dict, list)):
return json.dumps(value, sort_keys=True)
return value