| """TabularExecutor — runs compiled pandas/polars chain on a Parquet file. |
| |
| Picks engine by file size: |
| ≤ 100 MB → eager pandas |
| 100 MB-1 GB → pyarrow with predicate pushdown |
| > 1 GB → polars lazy scan |
| |
| Initial scope ships eager pandas only; the others are added when a real |
| file is too big. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import io |
| import time |
| from collections.abc import Callable, Coroutine |
| from typing import Any |
|
|
| import pandas as pd |
|
|
| from ...catalog.models import Catalog, Source, Table |
| from ...storage.parquet import parquet_blob_name |
| from ...middlewares.logging import get_logger |
| from ..compiler.pandas import CompiledPandas, PandasCompiler |
| from ..ir.models import QueryIR |
| from .base import BaseExecutor, QueryResult |
|
|
| logger = get_logger("tabular_executor") |
|
|
| _AZ_BLOB_PREFIX = "az_blob://" |
| _ROW_HARD_CAP = 10_000 |
|
|
|
|
| class TabularExecutor(BaseExecutor): |
| """Executes compiled pandas chain on a Parquet blob. |
| |
| `fetch_blob` is injectable for tests — defaults to AzureBlobStorage. |
| """ |
|
|
| def __init__( |
| self, |
| catalog: Catalog, |
| fetch_blob: Callable[[str], Coroutine[Any, Any, bytes]] | None = None, |
| ) -> None: |
| self._catalog = catalog |
| self._compiler = PandasCompiler(catalog) |
| self._fetch_blob = fetch_blob or self._default_fetch_blob |
|
|
| @staticmethod |
| async def _default_fetch_blob(blob_name: str) -> bytes: |
| from ...storage.az_blob.az_blob import blob_storage |
|
|
| return await blob_storage.download_file(blob_name) |
|
|
| async def run(self, ir: QueryIR) -> QueryResult: |
| started = time.perf_counter() |
| table_name = "" |
| source_name = "" |
| try: |
| source, table = self._lookup(ir) |
| table_name = table.name |
| source_name = source.name |
| if source.source_type != "tabular": |
| raise ValueError( |
| f"TabularExecutor cannot run on source_type={source.source_type!r}; " |
| "expected 'tabular'" |
| ) |
|
|
| compiled = self._compiler.compile(ir) |
| logger.info("pandas query", query=_render_query(ir, {c.column_id: c for c in table.columns})) |
| blob_name = _resolve_blob_name(source, table) |
| blob_bytes = await self._fetch_blob(blob_name) |
|
|
| result_df = await asyncio.to_thread(_load_and_apply, blob_bytes, compiled) |
|
|
| truncated = len(result_df) > _ROW_HARD_CAP |
| capped = result_df.head(_ROW_HARD_CAP) |
|
|
| columns = compiled.output_columns |
| rows = capped.to_dict(orient="records") |
| elapsed_ms = int((time.perf_counter() - started) * 1000) |
| logger.info( |
| "tabular query complete", |
| source_id=ir.source_id, |
| rows=len(rows), |
| truncated=truncated, |
| elapsed_ms=elapsed_ms, |
| ) |
| return QueryResult( |
| source_id=ir.source_id, |
| backend="tabular", |
| columns=columns, |
| rows=rows, |
| row_count=len(rows), |
| truncated=truncated, |
| elapsed_ms=elapsed_ms, |
| table_id=ir.table_id, |
| table_name=table_name, |
| source_name=source_name, |
| ) |
|
|
| except Exception as e: |
| elapsed_ms = int((time.perf_counter() - started) * 1000) |
| logger.error( |
| "tabular executor failed", |
| source_id=ir.source_id, |
| error=str(e), |
| elapsed_ms=elapsed_ms, |
| ) |
| return QueryResult( |
| source_id=ir.source_id, |
| backend="tabular", |
| elapsed_ms=elapsed_ms, |
| error=str(e), |
| table_id=ir.table_id, |
| table_name=table_name, |
| source_name=source_name, |
| ) |
|
|
| |
| |
| |
|
|
| def _lookup(self, ir: QueryIR) -> tuple[Source, Table]: |
| source = next( |
| (s for s in self._catalog.sources if s.source_id == ir.source_id), None |
| ) |
| if source is None: |
| raise ValueError(f"source_id {ir.source_id!r} not in catalog") |
| table = next( |
| (t for t in source.tables if t.table_id == ir.table_id), None |
| ) |
| if table is None: |
| raise ValueError(f"table_id {ir.table_id!r} not in source {ir.source_id!r}") |
| return source, table |
|
|
|
|
| |
| |
| |
|
|
| def _resolve_blob_name(source: Source, table: Table) -> str: |
| """Map source.location_ref + table → the Parquet blob name to download. |
| |
| Delegates to ``parquet_service.parquet_blob_name`` so the same naming |
| convention (and ``_safe_sheet_name`` sanitization) is used on both the |
| write side (ingestion) and the read side (query execution). |
| |
| CSV / Parquet → ``{user_id}/{document_id}.parquet`` |
| XLSX → ``{user_id}/{document_id}__{safe_sheet}.parquet`` |
| (writer always uploads with sheet suffix for XLSX, |
| regardless of sheet count — see processing_service |
| `_build_excel_documents`) |
| |
| XLSX is detected via ``Source.name`` (the original filename). This relies |
| on the upload pipeline preserving the file extension, which it does today |
| because `Document.filename` is set once at upload and never renamed. |
| """ |
| if not source.location_ref.startswith(_AZ_BLOB_PREFIX): |
| raise ValueError( |
| f"TabularExecutor expects 'az_blob://...' location_ref, " |
| f"got {source.location_ref!r}" |
| ) |
| path = source.location_ref[len(_AZ_BLOB_PREFIX):] |
| parts = path.split("/", 1) |
| if len(parts) != 2 or not parts[0] or not parts[1]: |
| raise ValueError(f"Malformed az_blob location_ref: {source.location_ref!r}") |
| user_id, document_id = parts |
| is_xlsx = source.name.lower().endswith(".xlsx") |
| sheet_name = table.name if is_xlsx else None |
| return parquet_blob_name(user_id, document_id, sheet_name) |
|
|
|
|
| def _render_query(ir: QueryIR, cols_by_id: dict) -> str: |
| from ..ir.models import AggSelect, ColumnSelect |
| parts = ["df"] |
| if ir.filters: |
| conds = " & ".join( |
| f'(df["{cols_by_id[f.column_id].name}"] {f.op} {f.value!r})' |
| for f in ir.filters |
| ) |
| parts.append(f"[{conds}]") |
| aggs = [s for s in ir.select if isinstance(s, AggSelect)] |
| cols = [s for s in ir.select if isinstance(s, ColumnSelect)] |
| if aggs: |
| col_names = [cols_by_id[s.column_id].name for s in cols] |
| if ir.group_by: |
| group_names = [cols_by_id[g].name for g in ir.group_by] |
| parts.append(f'.groupby({group_names})') |
| for agg in aggs: |
| col = f'["{cols_by_id[agg.column_id].name}"]' if agg.column_id else "" |
| fn_map = {"count": "count()", "count_distinct": "nunique()", "sum": "sum()", "avg": "mean()", "min": "min()", "max": "max()"} |
| parts.append(f'{col}.{fn_map.get(agg.fn, agg.fn + "()")}') |
| elif cols: |
| col_names = [cols_by_id[s.column_id].name for s in cols] |
| parts.append(f'[{col_names}]') |
| if ir.limit: |
| parts.append(f'.head({ir.limit})') |
| return "".join(parts) |
|
|
|
|
| def _load_and_apply(blob_bytes: bytes, compiled: CompiledPandas) -> pd.DataFrame: |
| """Load Parquet bytes into a DataFrame and apply the compiled op chain.""" |
| df = pd.read_parquet(io.BytesIO(blob_bytes)) |
| return compiled.apply(df) |
|
|