| """Validation module for PatchJudge. |
| |
| Validates that PatchJudge scores correlate with actual code quality: |
| 1. METR alignment: ~50% of test-passing patches should score below 50 |
| 2. Known-bad pattern detection: deliberately bad patches should score low |
| 3. Score distribution analysis |
| 4. Resolved vs unresolved separation |
| """ |
|
|
| import json |
| import logging |
| import statistics |
| from collections import defaultdict |
| from typing import Optional |
|
|
| from patchjudge.models import ( |
| PatchExample, PatchFeatures, JudgeResult, ValidationResult |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| class KnownBadPatchGenerator: |
| """Generate deliberately bad patches that pass tests but are low quality.""" |
| |
| @staticmethod |
| def generate_all(gold_examples: list[dict]) -> list[PatchExample]: |
| """Generate known-bad variants for a set of gold examples. |
| |
| Args: |
| gold_examples: List of dicts with keys: |
| instance_id, repo, problem_statement, gold_patch, base_commit |
| |
| Returns: |
| List of PatchExample with known-bad patches. |
| """ |
| bad_patches = [] |
| generator = KnownBadPatchGenerator() |
| |
| for ex in gold_examples[:50]: |
| variants = generator._generate_variants(ex) |
| bad_patches.extend(variants) |
| |
| logger.info(f"Generated {len(bad_patches)} known-bad patch variants") |
| return bad_patches |
| |
| def _generate_variants(self, ex: dict) -> list[PatchExample]: |
| """Generate known-bad variants of a gold patch.""" |
| variants = [] |
| gold = ex.get("gold_patch", "") |
| |
| if not gold: |
| return variants |
| |
| |
| hardcoded = self._make_hardcoded_variant(ex) |
| if hardcoded: |
| variants.append(hardcoded) |
| |
| |
| noop = self._make_noop_variant(ex) |
| if noop: |
| variants.append(noop) |
| |
| |
| comment_only = self._make_comment_only_variant(ex) |
| if comment_only: |
| variants.append(comment_only) |
| |
| |
| broad_except = self._make_broad_except_variant(ex) |
| if broad_except: |
| variants.append(broad_except) |
| |
| |
| test_disable = self._make_test_disable_variant(ex) |
| if test_disable: |
| variants.append(test_disable) |
| |
| return variants |
| |
| def _make_hardcoded_variant(self, ex: dict) -> Optional[PatchExample]: |
| """Create a patch with hardcoded return values.""" |
| gold = ex["gold_patch"] |
| lines = gold.split('\n') |
| |
| |
| new_lines = [] |
| modified = False |
| for line in lines: |
| if line.startswith('+') and not line.startswith('+++'): |
| content = line[1:] |
| if 'return' in content and not modified: |
| |
| indent = len(content) - len(content.lstrip()) |
| new_lines.append('+' + ' ' * indent + 'return True # HARDCODED') |
| modified = True |
| continue |
| new_lines.append(line) |
| |
| if not modified: |
| return None |
| |
| return PatchExample( |
| instance_id=ex["instance_id"] + "__hardcoded", |
| repo=ex["repo"], |
| problem_statement=ex["problem_statement"], |
| gold_patch=ex["gold_patch"], |
| agent_patch='\n'.join(new_lines), |
| agent_name="known-bad:hardcoded", |
| test_passed=True, |
| base_commit=ex.get("base_commit", ""), |
| difficulty=ex.get("difficulty", ""), |
| ) |
| |
| def _make_noop_variant(self, ex: dict) -> Optional[PatchExample]: |
| """Create a minimal no-op patch (just adds 'pass').""" |
| gold = ex["gold_patch"] |
| lines = gold.split('\n') |
| |
| |
| new_lines = [] |
| in_hunk = False |
| added_pass = False |
| |
| for line in lines: |
| if line.startswith('@@'): |
| in_hunk = True |
| new_lines.append(line) |
| continue |
| |
| if in_hunk: |
| if line.startswith('+') and not line.startswith('+++'): |
| if not added_pass: |
| content = line[1:] |
| indent = len(content) - len(content.lstrip()) |
| new_lines.append('+' + ' ' * indent + 'pass # TODO: implement') |
| added_pass = True |
| |
| continue |
| elif line.startswith('-') and not line.startswith('---'): |
| new_lines.append(line) |
| else: |
| new_lines.append(line) |
| else: |
| new_lines.append(line) |
| |
| if not added_pass: |
| return None |
| |
| return PatchExample( |
| instance_id=ex["instance_id"] + "__noop", |
| repo=ex["repo"], |
| problem_statement=ex["problem_statement"], |
| gold_patch=ex["gold_patch"], |
| agent_patch='\n'.join(new_lines), |
| agent_name="known-bad:noop", |
| test_passed=False, |
| base_commit=ex.get("base_commit", ""), |
| difficulty=ex.get("difficulty", ""), |
| ) |
| |
| def _make_comment_only_variant(self, ex: dict) -> Optional[PatchExample]: |
| """Create a patch that only adds comments, no real code changes.""" |
| gold = ex["gold_patch"] |
| lines = gold.split('\n') |
| |
| new_lines = [] |
| modified = False |
| |
| for line in lines: |
| if line.startswith('+') and not line.startswith('+++'): |
| content = line[1:] |
| indent = len(content) - len(content.lstrip()) |
| |
| new_lines.append('+' + ' ' * indent + '# Fixed: ' + content.strip()) |
| modified = True |
| elif line.startswith('-') and not line.startswith('---'): |
| |
| new_lines.append(line) |
| else: |
| new_lines.append(line) |
| |
| if not modified: |
| return None |
| |
| return PatchExample( |
| instance_id=ex["instance_id"] + "__comment_only", |
| repo=ex["repo"], |
| problem_statement=ex["problem_statement"], |
| gold_patch=ex["gold_patch"], |
| agent_patch='\n'.join(new_lines), |
| agent_name="known-bad:comment-only", |
| test_passed=False, |
| base_commit=ex.get("base_commit", ""), |
| difficulty=ex.get("difficulty", ""), |
| ) |
| |
| def _make_broad_except_variant(self, ex: dict) -> Optional[PatchExample]: |
| """Create a patch that wraps everything in a broad try/except.""" |
| gold = ex["gold_patch"] |
| lines = gold.split('\n') |
| |
| |
| new_lines = [] |
| added_lines = [] |
| min_indent = 999 |
| |
| for line in lines: |
| if line.startswith('+') and not line.startswith('+++'): |
| content = line[1:] |
| if content.strip(): |
| indent = len(content) - len(content.lstrip()) |
| min_indent = min(min_indent, indent) |
| added_lines.append(content) |
| |
| if not added_lines or min_indent == 999: |
| return None |
| |
| indent_str = ' ' * min_indent |
| |
| |
| for line in lines: |
| if line.startswith('+') and not line.startswith('+++'): |
| continue |
| elif line.startswith('@@') and added_lines: |
| new_lines.append(line) |
| |
| new_lines.append(f'+{indent_str}try:') |
| for al in added_lines: |
| new_lines.append(f'+ {al}') |
| new_lines.append(f'+{indent_str}except Exception:') |
| new_lines.append(f'+{indent_str} pass # Silently ignore all errors') |
| added_lines = [] |
| continue |
| new_lines.append(line) |
| |
| return PatchExample( |
| instance_id=ex["instance_id"] + "__broad_except", |
| repo=ex["repo"], |
| problem_statement=ex["problem_statement"], |
| gold_patch=ex["gold_patch"], |
| agent_patch='\n'.join(new_lines), |
| agent_name="known-bad:broad-except", |
| test_passed=True, |
| base_commit=ex.get("base_commit", ""), |
| difficulty=ex.get("difficulty", ""), |
| ) |
| |
| def _make_test_disable_variant(self, ex: dict) -> Optional[PatchExample]: |
| """Create a patch that 'fixes' the issue by modifying tests.""" |
| instance_id = ex["instance_id"] |
| repo = ex["repo"] |
| |
| |
| patch = f"""diff --git a/tests/test_fix.py b/tests/test_fix.py |
| --- a/tests/test_fix.py |
| +++ b/tests/test_fix.py |
| @@ -1,5 +1,7 @@ |
| +import pytest |
| + |
| class TestFix: |
| - def test_issue(self): |
| + @pytest.mark.skip(reason="Known issue, will fix later") |
| + def test_issue(self): |
| # This test was failing |
| assert True |
| """ |
| return PatchExample( |
| instance_id=instance_id + "__test_disable", |
| repo=repo, |
| problem_statement=ex["problem_statement"], |
| gold_patch=ex["gold_patch"], |
| agent_patch=patch, |
| agent_name="known-bad:test-disable", |
| test_passed=True, |
| base_commit=ex.get("base_commit", ""), |
| difficulty=ex.get("difficulty", ""), |
| ) |
|
|
|
|
| |
| |
| |
|
|
| class PatchJudgeValidator: |
| """Validates PatchJudge scoring against ground truth.""" |
| |
| def __init__(self, merge_threshold: float = 50.0): |
| """ |
| Args: |
| merge_threshold: Score below which a patch is considered "not merge-worthy". |
| """ |
| self.merge_threshold = merge_threshold |
| |
| def validate( |
| self, |
| examples: list[PatchExample], |
| results: list[JudgeResult], |
| known_bad_results: Optional[list[tuple[PatchExample, JudgeResult]]] = None, |
| ) -> ValidationResult: |
| """Run full validation suite. |
| |
| Args: |
| examples: The patch examples that were judged. |
| results: The corresponding judge results. |
| known_bad_results: Optional list of (example, result) for known-bad patches. |
| |
| Returns: |
| ValidationResult with all metrics. |
| """ |
| assert len(examples) == len(results), "examples and results must match" |
| |
| vr = ValidationResult(total_examples=len(examples)) |
| |
| scores = [r.merge_score for r in results] |
| |
| |
| if scores: |
| vr.score_mean = statistics.mean(scores) |
| vr.score_std = statistics.stdev(scores) if len(scores) > 1 else 0.0 |
| vr.score_median = statistics.median(scores) |
| |
| |
| |
| passed_scores = [ |
| r.merge_score |
| for ex, r in zip(examples, results) |
| if ex.test_passed |
| ] |
| if passed_scores: |
| below_threshold = sum(1 for s in passed_scores if s < self.merge_threshold) |
| vr.test_passing_below_50_pct = below_threshold / len(passed_scores) |
| |
| |
| resolved_scores = [ |
| r.merge_score |
| for ex, r in zip(examples, results) |
| if ex.test_passed |
| ] |
| unresolved_scores = [ |
| r.merge_score |
| for ex, r in zip(examples, results) |
| if not ex.test_passed |
| ] |
| |
| if resolved_scores: |
| vr.mean_score_resolved = statistics.mean(resolved_scores) |
| if unresolved_scores: |
| vr.mean_score_unresolved = statistics.mean(unresolved_scores) |
| |
| |
| if resolved_scores and unresolved_scores: |
| |
| diff = vr.mean_score_resolved - vr.mean_score_unresolved |
| combined_std = statistics.stdev(scores) if len(scores) > 1 else 1.0 |
| vr.score_resolved_correlation = min(1.0, max(-1.0, diff / max(combined_std, 0.01))) |
| |
| |
| if known_bad_results: |
| vr.known_bad_total = len(known_bad_results) |
| vr.known_bad_detected = sum( |
| 1 for _, r in known_bad_results |
| if r.merge_score < self.merge_threshold |
| ) |
| vr.known_bad_detection_rate = ( |
| vr.known_bad_detected / vr.known_bad_total |
| if vr.known_bad_total > 0 else 0.0 |
| ) |
| |
| |
| dim_scores = defaultdict(list) |
| for r in results: |
| for dim, data in r.dimension_scores.items(): |
| dim_scores[dim].append(data.get("score", 0)) |
| |
| for dim, ds in dim_scores.items(): |
| if ds: |
| vr.dimension_stats[dim] = { |
| "mean": round(statistics.mean(ds), 2), |
| "std": round(statistics.stdev(ds) if len(ds) > 1 else 0.0, 2), |
| "median": statistics.median(ds), |
| "min": min(ds), |
| "max": max(ds), |
| } |
| |
| return vr |
| |
| def print_report( |
| self, |
| vr: ValidationResult, |
| examples: list[PatchExample], |
| results: list[JudgeResult], |
| ) -> str: |
| """Generate a human-readable validation report.""" |
| lines = [] |
| lines.append("=" * 70) |
| lines.append(" PatchJudge Validation Report") |
| lines.append("=" * 70) |
| |
| lines.append(f"\n📊 Dataset: {vr.total_examples} examples") |
| |
| |
| lines.append(f"\n📈 Score Distribution:") |
| lines.append(f" Mean: {vr.score_mean:.1f}") |
| lines.append(f" Median: {vr.score_median:.1f}") |
| lines.append(f" Std: {vr.score_std:.1f}") |
| |
| |
| bins = [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100] |
| scores = [r.merge_score for r in results] |
| hist = defaultdict(int) |
| for s in scores: |
| for i in range(len(bins) - 1): |
| if bins[i] <= s < bins[i+1]: |
| hist[f"{bins[i]}-{bins[i+1]}"] += 1 |
| break |
| else: |
| hist[f"90-100"] += 1 |
| |
| lines.append(f"\n Score Distribution:") |
| for label in [f"{bins[i]}-{bins[i+1]}" for i in range(len(bins)-1)]: |
| count = hist.get(label, 0) |
| bar = "█" * count |
| lines.append(f" {label:>7}: {bar} ({count})") |
| |
| |
| lines.append(f"\n🎯 METR Alignment:") |
| lines.append( |
| f" Test-passing patches below {self.merge_threshold}: " |
| f"{vr.test_passing_below_50_pct:.1%}" |
| ) |
| metr_target = 0.50 |
| if abs(vr.test_passing_below_50_pct - metr_target) < 0.15: |
| lines.append(f" ✅ ALIGNED with METR finding (~50% not merge-worthy)") |
| elif vr.test_passing_below_50_pct < metr_target - 0.15: |
| lines.append(f" ⚠️ Too lenient — scoring too many patches as merge-worthy") |
| else: |
| lines.append(f" ⚠️ Too harsh — scoring too many patches as not merge-worthy") |
| |
| |
| lines.append(f"\n🔀 Resolved vs Unresolved Separation:") |
| lines.append(f" Mean score (resolved): {vr.mean_score_resolved:.1f}") |
| lines.append(f" Mean score (unresolved): {vr.mean_score_unresolved:.1f}") |
| lines.append(f" Separation: {vr.mean_score_resolved - vr.mean_score_unresolved:+.1f}") |
| lines.append(f" Correlation: {vr.score_resolved_correlation:.3f}") |
| |
| |
| if vr.known_bad_total > 0: |
| lines.append(f"\n🚨 Known-Bad Pattern Detection:") |
| lines.append( |
| f" Detected: {vr.known_bad_detected}/{vr.known_bad_total} " |
| f"({vr.known_bad_detection_rate:.1%})" |
| ) |
| if vr.known_bad_detection_rate >= 0.80: |
| lines.append(f" ✅ Good detection rate") |
| else: |
| lines.append(f" ⚠️ Detection rate below 80% — judge may be too lenient") |
| |
| |
| lines.append(f"\n📐 Per-Dimension Scores:") |
| for dim in ["correctness", "completeness", "code_quality", |
| "non_regression_risk", "merge_readiness"]: |
| stats = vr.dimension_stats.get(dim, {}) |
| if stats: |
| lines.append( |
| f" {dim:>25}: " |
| f"mean={stats['mean']:.1f} " |
| f"std={stats['std']:.1f} " |
| f"[{stats['min']}-{stats['max']}]" |
| ) |
| |
| |
| all_flags = defaultdict(int) |
| for r in results: |
| for dim, data in r.dimension_scores.items(): |
| for flag in data.get("flags", []): |
| if flag and flag != "JUDGE_ERROR": |
| all_flags[flag] += 1 |
| |
| if all_flags: |
| lines.append(f"\n🏴 Most Common Flags:") |
| for flag, count in sorted(all_flags.items(), key=lambda x: -x[1])[:10]: |
| lines.append(f" {count:>4}x {flag}") |
| |
| |
| scored = list(zip(examples, results)) |
| scored.sort(key=lambda x: x[1].merge_score, reverse=True) |
| |
| if len(scored) >= 3: |
| lines.append(f"\n⭐ Top 3 Patches:") |
| for ex, r in scored[:3]: |
| lines.append( |
| f" {r.merge_score:5.1f} {ex.instance_id} " |
| f"({ex.agent_name}, {'PASS' if ex.test_passed else 'FAIL'})" |
| ) |
| |
| lines.append(f"\n💀 Bottom 3 Patches:") |
| for ex, r in scored[-3:]: |
| lines.append( |
| f" {r.merge_score:5.1f} {ex.instance_id} " |
| f"({ex.agent_name}, {'PASS' if ex.test_passed else 'FAIL'})" |
| ) |
| |
| lines.append("\n" + "=" * 70) |
| |
| report = '\n'.join(lines) |
| return report |
|
|
|
|
| def run_full_validation( |
| examples: list[PatchExample], |
| results: list[JudgeResult], |
| gold_data: Optional[list[dict]] = None, |
| judge=None, |
| ) -> tuple[ValidationResult, str]: |
| """Run the complete validation pipeline. |
| |
| Args: |
| examples: Judged patch examples. |
| results: Judge results for those examples. |
| gold_data: Gold standard data for generating known-bad patches. |
| judge: PatchJudge instance (needed if judging known-bad patches). |
| |
| Returns: |
| (ValidationResult, report_string) |
| """ |
| known_bad_results = None |
| |
| |
| if gold_data and judge: |
| logger.info("Generating known-bad patches...") |
| bad_patches = KnownBadPatchGenerator.generate_all(gold_data) |
| |
| if bad_patches: |
| logger.info(f"Judging {len(bad_patches)} known-bad patches...") |
| bad_judge_results = judge.judge_batch(bad_patches, show_progress=True) |
| known_bad_results = list(zip(bad_patches, bad_judge_results)) |
| |
| |
| for bp, br in known_bad_results[:5]: |
| logger.info( |
| f" Known-bad [{bp.agent_name}] " |
| f"{bp.instance_id}: {br.merge_score:.1f}/100" |
| ) |
| |
| |
| validator = PatchJudgeValidator() |
| vr = validator.validate(examples, results, known_bad_results) |
| report = validator.print_report(vr, examples, results) |
| |
| return vr, report |
|
|