| """Data loading pipeline for PatchJudge. |
| |
| Loads SWE-bench Verified gold patches and agent-generated patches from: |
| 1. HuggingFace datasets (AlexCuadron O1, CoderForge) |
| 2. SWE-bench S3 bucket (139 verified agent submissions) |
| """ |
|
|
| import json |
| import os |
| import re |
| import logging |
| from pathlib import Path |
| from typing import Optional |
| from collections import defaultdict |
|
|
| from datasets import load_dataset |
| from patchjudge.models import PatchExample |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class SWEBenchLoader: |
| """Loads SWE-bench Verified data and agent patches.""" |
| |
| def __init__(self, cache_dir: str = "data"): |
| self.cache_dir = Path(cache_dir) |
| self.cache_dir.mkdir(parents=True, exist_ok=True) |
| self._gold_data = None |
| |
| def load_gold_data(self) -> dict: |
| """Load SWE-bench Verified dataset. Returns {instance_id: row_dict}.""" |
| if self._gold_data is not None: |
| return self._gold_data |
| |
| logger.info("Loading SWE-bench Verified dataset...") |
| ds = load_dataset("princeton-nlp/SWE-bench_Verified", split="test") |
| self._gold_data = {} |
| for row in ds: |
| self._gold_data[row["instance_id"]] = { |
| "instance_id": row["instance_id"], |
| "repo": row["repo"], |
| "problem_statement": row["problem_statement"], |
| "gold_patch": row["patch"], |
| "base_commit": row["base_commit"], |
| "test_patch": row["test_patch"], |
| "difficulty": row.get("difficulty", ""), |
| "hints_text": row.get("hints_text", ""), |
| } |
| logger.info(f"Loaded {len(self._gold_data)} SWE-bench Verified instances") |
| return self._gold_data |
| |
| def load_coderforge_patches(self) -> list[PatchExample]: |
| """Load agent patches from CoderForge (Qwen3-Coder-32B, 500 instances).""" |
| logger.info("Loading CoderForge agent patches...") |
| ds = load_dataset( |
| "togethercomputer/CoderForge-Preview-32B-SWE-Bench-Verified-Evaluation-trajectories", |
| "trajectory", split="train" |
| ) |
| gold = self.load_gold_data() |
| examples = [] |
| |
| for row in ds: |
| |
| try: |
| ds_info = json.loads(row["ds"]) |
| instance_id = ds_info["instance_id"] |
| except (json.JSONDecodeError, KeyError): |
| |
| tid = row.get("trajectory_id", "") |
| instance_id = tid.rsplit("_run", 1)[0] if "_run" in tid else tid |
| |
| if instance_id not in gold: |
| continue |
| |
| agent_patch = row.get("output_patch", "") |
| if not agent_patch or agent_patch.strip() == "": |
| continue |
| |
| g = gold[instance_id] |
| ex = PatchExample( |
| instance_id=instance_id, |
| repo=g["repo"], |
| problem_statement=g["problem_statement"], |
| gold_patch=g["gold_patch"], |
| agent_patch=agent_patch, |
| agent_name="CoderForge-Qwen3-32B", |
| test_passed=row.get("reward", 0.0) == 1.0, |
| base_commit=g["base_commit"], |
| difficulty=g["difficulty"], |
| ) |
| examples.append(ex) |
| |
| logger.info(f"Loaded {len(examples)} CoderForge patches " |
| f"({sum(1 for e in examples if e.test_passed)} passed)") |
| return examples |
| |
| def load_o1_patches(self) -> list[PatchExample]: |
| """Load agent patches from OpenHands+O1 (500 instances).""" |
| logger.info("Loading OpenHands+O1 agent patches...") |
| ds = load_dataset( |
| "AlexCuadron/SWE-Bench-Verified-O1-native-tool-calling-reasoning-high-results", |
| split="test" |
| ) |
| gold = self.load_gold_data() |
| examples = [] |
| |
| for row in ds: |
| issue_name = row.get("issue_name", "") |
| |
| instance_id = issue_name |
| |
| if instance_id not in gold: |
| continue |
| |
| agent_patch = row.get("patch", "") |
| if not agent_patch or agent_patch.strip() == "": |
| continue |
| |
| g = gold[instance_id] |
| ex = PatchExample( |
| instance_id=instance_id, |
| repo=g["repo"], |
| problem_statement=g["problem_statement"], |
| gold_patch=g["gold_patch"], |
| agent_patch=agent_patch, |
| agent_name="OpenHands-O1-reasoning-high", |
| test_passed=row.get("resolved", False), |
| base_commit=g["base_commit"], |
| difficulty=g["difficulty"], |
| ) |
| examples.append(ex) |
| |
| logger.info(f"Loaded {len(examples)} O1 patches " |
| f"({sum(1 for e in examples if e.test_passed)} passed)") |
| return examples |
| |
| def load_s3_agent_patches( |
| self, |
| agents: list[str] = None, |
| max_per_agent: int = 500, |
| ) -> list[PatchExample]: |
| """Load agent patches from SWE-bench S3 bucket. |
| |
| Args: |
| agents: List of agent directory names in S3. Defaults to a curated set. |
| max_per_agent: Max patches per agent. |
| """ |
| try: |
| import boto3 |
| from botocore import UNSIGNED |
| from botocore.config import Config |
| import requests |
| except ImportError: |
| logger.warning("boto3 not available, skipping S3 patches") |
| return [] |
| |
| if agents is None: |
| agents = [ |
| "20250225_sweagent_claude-3-7-sonnet", |
| "20241029_OpenHands-CodeAct-2.1-sonnet-20241022", |
| "20241028_agentless-1.5_gpt4o", |
| "20241108_autocoderover-v2.0-claude-3-5-sonnet-20241022", |
| "20240620_sweagent_claude3.5sonnet", |
| ] |
| |
| gold = self.load_gold_data() |
| s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED)) |
| BUCKET = 'swe-bench-submissions' |
| examples = [] |
| |
| for agent_dir in agents: |
| logger.info(f"Loading patches from S3 agent: {agent_dir}") |
| |
| |
| resolved_ids = set() |
| try: |
| url = ( |
| f"https://raw.githubusercontent.com/SWE-bench/experiments/" |
| f"main/evaluation/verified/{agent_dir}/results/results.json" |
| ) |
| import requests |
| r = requests.get(url, timeout=10) |
| if r.status_code == 200: |
| resolved_ids = set(r.json().get("resolved", [])) |
| logger.info(f" {agent_dir}: {len(resolved_ids)} resolved") |
| except Exception as e: |
| logger.warning(f" Could not load resolve labels for {agent_dir}: {e}") |
| |
| |
| paginator = s3.get_paginator('list_objects_v2') |
| count = 0 |
| try: |
| for page in paginator.paginate( |
| Bucket=BUCKET, |
| Prefix=f'verified/{agent_dir}/logs/', |
| Delimiter='/' |
| ): |
| for prefix_info in page.get('CommonPrefixes', []): |
| if count >= max_per_agent: |
| break |
| |
| prefix = prefix_info['Prefix'] |
| instance_id = prefix.rstrip('/').split('/')[-1] |
| |
| if instance_id not in gold: |
| continue |
| |
| |
| try: |
| obj = s3.get_object( |
| Bucket=BUCKET, |
| Key=f'verified/{agent_dir}/logs/{instance_id}/patch.diff' |
| ) |
| agent_patch = obj['Body'].read().decode('utf-8') |
| except Exception: |
| continue |
| |
| if not agent_patch.strip(): |
| continue |
| |
| g = gold[instance_id] |
| ex = PatchExample( |
| instance_id=instance_id, |
| repo=g["repo"], |
| problem_statement=g["problem_statement"], |
| gold_patch=g["gold_patch"], |
| agent_patch=agent_patch, |
| agent_name=agent_dir, |
| test_passed=instance_id in resolved_ids, |
| base_commit=g["base_commit"], |
| difficulty=g["difficulty"], |
| ) |
| examples.append(ex) |
| count += 1 |
| except Exception as e: |
| logger.warning(f" Error loading from S3 for {agent_dir}: {e}") |
| |
| logger.info(f" Loaded {count} patches from {agent_dir}") |
| |
| logger.info(f"Total S3 patches: {len(examples)} " |
| f"({sum(1 for e in examples if e.test_passed)} passed)") |
| return examples |
| |
| def build_dataset( |
| self, |
| sources: list[str] = None, |
| min_examples: int = 100, |
| include_repo_context: bool = False, |
| s3_agents: list[str] = None, |
| ) -> list[PatchExample]: |
| """Build the unified PatchExample dataset from multiple sources. |
| |
| Args: |
| sources: List of sources to use. Options: 'coderforge', 'o1', 's3'. |
| Defaults to ['coderforge', 'o1']. |
| min_examples: Minimum examples to collect. |
| include_repo_context: If True, attempt to clone repos and gather context. |
| s3_agents: Agent list for S3 source. |
| """ |
| if sources is None: |
| sources = ["coderforge", "o1"] |
| |
| all_examples = [] |
| |
| if "coderforge" in sources: |
| all_examples.extend(self.load_coderforge_patches()) |
| |
| if "o1" in sources: |
| all_examples.extend(self.load_o1_patches()) |
| |
| if "s3" in sources: |
| all_examples.extend(self.load_s3_agent_patches(agents=s3_agents)) |
| |
| |
| seen = set() |
| unique = [] |
| for ex in all_examples: |
| key = (ex.instance_id, ex.agent_name) |
| if key not in seen: |
| seen.add(key) |
| unique.append(ex) |
| |
| logger.info(f"Total unique examples: {len(unique)} " |
| f"(passed: {sum(1 for e in unique if e.test_passed)}, " |
| f"failed: {sum(1 for e in unique if not e.test_passed)})") |
| |
| if len(unique) < min_examples: |
| logger.warning( |
| f"Only {len(unique)} examples collected, " |
| f"below minimum of {min_examples}. " |
| f"Consider adding more sources." |
| ) |
| |
| return unique |
| |
| def save_dataset(self, examples: list[PatchExample], filename: str = "patch_examples.jsonl"): |
| """Save examples to JSONL.""" |
| path = self.cache_dir / filename |
| with open(path, 'w') as f: |
| for ex in examples: |
| f.write(json.dumps(ex.to_dict()) + "\n") |
| logger.info(f"Saved {len(examples)} examples to {path}") |
| return path |
| |
| def load_saved_dataset(self, filename: str = "patch_examples.jsonl") -> list[PatchExample]: |
| """Load previously saved examples.""" |
| path = self.cache_dir / filename |
| examples = [] |
| with open(path) as f: |
| for line in f: |
| if line.strip(): |
| examples.append(PatchExample.from_dict(json.loads(line))) |
| logger.info(f"Loaded {len(examples)} examples from {path}") |
| return examples |
|
|
|
|
| def extract_repo_context_from_diff(diff: str) -> list[str]: |
| """Extract filenames mentioned in a diff.""" |
| files = [] |
| for line in diff.split('\n'): |
| if line.startswith('diff --git'): |
| |
| match = re.search(r'b/(.+)$', line) |
| if match: |
| files.append(match.group(1)) |
| elif line.startswith('---') and not line.startswith('--- /dev/null'): |
| match = re.search(r'a/(.+)$', line) |
| if match: |
| files.append(match.group(1)) |
| return list(set(files)) |
|
|
|
|
| def get_diff_stats(diff: str) -> dict: |
| """Get basic stats from a unified diff.""" |
| lines = diff.split('\n') |
| added = sum(1 for l in lines if l.startswith('+') and not l.startswith('+++')) |
| removed = sum(1 for l in lines if l.startswith('-') and not l.startswith('---')) |
| files = len(extract_repo_context_from_diff(diff)) |
| hunks = sum(1 for l in lines if l.startswith('@@')) |
| return { |
| "lines_added": added, |
| "lines_removed": removed, |
| "files_changed": files, |
| "hunks": hunks, |
| } |
|
|
|
|
| if __name__ == "__main__": |
| logging.basicConfig(level=logging.INFO) |
| loader = SWEBenchLoader() |
| |
| |
| examples = loader.build_dataset(sources=["coderforge", "o1"]) |
| |
| |
| passed = sum(1 for e in examples if e.test_passed) |
| failed = len(examples) - passed |
| repos = set(e.repo for e in examples) |
| agents = set(e.agent_name for e in examples) |
| |
| print(f"\n{'='*60}") |
| print(f"PatchJudge Dataset Summary") |
| print(f"{'='*60}") |
| print(f"Total examples: {len(examples)}") |
| print(f" Test passed: {passed}") |
| print(f" Test failed: {failed}") |
| print(f"Unique instances: {len(set(e.instance_id for e in examples))}") |
| print(f"Unique repos: {len(repos)}") |
| print(f"Agent sources: {agents}") |
| print(f"\nDifficulty distribution:") |
| |
| diff_counts = defaultdict(int) |
| for e in examples: |
| diff_counts[e.difficulty] += 1 |
| for d, c in sorted(diff_counts.items()): |
| print(f" {d}: {c}") |
| |
| |
| path = loader.save_dataset(examples) |
| print(f"\nSaved to: {path}") |
|
|