repomind-api / scripts /run_baseline.py
SouravNath's picture
Initial commit
dc71cad
"""
scripts/run_baseline.py
───────────────────────
Phase 1 evaluation script: run naive GPT-4o baseline on SWE-bench Lite.
Usage:
python scripts/run_baseline.py --max-instances 10 --output-dir results/baseline
This script:
1. Loads SWE-bench Lite instances
2. Clones each repo at base_commit
3. Generates a patch with the naive GPT-4o agent
4. Applies the patch and runs tests in the sandbox
5. Aggregates and logs results to MLflow
6. Prints a rich summary table
Expected output (baseline): ~10–18% resolved on SWE-bench Lite
"""
from __future__ import annotations
import argparse
import logging
import sys
import tempfile
import time
from pathlib import Path
# Make sure project root is on the path
sys.path.insert(0, str(Path(__file__).parent.parent))
import mlflow
import structlog
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, TimeElapsedColumn
from configs.settings import settings
from swe_bench.loader import load_swebench_lite, SWEInstance
from swe_bench.evaluator import (
aggregate_results,
save_results,
InstanceResult,
AttemptResult,
)
from sandbox.executor import SandboxExecutor
from agent.naive_baseline import NaiveBaselineAgent, log_baseline_attempt
console = Console()
# ── Structured logging setup ──────────────────────────────────────────────────
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="%H:%M:%S"),
structlog.dev.ConsoleRenderer(),
],
wrapper_class=structlog.BoundLogger,
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
)
logger = structlog.get_logger()
def run_instance(
instance: SWEInstance,
agent: NaiveBaselineAgent,
sandbox: SandboxExecutor,
workspace_root: Path,
) -> InstanceResult:
"""
Run the baseline agent on a single SWE-bench instance.
Steps:
1. Clone repo at base_commit
2. Generate patch with GPT-4o
3. Apply patch
4. Run tests
5. Return InstanceResult
"""
workspace_dir = workspace_root / instance.repo_name / instance.base_commit[:8]
workspace_dir.mkdir(parents=True, exist_ok=True)
start = time.monotonic()
logger.info("Processing instance", instance_id=instance.instance_id, repo=instance.repo)
# ── Step 1: Clone repo ────────────────────────────────────────────────
clone_result = sandbox.clone_repo(instance.repo, instance.base_commit, workspace_dir)
if not clone_result.success:
logger.error("Clone failed", instance_id=instance.instance_id)
return InstanceResult(
instance_id=instance.instance_id,
repo=instance.repo,
resolved=False,
attempts=[],
total_attempts=1,
error=f"Clone failed: {clone_result.stderr[:200]}",
total_elapsed=time.monotonic() - start,
)
# ── Step 2: Generate patch ────────────────────────────────────────────
try:
patch_text, usage = agent.generate_patch(
problem_statement=instance.problem_statement,
repo=instance.repo,
base_commit=instance.base_commit,
workspace_dir=workspace_dir,
)
except Exception as e:
logger.error("Patch generation failed", instance_id=instance.instance_id, error=str(e))
return InstanceResult(
instance_id=instance.instance_id,
repo=instance.repo,
resolved=False,
attempts=[],
total_attempts=1,
error=f"LLM error: {str(e)[:200]}",
total_elapsed=time.monotonic() - start,
)
total_tokens = usage.get("total_tokens", 0)
# ── Step 3: Apply patch ───────────────────────────────────────────────
apply_result = sandbox.apply_patch(patch_text, workspace_dir)
if not apply_result.success:
logger.warning(
"Patch apply failed",
instance_id=instance.instance_id,
stderr=apply_result.stderr[:200],
)
# Still run tests to measure β€” patch may partially apply
failure_category = "syntax_error"
else:
failure_category = "unknown"
# ── Step 4: Run tests ─────────────────────────────────────────────────
all_test_ids = instance.fail_to_pass + instance.pass_to_pass
test_result = sandbox.run_tests(workspace_dir, all_test_ids)
resolved, ftp_results, ptp_results = test_result.check_tests(
instance.fail_to_pass, instance.pass_to_pass
)
if resolved:
failure_category = "success"
elif not apply_result.success:
failure_category = "syntax_error"
elif any(not v for v in ftp_results.values()):
failure_category = "wrong_file_edit"
elapsed = time.monotonic() - start
attempt = AttemptResult(
attempt_num=1,
patch=patch_text,
test_stdout=test_result.raw_output,
fail_to_pass_results=ftp_results,
pass_to_pass_results=ptp_results,
resolved=resolved,
failure_category=failure_category,
elapsed_seconds=elapsed,
token_cost=usage,
)
# ── Log to MLflow ─────────────────────────────────────────────────────
log_baseline_attempt(
instance_id=instance.instance_id,
resolved=resolved,
usage=usage,
elapsed=elapsed,
failure_category=failure_category,
attempt=1,
)
logger.info(
"Instance done",
instance_id=instance.instance_id,
resolved=resolved,
tokens=total_tokens,
elapsed=round(elapsed, 1),
)
return InstanceResult(
instance_id=instance.instance_id,
repo=instance.repo,
resolved=resolved,
attempts=[attempt],
total_attempts=1,
total_tokens=total_tokens,
total_elapsed=elapsed,
)
def main() -> None:
parser = argparse.ArgumentParser(
description="Run naive GPT-4o baseline on SWE-bench Lite"
)
parser.add_argument(
"--max-instances", type=int, default=None,
help="Limit number of instances (default: all 300)"
)
parser.add_argument(
"--instance-ids", nargs="+", default=None,
help="Run specific instance IDs only"
)
parser.add_argument(
"--output-dir", type=Path, default=Path("results/baseline"),
help="Directory for evaluation output"
)
parser.add_argument(
"--model", default="gpt-4o",
help="OpenAI model to use (default: gpt-4o)"
)
parser.add_argument(
"--cache-dir", type=Path, default=Path(".cache/swebench"),
help="Local cache for SWE-bench dataset"
)
parser.add_argument(
"--no-docker", action="store_true",
help="Disable Docker, use local subprocess (for quick testing)"
)
args = parser.parse_args()
settings.ensure_dirs()
args.output_dir.mkdir(parents=True, exist_ok=True)
# ── Load dataset ──────────────────────────────────────────────────────
console.print("[bold cyan]Loading SWE-bench Lite...[/bold cyan]")
instances = load_swebench_lite(
max_instances=args.max_instances,
instance_ids=args.instance_ids,
cache_dir=args.cache_dir,
)
console.print(f"[green]Loaded {len(instances)} instances[/green]")
# ── Init components ───────────────────────────────────────────────────
agent = NaiveBaselineAgent(model=args.model)
sandbox = SandboxExecutor(use_docker=not args.no_docker)
# ── MLflow experiment ─────────────────────────────────────────────────
mlflow.set_tracking_uri(settings.mlflow_tracking_uri)
mlflow.set_experiment(settings.mlflow_experiment_name)
results: list[InstanceResult] = []
with tempfile.TemporaryDirectory(prefix="code-agent-workspaces-") as tmpdir:
workspace_root = Path(tmpdir)
with mlflow.start_run(run_name="naive_baseline"):
mlflow.log_params({
"model": args.model,
"max_instances": len(instances),
"agent_type": "naive_baseline",
})
with Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}"),
TimeElapsedColumn(),
console=console,
) as progress:
task = progress.add_task(
"Running baseline...", total=len(instances)
)
for instance in instances:
progress.update(
task, description=f"[{instance.instance_id}]"
)
result = run_instance(instance, agent, sandbox, workspace_root)
results.append(result)
progress.advance(task)
# ── Aggregate ─────────────────────────────────────────────────
report = aggregate_results(results)
save_results(report, args.output_dir)
# Log aggregate metrics to MLflow
mlflow.log_metrics({
"resolved_rate": report.resolved_rate,
"resolved_count": report.resolved_count,
"avg_attempts": report.avg_attempts,
"total_tokens": report.total_tokens,
"avg_tokens_per_instance": report.avg_tokens_per_instance,
})
report.print_summary()
console.print(f"\n[bold green]Results saved to:[/bold green] {args.output_dir}")
if __name__ == "__main__":
main()