"""Picks DB vs Tabular executor based on the source_type of the IR's source. This is the only place in the structured query path where the schema/tabular distinction matters. Every step before this is source-type-agnostic. Production executors are imported lazily so the module is import-safe for tests (DbExecutor transitively imports `Settings` which fails without `.env`). Tests can inject their own `executor_factories` to bypass production deps entirely. Until TAB owner ships the real `TabularExecutor` body, dispatching to a tabular source returns the existing stub which raises `NotImplementedError` on `.run()`. `QueryService` catches this and surfaces a graceful error in `QueryResult.error`. """ from __future__ import annotations from collections.abc import Callable from ...catalog.models import Catalog, Source from ..ir.models import QueryIR from .base import BaseExecutor ExecutorFactory = Callable[[Catalog], BaseExecutor] class ExecutorDispatcher: """Picks the right `BaseExecutor` for an IR. One executor instance per source_type per dispatcher (cached internally), since both `DbExecutor` and `TabularExecutor` are stateless beyond the catalog they hold. """ def __init__( self, catalog: Catalog, executor_factories: dict[str, ExecutorFactory] | None = None, ) -> None: self._catalog = catalog self._factories = executor_factories self._cache: dict[str, BaseExecutor] = {} def pick(self, ir: QueryIR) -> BaseExecutor: source = self._find_source(ir.source_id) if source.source_type in self._cache: return self._cache[source.source_type] factory = self._get_factory(source.source_type) executor = factory(self._catalog) self._cache[source.source_type] = executor return executor def _get_factory(self, source_type: str) -> ExecutorFactory: if self._factories is not None: factory = self._factories.get(source_type) if factory is None: raise ValueError( f"no executor factory injected for source_type={source_type!r}" ) return factory # Default factories — lazy-imported so importing this module is cheap if source_type == "schema": from .db import DbExecutor return DbExecutor # type: ignore[return-value] if source_type == "tabular": from .tabular import TabularExecutor return TabularExecutor # type: ignore[return-value] raise ValueError(f"unsupported source_type={source_type!r}") def _find_source(self, source_id: str) -> Source: for s in self._catalog.sources: if s.source_id == source_id: return s raise ValueError(f"source_id {source_id!r} not in catalog")