| """Node: execute the validated SQL via the read-only runner. |
| |
| The validate node has already produced a `ValidationReport`; we re-validate |
| inside `execute_validated` (cheap) so this node still works if it's ever |
| called directly without the validate predecessor (e.g. in unit tests). |
| """ |
|
|
| from __future__ import annotations |
|
|
| from collections.abc import Callable |
|
|
| from nl_sql.agent.state import PipelineState |
| from nl_sql.db.connection import Dialect |
| from nl_sql.db.registry import DatabaseRegistry, get_default_registry |
| from nl_sql.execution.runner import execute_validated |
|
|
|
|
| def make_execute_node( |
| *, |
| registry: DatabaseRegistry | None = None, |
| statement_timeout_ms: int = 30_000, |
| row_cap: int = 10_000, |
| ) -> Callable[[PipelineState], PipelineState]: |
| """`registry` is injected for tests; production code uses the default scan. |
| |
| Engine is created+disposed per call. SQLite engine setup is essentially |
| free; pooling is unnecessary for one query at a time and risks leaking |
| SQLite connections under pytest's strict `ResourceWarning` regime on |
| Windows. |
| """ |
| reg = registry or get_default_registry() |
|
|
| def node(state: PipelineState) -> PipelineState: |
| generated = state.get("generated") |
| db_id = state.get("db_id", "") |
| dialect: Dialect = state.get("dialect", "sqlite") |
| trace = list(state.get("trace") or []) |
|
|
| if generated is None or not generated.sql: |
| trace.append({"node": "execute", "ok": False, "reason": "no_sql"}) |
| |
| return {"trace": trace} |
|
|
| engine = reg.get(db_id).make_engine() |
| try: |
| outcome = execute_validated( |
| engine, |
| generated.sql, |
| dialect=dialect, |
| statement_timeout_ms=statement_timeout_ms, |
| row_cap=row_cap, |
| ) |
| finally: |
| engine.dispose() |
| if outcome.ok: |
| trace.append( |
| { |
| "node": "execute", |
| "ok": True, |
| "row_count": outcome.result.row_count if outcome.result else 0, |
| "elapsed_ms": outcome.result.elapsed_ms if outcome.result else 0.0, |
| } |
| ) |
| return { |
| "outcome": outcome, |
| "error_kind": None, |
| "error_message": "", |
| "trace": trace, |
| } |
|
|
| trace.append( |
| { |
| "node": "execute", |
| "ok": False, |
| "kind": outcome.error_kind.value if outcome.error_kind else None, |
| "message": outcome.error_message, |
| } |
| ) |
| last_error = outcome.error_message |
| if ( |
| outcome.error_kind is not None |
| and outcome.error_kind.value == "empty_result" |
| and state.get("verify_retry_on_empty") |
| ): |
| |
| |
| last_error = ( |
| "Your SQL ran successfully but returned 0 rows. The schema " |
| "and joins look fine; the most likely cause is a wrong " |
| "filter value. Re-examine the WHERE clause: check exact " |
| "case and spelling of string literals (compare against the " |
| "sample values in the schema cards), consider whether the " |
| "match needs LIKE '%value%' instead of `= 'value'`, and " |
| "verify NULL handling (`IS NULL` vs `= NULL`). If the " |
| "question is open-ended, try widening the filter; do not " |
| "narrow it further." |
| ) |
| return { |
| "outcome": outcome, |
| "last_error": last_error, |
| "error_kind": outcome.error_kind, |
| "error_message": outcome.error_message, |
| "trace": trace, |
| } |
|
|
| return node |
|
|