diffusers-pr-api / src /slop_farmer /app /pr_search.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
from slop_farmer.reports import pr_search_service
run_pr_search_refresh = pr_search_service.run_pr_search_refresh
get_pr_search_status = pr_search_service.get_pr_search_status
get_pr_search_similar = pr_search_service.get_pr_search_similar
get_pr_search_similar_lookup = pr_search_service.get_pr_search_similar_lookup
get_pr_search_candidate_clusters = pr_search_service.get_pr_search_candidate_clusters
get_pr_search_contributor = pr_search_service.get_pr_search_contributor
get_pr_search_contributor_pulls = pr_search_service.get_pr_search_contributor_pulls
get_pr_search_clusters = pr_search_service.get_pr_search_clusters
list_pr_search_clusters = pr_search_service.list_pr_search_clusters
get_pr_search_cluster = pr_search_service.get_pr_search_cluster
get_pr_search_pull_contributor = pr_search_service.get_pr_search_pull_contributor
explain_pr_search_pair = pr_search_service.explain_pr_search_pair
probe_pr_search_live = pr_search_service.probe_pr_search_live
probe_pr_search_github = pr_search_service.probe_pr_search_github
resolve_pr_search_db_path = pr_search_service.resolve_pr_search_db_path
def format_pr_search_status(result: Mapping[str, Any]) -> str:
counts = result["row_counts"]
return "\n".join(
[
f"Repo: {result['repo']}",
f"Active run: {result['id']}",
f"Snapshot: {result['snapshot_id']}",
f"Source: {result['source_type']}",
f"Finished: {result.get('finished_at') or 'running'}",
(
"Rows: "
f"documents={counts['documents']} "
f"contributors={counts.get('contributors', 0)} "
f"features={counts['features']} "
f"neighbors={counts['neighbors']} "
f"clusters={counts['clusters']} "
f"candidates={counts['cluster_candidates']}"
),
]
)
def format_pr_search_similar(result: Mapping[str, Any]) -> str:
query = result.get("query") or {}
mode_used = str(query.get("mode_used") or "indexed")
source = str(query.get("source") or "active_index")
lines = [
f"PR #{result['pr']['pr_number']}: {result['pr']['title']}",
"",
f"Active snapshot: {result['snapshot_id']}",
f"Lookup: {mode_used} via {source}",
f"Matches: {result.get('similar_count', len(result['similar_prs']))}",
"",
]
if not result["similar_prs"]:
lines.append("No similar PRs found in the active run.")
return "\n".join(lines)
for index, row in enumerate(result["similar_prs"], start=1):
lines.append(f"{index}. PR #{row['neighbor_pr_number']} score={row['similarity']:.2f}")
lines.append(
" "
f"content={row['content_similarity']:.2f} "
f"size={row['size_similarity']:.2f} "
f"breadth={row['breadth_similarity']:.2f} "
f"concentration={row['concentration_similarity']:.2f}"
)
if row["shared_filenames"]:
lines.append(f" shared files: {', '.join(row['shared_filenames'][:5])}")
elif row["shared_directories"]:
lines.append(f" shared directories: {', '.join(row['shared_directories'][:5])}")
if row["cluster_ids"]:
lines.append(f" cluster: {row['cluster_ids'][0]}")
return "\n".join(lines)
def format_pr_search_candidate_clusters(result: Mapping[str, Any]) -> str:
lines = [
f"PR #{result['pr']['pr_number']}: candidate scope clusters",
"",
]
if not result["candidate_clusters"]:
lines.append("No candidate clusters found in the active run.")
return "\n".join(lines)
for index, row in enumerate(result["candidate_clusters"], start=1):
lines.append(
f"{index}. {row['cluster_id']} score={row['candidate_score']:.2f} "
f"assigned={'yes' if row['assigned'] else 'no'}"
)
lines.append(f" representative: PR #{row['representative_pr_number']}")
matched = row.get("matched_member_pr_numbers") or []
if matched:
lines.append(f" matched members: {', '.join(f'#{number}' for number in matched)}")
if row.get("reason"):
lines.append(f" reason: {row['reason']}")
return "\n".join(lines)
def format_pr_search_clusters(result: Mapping[str, Any]) -> str:
query = result.get("query") or {}
mode_used = str(query.get("mode_used") or "indexed")
source = str(query.get("source") or "active_index")
lines = [
f"PR #{result['pr']['pr_number']}: cluster context",
"",
f"Lookup: {mode_used} via {source}",
f"Assigned: {result.get('assigned_cluster_count', len(result.get('assigned_clusters') or []))}",
f"Candidates: {result.get('candidate_cluster_count', len(result.get('candidate_clusters') or []))}",
"",
"Assigned clusters:",
]
assigned_clusters = result.get("assigned_clusters") or []
if not assigned_clusters:
lines.append("- none")
else:
for cluster in assigned_clusters:
lines.append(
f"- {cluster['cluster_id']} representative=PR #{cluster['representative_pr_number']} "
f"size={cluster['cluster_size']}"
)
if cluster.get("summary"):
lines.append(f" {cluster['summary']}")
lines.extend(["", "Candidate clusters:"])
candidate_clusters = result.get("candidate_clusters") or []
if not candidate_clusters:
lines.append("- none")
return "\n".join(lines)
for index, row in enumerate(candidate_clusters, start=1):
lines.append(
f"{index}. {row['cluster_id']} score={row['candidate_score']:.2f} "
f"assigned={'yes' if row['assigned'] else 'no'}"
)
lines.append(f" representative: PR #{row['representative_pr_number']}")
matched = row.get("matched_member_pr_numbers") or []
if matched:
lines.append(f" matched members: {', '.join(f'#{number}' for number in matched)}")
if row.get("reason"):
lines.append(f" reason: {row['reason']}")
return "\n".join(lines)
def format_pr_search_cluster(result: Mapping[str, Any]) -> str:
cluster = result["cluster"]
lines = [
f"Cluster {cluster['cluster_id']}",
f"Representative PR: #{cluster['representative_pr_number']}",
f"Members: {result.get('member_count', len(result['members']))}",
f"Average similarity: {cluster['average_similarity']:.2f}",
cluster["summary"],
"",
"Members:",
]
for member in result["members"]:
suffix = " (representative)" if member["member_role"] == "representative" else ""
title = member.get("title") or ""
lines.append(f"- PR #{member['pr_number']}{suffix}: {title}")
return "\n".join(lines)
def format_pr_search_cluster_list(result: Mapping[str, Any]) -> str:
lines = [
f"Repo: {result['repo']}",
f"Active snapshot: {result['snapshot_id']}",
f"Clusters returned: {result.get('cluster_count', len(result.get('clusters') or []))}",
"",
"Clusters:",
]
clusters = result.get("clusters") or []
if not clusters:
lines.append("- none")
return "\n".join(lines)
for index, cluster in enumerate(clusters, start=1):
lines.append(
f"{cluster.get('rank', index)}. {cluster['cluster_id']} representative=PR #{cluster['representative_pr_number']} "
f"size={cluster['cluster_size']} avg={cluster['average_similarity']:.2f}"
)
if cluster.get("representative_title"):
lines.append(f" {cluster['representative_title']}")
if cluster.get("summary"):
lines.append(f" {cluster['summary']}")
return "\n".join(lines)
def format_pr_search_pair(result: Mapping[str, Any]) -> str:
pair = result["pair"]
lines = [
f"PR pair #{result['left_pr']['pr_number']} vs #{result['right_pr']['pr_number']}",
f"Materialized: {'yes' if result['materialized'] else 'no'}",
(
"Scores: "
f"similarity={pair['similarity']:.2f} "
f"content={pair['content_similarity']:.2f} "
f"size={pair['size_similarity']:.2f} "
f"breadth={pair['breadth_similarity']:.2f} "
f"concentration={pair['concentration_similarity']:.2f}"
),
]
if pair["shared_filenames"]:
lines.append(f"Shared files: {', '.join(pair['shared_filenames'][:5])}")
if pair["shared_directories"]:
lines.append(f"Shared directories: {', '.join(pair['shared_directories'][:5])}")
if result["shared_cluster_ids"]:
lines.append(f"Shared clusters: {', '.join(result['shared_cluster_ids'])}")
return "\n".join(lines)
def format_pr_search_probe(result: Mapping[str, Any]) -> str:
lines = [
f"GitHub probe PR #{result['probe_pr']['pr_number']}: {result['probe_pr']['title']}",
"",
f"Compared against active snapshot: {result['snapshot_id']}",
"",
"Similar PRs:",
]
if not result["similar_prs"]:
lines.append("- none above the current similarity threshold")
else:
for index, row in enumerate(result["similar_prs"], start=1):
lines.append(f"{index}. PR #{row['neighbor_pr_number']} score={row['similarity']:.2f}")
lines.append(
" "
f"content={row['content_similarity']:.2f} "
f"size={row['size_similarity']:.2f} "
f"breadth={row['breadth_similarity']:.2f} "
f"concentration={row['concentration_similarity']:.2f}"
)
if row["shared_filenames"]:
lines.append(f" shared files: {', '.join(row['shared_filenames'][:5])}")
elif row["shared_directories"]:
lines.append(f" shared directories: {', '.join(row['shared_directories'][:5])}")
if row["cluster_ids"]:
lines.append(f" cluster: {row['cluster_ids'][0]}")
lines.extend(["", "Candidate clusters:"])
if not result["candidate_clusters"]:
lines.append("- none")
else:
for index, row in enumerate(result["candidate_clusters"], start=1):
lines.append(
f"{index}. {row['cluster_id']} score={row['candidate_score']:.2f} "
f"assigned={'yes' if row['assigned'] else 'no'}"
)
lines.append(f" representative: PR #{row['representative_pr_number']}")
matched = row.get("matched_member_pr_numbers") or []
if matched:
lines.append(f" matched members: {', '.join(f'#{number}' for number in matched)}")
if row.get("reason"):
lines.append(f" reason: {row['reason']}")
return "\n".join(lines)
def format_pr_search_contributor(result: Mapping[str, Any]) -> str:
contributor = result["contributor"]
lines = [
f"Contributor {contributor['author_login']}",
f"Repo: {result['repo']}",
f"Snapshot: {result['snapshot_id']}",
f"Name: {contributor.get('name') or '-'}",
f"Profile: {contributor.get('profile_url') or '-'}",
f"Association: {contributor.get('repo_association') or '-'}",
f"First seen in snapshot: {'yes' if contributor.get('first_seen_in_snapshot') else 'no'}",
(
"Scores: "
f"follow-through={contributor.get('follow_through_score') or '-'} "
f"breadth={contributor.get('breadth_score') or '-'} "
f"risk={contributor.get('automation_risk_signal') or '-'}"
),
f"Heuristic: {contributor.get('heuristic_note') or '-'}",
f"Public orgs: {', '.join(contributor.get('public_orgs') or []) or '-'}",
"",
"Recent indexed PRs:",
]
pulls = result.get("pulls") or []
if not pulls:
lines.append("- none")
return "\n".join(lines)
for row in pulls:
lines.append(
f"- PR #{row['pr_number']}: {row.get('title') or ''} "
f"[state={row.get('state') or '-'} merged={'yes' if row.get('merged') else 'no'}]"
)
return "\n".join(lines)
def format_pr_search_contributor_pulls(result: Mapping[str, Any]) -> str:
contributor = result["contributor"]
lines = [
f"Contributor PRs: {contributor['author_login']}",
f"Repo: {result['repo']}",
f"Snapshot: {result['snapshot_id']}",
f"Pull requests: {result.get('pull_count', len(result.get('pulls') or []))}",
"",
]
pulls = result.get("pulls") or []
if not pulls:
lines.append("No indexed PRs found for that contributor.")
return "\n".join(lines)
for row in pulls:
lines.append(
f"- PR #{row['pr_number']}: {row.get('title') or ''} "
f"(updated={row.get('updated_at') or '-'}, state={row.get('state') or '-'})"
)
return "\n".join(lines)
def format_pr_search_pull_contributor(result: Mapping[str, Any]) -> str:
pr = result["pr"]
contributor = result["contributor"]
return "\n".join(
[
f"PR #{pr['pr_number']}: {pr.get('title') or ''}",
f"Author: {contributor['author_login']}",
f"Risk: {contributor.get('automation_risk_signal') or '-'}",
f"Follow-through: {contributor.get('follow_through_score') or '-'}",
f"Breadth: {contributor.get('breadth_score') or '-'}",
f"Heuristic: {contributor.get('heuristic_note') or '-'}",
f"Profile: {contributor.get('profile_url') or '-'}",
]
)