ishaq101's picture
feat/Catalog Retrieval System (#1)
6bff5d9

Progress β€” Phase 2 catalog-driven build

Persistent tracker mirroring the 42-item ownership table in REPO_CONTEXT.md "Team β€” division of work". Update as PRs land. Future Claude Code sessions read this to know what's already done.

Last updated: 2026-05-12 ([NOTICKET] Cleanup PR landed: ChatHandler wired to chat.py, Phase 1 dual-write dropped from /ingest, on_catalog_rebuild_requested implemented, dead modules deleted, answer_agent→chatbot renamed, retrieval cache restored via RetrievalRouter, top_values added to ColumnStats, lifespan migration, knowledge_router removed) Current open PR: pr/1 — active. Cleanup PR committed and pushed.


Legend

  • [x] done and merged
  • [~] in progress (open PR or active branch)
  • [ ] not started
  • DB / TAB / B β€” ownership (from REPO_CONTEXT.md)

PR sequence

PR Status Owner(s) Scope
PR1 [x] merged DB Contract locks + catalog plumbing + DB introspector + IR validator + tests
PR1-tab [x] shipped TAB Tabular introspector + on_tabular_uploaded trigger + 31 unit tests
PR2a [x] merged DB CatalogEnricher + StructuredPipeline + on_db_registered trigger + FK extension on Table (enricher later removed in KM-557)
KM-557 [x] shipped DB Drop CatalogEnricher entirely (cost cut β€” planner uses stats + sample rows directly); rename jsonb table catalogs β†’ data_catalog; add GET /api/v1/data-catalog/{user_id} index endpoint for catalog refresher
PR2b [x] shipped DB-solo (B-review) IntentRouter + planner prompt + planner LLM service
PR3-DB [x] shipped DB SqlCompiler (Postgres) + DbExecutor (sqlglot guard, RO + statement_timeout, asyncio.to_thread) + 36 golden IR→SQL tests
PR3-TAB [x] shipped TAB PandasCompiler + TabularExecutor + 43+12 golden IR→DataFrame tests
PR4 [x] DB-solo (B-review) ExecutorDispatcher + QueryService + ChatHandler module. API rewired in Cleanup PR.
PR5 [x] shipped DB-solo (B-review) Retry/self-correction loop on validation failure (lives in QueryService, max 3 attempts, planner re-prompted with prior error)
PR6 [~] scaffold DB-solo (B-review) Eval harness scaffold + 3 DB-targeting golden cases. Skipped without RUN_PLANNER_EVAL=1 env. TAB extends with tabular cases.
PR7 [x] DB-solo (B-review) ChatbotAgent (renamed from AnswerAgent) + chatbot_system + guardrails prompts. answer_agent.py β†’ chatbot.py, AnswerAgent β†’ ChatbotAgent. API rewired in Cleanup PR.
Cleanup [x] B ChatHandler wired to chat.py; Phase 1 dual-write dropped from /ingest; on_catalog_rebuild_requested + POST /data-catalog/rebuild; dead modules deleted (chatbot Phase 1, orchestrator, query/base, knowledge.py, config/agents/); retrieval cache restored via RetrievalRouter; top_values added to ColumnStats; lifespan migration; knowledge_router removed.

All items

Contracts (B β€” shared)

# Item Status Notes
1 Catalog Pydantic models (catalog/models.py) [x] PR1 added location_ref URI-scheme docstring; PR2a added ForeignKey model + Table.foreign_keys field
2 IR Pydantic models (query/ir/models.py) [x] Pre-existing scaffold
3 IR operator whitelists (query/ir/operators.py) [x] PR1 filled TYPE_COMPATIBILITY matrix
4 PII patterns / regex (security/pii_patterns.py) [x] Pre-existing
β€” data_catalog Postgres jsonb table (db/postgres/models.py) [x] PR1 added Catalog SQLAlchemy class + init_db.py import. KM-557 renamed __tablename__ from catalogs β†’ data_catalog; created fresh (no migration)
β€” QueryResult shape (query/executor/base.py) [x] Pre-existing scaffold; columns: list[str] added (TAB owner, PR1-tab) β€” DbExecutor updated to populate it.
β€” Source.location_ref URI scheme [x] PR1 documented in catalog/models.py docstring

Ingestion β€” introspection

# Item Owner Status Notes
5 DB introspector (catalog/introspect/database.py) DB [x] PR1 β€” reuses Phase 1 database_client_service, db_credential_encryption, db_pipeline_service.engine_scope, extractor.get_schema/profile_column/get_row_count. PR2a wired FK extraction (was discarded before).
6 Tabular introspector (catalog/introspect/tabular.py) TAB [~] PR1-tab β€” downloads original blob (CSV/XLSX/Parquet), one Table per sheet (XLSX) or one Table (CSV/Parquet). source_id = document_id. fetch_doc/fetch_blob injectable for unit tests (no Settings).
7 BaseIntrospector ABC (catalog/introspect/base.py) B [x] Pre-existing; signature locked

Ingestion β€” shared catalog plumbing

# Item Owner Status Notes
8 Catalog enricher + prompt B REMOVED in KM-557 Cost optimization β€” planner reads stats + sample rows + column names directly. catalog/enricher.py + config/prompts/catalog_enricher.md deleted. render_source (the only piece still needed) moved to src/catalog/render.py. Tests moved to tests/catalog/test_render.py.
9 Catalog validator (catalog/validator.py) B [x] PR1 (DB owner picked up) β€” uniqueness invariants
10 Catalog store β€” Postgres jsonb (catalog/store.py) B [x] PR1 (DB owner picked up) β€” INSERT ... ON CONFLICT
11 Catalog reader (catalog/reader.py) B [x] PR1 (DB owner picked up) β€” filters by source_hint, empty on miss
12 PII detector (catalog/pii_detector.py) B [x] PR1 (DB owner picked up) β€” name + value matching, bias toward over-flag

Ingestion β€” pipelines

# Item Owner Status Notes
13 Structured pipeline (pipeline/structured_pipeline.py) B [x] PR2a (DB owner) β€” Source-type-agnostic: caller supplies the introspector. default_structured_pipeline() factory wires production deps lazily so tests can inject mocks without Settings() construction. KM-557: enrich step removed; pipeline is now introspect β†’ merge with existing β†’ validate β†’ upsert. Constructor no longer takes enricher.
14 Triggers (pipeline/triggers.py) B [x] PR2a β€” on_db_registered implemented (DB owner). PR1-tab β€” on_tabular_uploaded implemented (TAB owner). 2026-05-11 β€” on_document_uploaded implemented. 2026-05-12 β€” on_catalog_rebuild_requested implemented: iterates all Sources in current catalog, re-runs on_db_registered (schema) or on_tabular_uploaded (tabular) per source; per-source errors logged but don't abort.
15 Ingestion orchestrator (pipeline/orchestrator.py) B DELETED Redundant stub β€” StructuredPipeline already takes introspector at run() time. Deleted in Cleanup PR.
16 Document pipeline (pipeline/document_pipeline.py) TAB [x] Flattened pipeline/document_pipeline/document_pipeline.py (folder) β†’ pipeline/document_pipeline.py (file). Updated import in api/v1/document.py.

Query β€” shared spine

# Item Owner Status Notes
17 IR validator (query/ir/validator.py) B [x] PR1 (DB owner) β€” full rule set; descriptive errors for planner retry
18 Planner LLM service (query/planner/service.py) B [x] PR2b β€” Azure OpenAI structured output β†’ QueryIR. Injectable chain. Supports retry via previous_error argument.
19 Planner prompt (query/planner/prompt.py, config/prompts/query_planner.md) B [x] PR2b β€” system prompt with hard constraints + few-shot for DB and tabular sources. build_planner_prompt(question, catalog, previous_error) calls catalog.render.render_source (renamed from catalog.enricher.render_source in KM-557).
20 Intent router (agents/orchestration.py β€” class OrchestratorAgent; config/prompts/intent_router.md) B [x] PR2b β€” single LLM call β†’ IntentRouterDecision(needs_search, source_hint, rewritten_query). Supports conversation history. NOTE: source filename + class name were kept from Phase 1 for import-site compatibility; only the body is Phase 2. Prompt file and test file use the intent_router name.
21 Executor base + QueryResult (query/executor/base.py) B [x] Pre-existing scaffold
22 Executor dispatcher (query/executor/dispatcher.py) B [x] PR4 β€” picks DbExecutor / TabularExecutor by source.source_type. Lazy imports of production executors keep import side-effect-free for tests. Caches per source_type.
23 Compiler base ABC (query/compiler/base.py) B [x] Pre-existing scaffold
24 Top-level QueryService (query/service.py) B [x] PR4+5 β€” plan β†’ validate β†’ dispatch β†’ execute β†’ QueryResult. Retry loop on validation failure (max 3, planner re-prompted with prior error). Catches NotImplementedError from TabularExecutor placeholder gracefully. Never raises.

Query β€” DB path

# Item Status Notes
25 SQL compiler (query/compiler/sql.py) [x] PR3-DB β€” Postgres dialect (Supabase reuses); deterministic IR β†’ (sql, named-params dict); double-quoted identifiers from catalog; all whitelisted ops (=, !=, <, <=, >, >=, in, not_in, is_null, is_not_null, like, between); alias-aware order_by; CompiledSql.params: dict[str, Any] (changed from list). MySQL/BigQuery/Snowflake compilers later.
26 DB executor (query/executor/db.py) [x] PR3-DB β€” sync engine via db_pipeline_service.engine_scope inside asyncio.to_thread. sqlglot SELECT-only / no-DML guard. Postgres-only session settings: default_transaction_read_only=on + statement_timeout=30000. asyncio.wait_for backstop. Never raises β€” populates QueryResult.error. 10k row hard cap.
27 Credential encryption (security/credentials.py) [ ] Stub exists; PR1 reused Phase 1 utils/db_credential_encryption.py instead. Move in cleanup PR
28 User-DB connection management [x] PR3-DB reused Phase 1 db_pipeline_service.engine_scope (same as PR1 introspector); no new helper needed

Query β€” Tabular path

# Item Status Notes
29 Pandas compiler (query/compiler/pandas.py) [~] PR3-TAB β€” CompiledPandas dataclass; all 12 filter ops; all 6 aggs; group_by via pd.concat of Series; alias-aware order_by; _like_to_regex (%β†’.*, _β†’.); pure module-level helpers
30 Tabular executor (query/executor/tabular.py) [~] PR3-TAB β€” fetch_blob injectable for tests; blob path: single-table β†’ {uid}/{did}.parquet, multi-table β†’ {uid}/{did}__{table.name}.parquet; asyncio.to_thread; 10k row hard cap; errors β†’ QueryResult.error
31 Parquet upload/download wrapper [x] Moved knowledge/parquet_service.py β†’ storage/parquet.py. Updated 4 import sites: pipeline/document_pipeline.py, knowledge/processing_service.py, query/executor/tabular.py, query/executors/tabular.py.

Agents + chat

# Item Status Notes
32 Chatbot agent + prompt (agents/chatbot.py, config/prompts/chatbot_system.md) [x] PR7-bundle β€” ChatbotAgent (was AnswerAgent) streams tokens, accepts QueryResult or list[DocumentChunk] or neither. Cleanup PR: renamed answer_agent.py β†’ chatbot.py, AnswerAgent β†’ ChatbotAgent; Phase 1 agents/chatbot.py deleted.
33 Guardrails prompt (config/prompts/guardrails.md) [x] PR7-bundle β€” appended to chatbot_system.md so guardrails take precedence in conflict.
β€” Chat handler / orchestrator (agents/chat_handler.py) [x] PR4-bundle β€” top-level Phase 2 orchestrator. Routes by source_hint: chat β†’ AnswerAgent direct; structured β†’ CatalogReader + QueryService; unstructured β†’ DocumentRetriever placeholder + AnswerAgent. Yields intent / chunk / done / error SSE-style events. Phase 1 chat.py NOT touched β€” cleanup PR rewires the API to call this.

API surface

# Item Owner Status Notes
34 DB client endpoints (api/v1/db_client.py) DB [x] Cleanup PR β€” /ingest now calls only on_db_registered. Phase 1 db_pipeline_service.run() + decrypt_credentials_dict removed. Error from catalog build now raises HTTP 500 (was silent log). Response simplified to {"status": "success", "client_id": ...}.
35 Document/tabular upload endpoints (api/v1/document.py) TAB [x] Rewired /document/process β€” after processing CSV/XLSX, calls on_tabular_uploaded(document_id, user_id). Catalog ingestion failure is logged but does not fail the request. 2026-05-11 β€” CSV/XLSX no longer ingested to vector store (knowledge_processor skipped for tabular types in document_pipeline.py); they go to catalog only.
36 Chat stream endpoint (api/v1/chat.py) B [x] Rewired /chat/stream β€” replaced query_executor.execute() (Phase 1) with CatalogReader + QueryService (Phase 2). Cleanup PR: fully rewired to ChatHandler.handle(). Inline intent routing, retrieval, and answer generation removed. Redis cache, fast intent, history loading, and message persistence remain in chat.py. Sources event emits [] (retrieval not yet exposed by ChatHandler).
37 Room / users endpoints (api/v1/room.py, api/v1/users.py) B [ ] No catalog work; only touch if auth flow changes
β€” Data catalog index endpoint (api/v1/data_catalog.py) DB [x] KM-557 β€” GET /api/v1/data-catalog/{user_id} β†’ list[CatalogIndexEntry]. Cleanup PR β€” added POST /api/v1/data-catalog/rebuild?user_id= β†’ calls on_catalog_rebuild_requested; per-source errors logged but don't fail the request.

Tests + eval

# Item Owner Status Notes
38 DB compiler golden tests (tests/query/compiler/test_sql.py) DB [x] PR3-DB β€” 36 tests across all whitelisted ops, identifier quoting, agg / count_distinct / count(*), order_by alias resolution, parameter sequencing, error paths. Pure-Python, no LLM, no DB.
39 Pandas compiler golden tests (tests/unit/query/compiler/test_pandas_compiler.py) TAB [~] PR3-TAB β€” 43 tests: all 12 filter ops, all 6 aggs, group_by, order_by, limit, aliases, empty DataFrame, error paths. test_tabular_executor.py adds 12 more (blob name resolution + happy path + error paths).
40 IR validator tests (tests/query/ir/test_validator.py) B [x] PR1 β€” 19 tests, all rules covered
β€” PII detector tests (tests/catalog/test_pii_detector.py) B [x] PR1 β€” 26 tests (parametrized)
β€” Catalog validator tests (tests/catalog/test_validator.py) B [x] PR1 β€” 5 tests
β€” Catalog render tests (tests/catalog/test_render.py) B [x] KM-557 β€” 5 tests (renamed from test_enricher.py; LLM enrichment tests dropped, render-only tests kept).
β€” Catalog store integration test (tests/catalog/test_store.py) DB [x] PR1 β€” module-level skip without RUN_INTEGRATION_TESTS=1
β€” DB introspector test DB [ ] Deferred to PR2 β€” needs Postgres testcontainer or fixture infra
β€” Tabular introspector test TAB [x] PR1-tab β€” 31 unit tests (CSV/XLSX/Parquet, stats, PII, error paths). No DB/blob I/O β€” mocks injected via constructor.
41 Planner eval (tests/query/planner/) B [x] PR6-scaffold β€” test_golden_questions.py with 3 DB-targeting cases. TAB added test_golden_tabular.py with 4 tabular cases (group_by+sum, top-N+limit, date range filter, XLSX sheet selection). All 4 passed against real Azure OpenAI. Fix shipped alongside: query/planner/service.py replaced ("system", text) tuple with SystemMessage β€” without this, {...} in query_planner.md was parsed as f-string variables and crashed on every real invocation.
42 E2E smoke tests (tests/e2e/) B [ ] Defer until Phase 2 endpoints are wired (cleanup PR). Component-level orchestration is already covered by test_chat_handler.py + test_service.py.
β€” Golden IR fixtures (tests/fixtures/golden_irs.json) B [~] PR1 seeded with 5 DB-targeting examples; TAB extends in PR1-tab
β€” Shared sample_catalog fixture (tests/conftest.py) B [x] PR1 β€” DB-shaped; TAB may add tabular sibling

What just shipped (2026-05-12 β€” Cleanup PR)

Phase 1 removal + Phase 2 API rewiring:

  • src/api/v1/chat.py β€” fully rewired to ChatHandler.handle(). Removed inline IntentRouter, retrieval, and ChatbotAgent calls. Redis cache, fast intent, load_history, save_messages stay in chat.py.
  • src/api/v1/db_client.py β€” /ingest now calls only on_db_registered. Phase 1 db_pipeline_service.run() block removed. Catalog build failure now raises HTTP 500.
  • src/api/v1/data_catalog.py β€” added POST /api/v1/data-catalog/rebuild endpoint.
  • src/pipeline/triggers.py β€” on_catalog_rebuild_requested implemented: iterates catalog sources, re-runs the appropriate trigger per source type, per-source errors logged.

Dead modules deleted:

  • src/agents/chatbot.py (Phase 1 LangChain chatbot)
  • src/pipeline/orchestrator.py (empty stub)
  • src/query/base.py (old duplicate of executor/base.py)
  • src/api/v1/knowledge.py (fake /knowledge/rebuild endpoint)
  • src/config/agents/ (folder β€” prompts only used by deleted Phase 1 chatbot)

Renames:

  • src/agents/answer_agent.py β†’ src/agents/chatbot.py; AnswerAgent β†’ ChatbotAgent; updated all import sites (chat_handler.py, chat.py)

Fixes + improvements:

  • src/agents/chat_handler.py β€” _get_document_retriever() now returns RetrievalRouter (Redis-cached) instead of DocumentRetriever directly; retrieval-level cache restored.
  • src/retrieval/router.py β€” removed dead db: AsyncSession and source_hint parameters + _UNSTRUCTURED_HINTS constant from retrieve(). Cache key simplified.
  • src/knowledge/processing_service.py β€” removed dead _build_csv_documents, _build_excel_documents, _profile_dataframe, _to_sheet_document methods + pandas and upload_parquet imports.
  • src/catalog/models.py β€” added top_values: list[Any] | None to ColumnStats.
  • src/catalog/introspect/tabular.py β€” _to_column now populates top_values for columns with ≀10 distinct values; useful for query planner WHERE clause generation.
  • main.py β€” replaced deprecated @app.on_event("startup") with lifespan context manager; removed knowledge_router.

What just shipped (KM-557 β€” DB owner)

After lead review of the catalog ingestion cost: dropped LLM enrichment, renamed the storage table, and exposed a lightweight index endpoint for the upcoming catalog refresher.

Files deleted:

  • src/catalog/enricher.py β€” entire CatalogEnricher + EnrichmentResponse + apply_descriptions removed
  • src/config/prompts/catalog_enricher.md β€” dead prompt
  • tests/catalog/test_enricher.py β€” replaced by test_render.py

Files added:

  • src/catalog/render.py β€” new home for render_source (the only piece of the old enricher still needed; consumed by query/planner/prompt.py)
  • src/api/v1/data_catalog.py β€” GET /api/v1/data-catalog/{user_id} returns list[CatalogIndexEntry]
  • tests/catalog/test_render.py β€” 5 tests (same coverage as the old render block)

Files modified:

  • src/db/postgres/models.py β€” __tablename__ = "data_catalog" (was "catalogs"). Class name unchanged
  • src/pipeline/structured_pipeline.py β€” StructuredPipeline(validator, store) (was (enricher, validator, store)); pipeline is now introspect β†’ merge β†’ validate β†’ upsert; default_structured_pipeline() no longer constructs an enricher
  • src/pipeline/triggers.py β€” docstrings updated; on_catalog_rebuild_requested docstring rewritten for the refresher use case
  • src/query/planner/prompt.py β€” import now from ...catalog.render import render_source
  • src/catalog/introspect/{base,database,tabular}.py β€” docstring scrubs (no behavior changes)
  • src/models/api/catalog.py β€” added CatalogIndexEntry; simplified CatalogRebuildResponse to sources_rebuilt
  • main.py β€” registered data_catalog_router
  • src/security/README.md β€” one stale wording fix

No migration: the data_catalog table is created from scratch on first init_db(). The old catalogs table was never deployed against production data, so no rename SQL is needed.

Tests: all 4 test_structured_pipeline.py tests reworked to construct StructuredPipeline(validator=, store=) without enricher. 5 test_render.py tests cover render_source standalone.

Lint: ruff check clean on modified Phase 2 paths.

Open follow-ups left for the lead:

  • on_catalog_rebuild_requested body β€” the refresher will iterate the index endpoint and call this trigger per source
  • api/v1/db_client.py /ingest still doesn't call on_db_registered β€” same blocker as before, untouched by KM-557

What just shipped (2026-05-11 β€” retrieval migration + bug fixes)

Files implemented / migrated:

  • src/retrieval/base.py β€” RetrievalResult dataclass + BaseRetriever ABC (was in src/rag/base.py)
  • src/retrieval/document.py β€” full DocumentRetriever migrated from src/rag/retrievers/document.py; all retrieval methods (MMR/cosine/euclidean/inner_product/manhattan). Tabular file types filtered out from results.
  • src/retrieval/router.py β€” RetrievalRouter (Redis-cached, unstructured-only). invalidate_cache(user_id) clears all retrieval:{user_id}:* keys.

Deleted (no longer used):

  • src/rag/ β€” entire folder (base.py, retriever.py, router.py, retrievers/)
  • src/tools/ β€” entire folder (search.py was the only real file; only called by deleted rag/ router)

Bug fixes:

  • src/pipeline/document_pipeline.py β€” retrieval_router.invalidate_cache(user_id) called after process() and delete(). Redis failure is caught and logged (does not fail the document op).
  • src/pipeline/document_pipeline.py β€” CSV/XLSX now skips knowledge_processor (vector store). Tabular files go to catalog only; no duplicate embeddings.
  • src/pipeline/triggers.py β€” on_document_uploaded implemented (was raise NotImplementedError).
  • src/agents/chat_handler.py β€” _normalize_chunks now handles RetrievalResult objects. Previously they were silently dropped, causing empty context for unstructured queries through ChatHandler.

Import updates (all changed from src.rag.* β†’ src.retrieval.*):

  • src/api/v1/chat.py, src/query/base.py, src/query/query_executor.py, src/query/executors/db_executor.py, src/query/executors/tabular.py

What shipped previously (PR2b/4/5/6/7-bundle β€” DB owner solo, teammate reviews)

Files implemented:

  • src/agents/orchestration.py β€” OrchestratorAgent.classify(message, history) β†’ IntentRouterDecision. Pydantic model for structured output. History-aware query rewriting. Phase 1 filename + class name preserved; body fully rewritten for Phase 2.
  • src/agents/answer_agent.py β€” AnswerAgent.astream(...) streams answer tokens; accepts QueryResult and/or list[DocumentChunk]. Renames to chatbot.py in cleanup PR.
  • src/agents/chat_handler.py β€” ChatHandler.handle(message, user_id, history) returns AsyncIterator[dict] of intent / chunk / done / error SSE events. All deps injectable; lazy default builders.
  • src/query/planner/prompt.py β€” render_catalog(catalog) + build_planner_prompt(question, catalog, previous_error). Reuses catalog.enricher.render_source for consistency across LLM call sites.
  • src/query/planner/service.py β€” QueryPlannerService.plan(question, catalog, previous_error) Azure OpenAI structured output β†’ QueryIR.
  • src/query/executor/dispatcher.py β€” ExecutorDispatcher.pick(ir) β†’ BaseExecutor by source.source_type. Lazy executor imports + per-source-type cache.
  • src/query/service.py β€” QueryService.run(user_id, question, catalog) β†’ QueryResult. Planβ†’validateβ†’retry-on-failure (max 3)β†’dispatchβ†’execute. Catches NotImplementedError from TabularExecutor placeholder gracefully.

Prompts written (filled in placeholders):

  • src/config/prompts/intent_router.md
  • src/config/prompts/query_planner.md
  • src/config/prompts/chatbot_system.md
  • src/config/prompts/guardrails.md

Tests added (46 new β€” total now 146 + 2 skipped):

  • tests/agents/test_intent_router.py (4)
  • tests/agents/test_answer_agent.py (12)
  • tests/agents/test_chat_handler.py (6)
  • tests/query/planner/test_prompt.py (7)
  • tests/query/planner/test_service.py (3)
  • tests/query/executor/test_dispatcher.py (5)
  • tests/query/test_service.py (8)
  • tests/query/planner/test_golden_questions.py (3 β€” skipped by default; eval harness scaffold)

Lint: ruff check clean on all Phase 2 paths. Phase 1 files have pre-existing E501/S608 issues β€” out of scope for this PR.

Placeholders / blockers for teammate (status as of DB owner's commit, before merge):

  • src/query/executor/tabular.py (TAB) β€” DB owner's note: "still raises NotImplementedError". Post-merge: TAB shipped this in PR3-TAB; dispatcher now routes to the real TabularExecutor. The NotImplementedError catch in QueryService stays as a safety net.
  • src/retrieval/document.py β€” implemented (2026-05-11). Full DocumentRetriever migrated from src/rag/retrievers/document.py; supports MMR/cosine/euclidean/manhattan/inner_product. _normalize_chunks in chat_handler.py now handles RetrievalResult β†’ DocumentChunk conversion correctly.
  • src/api/v1/chat.py (Phase 1) β€” NOT touched. Cleanup PR rewires the SSE endpoint to call ChatHandler.handle(...).
  • src/api/v1/db_client.py (Phase 1) β€” NOT touched. Cleanup PR rewires /database-clients/{id}/ingest to call pipeline.triggers.on_db_registered.

What shipped previously (PR3-TAB β€” TAB owner)

Files implemented:

  • src/query/compiler/pandas.py β€” PandasCompiler + CompiledPandas(apply, output_columns) dataclass. Pure helper functions (easier to test in isolation): _apply_filters (all 12 ops, _like_to_regex for LIKE), _apply_select (column pick + rename), _apply_agg (scalar + group_by via pd.concat of Series β†’ reset_index), _apply_orderby (alias-aware via _resolve_order_col). Closure captures all IR fields explicitly so apply(df) is self-contained.
  • src/query/executor/tabular.py β€” TabularExecutor with injectable fetch_blob (same testability pattern as TabularIntrospector). Resolves Parquet blob path from az_blob://{uid}/{did} + table: single-table β†’ {uid}/{did}.parquet, multi-table β†’ {uid}/{did}__{table.name}.parquet. Runs compile β†’ download β†’ asyncio.to_thread(_load_and_apply) β†’ 10k hard cap. Never raises; errors populate QueryResult.error. Uses compiled.output_columns for column labels (safe on empty DataFrame).

Tests added (55 new β€” total suite was 86 all passing at PR3-TAB time):

  • tests/unit/query/compiler/test_pandas_compiler.py β€” 43 tests across all 12 filter ops (including is_null, not_in, like, between), all 6 agg fns, group_by, order_by asc/desc, limit-after-order, alias round-trip, empty DataFrame, error paths.
  • tests/unit/query/executor/test_tabular_executor.py β€” 12 tests: _resolve_blob_name (single/multi-table, bad prefix), happy-path QueryResult shape (columns, rows, backend, truncated, source_id), wrong source_type β†’ error, blob fetch failure β†’ error, unknown source β†’ error.

Lint: ruff check clean on both files.


What shipped previously (PR1-tab β€” TAB owner)

Files implemented:

  • src/catalog/introspect/tabular.py β€” TabularIntrospector reads original blob (CSV/XLSX/Parquet), profiles each column (dtype, stats, sample values), runs PIIDetector. For XLSX: one Table per sheet (Table.name = sheet_name); for CSV/Parquet: one Table (Table.name = filename stem). fetch_doc/fetch_blob are constructor-injectable for unit tests β€” no Settings or DB required at import time.
  • src/pipeline/triggers.py β€” on_tabular_uploaded wired (mirrors on_db_registered pattern).

Tests added (31 new):

  • tests/unit/catalog/test_introspect_tabular.py β€” CSV / XLSX / Parquet shapes, per-column stats, nullable detection, PII name + value matching, sample capping, all error paths. Pure Python, no network I/O.

Executor contract note: introspector downloads the original blob for schema reading. The tabular executor (PR3-TAB) downloads Parquet blobs for query execution. For CSV/Parquet sources (single table), the executor must call parquet_blob_name(uid, did, sheet_name=None); for XLSX (multi-table), parquet_blob_name(uid, did, table.name).


What shipped previously (PR3-DB β€” DB owner)

Files implemented:

  • src/query/compiler/sql.py β€” SqlCompiler for Postgres dialect; CompiledSql(sql, params) dataclass with params: dict[str, Any] (changed from list); supports all 12 whitelisted filter ops, all 6 aggs, alias-aware order_by; _qident escapes embedded double-quotes
  • src/query/executor/db.py β€” DbExecutor with sqlglot SELECT-only guard, Postgres session-level read-only + 30s statement_timeout, asyncio.wait_for backstop, 10k row hard cap; rejects non-schema source_type and dbclient:// URI mismatch; never raises (populates QueryResult.error)

Files extended:

  • src/query/compiler/pandas.py β€” fixed pre-existing UP035 (Callable import)
  • pyproject.toml β€” added S608 to tests/** ruff ignore (false positive: tests assert literal SQL strings)

Tests added (36 new, all passing β€” total now 100):

  • tests/query/compiler/test_sql.py β€” every filter op, every agg, count(*), count_distinct, order_by alias vs column, multi-filter AND, identifier quoting escape, error paths

Lint: ruff check clean on Phase 2 paths.

Hand-off note for teammate: CompiledSql.params is now dict[str, Any] not list. The pandas compiler will follow the same convention (or document its own) β€” coordinate when PR3-TAB lands.


What shipped previously (PR2a β€” DB owner)

Files implemented:

  • src/catalog/enricher.py β€” Azure OpenAI GPT-4o + structured output (EnrichmentResponse), render_source (reusable by planner prompt later), apply_descriptions merger, injectable structured_chain for tests
  • src/pipeline/structured_pipeline.py β€” StructuredPipeline orchestrator + default_structured_pipeline() factory with lazy production-dep imports
  • src/pipeline/triggers.py β€” on_db_registered wired; tabular/document/rebuild stubs preserved with implementation notes

Files extended:

  • src/catalog/models.py β€” added ForeignKey model, Table.foreign_keys: list[ForeignKey] = []
  • src/catalog/introspect/database.py β€” _extract_foreign_keys populates Table.foreign_keys from extractor data
  • src/config/prompts/catalog_enricher.md β€” full system prompt with style rules and one few-shot example

Tests added (14 new, all passing β€” total now 64):

  • tests/catalog/test_enricher.py β€” render / apply / end-to-end with fake chain (10 tests)
  • tests/pipeline/test_structured_pipeline.py β€” orchestration with stub deps (4 tests)

Lint: ruff check clean on all Phase 2 paths. Phase 1 files (pipeline/db_pipeline/, pipeline/document_pipeline/) have pre-existing ruff issues β€” out of scope for this PR.


What shipped previously (PR1 β€” DB owner's first chunk)

Files implemented (was NotImplementedError):

  • src/catalog/pii_detector.py, src/catalog/validator.py, src/catalog/store.py, src/catalog/reader.py
  • src/catalog/introspect/database.py (FK extraction added in PR2a)
  • src/query/ir/validator.py

Files extended:

  • src/query/ir/operators.py β€” TYPE_COMPATIBILITY matrix
  • src/catalog/models.py β€” location_ref URI-scheme docstring
  • src/db/postgres/models.py β€” Catalog SQLAlchemy table; init_db.py imports it

Tests: 50 unit tests + 1 integration (gated on RUN_INTEGRATION_TESTS=1).

Reused Phase 1 utilities (cleanup deferred):

  • src/database_client/database_client_service.py:get
  • src/utils/db_credential_encryption.py:decrypt_credentials_dict
  • src/pipeline/db_pipeline/db_pipeline_service.py:engine_scope
  • src/pipeline/db_pipeline/extractor.py:get_schema/profile_column/get_row_count

Open contract items (not yet locked)

  • Joins in IR β€” currently single-table only (ARCHITECTURE.md Β§7); DB owner accepted the constraint for v1, will revisit in PR3 if it's blocking real queries
  • updated_at on Source vs generated_at on Catalog β€” Pydantic models have both; introspector sets per-Source; CatalogStore preserves both
  • Catalog refresh trigger (open question Β§3) β€” default policy is rebuild-on-upload-or-connect; auto-refresh deferred
  • Unstructured catalog entries (open question Β§2) β€” currently empty filter for source_hint="unstructured"; revisit when adding doc descriptions
  • PII handling for sample_values (open question Β§5) β€” currently nulls them out (skip); mask/synthesize deferred
  • Dialect priority for SQL compiler β€” PR3 will land Postgres first, MySQL second; BigQuery/Snowflake/SQL Server later

How to update this file

When a PR lands:

  1. Flip status from [ ] or [~] to [x]
  2. Add a short note (file paths, scope cuts, surprises)
  3. Bump "Last updated" at the top
  4. If a new contract decision lands, move it from "Open contract items" to the relevant inline note

When opening a PR:

  1. Flip status to [~] and add yourself as the active owner in the PR row
  2. Don't promise items in the PR description that aren't in the table