Spaces:
Running
Running
| from __future__ import annotations | |
| from collections.abc import Mapping | |
| from typing import Any | |
| from slop_farmer.reports import pr_search_service | |
| run_pr_search_refresh = pr_search_service.run_pr_search_refresh | |
| get_pr_search_status = pr_search_service.get_pr_search_status | |
| get_pr_search_similar = pr_search_service.get_pr_search_similar | |
| get_pr_search_similar_lookup = pr_search_service.get_pr_search_similar_lookup | |
| get_pr_search_candidate_clusters = pr_search_service.get_pr_search_candidate_clusters | |
| get_pr_search_contributor = pr_search_service.get_pr_search_contributor | |
| get_pr_search_contributor_pulls = pr_search_service.get_pr_search_contributor_pulls | |
| get_pr_search_clusters = pr_search_service.get_pr_search_clusters | |
| list_pr_search_clusters = pr_search_service.list_pr_search_clusters | |
| get_pr_search_cluster = pr_search_service.get_pr_search_cluster | |
| get_pr_search_pull_contributor = pr_search_service.get_pr_search_pull_contributor | |
| explain_pr_search_pair = pr_search_service.explain_pr_search_pair | |
| probe_pr_search_live = pr_search_service.probe_pr_search_live | |
| probe_pr_search_github = pr_search_service.probe_pr_search_github | |
| resolve_pr_search_db_path = pr_search_service.resolve_pr_search_db_path | |
| def format_pr_search_status(result: Mapping[str, Any]) -> str: | |
| counts = result["row_counts"] | |
| return "\n".join( | |
| [ | |
| f"Repo: {result['repo']}", | |
| f"Active run: {result['id']}", | |
| f"Snapshot: {result['snapshot_id']}", | |
| f"Source: {result['source_type']}", | |
| f"Finished: {result.get('finished_at') or 'running'}", | |
| ( | |
| "Rows: " | |
| f"documents={counts['documents']} " | |
| f"contributors={counts.get('contributors', 0)} " | |
| f"features={counts['features']} " | |
| f"neighbors={counts['neighbors']} " | |
| f"clusters={counts['clusters']} " | |
| f"candidates={counts['cluster_candidates']}" | |
| ), | |
| ] | |
| ) | |
| def format_pr_search_similar(result: Mapping[str, Any]) -> str: | |
| query = result.get("query") or {} | |
| mode_used = str(query.get("mode_used") or "indexed") | |
| source = str(query.get("source") or "active_index") | |
| lines = [ | |
| f"PR #{result['pr']['pr_number']}: {result['pr']['title']}", | |
| "", | |
| f"Active snapshot: {result['snapshot_id']}", | |
| f"Lookup: {mode_used} via {source}", | |
| f"Matches: {result.get('similar_count', len(result['similar_prs']))}", | |
| "", | |
| ] | |
| if not result["similar_prs"]: | |
| lines.append("No similar PRs found in the active run.") | |
| return "\n".join(lines) | |
| for index, row in enumerate(result["similar_prs"], start=1): | |
| lines.append(f"{index}. PR #{row['neighbor_pr_number']} score={row['similarity']:.2f}") | |
| lines.append( | |
| " " | |
| f"content={row['content_similarity']:.2f} " | |
| f"size={row['size_similarity']:.2f} " | |
| f"breadth={row['breadth_similarity']:.2f} " | |
| f"concentration={row['concentration_similarity']:.2f}" | |
| ) | |
| if row["shared_filenames"]: | |
| lines.append(f" shared files: {', '.join(row['shared_filenames'][:5])}") | |
| elif row["shared_directories"]: | |
| lines.append(f" shared directories: {', '.join(row['shared_directories'][:5])}") | |
| if row["cluster_ids"]: | |
| lines.append(f" cluster: {row['cluster_ids'][0]}") | |
| return "\n".join(lines) | |
| def format_pr_search_candidate_clusters(result: Mapping[str, Any]) -> str: | |
| lines = [ | |
| f"PR #{result['pr']['pr_number']}: candidate scope clusters", | |
| "", | |
| ] | |
| if not result["candidate_clusters"]: | |
| lines.append("No candidate clusters found in the active run.") | |
| return "\n".join(lines) | |
| for index, row in enumerate(result["candidate_clusters"], start=1): | |
| lines.append( | |
| f"{index}. {row['cluster_id']} score={row['candidate_score']:.2f} " | |
| f"assigned={'yes' if row['assigned'] else 'no'}" | |
| ) | |
| lines.append(f" representative: PR #{row['representative_pr_number']}") | |
| matched = row.get("matched_member_pr_numbers") or [] | |
| if matched: | |
| lines.append(f" matched members: {', '.join(f'#{number}' for number in matched)}") | |
| if row.get("reason"): | |
| lines.append(f" reason: {row['reason']}") | |
| return "\n".join(lines) | |
| def format_pr_search_clusters(result: Mapping[str, Any]) -> str: | |
| query = result.get("query") or {} | |
| mode_used = str(query.get("mode_used") or "indexed") | |
| source = str(query.get("source") or "active_index") | |
| lines = [ | |
| f"PR #{result['pr']['pr_number']}: cluster context", | |
| "", | |
| f"Lookup: {mode_used} via {source}", | |
| f"Assigned: {result.get('assigned_cluster_count', len(result.get('assigned_clusters') or []))}", | |
| f"Candidates: {result.get('candidate_cluster_count', len(result.get('candidate_clusters') or []))}", | |
| "", | |
| "Assigned clusters:", | |
| ] | |
| assigned_clusters = result.get("assigned_clusters") or [] | |
| if not assigned_clusters: | |
| lines.append("- none") | |
| else: | |
| for cluster in assigned_clusters: | |
| lines.append( | |
| f"- {cluster['cluster_id']} representative=PR #{cluster['representative_pr_number']} " | |
| f"size={cluster['cluster_size']}" | |
| ) | |
| if cluster.get("summary"): | |
| lines.append(f" {cluster['summary']}") | |
| lines.extend(["", "Candidate clusters:"]) | |
| candidate_clusters = result.get("candidate_clusters") or [] | |
| if not candidate_clusters: | |
| lines.append("- none") | |
| return "\n".join(lines) | |
| for index, row in enumerate(candidate_clusters, start=1): | |
| lines.append( | |
| f"{index}. {row['cluster_id']} score={row['candidate_score']:.2f} " | |
| f"assigned={'yes' if row['assigned'] else 'no'}" | |
| ) | |
| lines.append(f" representative: PR #{row['representative_pr_number']}") | |
| matched = row.get("matched_member_pr_numbers") or [] | |
| if matched: | |
| lines.append(f" matched members: {', '.join(f'#{number}' for number in matched)}") | |
| if row.get("reason"): | |
| lines.append(f" reason: {row['reason']}") | |
| return "\n".join(lines) | |
| def format_pr_search_cluster(result: Mapping[str, Any]) -> str: | |
| cluster = result["cluster"] | |
| lines = [ | |
| f"Cluster {cluster['cluster_id']}", | |
| f"Representative PR: #{cluster['representative_pr_number']}", | |
| f"Members: {result.get('member_count', len(result['members']))}", | |
| f"Average similarity: {cluster['average_similarity']:.2f}", | |
| cluster["summary"], | |
| "", | |
| "Members:", | |
| ] | |
| for member in result["members"]: | |
| suffix = " (representative)" if member["member_role"] == "representative" else "" | |
| title = member.get("title") or "" | |
| lines.append(f"- PR #{member['pr_number']}{suffix}: {title}") | |
| return "\n".join(lines) | |
| def format_pr_search_cluster_list(result: Mapping[str, Any]) -> str: | |
| lines = [ | |
| f"Repo: {result['repo']}", | |
| f"Active snapshot: {result['snapshot_id']}", | |
| f"Clusters returned: {result.get('cluster_count', len(result.get('clusters') or []))}", | |
| "", | |
| "Clusters:", | |
| ] | |
| clusters = result.get("clusters") or [] | |
| if not clusters: | |
| lines.append("- none") | |
| return "\n".join(lines) | |
| for index, cluster in enumerate(clusters, start=1): | |
| lines.append( | |
| f"{cluster.get('rank', index)}. {cluster['cluster_id']} representative=PR #{cluster['representative_pr_number']} " | |
| f"size={cluster['cluster_size']} avg={cluster['average_similarity']:.2f}" | |
| ) | |
| if cluster.get("representative_title"): | |
| lines.append(f" {cluster['representative_title']}") | |
| if cluster.get("summary"): | |
| lines.append(f" {cluster['summary']}") | |
| return "\n".join(lines) | |
| def format_pr_search_pair(result: Mapping[str, Any]) -> str: | |
| pair = result["pair"] | |
| lines = [ | |
| f"PR pair #{result['left_pr']['pr_number']} vs #{result['right_pr']['pr_number']}", | |
| f"Materialized: {'yes' if result['materialized'] else 'no'}", | |
| ( | |
| "Scores: " | |
| f"similarity={pair['similarity']:.2f} " | |
| f"content={pair['content_similarity']:.2f} " | |
| f"size={pair['size_similarity']:.2f} " | |
| f"breadth={pair['breadth_similarity']:.2f} " | |
| f"concentration={pair['concentration_similarity']:.2f}" | |
| ), | |
| ] | |
| if pair["shared_filenames"]: | |
| lines.append(f"Shared files: {', '.join(pair['shared_filenames'][:5])}") | |
| if pair["shared_directories"]: | |
| lines.append(f"Shared directories: {', '.join(pair['shared_directories'][:5])}") | |
| if result["shared_cluster_ids"]: | |
| lines.append(f"Shared clusters: {', '.join(result['shared_cluster_ids'])}") | |
| return "\n".join(lines) | |
| def format_pr_search_probe(result: Mapping[str, Any]) -> str: | |
| lines = [ | |
| f"GitHub probe PR #{result['probe_pr']['pr_number']}: {result['probe_pr']['title']}", | |
| "", | |
| f"Compared against active snapshot: {result['snapshot_id']}", | |
| "", | |
| "Similar PRs:", | |
| ] | |
| if not result["similar_prs"]: | |
| lines.append("- none above the current similarity threshold") | |
| else: | |
| for index, row in enumerate(result["similar_prs"], start=1): | |
| lines.append(f"{index}. PR #{row['neighbor_pr_number']} score={row['similarity']:.2f}") | |
| lines.append( | |
| " " | |
| f"content={row['content_similarity']:.2f} " | |
| f"size={row['size_similarity']:.2f} " | |
| f"breadth={row['breadth_similarity']:.2f} " | |
| f"concentration={row['concentration_similarity']:.2f}" | |
| ) | |
| if row["shared_filenames"]: | |
| lines.append(f" shared files: {', '.join(row['shared_filenames'][:5])}") | |
| elif row["shared_directories"]: | |
| lines.append(f" shared directories: {', '.join(row['shared_directories'][:5])}") | |
| if row["cluster_ids"]: | |
| lines.append(f" cluster: {row['cluster_ids'][0]}") | |
| lines.extend(["", "Candidate clusters:"]) | |
| if not result["candidate_clusters"]: | |
| lines.append("- none") | |
| else: | |
| for index, row in enumerate(result["candidate_clusters"], start=1): | |
| lines.append( | |
| f"{index}. {row['cluster_id']} score={row['candidate_score']:.2f} " | |
| f"assigned={'yes' if row['assigned'] else 'no'}" | |
| ) | |
| lines.append(f" representative: PR #{row['representative_pr_number']}") | |
| matched = row.get("matched_member_pr_numbers") or [] | |
| if matched: | |
| lines.append(f" matched members: {', '.join(f'#{number}' for number in matched)}") | |
| if row.get("reason"): | |
| lines.append(f" reason: {row['reason']}") | |
| return "\n".join(lines) | |
| def format_pr_search_contributor(result: Mapping[str, Any]) -> str: | |
| contributor = result["contributor"] | |
| lines = [ | |
| f"Contributor {contributor['author_login']}", | |
| f"Repo: {result['repo']}", | |
| f"Snapshot: {result['snapshot_id']}", | |
| f"Name: {contributor.get('name') or '-'}", | |
| f"Profile: {contributor.get('profile_url') or '-'}", | |
| f"Association: {contributor.get('repo_association') or '-'}", | |
| f"First seen in snapshot: {'yes' if contributor.get('first_seen_in_snapshot') else 'no'}", | |
| ( | |
| "Scores: " | |
| f"follow-through={contributor.get('follow_through_score') or '-'} " | |
| f"breadth={contributor.get('breadth_score') or '-'} " | |
| f"risk={contributor.get('automation_risk_signal') or '-'}" | |
| ), | |
| f"Heuristic: {contributor.get('heuristic_note') or '-'}", | |
| f"Public orgs: {', '.join(contributor.get('public_orgs') or []) or '-'}", | |
| "", | |
| "Recent indexed PRs:", | |
| ] | |
| pulls = result.get("pulls") or [] | |
| if not pulls: | |
| lines.append("- none") | |
| return "\n".join(lines) | |
| for row in pulls: | |
| lines.append( | |
| f"- PR #{row['pr_number']}: {row.get('title') or ''} " | |
| f"[state={row.get('state') or '-'} merged={'yes' if row.get('merged') else 'no'}]" | |
| ) | |
| return "\n".join(lines) | |
| def format_pr_search_contributor_pulls(result: Mapping[str, Any]) -> str: | |
| contributor = result["contributor"] | |
| lines = [ | |
| f"Contributor PRs: {contributor['author_login']}", | |
| f"Repo: {result['repo']}", | |
| f"Snapshot: {result['snapshot_id']}", | |
| f"Pull requests: {result.get('pull_count', len(result.get('pulls') or []))}", | |
| "", | |
| ] | |
| pulls = result.get("pulls") or [] | |
| if not pulls: | |
| lines.append("No indexed PRs found for that contributor.") | |
| return "\n".join(lines) | |
| for row in pulls: | |
| lines.append( | |
| f"- PR #{row['pr_number']}: {row.get('title') or ''} " | |
| f"(updated={row.get('updated_at') or '-'}, state={row.get('state') or '-'})" | |
| ) | |
| return "\n".join(lines) | |
| def format_pr_search_pull_contributor(result: Mapping[str, Any]) -> str: | |
| pr = result["pr"] | |
| contributor = result["contributor"] | |
| return "\n".join( | |
| [ | |
| f"PR #{pr['pr_number']}: {pr.get('title') or ''}", | |
| f"Author: {contributor['author_login']}", | |
| f"Risk: {contributor.get('automation_risk_signal') or '-'}", | |
| f"Follow-through: {contributor.get('follow_through_score') or '-'}", | |
| f"Breadth: {contributor.get('breadth_score') or '-'}", | |
| f"Heuristic: {contributor.get('heuristic_note') or '-'}", | |
| f"Profile: {contributor.get('profile_url') or '-'}", | |
| ] | |
| ) | |