"""End-to-end SQL execution: validate → execute under runtime limits → report. Single entry point `execute_validated`. Pipeline nodes call this instead of threading the guard + DB layers themselves. """ from __future__ import annotations from dataclasses import dataclass from sqlalchemy import Engine from sqlalchemy.exc import OperationalError, SQLAlchemyError from nl_sql.db.connection import Dialect, QueryResult, execute_readonly from nl_sql.execution.errors import ExecutionErrorKind from nl_sql.execution.guards import ValidationReport, validate_sql @dataclass(frozen=True, slots=True) class ExecutionOutcome: """Combined result + error taxonomy. Exactly one of `result` or `error_kind` is set. `validation` is always present so callers can render the AST-level diagnostics regardless of whether execution actually ran. """ sql: str validation: ValidationReport result: QueryResult | None = None error_kind: ExecutionErrorKind | None = None error_message: str = "" @property def ok(self) -> bool: return self.result is not None and self.error_kind is None def execute_validated( engine: Engine, sql: str, *, dialect: Dialect = "sqlite", statement_timeout_ms: int = 30_000, row_cap: int = 10_000, ) -> ExecutionOutcome: validation = validate_sql(sql, dialect=dialect) if not validation.ok: return ExecutionOutcome( sql=sql, validation=validation, error_kind=ExecutionErrorKind.INVALID_SQL, error_message="; ".join(v.message for v in validation.violations), ) try: with execute_readonly( engine, sql, statement_timeout_ms=statement_timeout_ms, row_cap=row_cap, ) as result: if result.row_count == 0: return ExecutionOutcome( sql=sql, validation=validation, result=result, error_kind=ExecutionErrorKind.EMPTY_RESULT, error_message="query returned 0 rows", ) return ExecutionOutcome(sql=sql, validation=validation, result=result) except OperationalError as exc: kind = ( ExecutionErrorKind.EXECUTION_TIMEOUT if "timeout" in str(exc).lower() or "interrupted" in str(exc).lower() else ExecutionErrorKind.EXECUTION_FAILED ) return ExecutionOutcome( sql=sql, validation=validation, error_kind=kind, error_message=str(exc), ) except SQLAlchemyError as exc: return ExecutionOutcome( sql=sql, validation=validation, error_kind=ExecutionErrorKind.EXECUTION_FAILED, error_message=str(exc), )