| |
| """PatchJudge batch evaluation runner. |
| |
| Judges 150 patches (mix of test-passing and test-failing from 2 agents) |
| plus 50 known-bad patches, then runs full validation. |
| """ |
|
|
| import json |
| import logging |
| import os |
| import sys |
| import time |
| import statistics |
| from pathlib import Path |
| from collections import defaultdict |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(message)s", |
| ) |
| logger = logging.getLogger("patchjudge-batch") |
|
|
|
|
| def main(): |
| from patchjudge.data_loader import SWEBenchLoader |
| from patchjudge.feature_extractor import FeatureExtractor, extract_features_batch |
| from patchjudge.judge import PatchJudge |
| from patchjudge.validation import ( |
| KnownBadPatchGenerator, PatchJudgeValidator, run_full_validation, |
| ) |
| from patchjudge.models import PatchExample |
| |
| data_dir = Path("data") |
| data_dir.mkdir(exist_ok=True) |
| |
| |
| |
| |
| print("=" * 70) |
| print(" STEP 1: Loading Data") |
| print("=" * 70) |
| |
| loader = SWEBenchLoader(cache_dir="data") |
| gold = loader.load_gold_data() |
| examples = loader.build_dataset(sources=["coderforge", "o1"]) |
| |
| passed_examples = [e for e in examples if e.test_passed] |
| failed_examples = [e for e in examples if not e.test_passed] |
| |
| print(f"\nTotal examples: {len(examples)}") |
| print(f" Passed: {len(passed_examples)}") |
| print(f" Failed: {len(failed_examples)}") |
| |
| |
| |
| coderforge_passed = [e for e in passed_examples if e.agent_name == "CoderForge-Qwen3-32B"][:50] |
| o1_passed = [e for e in passed_examples if e.agent_name == "OpenHands-O1-reasoning-high"][:50] |
| coderforge_failed = [e for e in failed_examples if e.agent_name == "CoderForge-Qwen3-32B"][:30] |
| o1_failed = [e for e in failed_examples if e.agent_name == "OpenHands-O1-reasoning-high"][:30] |
| |
| judge_examples = coderforge_passed + o1_passed + coderforge_failed + o1_failed |
| print(f"\nSelected {len(judge_examples)} examples for judging:") |
| print(f" CoderForge passed: {len(coderforge_passed)}") |
| print(f" O1 passed: {len(o1_passed)}") |
| print(f" CoderForge failed: {len(coderforge_failed)}") |
| print(f" O1 failed: {len(o1_failed)}") |
| |
| |
| |
| |
| print("\n" + "=" * 70) |
| print(" STEP 2: Feature Extraction") |
| print("=" * 70) |
| |
| feat_results = extract_features_batch(judge_examples, show_progress=True) |
| features_list = [f for _, f in feat_results] |
| |
| |
| bool_features = [ |
| 'has_error_handling', 'has_edge_case_handling', 'has_todos', |
| 'has_hardcoded_values', 'has_debug_statements', |
| ] |
| for feat_name in bool_features: |
| count = sum(1 for f in features_list if getattr(f, feat_name)) |
| print(f" {feat_name:>30}: {count}/{len(features_list)} ({count/len(features_list):.1%})") |
| |
| |
| |
| |
| print("\n" + "=" * 70) |
| print(" STEP 3: LLM Judge Evaluation") |
| print("=" * 70) |
| |
| model_id = "Qwen/Qwen2.5-Coder-32B-Instruct" |
| print(f"\nModel: {model_id}") |
| |
| judge = PatchJudge( |
| model_id=model_id, |
| temperature=0.1, |
| max_tokens=2000, |
| max_retries=3, |
| ) |
| |
| start_time = time.time() |
| results = [] |
| |
| for i, (ex, feat) in enumerate(zip(judge_examples, features_list)): |
| print(f"\n [{i+1}/{len(judge_examples)}] {ex.instance_id} ({ex.agent_name})") |
| print(f" Test: {'PASS' if ex.test_passed else 'FAIL'}, " |
| f"Files: {feat.num_files_changed}, " |
| f"Lines: +{feat.num_lines_added}/-{feat.num_lines_removed}") |
| |
| try: |
| result = judge.judge(ex, feat) |
| results.append(result) |
| |
| print(f" MergeScore: {result.merge_score:.1f}/100") |
| for dim in ["correctness", "completeness", "code_quality", |
| "non_regression_risk", "merge_readiness"]: |
| score = result.dimension_scores.get(dim, {}).get("score", "?") |
| print(f" {dim}: {score}/10") |
| |
| except Exception as e: |
| logger.error(f" ERROR: {e}") |
| from patchjudge.models import JudgeResult |
| results.append(JudgeResult( |
| merge_score=0.0, |
| dimension_scores={ |
| dim: {"score": 0, "reasoning": f"Error: {str(e)}", "flags": ["ERROR"]} |
| for dim in judge.DIMENSIONS |
| }, |
| raw_output=f"ERROR: {str(e)}", |
| model_used=model_id, |
| )) |
| |
| |
| time.sleep(0.3) |
| |
| |
| if (i + 1) % 10 == 0: |
| _save_results(data_dir, judge_examples[:i+1], results) |
| elapsed = time.time() - start_time |
| rate = (i + 1) / elapsed * 60 |
| remaining = (len(judge_examples) - i - 1) / (rate / 60) |
| print(f"\n --- Progress: {i+1}/{len(judge_examples)} | " |
| f"{rate:.1f}/min | ETA: {remaining:.0f}s ---") |
| |
| elapsed = time.time() - start_time |
| print(f"\n\nJudging complete: {len(results)} patches in {elapsed:.0f}s " |
| f"({elapsed/len(results):.1f}s avg)") |
| |
| |
| _save_results(data_dir, judge_examples, results) |
| |
| |
| |
| |
| print("\n" + "=" * 70) |
| print(" STEP 4: Known-Bad Patch Detection") |
| print("=" * 70) |
| |
| gold_list = list(gold.values())[:30] |
| bad_patches = KnownBadPatchGenerator.generate_all(gold_list) |
| |
| |
| bad_to_judge = bad_patches[:50] |
| print(f"\nJudging {len(bad_to_judge)} known-bad patches...") |
| |
| bad_features = [FeatureExtractor().extract(bp) for bp in bad_to_judge] |
| bad_results = [] |
| |
| for i, (bp, bf) in enumerate(zip(bad_to_judge, bad_features)): |
| print(f" [{i+1}/{len(bad_to_judge)}] {bp.agent_name}: {bp.instance_id}") |
| try: |
| result = judge.judge(bp, bf) |
| bad_results.append(result) |
| print(f" MergeScore: {result.merge_score:.1f}/100") |
| except Exception as e: |
| logger.error(f" ERROR: {e}") |
| from patchjudge.models import JudgeResult |
| bad_results.append(JudgeResult( |
| merge_score=0.0, |
| dimension_scores={ |
| dim: {"score": 0, "reasoning": f"Error: {str(e)}", "flags": ["ERROR"]} |
| for dim in judge.DIMENSIONS |
| }, |
| model_used=model_id, |
| )) |
| time.sleep(0.3) |
| |
| known_bad_pairs = list(zip(bad_to_judge, bad_results)) |
| |
| |
| with open(data_dir / "known_bad_results.jsonl", 'w') as f: |
| for bp, br in known_bad_pairs: |
| f.write(json.dumps({ |
| "instance_id": bp.instance_id, |
| "agent_name": bp.agent_name, |
| "merge_score": br.merge_score, |
| "dimension_scores": br.dimension_scores, |
| }) + "\n") |
| |
| |
| |
| |
| print("\n" + "=" * 70) |
| print(" STEP 5: Validation Report") |
| print("=" * 70) |
| |
| validator = PatchJudgeValidator() |
| vr = validator.validate(judge_examples, results, known_bad_pairs) |
| report = validator.print_report(vr, judge_examples, results) |
| |
| print(report) |
| |
| |
| with open(data_dir / "validation_results.json", 'w') as f: |
| json.dump(vr.to_dict(), f, indent=2) |
| |
| with open(data_dir / "validation_report.txt", 'w') as f: |
| f.write(report) |
| |
| |
| |
| |
| print("\n" + "=" * 70) |
| print(" FINAL SUMMARY") |
| print("=" * 70) |
| |
| scores = [r.merge_score for r in results] |
| passed_scores = [r.merge_score for ex, r in zip(judge_examples, results) if ex.test_passed] |
| failed_scores = [r.merge_score for ex, r in zip(judge_examples, results) if not ex.test_passed] |
| |
| print(f"\nAll patches ({len(scores)}):") |
| print(f" Mean MergeScore: {statistics.mean(scores):.1f}") |
| print(f" Median: {statistics.median(scores):.1f}") |
| print(f" Std: {statistics.stdev(scores):.1f}") |
| |
| if passed_scores: |
| print(f"\nTest-passing patches ({len(passed_scores)}):") |
| print(f" Mean: {statistics.mean(passed_scores):.1f}") |
| print(f" Below 50: {sum(1 for s in passed_scores if s < 50)}/{len(passed_scores)} " |
| f"({sum(1 for s in passed_scores if s < 50)/len(passed_scores):.1%})") |
| |
| if failed_scores: |
| print(f"\nTest-failing patches ({len(failed_scores)}):") |
| print(f" Mean: {statistics.mean(failed_scores):.1f}") |
| print(f" Below 50: {sum(1 for s in failed_scores if s < 50)}/{len(failed_scores)} " |
| f"({sum(1 for s in failed_scores if s < 50)/len(failed_scores):.1%})") |
| |
| |
| print(f"\nPer-agent scores:") |
| agent_scores = defaultdict(list) |
| for ex, r in zip(judge_examples, results): |
| agent_scores[ex.agent_name].append(r.merge_score) |
| for agent, scores_a in sorted(agent_scores.items()): |
| print(f" {agent}: mean={statistics.mean(scores_a):.1f}, " |
| f"median={statistics.median(scores_a):.1f}") |
| |
| |
| if bad_results: |
| bad_scores = [r.merge_score for r in bad_results] |
| print(f"\nKnown-bad patches ({len(bad_scores)}):") |
| print(f" Mean: {statistics.mean(bad_scores):.1f}") |
| print(f" Below 50: {sum(1 for s in bad_scores if s < 50)}/{len(bad_scores)} " |
| f"({sum(1 for s in bad_scores if s < 50)/len(bad_scores):.1%})") |
| |
| bad_agent_scores = defaultdict(list) |
| for bp, br in known_bad_pairs: |
| bad_agent_scores[bp.agent_name].append(br.merge_score) |
| for agent, scores_b in sorted(bad_agent_scores.items()): |
| print(f" {agent}: mean={statistics.mean(scores_b):.1f}") |
| |
| print("\n✅ PatchJudge batch evaluation complete!") |
| print(f" Results saved to: {data_dir}/") |
|
|
|
|
| def _save_results(data_dir, examples, results): |
| """Save intermediate results.""" |
| path = data_dir / "judge_results.jsonl" |
| with open(path, 'w') as f: |
| for ex, r in zip(examples, results): |
| f.write(json.dumps({ |
| "instance_id": ex.instance_id, |
| "agent_name": ex.agent_name, |
| "test_passed": ex.test_passed, |
| "merge_score": r.merge_score, |
| "dimension_scores": r.dimension_scores, |
| "model_used": r.model_used, |
| }) + "\n") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|