File size: 2,871 Bytes
6bff5d9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | """Picks DB vs Tabular executor based on the source_type of the IR's source.
This is the only place in the structured query path where the schema/tabular
distinction matters. Every step before this is source-type-agnostic.
Production executors are imported lazily so the module is import-safe for
tests (DbExecutor transitively imports `Settings` which fails without `.env`).
Tests can inject their own `executor_factories` to bypass production deps
entirely.
Until TAB owner ships the real `TabularExecutor` body, dispatching to a
tabular source returns the existing stub which raises `NotImplementedError`
on `.run()`. `QueryService` catches this and surfaces a graceful error in
`QueryResult.error`.
"""
from __future__ import annotations
from collections.abc import Callable
from ...catalog.models import Catalog, Source
from ..ir.models import QueryIR
from .base import BaseExecutor
ExecutorFactory = Callable[[Catalog], BaseExecutor]
class ExecutorDispatcher:
"""Picks the right `BaseExecutor` for an IR.
One executor instance per source_type per dispatcher (cached internally),
since both `DbExecutor` and `TabularExecutor` are stateless beyond the
catalog they hold.
"""
def __init__(
self,
catalog: Catalog,
executor_factories: dict[str, ExecutorFactory] | None = None,
) -> None:
self._catalog = catalog
self._factories = executor_factories
self._cache: dict[str, BaseExecutor] = {}
def pick(self, ir: QueryIR) -> BaseExecutor:
source = self._find_source(ir.source_id)
if source.source_type in self._cache:
return self._cache[source.source_type]
factory = self._get_factory(source.source_type)
executor = factory(self._catalog)
self._cache[source.source_type] = executor
return executor
def _get_factory(self, source_type: str) -> ExecutorFactory:
if self._factories is not None:
factory = self._factories.get(source_type)
if factory is None:
raise ValueError(
f"no executor factory injected for source_type={source_type!r}"
)
return factory
# Default factories — lazy-imported so importing this module is cheap
if source_type == "schema":
from .db import DbExecutor
return DbExecutor # type: ignore[return-value]
if source_type == "tabular":
from .tabular import TabularExecutor
return TabularExecutor # type: ignore[return-value]
raise ValueError(f"unsupported source_type={source_type!r}")
def _find_source(self, source_id: str) -> Source:
for s in self._catalog.sources:
if s.source_id == source_id:
return s
raise ValueError(f"source_id {source_id!r} not in catalog")
|