ishaq101's picture
feat/Catalog Retrieval System (#1)
6bff5d9
# Progress β€” Phase 2 catalog-driven build
Persistent tracker mirroring the 42-item ownership table in `REPO_CONTEXT.md` "Team β€” division of work". Update as PRs land. Future Claude Code sessions read this to know what's already done.
**Last updated**: 2026-05-12 ([NOTICKET] Cleanup PR landed: ChatHandler wired to chat.py, Phase 1 dual-write dropped from /ingest, on_catalog_rebuild_requested implemented, dead modules deleted, answer_agent→chatbot renamed, retrieval cache restored via RetrievalRouter, top_values added to ColumnStats, lifespan migration, knowledge_router removed)
**Current open PR**: `pr/1` β€” active. Cleanup PR committed and pushed.
---
## Legend
- `[x]` done and merged
- `[~]` in progress (open PR or active branch)
- `[ ]` not started
- **DB** / **TAB** / **B** β€” ownership (from REPO_CONTEXT.md)
---
## PR sequence
| PR | Status | Owner(s) | Scope |
|---|---|---|---|
| PR1 | `[x]` merged | DB | Contract locks + catalog plumbing + DB introspector + IR validator + tests |
| PR1-tab | `[x]` shipped | TAB | Tabular introspector + on_tabular_uploaded trigger + 31 unit tests |
| PR2a | `[x]` merged | DB | CatalogEnricher + StructuredPipeline + on_db_registered trigger + FK extension on Table (enricher later removed in KM-557) |
| KM-557 | `[x]` shipped | DB | Drop CatalogEnricher entirely (cost cut β€” planner uses stats + sample rows directly); rename jsonb table `catalogs` β†’ `data_catalog`; add `GET /api/v1/data-catalog/{user_id}` index endpoint for catalog refresher |
| PR2b | `[x]` shipped | DB-solo (B-review) | IntentRouter + planner prompt + planner LLM service |
| PR3-DB | `[x]` shipped | DB | SqlCompiler (Postgres) + DbExecutor (sqlglot guard, RO + statement_timeout, asyncio.to_thread) + 36 golden IR→SQL tests |
| PR3-TAB | `[x]` shipped | TAB | PandasCompiler + TabularExecutor + 43+12 golden IR→DataFrame tests |
| PR4 | `[x]` | DB-solo (B-review) | ExecutorDispatcher + QueryService + ChatHandler module. **API rewired in Cleanup PR.** |
| PR5 | `[x]` shipped | DB-solo (B-review) | Retry/self-correction loop on validation failure (lives in QueryService, max 3 attempts, planner re-prompted with prior error) |
| PR6 | `[~]` scaffold | DB-solo (B-review) | Eval harness scaffold + 3 DB-targeting golden cases. Skipped without `RUN_PLANNER_EVAL=1` env. TAB extends with tabular cases. |
| PR7 | `[x]` | DB-solo (B-review) | `ChatbotAgent` (renamed from `AnswerAgent`) + chatbot_system + guardrails prompts. `answer_agent.py` β†’ `chatbot.py`, `AnswerAgent` β†’ `ChatbotAgent`. API rewired in Cleanup PR. |
| Cleanup | `[x]` | B | ChatHandler wired to chat.py; Phase 1 dual-write dropped from /ingest; on_catalog_rebuild_requested + POST /data-catalog/rebuild; dead modules deleted (chatbot Phase 1, orchestrator, query/base, knowledge.py, config/agents/); retrieval cache restored via RetrievalRouter; top_values added to ColumnStats; lifespan migration; knowledge_router removed. |
---
## All items
### Contracts (B β€” shared)
| # | Item | Status | Notes |
|---|---|---|---|
| 1 | Catalog Pydantic models (`catalog/models.py`) | `[x]` | PR1 added `location_ref` URI-scheme docstring; PR2a added `ForeignKey` model + `Table.foreign_keys` field |
| 2 | IR Pydantic models (`query/ir/models.py`) | `[x]` | Pre-existing scaffold |
| 3 | IR operator whitelists (`query/ir/operators.py`) | `[x]` | PR1 filled `TYPE_COMPATIBILITY` matrix |
| 4 | PII patterns / regex (`security/pii_patterns.py`) | `[x]` | Pre-existing |
| β€” | `data_catalog` Postgres jsonb table (`db/postgres/models.py`) | `[x]` | PR1 added `Catalog` SQLAlchemy class + `init_db.py` import. KM-557 renamed `__tablename__` from `catalogs` β†’ `data_catalog`; created fresh (no migration) |
| β€” | `QueryResult` shape (`query/executor/base.py`) | `[x]` | Pre-existing scaffold; `columns: list[str]` added (TAB owner, PR1-tab) β€” DbExecutor updated to populate it. |
| β€” | `Source.location_ref` URI scheme | `[x]` | PR1 documented in `catalog/models.py` docstring |
### Ingestion β€” introspection
| # | Item | Owner | Status | Notes |
|---|---|---|---|---|
| 5 | DB introspector (`catalog/introspect/database.py`) | DB | `[x]` | PR1 β€” reuses Phase 1 `database_client_service`, `db_credential_encryption`, `db_pipeline_service.engine_scope`, `extractor.get_schema/profile_column/get_row_count`. PR2a wired FK extraction (was discarded before). |
| 6 | Tabular introspector (`catalog/introspect/tabular.py`) | TAB | `[~]` | PR1-tab β€” downloads original blob (CSV/XLSX/Parquet), one Table per sheet (XLSX) or one Table (CSV/Parquet). `source_id = document_id`. `fetch_doc`/`fetch_blob` injectable for unit tests (no Settings). |
| 7 | `BaseIntrospector` ABC (`catalog/introspect/base.py`) | B | `[x]` | Pre-existing; signature locked |
### Ingestion β€” shared catalog plumbing
| # | Item | Owner | Status | Notes |
|---|---|---|---|---|
| 8 | ~~Catalog enricher + prompt~~ | B | **REMOVED in KM-557** | Cost optimization β€” planner reads stats + sample rows + column names directly. `catalog/enricher.py` + `config/prompts/catalog_enricher.md` deleted. `render_source` (the only piece still needed) moved to `src/catalog/render.py`. Tests moved to `tests/catalog/test_render.py`. |
| 9 | Catalog validator (`catalog/validator.py`) | B | `[x]` | PR1 (DB owner picked up) β€” uniqueness invariants |
| 10 | Catalog store β€” Postgres jsonb (`catalog/store.py`) | B | `[x]` | PR1 (DB owner picked up) β€” `INSERT ... ON CONFLICT` |
| 11 | Catalog reader (`catalog/reader.py`) | B | `[x]` | PR1 (DB owner picked up) β€” filters by source_hint, empty on miss |
| 12 | PII detector (`catalog/pii_detector.py`) | B | `[x]` | PR1 (DB owner picked up) β€” name + value matching, bias toward over-flag |
### Ingestion β€” pipelines
| # | Item | Owner | Status | Notes |
|---|---|---|---|---|
| 13 | Structured pipeline (`pipeline/structured_pipeline.py`) | B | `[x]` | PR2a (DB owner) β€” Source-type-agnostic: caller supplies the introspector. `default_structured_pipeline()` factory wires production deps lazily so tests can inject mocks without `Settings()` construction. **KM-557**: enrich step removed; pipeline is now `introspect β†’ merge with existing β†’ validate β†’ upsert`. Constructor no longer takes `enricher`. |
| 14 | Triggers (`pipeline/triggers.py`) | B | `[x]` | PR2a β€” `on_db_registered` implemented (DB owner). PR1-tab β€” `on_tabular_uploaded` implemented (TAB owner). **2026-05-11** β€” `on_document_uploaded` implemented. **2026-05-12** β€” `on_catalog_rebuild_requested` implemented: iterates all Sources in current catalog, re-runs `on_db_registered` (schema) or `on_tabular_uploaded` (tabular) per source; per-source errors logged but don't abort. |
| 15 | Ingestion orchestrator (`pipeline/orchestrator.py`) | B | **DELETED** | Redundant stub β€” `StructuredPipeline` already takes introspector at run() time. Deleted in Cleanup PR. |
| 16 | Document pipeline (`pipeline/document_pipeline.py`) | TAB | `[x]` | Flattened `pipeline/document_pipeline/document_pipeline.py` (folder) β†’ `pipeline/document_pipeline.py` (file). Updated import in `api/v1/document.py`. |
### Query β€” shared spine
| # | Item | Owner | Status | Notes |
|---|---|---|---|---|
| 17 | IR validator (`query/ir/validator.py`) | B | `[x]` | PR1 (DB owner) β€” full rule set; descriptive errors for planner retry |
| 18 | Planner LLM service (`query/planner/service.py`) | B | `[x]` | PR2b β€” Azure OpenAI structured output β†’ `QueryIR`. Injectable chain. Supports retry via `previous_error` argument. |
| 19 | Planner prompt (`query/planner/prompt.py`, `config/prompts/query_planner.md`) | B | `[x]` | PR2b β€” system prompt with hard constraints + few-shot for DB and tabular sources. `build_planner_prompt(question, catalog, previous_error)` calls `catalog.render.render_source` (renamed from `catalog.enricher.render_source` in KM-557). |
| 20 | Intent router (`agents/orchestration.py` β€” class `OrchestratorAgent`; `config/prompts/intent_router.md`) | B | `[x]` | PR2b β€” single LLM call β†’ `IntentRouterDecision(needs_search, source_hint, rewritten_query)`. Supports conversation history. **NOTE**: source filename + class name were kept from Phase 1 for import-site compatibility; only the body is Phase 2. Prompt file and test file use the `intent_router` name. |
| 21 | Executor base + `QueryResult` (`query/executor/base.py`) | B | `[x]` | Pre-existing scaffold |
| 22 | Executor dispatcher (`query/executor/dispatcher.py`) | B | `[x]` | PR4 β€” picks DbExecutor / TabularExecutor by `source.source_type`. Lazy imports of production executors keep import side-effect-free for tests. Caches per source_type. |
| 23 | Compiler base ABC (`query/compiler/base.py`) | B | `[x]` | Pre-existing scaffold |
| 24 | Top-level QueryService (`query/service.py`) | B | `[x]` | PR4+5 β€” `plan β†’ validate β†’ dispatch β†’ execute β†’ QueryResult`. Retry loop on validation failure (max 3, planner re-prompted with prior error). Catches NotImplementedError from TabularExecutor placeholder gracefully. Never raises. |
### Query β€” DB path
| # | Item | Status | Notes |
|---|---|---|---|
| 25 | SQL compiler (`query/compiler/sql.py`) | `[x]` | PR3-DB β€” Postgres dialect (Supabase reuses); deterministic IR β†’ (sql, named-params dict); double-quoted identifiers from catalog; all whitelisted ops (=, !=, <, <=, >, >=, in, not_in, is_null, is_not_null, like, between); alias-aware order_by; `CompiledSql.params: dict[str, Any]` (changed from `list`). MySQL/BigQuery/Snowflake compilers later. |
| 26 | DB executor (`query/executor/db.py`) | `[x]` | PR3-DB β€” sync engine via `db_pipeline_service.engine_scope` inside `asyncio.to_thread`. sqlglot SELECT-only / no-DML guard. Postgres-only session settings: `default_transaction_read_only=on` + `statement_timeout=30000`. asyncio.wait_for backstop. Never raises β€” populates `QueryResult.error`. 10k row hard cap. |
| 27 | Credential encryption (`security/credentials.py`) | `[ ]` | Stub exists; PR1 reused Phase 1 `utils/db_credential_encryption.py` instead. Move in cleanup PR |
| 28 | User-DB connection management | `[x]` | PR3-DB reused Phase 1 `db_pipeline_service.engine_scope` (same as PR1 introspector); no new helper needed |
### Query β€” Tabular path
| # | Item | Status | Notes |
|---|---|---|---|
| 29 | Pandas compiler (`query/compiler/pandas.py`) | `[~]` | PR3-TAB β€” `CompiledPandas` dataclass; all 12 filter ops; all 6 aggs; group_by via `pd.concat` of Series; alias-aware order_by; `_like_to_regex` (`%`β†’`.*`, `_`β†’`.`); pure module-level helpers |
| 30 | Tabular executor (`query/executor/tabular.py`) | `[~]` | PR3-TAB β€” `fetch_blob` injectable for tests; blob path: single-table β†’ `{uid}/{did}.parquet`, multi-table β†’ `{uid}/{did}__{table.name}.parquet`; `asyncio.to_thread`; 10k row hard cap; errors β†’ `QueryResult.error` |
| 31 | Parquet upload/download wrapper | `[x]` | Moved `knowledge/parquet_service.py` β†’ `storage/parquet.py`. Updated 4 import sites: `pipeline/document_pipeline.py`, `knowledge/processing_service.py`, `query/executor/tabular.py`, `query/executors/tabular.py`. |
### Agents + chat
| # | Item | Status | Notes |
|---|---|---|---|
| 32 | Chatbot agent + prompt (`agents/chatbot.py`, `config/prompts/chatbot_system.md`) | `[x]` | PR7-bundle β€” `ChatbotAgent` (was `AnswerAgent`) streams tokens, accepts `QueryResult` or list[`DocumentChunk`] or neither. **Cleanup PR**: renamed `answer_agent.py` β†’ `chatbot.py`, `AnswerAgent` β†’ `ChatbotAgent`; Phase 1 `agents/chatbot.py` deleted. |
| 33 | Guardrails prompt (`config/prompts/guardrails.md`) | `[x]` | PR7-bundle β€” appended to `chatbot_system.md` so guardrails take precedence in conflict. |
| β€” | Chat handler / orchestrator (`agents/chat_handler.py`) | `[x]` | PR4-bundle β€” top-level Phase 2 orchestrator. Routes by `source_hint`: chat β†’ AnswerAgent direct; structured β†’ CatalogReader + QueryService; unstructured β†’ DocumentRetriever placeholder + AnswerAgent. Yields `intent` / `chunk` / `done` / `error` SSE-style events. Phase 1 chat.py NOT touched β€” cleanup PR rewires the API to call this. |
### API surface
| # | Item | Owner | Status | Notes |
|---|---|---|---|---|
| 34 | DB client endpoints (`api/v1/db_client.py`) | DB | `[x]` | **Cleanup PR** β€” `/ingest` now calls only `on_db_registered`. Phase 1 `db_pipeline_service.run()` + `decrypt_credentials_dict` removed. Error from catalog build now raises HTTP 500 (was silent log). Response simplified to `{"status": "success", "client_id": ...}`. |
| 35 | Document/tabular upload endpoints (`api/v1/document.py`) | TAB | `[x]` | Rewired `/document/process` β€” after processing CSV/XLSX, calls `on_tabular_uploaded(document_id, user_id)`. Catalog ingestion failure is logged but does not fail the request. **2026-05-11** β€” CSV/XLSX no longer ingested to vector store (`knowledge_processor` skipped for tabular types in `document_pipeline.py`); they go to catalog only. |
| 36 | Chat stream endpoint (`api/v1/chat.py`) | B | `[x]` | Rewired `/chat/stream` β€” replaced `query_executor.execute()` (Phase 1) with `CatalogReader + QueryService` (Phase 2). **Cleanup PR**: fully rewired to `ChatHandler.handle()`. Inline intent routing, retrieval, and answer generation removed. Redis cache, fast intent, history loading, and message persistence remain in chat.py. Sources event emits `[]` (retrieval not yet exposed by ChatHandler). |
| 37 | Room / users endpoints (`api/v1/room.py`, `api/v1/users.py`) | B | `[ ]` | No catalog work; only touch if auth flow changes |
| β€” | Data catalog index endpoint (`api/v1/data_catalog.py`) | DB | `[x]` | **KM-557** β€” `GET /api/v1/data-catalog/{user_id}` β†’ `list[CatalogIndexEntry]`. **Cleanup PR** β€” added `POST /api/v1/data-catalog/rebuild?user_id=` β†’ calls `on_catalog_rebuild_requested`; per-source errors logged but don't fail the request. |
### Tests + eval
| # | Item | Owner | Status | Notes |
|---|---|---|---|---|
| 38 | DB compiler golden tests (`tests/query/compiler/test_sql.py`) | DB | `[x]` | PR3-DB β€” 36 tests across all whitelisted ops, identifier quoting, agg / count_distinct / count(*), order_by alias resolution, parameter sequencing, error paths. Pure-Python, no LLM, no DB. |
| 39 | Pandas compiler golden tests (`tests/unit/query/compiler/test_pandas_compiler.py`) | TAB | `[~]` | PR3-TAB β€” 43 tests: all 12 filter ops, all 6 aggs, group_by, order_by, limit, aliases, empty DataFrame, error paths. `test_tabular_executor.py` adds 12 more (blob name resolution + happy path + error paths). |
| 40 | IR validator tests (`tests/query/ir/test_validator.py`) | B | `[x]` | PR1 β€” 19 tests, all rules covered |
| β€” | PII detector tests (`tests/catalog/test_pii_detector.py`) | B | `[x]` | PR1 β€” 26 tests (parametrized) |
| β€” | Catalog validator tests (`tests/catalog/test_validator.py`) | B | `[x]` | PR1 β€” 5 tests |
| β€” | Catalog render tests (`tests/catalog/test_render.py`) | B | `[x]` | **KM-557** β€” 5 tests (renamed from `test_enricher.py`; LLM enrichment tests dropped, render-only tests kept). |
| β€” | Catalog store integration test (`tests/catalog/test_store.py`) | DB | `[x]` | PR1 β€” module-level skip without `RUN_INTEGRATION_TESTS=1` |
| β€” | DB introspector test | DB | `[ ]` | Deferred to PR2 β€” needs Postgres testcontainer or fixture infra |
| β€” | Tabular introspector test | TAB | `[x]` | PR1-tab β€” 31 unit tests (CSV/XLSX/Parquet, stats, PII, error paths). No DB/blob I/O β€” mocks injected via constructor. |
| 41 | Planner eval (`tests/query/planner/`) | B | `[x]` | PR6-scaffold β€” `test_golden_questions.py` with 3 DB-targeting cases. TAB added `test_golden_tabular.py` with 4 tabular cases (group_by+sum, top-N+limit, date range filter, XLSX sheet selection). All 4 passed against real Azure OpenAI. Fix shipped alongside: `query/planner/service.py` replaced `("system", text)` tuple with `SystemMessage` β€” without this, `{...}` in `query_planner.md` was parsed as f-string variables and crashed on every real invocation. |
| 42 | E2E smoke tests (`tests/e2e/`) | B | `[ ]` | Defer until Phase 2 endpoints are wired (cleanup PR). Component-level orchestration is already covered by `test_chat_handler.py` + `test_service.py`. |
| β€” | Golden IR fixtures (`tests/fixtures/golden_irs.json`) | B | `[~]` | PR1 seeded with 5 DB-targeting examples; TAB extends in PR1-tab |
| β€” | Shared `sample_catalog` fixture (`tests/conftest.py`) | B | `[x]` | PR1 β€” DB-shaped; TAB may add tabular sibling |
---
## What just shipped (2026-05-12 β€” Cleanup PR)
**Phase 1 removal + Phase 2 API rewiring:**
- `src/api/v1/chat.py` β€” fully rewired to `ChatHandler.handle()`. Removed inline IntentRouter, retrieval, and ChatbotAgent calls. Redis cache, fast intent, load_history, save_messages stay in chat.py.
- `src/api/v1/db_client.py` β€” `/ingest` now calls only `on_db_registered`. Phase 1 `db_pipeline_service.run()` block removed. Catalog build failure now raises HTTP 500.
- `src/api/v1/data_catalog.py` β€” added `POST /api/v1/data-catalog/rebuild` endpoint.
- `src/pipeline/triggers.py` β€” `on_catalog_rebuild_requested` implemented: iterates catalog sources, re-runs the appropriate trigger per source type, per-source errors logged.
**Dead modules deleted:**
- `src/agents/chatbot.py` (Phase 1 LangChain chatbot)
- `src/pipeline/orchestrator.py` (empty stub)
- `src/query/base.py` (old duplicate of `executor/base.py`)
- `src/api/v1/knowledge.py` (fake `/knowledge/rebuild` endpoint)
- `src/config/agents/` (folder β€” prompts only used by deleted Phase 1 chatbot)
**Renames:**
- `src/agents/answer_agent.py` β†’ `src/agents/chatbot.py`; `AnswerAgent` β†’ `ChatbotAgent`; updated all import sites (`chat_handler.py`, `chat.py`)
**Fixes + improvements:**
- `src/agents/chat_handler.py` β€” `_get_document_retriever()` now returns `RetrievalRouter` (Redis-cached) instead of `DocumentRetriever` directly; retrieval-level cache restored.
- `src/retrieval/router.py` β€” removed dead `db: AsyncSession` and `source_hint` parameters + `_UNSTRUCTURED_HINTS` constant from `retrieve()`. Cache key simplified.
- `src/knowledge/processing_service.py` β€” removed dead `_build_csv_documents`, `_build_excel_documents`, `_profile_dataframe`, `_to_sheet_document` methods + `pandas` and `upload_parquet` imports.
- `src/catalog/models.py` β€” added `top_values: list[Any] | None` to `ColumnStats`.
- `src/catalog/introspect/tabular.py` β€” `_to_column` now populates `top_values` for columns with ≀10 distinct values; useful for query planner WHERE clause generation.
- `main.py` β€” replaced deprecated `@app.on_event("startup")` with `lifespan` context manager; removed `knowledge_router`.
---
## What just shipped (KM-557 β€” DB owner)
After lead review of the catalog ingestion cost: dropped LLM enrichment,
renamed the storage table, and exposed a lightweight index endpoint for
the upcoming catalog refresher.
**Files deleted**:
- `src/catalog/enricher.py` β€” entire CatalogEnricher + EnrichmentResponse + apply_descriptions removed
- `src/config/prompts/catalog_enricher.md` β€” dead prompt
- `tests/catalog/test_enricher.py` β€” replaced by `test_render.py`
**Files added**:
- `src/catalog/render.py` β€” new home for `render_source` (the only piece of the old enricher still needed; consumed by `query/planner/prompt.py`)
- `src/api/v1/data_catalog.py` β€” `GET /api/v1/data-catalog/{user_id}` returns `list[CatalogIndexEntry]`
- `tests/catalog/test_render.py` β€” 5 tests (same coverage as the old render block)
**Files modified**:
- `src/db/postgres/models.py` β€” `__tablename__ = "data_catalog"` (was `"catalogs"`). Class name unchanged
- `src/pipeline/structured_pipeline.py` β€” `StructuredPipeline(validator, store)` (was `(enricher, validator, store)`); pipeline is now `introspect β†’ merge β†’ validate β†’ upsert`; `default_structured_pipeline()` no longer constructs an enricher
- `src/pipeline/triggers.py` β€” docstrings updated; `on_catalog_rebuild_requested` docstring rewritten for the refresher use case
- `src/query/planner/prompt.py` β€” import now `from ...catalog.render import render_source`
- `src/catalog/introspect/{base,database,tabular}.py` β€” docstring scrubs (no behavior changes)
- `src/models/api/catalog.py` β€” added `CatalogIndexEntry`; simplified `CatalogRebuildResponse` to `sources_rebuilt`
- `main.py` β€” registered `data_catalog_router`
- `src/security/README.md` β€” one stale wording fix
**No migration**: the `data_catalog` table is created from scratch on first `init_db()`. The old `catalogs` table was never deployed against production data, so no rename SQL is needed.
**Tests**: all 4 `test_structured_pipeline.py` tests reworked to construct `StructuredPipeline(validator=, store=)` without `enricher`. 5 `test_render.py` tests cover render_source standalone.
**Lint**: `ruff check` clean on modified Phase 2 paths.
**Open follow-ups left for the lead**:
- `on_catalog_rebuild_requested` body β€” the refresher will iterate the index endpoint and call this trigger per source
- `api/v1/db_client.py` `/ingest` still doesn't call `on_db_registered` β€” same blocker as before, untouched by KM-557
---
## What just shipped (2026-05-11 β€” retrieval migration + bug fixes)
**Files implemented / migrated**:
- `src/retrieval/base.py` β€” `RetrievalResult` dataclass + `BaseRetriever` ABC (was in `src/rag/base.py`)
- `src/retrieval/document.py` β€” full `DocumentRetriever` migrated from `src/rag/retrievers/document.py`; all retrieval methods (MMR/cosine/euclidean/inner_product/manhattan). Tabular file types filtered out from results.
- `src/retrieval/router.py` β€” `RetrievalRouter` (Redis-cached, unstructured-only). `invalidate_cache(user_id)` clears all `retrieval:{user_id}:*` keys.
**Deleted** (no longer used):
- `src/rag/` β€” entire folder (base.py, retriever.py, router.py, retrievers/)
- `src/tools/` β€” entire folder (search.py was the only real file; only called by deleted rag/ router)
**Bug fixes**:
- `src/pipeline/document_pipeline.py` β€” `retrieval_router.invalidate_cache(user_id)` called after `process()` and `delete()`. Redis failure is caught and logged (does not fail the document op).
- `src/pipeline/document_pipeline.py` β€” CSV/XLSX now skips `knowledge_processor` (vector store). Tabular files go to catalog only; no duplicate embeddings.
- `src/pipeline/triggers.py` β€” `on_document_uploaded` implemented (was `raise NotImplementedError`).
- `src/agents/chat_handler.py` β€” `_normalize_chunks` now handles `RetrievalResult` objects. Previously they were silently dropped, causing empty context for unstructured queries through ChatHandler.
**Import updates** (all changed from `src.rag.*` β†’ `src.retrieval.*`):
- `src/api/v1/chat.py`, `src/query/base.py`, `src/query/query_executor.py`, `src/query/executors/db_executor.py`, `src/query/executors/tabular.py`
---
## What shipped previously (PR2b/4/5/6/7-bundle β€” DB owner solo, teammate reviews)
**Files implemented**:
- `src/agents/orchestration.py` β€” `OrchestratorAgent.classify(message, history) β†’ IntentRouterDecision`. Pydantic model for structured output. History-aware query rewriting. Phase 1 filename + class name preserved; body fully rewritten for Phase 2.
- `src/agents/answer_agent.py` β€” `AnswerAgent.astream(...)` streams answer tokens; accepts `QueryResult` and/or `list[DocumentChunk]`. Renames to `chatbot.py` in cleanup PR.
- `src/agents/chat_handler.py` β€” `ChatHandler.handle(message, user_id, history)` returns `AsyncIterator[dict]` of `intent` / `chunk` / `done` / `error` SSE events. All deps injectable; lazy default builders.
- `src/query/planner/prompt.py` β€” `render_catalog(catalog)` + `build_planner_prompt(question, catalog, previous_error)`. Reuses `catalog.enricher.render_source` for consistency across LLM call sites.
- `src/query/planner/service.py` β€” `QueryPlannerService.plan(question, catalog, previous_error)` Azure OpenAI structured output β†’ `QueryIR`.
- `src/query/executor/dispatcher.py` β€” `ExecutorDispatcher.pick(ir) β†’ BaseExecutor` by `source.source_type`. Lazy executor imports + per-source-type cache.
- `src/query/service.py` — `QueryService.run(user_id, question, catalog) → QueryResult`. Plan→validate→retry-on-failure (max 3)→dispatch→execute. Catches NotImplementedError from TabularExecutor placeholder gracefully.
**Prompts written** (filled in placeholders):
- `src/config/prompts/intent_router.md`
- `src/config/prompts/query_planner.md`
- `src/config/prompts/chatbot_system.md`
- `src/config/prompts/guardrails.md`
**Tests added** (46 new β€” total now 146 + 2 skipped):
- `tests/agents/test_intent_router.py` (4)
- `tests/agents/test_answer_agent.py` (12)
- `tests/agents/test_chat_handler.py` (6)
- `tests/query/planner/test_prompt.py` (7)
- `tests/query/planner/test_service.py` (3)
- `tests/query/executor/test_dispatcher.py` (5)
- `tests/query/test_service.py` (8)
- `tests/query/planner/test_golden_questions.py` (3 β€” skipped by default; eval harness scaffold)
**Lint**: `ruff check` clean on all Phase 2 paths. Phase 1 files have pre-existing E501/S608 issues β€” out of scope for this PR.
**Placeholders / blockers for teammate** (status as of DB owner's commit, before merge):
- `src/query/executor/tabular.py` (TAB) β€” DB owner's note: "still raises NotImplementedError". **Post-merge**: TAB shipped this in PR3-TAB; dispatcher now routes to the real `TabularExecutor`. The `NotImplementedError` catch in `QueryService` stays as a safety net.
- `src/retrieval/document.py` β€” **implemented** (2026-05-11). Full `DocumentRetriever` migrated from `src/rag/retrievers/document.py`; supports MMR/cosine/euclidean/manhattan/inner_product. `_normalize_chunks` in `chat_handler.py` now handles `RetrievalResult` β†’ `DocumentChunk` conversion correctly.
- `src/api/v1/chat.py` (Phase 1) β€” NOT touched. Cleanup PR rewires the SSE endpoint to call `ChatHandler.handle(...)`.
- `src/api/v1/db_client.py` (Phase 1) β€” NOT touched. Cleanup PR rewires `/database-clients/{id}/ingest` to call `pipeline.triggers.on_db_registered`.
---
## What shipped previously (PR3-TAB β€” TAB owner)
**Files implemented**:
- `src/query/compiler/pandas.py` β€” `PandasCompiler` + `CompiledPandas(apply, output_columns)` dataclass. Pure helper functions (easier to test in isolation): `_apply_filters` (all 12 ops, `_like_to_regex` for LIKE), `_apply_select` (column pick + rename), `_apply_agg` (scalar + group_by via `pd.concat` of Series β†’ `reset_index`), `_apply_orderby` (alias-aware via `_resolve_order_col`). Closure captures all IR fields explicitly so `apply(df)` is self-contained.
- `src/query/executor/tabular.py` β€” `TabularExecutor` with injectable `fetch_blob` (same testability pattern as `TabularIntrospector`). Resolves Parquet blob path from `az_blob://{uid}/{did}` + table: single-table β†’ `{uid}/{did}.parquet`, multi-table β†’ `{uid}/{did}__{table.name}.parquet`. Runs compile β†’ download β†’ `asyncio.to_thread(_load_and_apply)` β†’ 10k hard cap. Never raises; errors populate `QueryResult.error`. Uses `compiled.output_columns` for column labels (safe on empty DataFrame).
**Tests added** (55 new β€” total suite was 86 all passing at PR3-TAB time):
- `tests/unit/query/compiler/test_pandas_compiler.py` β€” 43 tests across all 12 filter ops (including `is_null`, `not_in`, `like`, `between`), all 6 agg fns, group_by, order_by asc/desc, limit-after-order, alias round-trip, empty DataFrame, error paths.
- `tests/unit/query/executor/test_tabular_executor.py` β€” 12 tests: `_resolve_blob_name` (single/multi-table, bad prefix), happy-path `QueryResult` shape (columns, rows, backend, truncated, source_id), wrong source_type β†’ error, blob fetch failure β†’ error, unknown source β†’ error.
**Lint**: `ruff check` clean on both files.
---
## What shipped previously (PR1-tab β€” TAB owner)
**Files implemented**:
- `src/catalog/introspect/tabular.py` β€” `TabularIntrospector` reads original blob (CSV/XLSX/Parquet), profiles each column (dtype, stats, sample values), runs PIIDetector. For XLSX: one `Table` per sheet (`Table.name = sheet_name`); for CSV/Parquet: one `Table` (`Table.name = filename stem`). `fetch_doc`/`fetch_blob` are constructor-injectable for unit tests β€” no `Settings` or DB required at import time.
- `src/pipeline/triggers.py` β€” `on_tabular_uploaded` wired (mirrors `on_db_registered` pattern).
**Tests added** (31 new):
- `tests/unit/catalog/test_introspect_tabular.py` β€” CSV / XLSX / Parquet shapes, per-column stats, nullable detection, PII name + value matching, sample capping, all error paths. Pure Python, no network I/O.
**Executor contract note**: introspector downloads the *original* blob for schema reading. The tabular executor (PR3-TAB) downloads *Parquet* blobs for query execution. For CSV/Parquet sources (single table), the executor must call `parquet_blob_name(uid, did, sheet_name=None)`; for XLSX (multi-table), `parquet_blob_name(uid, did, table.name)`.
---
## What shipped previously (PR3-DB β€” DB owner)
**Files implemented**:
- `src/query/compiler/sql.py` β€” `SqlCompiler` for Postgres dialect; `CompiledSql(sql, params)` dataclass with `params: dict[str, Any]` (changed from `list`); supports all 12 whitelisted filter ops, all 6 aggs, alias-aware order_by; `_qident` escapes embedded double-quotes
- `src/query/executor/db.py` β€” `DbExecutor` with sqlglot SELECT-only guard, Postgres session-level read-only + 30s `statement_timeout`, `asyncio.wait_for` backstop, 10k row hard cap; rejects non-`schema` source_type and `dbclient://` URI mismatch; never raises (populates `QueryResult.error`)
**Files extended**:
- `src/query/compiler/pandas.py` β€” fixed pre-existing UP035 (Callable import)
- `pyproject.toml` β€” added `S608` to `tests/**` ruff ignore (false positive: tests assert literal SQL strings)
**Tests added** (36 new, all passing β€” total now 100):
- `tests/query/compiler/test_sql.py` β€” every filter op, every agg, count(*), count_distinct, order_by alias vs column, multi-filter AND, identifier quoting escape, error paths
**Lint**: `ruff check` clean on Phase 2 paths.
**Hand-off note for teammate**: `CompiledSql.params` is now `dict[str, Any]` not `list`. The pandas compiler will follow the same convention (or document its own) β€” coordinate when PR3-TAB lands.
---
## What shipped previously (PR2a β€” DB owner)
**Files implemented**:
- `src/catalog/enricher.py` β€” Azure OpenAI GPT-4o + structured output (`EnrichmentResponse`), `render_source` (reusable by planner prompt later), `apply_descriptions` merger, injectable `structured_chain` for tests
- `src/pipeline/structured_pipeline.py` β€” `StructuredPipeline` orchestrator + `default_structured_pipeline()` factory with lazy production-dep imports
- `src/pipeline/triggers.py` β€” `on_db_registered` wired; tabular/document/rebuild stubs preserved with implementation notes
**Files extended**:
- `src/catalog/models.py` β€” added `ForeignKey` model, `Table.foreign_keys: list[ForeignKey] = []`
- `src/catalog/introspect/database.py` β€” `_extract_foreign_keys` populates `Table.foreign_keys` from extractor data
- `src/config/prompts/catalog_enricher.md` β€” full system prompt with style rules and one few-shot example
**Tests added** (14 new, all passing β€” total now 64):
- `tests/catalog/test_enricher.py` β€” render / apply / end-to-end with fake chain (10 tests)
- `tests/pipeline/test_structured_pipeline.py` β€” orchestration with stub deps (4 tests)
**Lint**: `ruff check` clean on all Phase 2 paths. Phase 1 files (`pipeline/db_pipeline/`, `pipeline/document_pipeline/`) have pre-existing ruff issues β€” out of scope for this PR.
---
## What shipped previously (PR1 β€” DB owner's first chunk)
**Files implemented** (was `NotImplementedError`):
- `src/catalog/pii_detector.py`, `src/catalog/validator.py`, `src/catalog/store.py`, `src/catalog/reader.py`
- `src/catalog/introspect/database.py` (FK extraction added in PR2a)
- `src/query/ir/validator.py`
**Files extended**:
- `src/query/ir/operators.py` β€” `TYPE_COMPATIBILITY` matrix
- `src/catalog/models.py` β€” `location_ref` URI-scheme docstring
- `src/db/postgres/models.py` β€” `Catalog` SQLAlchemy table; `init_db.py` imports it
**Tests**: 50 unit tests + 1 integration (gated on `RUN_INTEGRATION_TESTS=1`).
**Reused Phase 1 utilities** (cleanup deferred):
- `src/database_client/database_client_service.py:get`
- `src/utils/db_credential_encryption.py:decrypt_credentials_dict`
- `src/pipeline/db_pipeline/db_pipeline_service.py:engine_scope`
- `src/pipeline/db_pipeline/extractor.py:get_schema/profile_column/get_row_count`
---
## Open contract items (not yet locked)
- **Joins in IR** β€” currently single-table only (ARCHITECTURE.md Β§7); DB owner accepted the constraint for v1, will revisit in PR3 if it's blocking real queries
- **`updated_at` on Source vs `generated_at` on Catalog** β€” Pydantic models have both; introspector sets per-Source; CatalogStore preserves both
- **Catalog refresh trigger** (open question Β§3) β€” default policy is rebuild-on-upload-or-connect; auto-refresh deferred
- **Unstructured catalog entries** (open question Β§2) β€” currently empty filter for `source_hint="unstructured"`; revisit when adding doc descriptions
- **PII handling for `sample_values`** (open question Β§5) β€” currently nulls them out (skip); mask/synthesize deferred
- **Dialect priority for SQL compiler** β€” PR3 will land Postgres first, MySQL second; BigQuery/Snowflake/SQL Server later
---
## How to update this file
When a PR lands:
1. Flip status from `[ ]` or `[~]` to `[x]`
2. Add a short note (file paths, scope cuts, surprises)
3. Bump "Last updated" at the top
4. If a new contract decision lands, move it from "Open contract items" to the relevant inline note
When opening a PR:
1. Flip status to `[~]` and add yourself as the active owner in the PR row
2. Don't promise items in the PR description that aren't in the table