from __future__ import annotations from collections.abc import Mapping from typing import Any from slop_farmer.reports import pr_search_service run_pr_search_refresh = pr_search_service.run_pr_search_refresh get_pr_search_status = pr_search_service.get_pr_search_status get_pr_search_similar = pr_search_service.get_pr_search_similar get_pr_search_similar_lookup = pr_search_service.get_pr_search_similar_lookup get_pr_search_candidate_clusters = pr_search_service.get_pr_search_candidate_clusters get_pr_search_contributor = pr_search_service.get_pr_search_contributor get_pr_search_contributor_pulls = pr_search_service.get_pr_search_contributor_pulls get_pr_search_clusters = pr_search_service.get_pr_search_clusters list_pr_search_clusters = pr_search_service.list_pr_search_clusters get_pr_search_cluster = pr_search_service.get_pr_search_cluster get_pr_search_pull_contributor = pr_search_service.get_pr_search_pull_contributor explain_pr_search_pair = pr_search_service.explain_pr_search_pair probe_pr_search_live = pr_search_service.probe_pr_search_live probe_pr_search_github = pr_search_service.probe_pr_search_github resolve_pr_search_db_path = pr_search_service.resolve_pr_search_db_path def format_pr_search_status(result: Mapping[str, Any]) -> str: counts = result["row_counts"] return "\n".join( [ f"Repo: {result['repo']}", f"Active run: {result['id']}", f"Snapshot: {result['snapshot_id']}", f"Source: {result['source_type']}", f"Finished: {result.get('finished_at') or 'running'}", ( "Rows: " f"documents={counts['documents']} " f"contributors={counts.get('contributors', 0)} " f"features={counts['features']} " f"neighbors={counts['neighbors']} " f"clusters={counts['clusters']} " f"candidates={counts['cluster_candidates']}" ), ] ) def format_pr_search_similar(result: Mapping[str, Any]) -> str: query = result.get("query") or {} mode_used = str(query.get("mode_used") or "indexed") source = str(query.get("source") or "active_index") lines = [ f"PR #{result['pr']['pr_number']}: {result['pr']['title']}", "", f"Active snapshot: {result['snapshot_id']}", f"Lookup: {mode_used} via {source}", f"Matches: {result.get('similar_count', len(result['similar_prs']))}", "", ] if not result["similar_prs"]: lines.append("No similar PRs found in the active run.") return "\n".join(lines) for index, row in enumerate(result["similar_prs"], start=1): lines.append(f"{index}. PR #{row['neighbor_pr_number']} score={row['similarity']:.2f}") lines.append( " " f"content={row['content_similarity']:.2f} " f"size={row['size_similarity']:.2f} " f"breadth={row['breadth_similarity']:.2f} " f"concentration={row['concentration_similarity']:.2f}" ) if row["shared_filenames"]: lines.append(f" shared files: {', '.join(row['shared_filenames'][:5])}") elif row["shared_directories"]: lines.append(f" shared directories: {', '.join(row['shared_directories'][:5])}") if row["cluster_ids"]: lines.append(f" cluster: {row['cluster_ids'][0]}") return "\n".join(lines) def format_pr_search_candidate_clusters(result: Mapping[str, Any]) -> str: lines = [ f"PR #{result['pr']['pr_number']}: candidate scope clusters", "", ] if not result["candidate_clusters"]: lines.append("No candidate clusters found in the active run.") return "\n".join(lines) for index, row in enumerate(result["candidate_clusters"], start=1): lines.append( f"{index}. {row['cluster_id']} score={row['candidate_score']:.2f} " f"assigned={'yes' if row['assigned'] else 'no'}" ) lines.append(f" representative: PR #{row['representative_pr_number']}") matched = row.get("matched_member_pr_numbers") or [] if matched: lines.append(f" matched members: {', '.join(f'#{number}' for number in matched)}") if row.get("reason"): lines.append(f" reason: {row['reason']}") return "\n".join(lines) def format_pr_search_clusters(result: Mapping[str, Any]) -> str: query = result.get("query") or {} mode_used = str(query.get("mode_used") or "indexed") source = str(query.get("source") or "active_index") lines = [ f"PR #{result['pr']['pr_number']}: cluster context", "", f"Lookup: {mode_used} via {source}", f"Assigned: {result.get('assigned_cluster_count', len(result.get('assigned_clusters') or []))}", f"Candidates: {result.get('candidate_cluster_count', len(result.get('candidate_clusters') or []))}", "", "Assigned clusters:", ] assigned_clusters = result.get("assigned_clusters") or [] if not assigned_clusters: lines.append("- none") else: for cluster in assigned_clusters: lines.append( f"- {cluster['cluster_id']} representative=PR #{cluster['representative_pr_number']} " f"size={cluster['cluster_size']}" ) if cluster.get("summary"): lines.append(f" {cluster['summary']}") lines.extend(["", "Candidate clusters:"]) candidate_clusters = result.get("candidate_clusters") or [] if not candidate_clusters: lines.append("- none") return "\n".join(lines) for index, row in enumerate(candidate_clusters, start=1): lines.append( f"{index}. {row['cluster_id']} score={row['candidate_score']:.2f} " f"assigned={'yes' if row['assigned'] else 'no'}" ) lines.append(f" representative: PR #{row['representative_pr_number']}") matched = row.get("matched_member_pr_numbers") or [] if matched: lines.append(f" matched members: {', '.join(f'#{number}' for number in matched)}") if row.get("reason"): lines.append(f" reason: {row['reason']}") return "\n".join(lines) def format_pr_search_cluster(result: Mapping[str, Any]) -> str: cluster = result["cluster"] lines = [ f"Cluster {cluster['cluster_id']}", f"Representative PR: #{cluster['representative_pr_number']}", f"Members: {result.get('member_count', len(result['members']))}", f"Average similarity: {cluster['average_similarity']:.2f}", cluster["summary"], "", "Members:", ] for member in result["members"]: suffix = " (representative)" if member["member_role"] == "representative" else "" title = member.get("title") or "" lines.append(f"- PR #{member['pr_number']}{suffix}: {title}") return "\n".join(lines) def format_pr_search_cluster_list(result: Mapping[str, Any]) -> str: lines = [ f"Repo: {result['repo']}", f"Active snapshot: {result['snapshot_id']}", f"Clusters returned: {result.get('cluster_count', len(result.get('clusters') or []))}", "", "Clusters:", ] clusters = result.get("clusters") or [] if not clusters: lines.append("- none") return "\n".join(lines) for index, cluster in enumerate(clusters, start=1): lines.append( f"{cluster.get('rank', index)}. {cluster['cluster_id']} representative=PR #{cluster['representative_pr_number']} " f"size={cluster['cluster_size']} avg={cluster['average_similarity']:.2f}" ) if cluster.get("representative_title"): lines.append(f" {cluster['representative_title']}") if cluster.get("summary"): lines.append(f" {cluster['summary']}") return "\n".join(lines) def format_pr_search_pair(result: Mapping[str, Any]) -> str: pair = result["pair"] lines = [ f"PR pair #{result['left_pr']['pr_number']} vs #{result['right_pr']['pr_number']}", f"Materialized: {'yes' if result['materialized'] else 'no'}", ( "Scores: " f"similarity={pair['similarity']:.2f} " f"content={pair['content_similarity']:.2f} " f"size={pair['size_similarity']:.2f} " f"breadth={pair['breadth_similarity']:.2f} " f"concentration={pair['concentration_similarity']:.2f}" ), ] if pair["shared_filenames"]: lines.append(f"Shared files: {', '.join(pair['shared_filenames'][:5])}") if pair["shared_directories"]: lines.append(f"Shared directories: {', '.join(pair['shared_directories'][:5])}") if result["shared_cluster_ids"]: lines.append(f"Shared clusters: {', '.join(result['shared_cluster_ids'])}") return "\n".join(lines) def format_pr_search_probe(result: Mapping[str, Any]) -> str: lines = [ f"GitHub probe PR #{result['probe_pr']['pr_number']}: {result['probe_pr']['title']}", "", f"Compared against active snapshot: {result['snapshot_id']}", "", "Similar PRs:", ] if not result["similar_prs"]: lines.append("- none above the current similarity threshold") else: for index, row in enumerate(result["similar_prs"], start=1): lines.append(f"{index}. PR #{row['neighbor_pr_number']} score={row['similarity']:.2f}") lines.append( " " f"content={row['content_similarity']:.2f} " f"size={row['size_similarity']:.2f} " f"breadth={row['breadth_similarity']:.2f} " f"concentration={row['concentration_similarity']:.2f}" ) if row["shared_filenames"]: lines.append(f" shared files: {', '.join(row['shared_filenames'][:5])}") elif row["shared_directories"]: lines.append(f" shared directories: {', '.join(row['shared_directories'][:5])}") if row["cluster_ids"]: lines.append(f" cluster: {row['cluster_ids'][0]}") lines.extend(["", "Candidate clusters:"]) if not result["candidate_clusters"]: lines.append("- none") else: for index, row in enumerate(result["candidate_clusters"], start=1): lines.append( f"{index}. {row['cluster_id']} score={row['candidate_score']:.2f} " f"assigned={'yes' if row['assigned'] else 'no'}" ) lines.append(f" representative: PR #{row['representative_pr_number']}") matched = row.get("matched_member_pr_numbers") or [] if matched: lines.append(f" matched members: {', '.join(f'#{number}' for number in matched)}") if row.get("reason"): lines.append(f" reason: {row['reason']}") return "\n".join(lines) def format_pr_search_contributor(result: Mapping[str, Any]) -> str: contributor = result["contributor"] lines = [ f"Contributor {contributor['author_login']}", f"Repo: {result['repo']}", f"Snapshot: {result['snapshot_id']}", f"Name: {contributor.get('name') or '-'}", f"Profile: {contributor.get('profile_url') or '-'}", f"Association: {contributor.get('repo_association') or '-'}", f"First seen in snapshot: {'yes' if contributor.get('first_seen_in_snapshot') else 'no'}", ( "Scores: " f"follow-through={contributor.get('follow_through_score') or '-'} " f"breadth={contributor.get('breadth_score') or '-'} " f"risk={contributor.get('automation_risk_signal') or '-'}" ), f"Heuristic: {contributor.get('heuristic_note') or '-'}", f"Public orgs: {', '.join(contributor.get('public_orgs') or []) or '-'}", "", "Recent indexed PRs:", ] pulls = result.get("pulls") or [] if not pulls: lines.append("- none") return "\n".join(lines) for row in pulls: lines.append( f"- PR #{row['pr_number']}: {row.get('title') or ''} " f"[state={row.get('state') or '-'} merged={'yes' if row.get('merged') else 'no'}]" ) return "\n".join(lines) def format_pr_search_contributor_pulls(result: Mapping[str, Any]) -> str: contributor = result["contributor"] lines = [ f"Contributor PRs: {contributor['author_login']}", f"Repo: {result['repo']}", f"Snapshot: {result['snapshot_id']}", f"Pull requests: {result.get('pull_count', len(result.get('pulls') or []))}", "", ] pulls = result.get("pulls") or [] if not pulls: lines.append("No indexed PRs found for that contributor.") return "\n".join(lines) for row in pulls: lines.append( f"- PR #{row['pr_number']}: {row.get('title') or ''} " f"(updated={row.get('updated_at') or '-'}, state={row.get('state') or '-'})" ) return "\n".join(lines) def format_pr_search_pull_contributor(result: Mapping[str, Any]) -> str: pr = result["pr"] contributor = result["contributor"] return "\n".join( [ f"PR #{pr['pr_number']}: {pr.get('title') or ''}", f"Author: {contributor['author_login']}", f"Risk: {contributor.get('automation_risk_signal') or '-'}", f"Follow-through: {contributor.get('follow_through_score') or '-'}", f"Breadth: {contributor.get('breadth_score') or '-'}", f"Heuristic: {contributor.get('heuristic_note') or '-'}", f"Profile: {contributor.get('profile_url') or '-'}", ] )