| |
| """PatchJudge — Main runner script. |
| |
| Runs the full PatchJudge pipeline: |
| 1. Load SWE-bench Verified + agent patches |
| 2. Extract features |
| 3. Judge patches with LLM |
| 4. Validate results |
| 5. Save everything |
| """ |
|
|
| import argparse |
| import json |
| import logging |
| import os |
| import sys |
| import time |
| from pathlib import Path |
| from collections import defaultdict |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", |
| ) |
| logger = logging.getLogger("patchjudge") |
|
|
|
|
| def run_data_loading(args): |
| """Task 1: Load and prepare the dataset.""" |
| from patchjudge.data_loader import SWEBenchLoader, get_diff_stats |
| |
| print("\n" + "=" * 70) |
| print(" Task 1: Data Loading & SWE-bench Setup") |
| print("=" * 70) |
| |
| loader = SWEBenchLoader(cache_dir=args.data_dir) |
| |
| |
| gold = loader.load_gold_data() |
| print(f"\n✅ Loaded {len(gold)} SWE-bench Verified instances") |
| |
| |
| sources = args.sources.split(",") if args.sources else ["coderforge", "o1"] |
| examples = loader.build_dataset(sources=sources) |
| |
| |
| passed = sum(1 for e in examples if e.test_passed) |
| failed = len(examples) - passed |
| repos = set(e.repo for e in examples) |
| agents = set(e.agent_name for e in examples) |
| instances = set(e.instance_id for e in examples) |
| |
| print(f"\n📊 Dataset Summary:") |
| print(f" Total examples: {len(examples)}") |
| print(f" Test passed: {passed} ({passed/len(examples):.1%})") |
| print(f" Test failed: {failed} ({failed/len(examples):.1%})") |
| print(f" Unique instances: {len(instances)}") |
| print(f" Unique repos: {len(repos)}") |
| print(f" Agent sources: {agents}") |
| |
| |
| diff_counts = defaultdict(int) |
| for e in examples: |
| diff_counts[e.difficulty or "unknown"] += 1 |
| print(f"\n Difficulty:") |
| for d, c in sorted(diff_counts.items()): |
| print(f" {d}: {c}") |
| |
| |
| repo_counts = defaultdict(int) |
| for e in examples: |
| repo_counts[e.repo] += 1 |
| print(f"\n Top repos:") |
| for repo, c in sorted(repo_counts.items(), key=lambda x: -x[1])[:10]: |
| print(f" {repo}: {c}") |
| |
| |
| print(f"\n Patch size stats (agent patches):") |
| all_stats = [get_diff_stats(e.agent_patch) for e in examples] |
| for key in ["lines_added", "lines_removed", "files_changed", "hunks"]: |
| values = [s[key] for s in all_stats] |
| if values: |
| import statistics |
| print(f" {key}: mean={statistics.mean(values):.1f}, " |
| f"median={statistics.median(values):.0f}, " |
| f"max={max(values)}") |
| |
| |
| path = loader.save_dataset(examples) |
| print(f"\n💾 Saved to: {path}") |
| |
| return examples, gold |
|
|
|
|
| def run_feature_extraction(examples, args): |
| """Task 2: Extract features from all patches.""" |
| from patchjudge.feature_extractor import FeatureExtractor, extract_features_batch |
| |
| print("\n" + "=" * 70) |
| print(" Task 2: Feature Extraction") |
| print("=" * 70) |
| |
| results = extract_features_batch(examples, show_progress=True) |
| features_list = [f for _, f in results] |
| |
| |
| print(f"\n📐 Feature Summary ({len(features_list)} patches):") |
| |
| bool_features = [ |
| 'has_error_handling', 'has_edge_case_handling', 'has_todos', |
| 'has_hardcoded_values', 'has_debug_statements', 'modifies_core_files', |
| 'has_imports_added', 'touches_tests', |
| ] |
| |
| for feat in bool_features: |
| count = sum(1 for f in features_list if getattr(f, feat)) |
| print(f" {feat:>30}: {count}/{len(features_list)} ({count/len(features_list):.1%})") |
| |
| |
| scope_counts = defaultdict(int) |
| for f in features_list: |
| scope_counts[f.change_scope] += 1 |
| print(f"\n Change scope:") |
| for scope, c in sorted(scope_counts.items()): |
| print(f" {scope}: {c}") |
| |
| |
| coverages = [f.keyword_coverage_ratio for f in features_list] |
| if coverages: |
| import statistics |
| print(f"\n Keyword coverage: " |
| f"mean={statistics.mean(coverages):.2f}, " |
| f"median={statistics.median(coverages):.2f}") |
| |
| |
| features_path = Path(args.data_dir) / "features.jsonl" |
| with open(features_path, 'w') as f: |
| for ex, feat in results: |
| f.write(json.dumps({ |
| "instance_id": ex.instance_id, |
| "agent_name": ex.agent_name, |
| "features": feat.to_dict(), |
| }) + "\n") |
| print(f"\n💾 Features saved to: {features_path}") |
| |
| return features_list |
|
|
|
|
| def run_judging(examples, features_list, args): |
| """Task 3: LLM Judge evaluation.""" |
| from patchjudge.judge import PatchJudge |
| |
| print("\n" + "=" * 70) |
| print(" Task 3: LLM Judge Evaluation") |
| print("=" * 70) |
| |
| |
| n = min(args.judge_count, len(examples)) |
| |
| |
| passed = [i for i, e in enumerate(examples) if e.test_passed] |
| failed = [i for i, e in enumerate(examples) if not e.test_passed] |
| |
| |
| n_passed = min(len(passed), int(n * 0.6)) |
| n_failed = min(len(failed), n - n_passed) |
| n_passed = n - n_failed |
| |
| selected_idx = passed[:n_passed] + failed[:n_failed] |
| selected_examples = [examples[i] for i in selected_idx] |
| selected_features = [features_list[i] for i in selected_idx] if features_list else None |
| |
| print(f"\n🔍 Judging {len(selected_examples)} patches " |
| f"({n_passed} passed, {n_failed} failed)") |
| print(f" Model: {args.model_id}") |
| |
| judge = PatchJudge( |
| model_id=args.model_id, |
| temperature=0.1, |
| max_tokens=2000, |
| ) |
| |
| start = time.time() |
| results = judge.judge_batch( |
| selected_examples, |
| selected_features, |
| show_progress=True, |
| ) |
| elapsed = time.time() - start |
| |
| print(f"\n⏱️ Judging complete in {elapsed:.1f}s " |
| f"({elapsed/len(selected_examples):.1f}s per patch)") |
| |
| |
| results_path = Path(args.data_dir) / "judge_results.jsonl" |
| with open(results_path, 'w') as f: |
| for ex, r in zip(selected_examples, results): |
| f.write(json.dumps({ |
| "instance_id": ex.instance_id, |
| "agent_name": ex.agent_name, |
| "test_passed": ex.test_passed, |
| "merge_score": r.merge_score, |
| "dimension_scores": r.dimension_scores, |
| "model_used": r.model_used, |
| }) + "\n") |
| print(f"💾 Results saved to: {results_path}") |
| |
| return selected_examples, results, judge |
|
|
|
|
| def run_validation(examples, results, gold_data, judge, args): |
| """Task 4: Validate PatchJudge against ground truth.""" |
| from patchjudge.validation import run_full_validation |
| |
| print("\n" + "=" * 70) |
| print(" Task 4: Validation") |
| print("=" * 70) |
| |
| gold_list = list(gold_data.values())[:50] if gold_data else None |
| |
| vr, report = run_full_validation( |
| examples=examples, |
| results=results, |
| gold_data=gold_list, |
| judge=judge if args.validate_known_bad else None, |
| ) |
| |
| print(report) |
| |
| |
| val_path = Path(args.data_dir) / "validation_results.json" |
| with open(val_path, 'w') as f: |
| json.dump(vr.to_dict(), f, indent=2) |
| print(f"\n💾 Validation results saved to: {val_path}") |
| |
| |
| report_path = Path(args.data_dir) / "validation_report.txt" |
| with open(report_path, 'w') as f: |
| f.write(report) |
| print(f"💾 Report saved to: {report_path}") |
| |
| return vr |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="PatchJudge - Post-Test Code Quality Scorer") |
| parser.add_argument("--data-dir", default="data", help="Data directory") |
| parser.add_argument("--sources", default="coderforge,o1", |
| help="Comma-separated data sources: coderforge,o1,s3") |
| parser.add_argument("--model-id", default="Qwen/Qwen2.5-Coder-32B-Instruct", |
| help="LLM model for judging") |
| parser.add_argument("--judge-count", type=int, default=50, |
| help="Number of patches to judge") |
| parser.add_argument("--validate-known-bad", action="store_true", |
| help="Also generate and judge known-bad patches for validation") |
| parser.add_argument("--tasks", default="1,2,3,4", |
| help="Comma-separated task numbers to run (1=load, 2=features, 3=judge, 4=validate)") |
| parser.add_argument("--load-cached", action="store_true", |
| help="Load previously saved dataset instead of re-downloading") |
| |
| args = parser.parse_args() |
| tasks = [int(t) for t in args.tasks.split(",")] |
| |
| os.makedirs(args.data_dir, exist_ok=True) |
| |
| examples = None |
| features_list = None |
| results = None |
| gold_data = None |
| judge = None |
| |
| |
| if 1 in tasks: |
| if args.load_cached: |
| from patchjudge.data_loader import SWEBenchLoader |
| loader = SWEBenchLoader(cache_dir=args.data_dir) |
| examples = loader.load_saved_dataset() |
| gold_data = loader.load_gold_data() |
| else: |
| examples, gold_data = run_data_loading(args) |
| |
| |
| if 2 in tasks and examples: |
| features_list = run_feature_extraction(examples, args) |
| |
| |
| if 3 in tasks and examples: |
| if features_list is None: |
| |
| features_list = run_feature_extraction(examples, args) |
| examples, results, judge = run_judging(examples, features_list, args) |
| |
| |
| if 4 in tasks and results: |
| run_validation(examples, results, gold_data, judge, args) |
| |
| print("\n✅ PatchJudge pipeline complete!") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|