| """Picks DB vs Tabular executor based on the source_type of the IR's source. |
| |
| This is the only place in the structured query path where the schema/tabular |
| distinction matters. Every step before this is source-type-agnostic. |
| |
| Production executors are imported lazily so the module is import-safe for |
| tests (DbExecutor transitively imports `Settings` which fails without `.env`). |
| Tests can inject their own `executor_factories` to bypass production deps |
| entirely. |
| |
| Until TAB owner ships the real `TabularExecutor` body, dispatching to a |
| tabular source returns the existing stub which raises `NotImplementedError` |
| on `.run()`. `QueryService` catches this and surfaces a graceful error in |
| `QueryResult.error`. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from collections.abc import Callable |
|
|
| from ...catalog.models import Catalog, Source |
| from ..ir.models import QueryIR |
| from .base import BaseExecutor |
|
|
| ExecutorFactory = Callable[[Catalog], BaseExecutor] |
|
|
|
|
| class ExecutorDispatcher: |
| """Picks the right `BaseExecutor` for an IR. |
| |
| One executor instance per source_type per dispatcher (cached internally), |
| since both `DbExecutor` and `TabularExecutor` are stateless beyond the |
| catalog they hold. |
| """ |
|
|
| def __init__( |
| self, |
| catalog: Catalog, |
| executor_factories: dict[str, ExecutorFactory] | None = None, |
| ) -> None: |
| self._catalog = catalog |
| self._factories = executor_factories |
| self._cache: dict[str, BaseExecutor] = {} |
|
|
| def pick(self, ir: QueryIR) -> BaseExecutor: |
| source = self._find_source(ir.source_id) |
| if source.source_type in self._cache: |
| return self._cache[source.source_type] |
| factory = self._get_factory(source.source_type) |
| executor = factory(self._catalog) |
| self._cache[source.source_type] = executor |
| return executor |
|
|
| def _get_factory(self, source_type: str) -> ExecutorFactory: |
| if self._factories is not None: |
| factory = self._factories.get(source_type) |
| if factory is None: |
| raise ValueError( |
| f"no executor factory injected for source_type={source_type!r}" |
| ) |
| return factory |
| |
| if source_type == "schema": |
| from .db import DbExecutor |
|
|
| return DbExecutor |
| if source_type == "tabular": |
| from .tabular import TabularExecutor |
|
|
| return TabularExecutor |
| raise ValueError(f"unsupported source_type={source_type!r}") |
|
|
| def _find_source(self, source_id: str) -> Source: |
| for s in self._catalog.sources: |
| if s.source_id == source_id: |
| return s |
| raise ValueError(f"source_id {source_id!r} not in catalog") |
|
|