File size: 7,605 Bytes
942050b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""Drop-in eval driver for plan-then-SQL ablation.

Why a dedicated script instead of `scripts/eval_baseline.py --config G`:
we need the `enable_planner=True` knob on PipelineConfig which the
existing driver doesn't surface yet, and we want robust progress logging
+ resumable JSON output without the background-shell-pipe issues we hit
when running long evals via the harness.

Usage:
    uv run python scripts/run_planner_eval.py \\
        --difficulty moderate --n 200 --seed 0 \\
        --out eval/reports/2026-05-11/G_planner-moderate-n99.json
"""

from __future__ import annotations

import argparse
import json
import sys
import time
import traceback
from pathlib import Path

from nl_sql.agent.graph import PipelineConfig, build_pipeline, run_pipeline
from nl_sql.config import get_settings
from nl_sql.db.registry import get_default_registry
from nl_sql.eval.dataset import dev_split, load_bird_mini_dev
from nl_sql.eval.metrics.execution_accuracy import compare_results
from nl_sql.eval.runner import _compose_question, _execute_gold
from nl_sql.llm.cache import CachingEmbeddingProvider, CachingLLMProvider
from nl_sql.llm.providers import build_provider
from nl_sql.llm.providers.mistral import MistralProvider
from nl_sql.schema_index.indexer import SchemaIndex


def main() -> int:
    p = argparse.ArgumentParser(description=__doc__)
    p.add_argument("--difficulty", choices=["simple", "moderate", "challenging"], default=None)
    p.add_argument("--n", type=int, default=200, help="prefix size BEFORE difficulty filter")
    p.add_argument("--seed", type=int, default=0)
    p.add_argument("--out", type=Path, required=True)
    p.add_argument(
        "--log",
        type=Path,
        default=None,
        help="per-example progress log; default <out>.progress.log",
    )
    p.add_argument("--enable-planner", action="store_true", default=False)
    p.add_argument("--no-planner", dest="enable_planner", action="store_false")
    p.add_argument("--enable-grounded-critique", action="store_true", default=False)
    p.add_argument("--bird-root", default="data/bird_mini_dev/MINIDEV")
    p.add_argument("--provider", default="mistral")
    p.add_argument("--limit", type=int, default=0, help="cap examples after filtering (0=all)")
    args = p.parse_args()

    log_path = args.log or args.out.with_suffix(".progress.log")
    log_path.parent.mkdir(parents=True, exist_ok=True)
    args.out.parent.mkdir(parents=True, exist_ok=True)

    s = get_settings()
    sql_prov = CachingLLMProvider(
        build_provider(args.provider, settings=s), cache_dir=s.llm_cache_dir
    )
    emb = CachingEmbeddingProvider(
        MistralProvider(api_key=s.mistral_api_key), cache_dir=s.llm_cache_dir
    )
    idx = SchemaIndex(persist_dir="chroma_data", embedder=emb)
    registry = get_default_registry()

    examples = load_bird_mini_dev(Path(args.bird_root))
    sample = dev_split(examples, n=args.n, seed=args.seed)
    if args.difficulty:
        sample = [e for e in sample if e.difficulty == args.difficulty]
    if args.limit:
        sample = sample[: args.limit]

    cfg = PipelineConfig(
        sql_provider=sql_prov,
        explain_provider=sql_prov,
        schema_index=idx,
        registry=registry,
        fewshot_top_k=3,
        sort_schema_block=True,
        cross_db_fewshot=True,
        verify_retry_on_empty=True,
        enable_planner=args.enable_planner,
        enable_grounded_critique=args.enable_grounded_critique,
        statement_timeout_ms=30_000,
        row_cap=10_000,
    )
    pipe = build_pipeline(cfg)

    def log(msg: str) -> None:
        ts = time.strftime("%H:%M:%S")
        line = f"[{ts}] {msg}\n"
        log_path.open("a", encoding="utf-8").write(line)
        sys.stderr.write(line)
        sys.stderr.flush()

    log(
        f"start: n={len(sample)} difficulty={args.difficulty} enable_planner={args.enable_planner} out={args.out}"
    )

    records: list[dict] = []
    matched = 0
    for i, ex in enumerate(sample, 1):
        started = time.perf_counter()
        spec = registry.get(ex.registry_db_id)
        gold_engine = spec.make_engine()
        try:
            try:
                res = run_pipeline(
                    pipe,
                    question=_compose_question(ex),
                    db_id=ex.registry_db_id,
                    dialect="sqlite",
                    verify_retry_on_empty=True,
                )
            except Exception as exc:
                log(f"[{i:3d}/{len(sample)}] EXC qid={ex.question_id}: {type(exc).__name__}: {exc}")
                traceback.print_exc(file=sys.stderr)
                continue
            try:
                gold_rows, _ = _execute_gold(
                    gold_engine, ex.sql, statement_timeout_ms=30_000, row_cap=10_000
                )
            except Exception:
                gold_rows = []
            if res.outcome is not None and res.outcome.result is not None:
                cmp = compare_results(gold_rows, res.outcome.result.rows, gold_sql=ex.sql)
                ok = cmp.match
                reason = cmp.reason
                gc, pc = cmp.gold_rows, cmp.pred_rows
            else:
                ok = False
                reason = res.error_kind.value if res.error_kind else "no result"
                gc, pc = len(gold_rows), 0
            if ok:
                matched += 1
            records.append(
                {
                    "question_id": ex.question_id,
                    "db_id": ex.db_id,
                    "difficulty": ex.difficulty,
                    "dialect": ex.dialect,
                    "question": ex.question,
                    "gold_sql": ex.sql,
                    "pred_sql": res.sql,
                    "match": bool(ok),
                    "comparison_reason": reason,
                    "gold_row_count": gc,
                    "pred_row_count": pc,
                    "error_kind": res.error_kind.value if res.error_kind else None,
                    "confidence": res.confidence,
                    "repair_attempted": res.repair_attempted,
                }
            )
            elapsed = (time.perf_counter() - started) * 1000.0
            log(
                f"[{i:3d}/{len(sample)}] {'OK ' if ok else '   '} ({elapsed:6.0f}ms) "
                f"qid={ex.question_id} {ex.registry_db_id}/{ex.difficulty} — "
                f"{ex.question[:60]}"
            )

            # incremental dump every 10 to survive crashes
            if i % 10 == 0:
                args.out.write_text(
                    json.dumps(
                        {
                            "configuration": "G_planner",
                            "sql_model": "codestral-latest",
                            "overall": {"ea": matched / len(records), "n": len(records)},
                            "records": records,
                        },
                        indent=2,
                    ),
                    encoding="utf-8",
                )
        finally:
            gold_engine.dispose()

    ea = matched / len(records) if records else 0.0
    args.out.write_text(
        json.dumps(
            {
                "configuration": "G_planner",
                "sql_model": "codestral-latest",
                "overall": {"ea": ea, "n": len(records), "matched": matched},
                "records": records,
            },
            indent=2,
        ),
        encoding="utf-8",
    )
    log(f"done: EA={matched}/{len(records)} = {100 * ea:.1f}% → {args.out}")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())