"""Validation module for PatchJudge. Validates that PatchJudge scores correlate with actual code quality: 1. METR alignment: ~50% of test-passing patches should score below 50 2. Known-bad pattern detection: deliberately bad patches should score low 3. Score distribution analysis 4. Resolved vs unresolved separation """ import json import logging import statistics from collections import defaultdict from typing import Optional from patchjudge.models import ( PatchExample, PatchFeatures, JudgeResult, ValidationResult ) logger = logging.getLogger(__name__) # ============================================================================ # Known-Bad Patch Generator # ============================================================================ class KnownBadPatchGenerator: """Generate deliberately bad patches that pass tests but are low quality.""" @staticmethod def generate_all(gold_examples: list[dict]) -> list[PatchExample]: """Generate known-bad variants for a set of gold examples. Args: gold_examples: List of dicts with keys: instance_id, repo, problem_statement, gold_patch, base_commit Returns: List of PatchExample with known-bad patches. """ bad_patches = [] generator = KnownBadPatchGenerator() for ex in gold_examples[:50]: # Generate from up to 50 examples variants = generator._generate_variants(ex) bad_patches.extend(variants) logger.info(f"Generated {len(bad_patches)} known-bad patch variants") return bad_patches def _generate_variants(self, ex: dict) -> list[PatchExample]: """Generate known-bad variants of a gold patch.""" variants = [] gold = ex.get("gold_patch", "") if not gold: return variants # Variant 1: Hardcoded return values hardcoded = self._make_hardcoded_variant(ex) if hardcoded: variants.append(hardcoded) # Variant 2: Pass statement (minimal no-op) noop = self._make_noop_variant(ex) if noop: variants.append(noop) # Variant 3: Comment-only patch comment_only = self._make_comment_only_variant(ex) if comment_only: variants.append(comment_only) # Variant 4: Overly broad try/except broad_except = self._make_broad_except_variant(ex) if broad_except: variants.append(broad_except) # Variant 5: Test-disabling patch test_disable = self._make_test_disable_variant(ex) if test_disable: variants.append(test_disable) return variants def _make_hardcoded_variant(self, ex: dict) -> Optional[PatchExample]: """Create a patch with hardcoded return values.""" gold = ex["gold_patch"] lines = gold.split('\n') # Find added lines with return statements and hardcode them new_lines = [] modified = False for line in lines: if line.startswith('+') and not line.startswith('+++'): content = line[1:] if 'return' in content and not modified: # Replace with hardcoded value indent = len(content) - len(content.lstrip()) new_lines.append('+' + ' ' * indent + 'return True # HARDCODED') modified = True continue new_lines.append(line) if not modified: return None return PatchExample( instance_id=ex["instance_id"] + "__hardcoded", repo=ex["repo"], problem_statement=ex["problem_statement"], gold_patch=ex["gold_patch"], agent_patch='\n'.join(new_lines), agent_name="known-bad:hardcoded", test_passed=True, # Assume it passes (test oracle weakness) base_commit=ex.get("base_commit", ""), difficulty=ex.get("difficulty", ""), ) def _make_noop_variant(self, ex: dict) -> Optional[PatchExample]: """Create a minimal no-op patch (just adds 'pass').""" gold = ex["gold_patch"] lines = gold.split('\n') # Find the first hunk and replace all added lines with 'pass' new_lines = [] in_hunk = False added_pass = False for line in lines: if line.startswith('@@'): in_hunk = True new_lines.append(line) continue if in_hunk: if line.startswith('+') and not line.startswith('+++'): if not added_pass: content = line[1:] indent = len(content) - len(content.lstrip()) new_lines.append('+' + ' ' * indent + 'pass # TODO: implement') added_pass = True # Skip other added lines continue elif line.startswith('-') and not line.startswith('---'): new_lines.append(line) else: new_lines.append(line) else: new_lines.append(line) if not added_pass: return None return PatchExample( instance_id=ex["instance_id"] + "__noop", repo=ex["repo"], problem_statement=ex["problem_statement"], gold_patch=ex["gold_patch"], agent_patch='\n'.join(new_lines), agent_name="known-bad:noop", test_passed=False, base_commit=ex.get("base_commit", ""), difficulty=ex.get("difficulty", ""), ) def _make_comment_only_variant(self, ex: dict) -> Optional[PatchExample]: """Create a patch that only adds comments, no real code changes.""" gold = ex["gold_patch"] lines = gold.split('\n') new_lines = [] modified = False for line in lines: if line.startswith('+') and not line.startswith('+++'): content = line[1:] indent = len(content) - len(content.lstrip()) # Replace real code with a comment new_lines.append('+' + ' ' * indent + '# Fixed: ' + content.strip()) modified = True elif line.startswith('-') and not line.startswith('---'): # Keep the removal but don't add real replacement new_lines.append(line) else: new_lines.append(line) if not modified: return None return PatchExample( instance_id=ex["instance_id"] + "__comment_only", repo=ex["repo"], problem_statement=ex["problem_statement"], gold_patch=ex["gold_patch"], agent_patch='\n'.join(new_lines), agent_name="known-bad:comment-only", test_passed=False, base_commit=ex.get("base_commit", ""), difficulty=ex.get("difficulty", ""), ) def _make_broad_except_variant(self, ex: dict) -> Optional[PatchExample]: """Create a patch that wraps everything in a broad try/except.""" gold = ex["gold_patch"] lines = gold.split('\n') # Find added lines and wrap them in try/except new_lines = [] added_lines = [] min_indent = 999 for line in lines: if line.startswith('+') and not line.startswith('+++'): content = line[1:] if content.strip(): indent = len(content) - len(content.lstrip()) min_indent = min(min_indent, indent) added_lines.append(content) if not added_lines or min_indent == 999: return None indent_str = ' ' * min_indent # Reconstruct diff with try/except wrapper for line in lines: if line.startswith('+') and not line.startswith('+++'): continue # Skip original added lines elif line.startswith('@@') and added_lines: new_lines.append(line) # Add our try/except version new_lines.append(f'+{indent_str}try:') for al in added_lines: new_lines.append(f'+ {al}') new_lines.append(f'+{indent_str}except Exception:') new_lines.append(f'+{indent_str} pass # Silently ignore all errors') added_lines = [] # Only do this once continue new_lines.append(line) return PatchExample( instance_id=ex["instance_id"] + "__broad_except", repo=ex["repo"], problem_statement=ex["problem_statement"], gold_patch=ex["gold_patch"], agent_patch='\n'.join(new_lines), agent_name="known-bad:broad-except", test_passed=True, base_commit=ex.get("base_commit", ""), difficulty=ex.get("difficulty", ""), ) def _make_test_disable_variant(self, ex: dict) -> Optional[PatchExample]: """Create a patch that 'fixes' the issue by modifying tests.""" instance_id = ex["instance_id"] repo = ex["repo"] # Create a patch that adds pytest.mark.skip to failing tests patch = f"""diff --git a/tests/test_fix.py b/tests/test_fix.py --- a/tests/test_fix.py +++ b/tests/test_fix.py @@ -1,5 +1,7 @@ +import pytest + class TestFix: - def test_issue(self): + @pytest.mark.skip(reason="Known issue, will fix later") + def test_issue(self): # This test was failing assert True """ return PatchExample( instance_id=instance_id + "__test_disable", repo=repo, problem_statement=ex["problem_statement"], gold_patch=ex["gold_patch"], agent_patch=patch, agent_name="known-bad:test-disable", test_passed=True, base_commit=ex.get("base_commit", ""), difficulty=ex.get("difficulty", ""), ) # ============================================================================ # Validator # ============================================================================ class PatchJudgeValidator: """Validates PatchJudge scoring against ground truth.""" def __init__(self, merge_threshold: float = 50.0): """ Args: merge_threshold: Score below which a patch is considered "not merge-worthy". """ self.merge_threshold = merge_threshold def validate( self, examples: list[PatchExample], results: list[JudgeResult], known_bad_results: Optional[list[tuple[PatchExample, JudgeResult]]] = None, ) -> ValidationResult: """Run full validation suite. Args: examples: The patch examples that were judged. results: The corresponding judge results. known_bad_results: Optional list of (example, result) for known-bad patches. Returns: ValidationResult with all metrics. """ assert len(examples) == len(results), "examples and results must match" vr = ValidationResult(total_examples=len(examples)) scores = [r.merge_score for r in results] # --- Score distribution --- if scores: vr.score_mean = statistics.mean(scores) vr.score_std = statistics.stdev(scores) if len(scores) > 1 else 0.0 vr.score_median = statistics.median(scores) # --- METR alignment --- # Among test-passing patches, what fraction scores below threshold? passed_scores = [ r.merge_score for ex, r in zip(examples, results) if ex.test_passed ] if passed_scores: below_threshold = sum(1 for s in passed_scores if s < self.merge_threshold) vr.test_passing_below_50_pct = below_threshold / len(passed_scores) # --- Resolved vs Unresolved separation --- resolved_scores = [ r.merge_score for ex, r in zip(examples, results) if ex.test_passed ] unresolved_scores = [ r.merge_score for ex, r in zip(examples, results) if not ex.test_passed ] if resolved_scores: vr.mean_score_resolved = statistics.mean(resolved_scores) if unresolved_scores: vr.mean_score_unresolved = statistics.mean(unresolved_scores) # Basic correlation: difference between resolved and unresolved means if resolved_scores and unresolved_scores: # Point-biserial-ish: just use the difference normalized diff = vr.mean_score_resolved - vr.mean_score_unresolved combined_std = statistics.stdev(scores) if len(scores) > 1 else 1.0 vr.score_resolved_correlation = min(1.0, max(-1.0, diff / max(combined_std, 0.01))) # --- Known-bad detection --- if known_bad_results: vr.known_bad_total = len(known_bad_results) vr.known_bad_detected = sum( 1 for _, r in known_bad_results if r.merge_score < self.merge_threshold ) vr.known_bad_detection_rate = ( vr.known_bad_detected / vr.known_bad_total if vr.known_bad_total > 0 else 0.0 ) # --- Per-dimension stats --- dim_scores = defaultdict(list) for r in results: for dim, data in r.dimension_scores.items(): dim_scores[dim].append(data.get("score", 0)) for dim, ds in dim_scores.items(): if ds: vr.dimension_stats[dim] = { "mean": round(statistics.mean(ds), 2), "std": round(statistics.stdev(ds) if len(ds) > 1 else 0.0, 2), "median": statistics.median(ds), "min": min(ds), "max": max(ds), } return vr def print_report( self, vr: ValidationResult, examples: list[PatchExample], results: list[JudgeResult], ) -> str: """Generate a human-readable validation report.""" lines = [] lines.append("=" * 70) lines.append(" PatchJudge Validation Report") lines.append("=" * 70) lines.append(f"\nšŸ“Š Dataset: {vr.total_examples} examples") # Score distribution lines.append(f"\nšŸ“ˆ Score Distribution:") lines.append(f" Mean: {vr.score_mean:.1f}") lines.append(f" Median: {vr.score_median:.1f}") lines.append(f" Std: {vr.score_std:.1f}") # Score histogram bins = [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100] scores = [r.merge_score for r in results] hist = defaultdict(int) for s in scores: for i in range(len(bins) - 1): if bins[i] <= s < bins[i+1]: hist[f"{bins[i]}-{bins[i+1]}"] += 1 break else: hist[f"90-100"] += 1 lines.append(f"\n Score Distribution:") for label in [f"{bins[i]}-{bins[i+1]}" for i in range(len(bins)-1)]: count = hist.get(label, 0) bar = "ā–ˆ" * count lines.append(f" {label:>7}: {bar} ({count})") # METR alignment lines.append(f"\nšŸŽÆ METR Alignment:") lines.append( f" Test-passing patches below {self.merge_threshold}: " f"{vr.test_passing_below_50_pct:.1%}" ) metr_target = 0.50 if abs(vr.test_passing_below_50_pct - metr_target) < 0.15: lines.append(f" āœ… ALIGNED with METR finding (~50% not merge-worthy)") elif vr.test_passing_below_50_pct < metr_target - 0.15: lines.append(f" āš ļø Too lenient — scoring too many patches as merge-worthy") else: lines.append(f" āš ļø Too harsh — scoring too many patches as not merge-worthy") # Resolved vs Unresolved lines.append(f"\nšŸ”€ Resolved vs Unresolved Separation:") lines.append(f" Mean score (resolved): {vr.mean_score_resolved:.1f}") lines.append(f" Mean score (unresolved): {vr.mean_score_unresolved:.1f}") lines.append(f" Separation: {vr.mean_score_resolved - vr.mean_score_unresolved:+.1f}") lines.append(f" Correlation: {vr.score_resolved_correlation:.3f}") # Known-bad detection if vr.known_bad_total > 0: lines.append(f"\n🚨 Known-Bad Pattern Detection:") lines.append( f" Detected: {vr.known_bad_detected}/{vr.known_bad_total} " f"({vr.known_bad_detection_rate:.1%})" ) if vr.known_bad_detection_rate >= 0.80: lines.append(f" āœ… Good detection rate") else: lines.append(f" āš ļø Detection rate below 80% — judge may be too lenient") # Per-dimension stats lines.append(f"\nšŸ“ Per-Dimension Scores:") for dim in ["correctness", "completeness", "code_quality", "non_regression_risk", "merge_readiness"]: stats = vr.dimension_stats.get(dim, {}) if stats: lines.append( f" {dim:>25}: " f"mean={stats['mean']:.1f} " f"std={stats['std']:.1f} " f"[{stats['min']}-{stats['max']}]" ) # Top flags all_flags = defaultdict(int) for r in results: for dim, data in r.dimension_scores.items(): for flag in data.get("flags", []): if flag and flag != "JUDGE_ERROR": all_flags[flag] += 1 if all_flags: lines.append(f"\nšŸ“ Most Common Flags:") for flag, count in sorted(all_flags.items(), key=lambda x: -x[1])[:10]: lines.append(f" {count:>4}x {flag}") # Example best/worst scored = list(zip(examples, results)) scored.sort(key=lambda x: x[1].merge_score, reverse=True) if len(scored) >= 3: lines.append(f"\n⭐ Top 3 Patches:") for ex, r in scored[:3]: lines.append( f" {r.merge_score:5.1f} {ex.instance_id} " f"({ex.agent_name}, {'PASS' if ex.test_passed else 'FAIL'})" ) lines.append(f"\nšŸ’€ Bottom 3 Patches:") for ex, r in scored[-3:]: lines.append( f" {r.merge_score:5.1f} {ex.instance_id} " f"({ex.agent_name}, {'PASS' if ex.test_passed else 'FAIL'})" ) lines.append("\n" + "=" * 70) report = '\n'.join(lines) return report def run_full_validation( examples: list[PatchExample], results: list[JudgeResult], gold_data: Optional[list[dict]] = None, judge=None, ) -> tuple[ValidationResult, str]: """Run the complete validation pipeline. Args: examples: Judged patch examples. results: Judge results for those examples. gold_data: Gold standard data for generating known-bad patches. judge: PatchJudge instance (needed if judging known-bad patches). Returns: (ValidationResult, report_string) """ known_bad_results = None # Generate and judge known-bad patches if we have gold data and a judge if gold_data and judge: logger.info("Generating known-bad patches...") bad_patches = KnownBadPatchGenerator.generate_all(gold_data) if bad_patches: logger.info(f"Judging {len(bad_patches)} known-bad patches...") bad_judge_results = judge.judge_batch(bad_patches, show_progress=True) known_bad_results = list(zip(bad_patches, bad_judge_results)) # Print known-bad summary for bp, br in known_bad_results[:5]: logger.info( f" Known-bad [{bp.agent_name}] " f"{bp.instance_id}: {br.merge_score:.1f}/100" ) # Run validation validator = PatchJudgeValidator() vr = validator.validate(examples, results, known_bad_results) report = validator.print_report(vr, examples, results) return vr, report