File size: 4,961 Bytes
942050b 0ec3207 942050b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 | """Read-only connection helpers for target databases.
The NL→SQL pipeline never owns write privileges on a target DB. Defences:
- Postgres: dedicated role with `default_transaction_read_only=on`, see
`scripts/sql/postgres_init.sql`.
- SQLite: `mode=ro` URI passed via a SQLAlchemy creator (URL form does not
carry through cross-platform; creator gives us full control over path
encoding) plus `PRAGMA query_only=ON` as a belt-and-braces guard.
"""
from __future__ import annotations
import sqlite3
import time
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal
from sqlalchemy import Connection, Engine, create_engine, text
Dialect = Literal["sqlite", "postgresql"]
@dataclass(frozen=True, slots=True)
class DatabaseSpec:
"""Connection target.
For SQLite, `url` is the absolute filesystem path to the .sqlite file.
For Postgres, `url` is a standard libpq DSN (`postgresql://...`).
"""
id: str
dialect: Dialect
url: str
description: str = ""
def make_engine(self) -> Engine:
return _build_engine(self)
@dataclass(frozen=True, slots=True)
class QueryResult:
rows: list[tuple[Any, ...]]
columns: list[str]
row_count: int
truncated: bool
elapsed_ms: float
def _build_engine(spec: DatabaseSpec) -> Engine:
if spec.dialect == "sqlite":
return _build_sqlite_readonly_engine(Path(spec.url))
if spec.dialect == "postgresql":
return create_engine(spec.url, future=True, pool_pre_ping=True)
raise ValueError(f"unsupported dialect: {spec.dialect}")
def _build_sqlite_readonly_engine(path: Path) -> Engine:
if not path.is_absolute():
path = path.resolve()
file_uri = path.as_uri() + "?mode=ro"
def _creator() -> sqlite3.Connection:
conn = sqlite3.connect(file_uri, uri=True, check_same_thread=False)
conn.execute("PRAGMA query_only = ON")
return conn
return create_engine("sqlite://", creator=_creator, future=True)
def connect(spec: DatabaseSpec) -> Engine:
"""Build (or reuse via SQLAlchemy pool) an engine for a DB spec."""
return spec.make_engine()
@contextmanager
def execute_readonly(
engine: Engine,
sql: str,
*,
statement_timeout_ms: int = 30_000,
row_cap: int = 10_000,
) -> Iterator[QueryResult]:
"""Run a SELECT-only query with hard timeout and row cap.
Caller must have already validated `sql` through the AST guard. This
function enforces operational limits, not correctness or safety.
"""
started = time.perf_counter()
with engine.connect() as conn:
_apply_runtime_limits(conn, statement_timeout_ms)
# exec_driver_sql bypasses SQLAlchemy's :name bind-parameter parsing,
# which would otherwise misinterpret colons inside string literals such
# as `LIKE '_:%:__.___'` (BIRD qids 959 / 989 / 990 — formula_1 time patterns).
cursor = conn.exec_driver_sql(sql)
columns = list(cursor.keys())
rows = cursor.fetchmany(row_cap + 1)
cursor.close() # drain any remaining rows so SQLite/Postgres release resources
truncated = len(rows) > row_cap
if truncated:
rows = rows[:row_cap]
elapsed_ms = (time.perf_counter() - started) * 1000.0
yield QueryResult(
rows=[tuple(r) for r in rows],
columns=columns,
row_count=len(rows),
truncated=truncated,
elapsed_ms=elapsed_ms,
)
def _apply_runtime_limits(conn: Connection, statement_timeout_ms: int) -> None:
dialect = conn.engine.dialect.name
if dialect == "postgresql":
conn.execute(text(f"SET statement_timeout = {int(statement_timeout_ms)}"))
conn.execute(text("SET default_transaction_read_only = on"))
elif dialect == "sqlite":
# SQLite has no per-query timeout knob in the URL API; use connection
# progress handler at the dbapi level.
seconds = statement_timeout_ms / 1000.0
raw = conn.connection.driver_connection
if isinstance(raw, sqlite3.Connection):
_install_sqlite_timeout(raw, seconds)
def _install_sqlite_timeout(conn: sqlite3.Connection, seconds: float) -> None:
deadline = time.monotonic() + seconds
def _interrupt() -> int:
return 1 if time.monotonic() > deadline else 0
# progress handler is invoked every N VM ops; 1000 keeps overhead trivial.
conn.set_progress_handler(_interrupt, 1000)
def sqlite_url_readonly(path: Path) -> str:
"""Return the absolute path used as DatabaseSpec.url for SQLite specs.
We store the bare path (not a full SQLAlchemy URL) because read-only mode
is applied via a creator function in `_build_sqlite_readonly_engine` —
SQLAlchemy's URL builder does not carry the SQLite `mode=ro` URI cleanly
across Windows and POSIX path encodings.
"""
return str(path.resolve())
|