| # Phase 1 β Phase 2 Migration Report |
|
|
| A walkthrough of what changed between the original retrieval-style backend (Phase 1) and the current catalog-driven backend (Phase 2). Intended as a hand-off for the lead. |
|
|
| --- |
|
|
| ## 1. The conceptual change |
|
|
| **Phase 1** was a single retrieval-style RAG pipeline. Every question β whether it pointed at a database, a spreadsheet, or a PDF β went through the same primitive: **chunk + embed + top-K** over PGVector. Schema and tabular columns were embedded as chunks and ranked alongside prose. When the question needed SQL, the LLM **wrote the SQL string directly** (via `query_executor`). |
|
|
| **Phase 2** splits the system into two paths governed by an LLM router: |
|
|
| | Path | Primitive | Why | |
| |---|---|---| |
| | Unstructured (PDF / DOCX / TXT) | Dense similarity over prose chunks (PGVector) | Right primitive for free text | |
| | Structured (DB / CSV / XLSX / Parquet) | **Per-user data catalog** β LLM emits a **JSON IR** of intent β deterministic **compiler** β **executor** (SQL or pandas) | A column lookup shouldn't go through a similarity ranking lottery; the LLM emits intent, never SQL syntax | |
|
|
| Three explicit LLM call sites only: |
|
|
| 1. **Intent router** (classifies the user message into `chat` / `unstructured` / `structured`) |
| 2. **Query planner** (turns the question + catalog into a Pydantic-validated `QueryIR`) |
| 3. **Chatbot agent** (formats the final answer, streamed over SSE) |
|
|
| Everything else β IR validation, SQL/pandas compilation, execution β is deterministic Python. |
|
|
| --- |
|
|
| ## 2. File-by-file changes |
|
|
| ### 2.1 Deleted (Phase 1 only) |
|
|
| | Phase 1 path | Reason it was removed | |
| |---|---| |
| | `src/rag/base.py`, `src/rag/retriever.py`, `src/rag/router.py` | Replaced by `src/retrieval/` | |
| | `src/rag/retrievers/baseline.py`, `schema.py`, `document.py` | Schema retrieval gone (catalog replaces it); document retriever rewritten in `src/retrieval/document.py` | |
| | `src/tools/search.py` (whole `tools/` folder) | Only consumer was `rag/router.py` | |
| | `src/query/base.py` | Duplicate of `query/executor/base.py` | |
| | `src/query/query_executor.py` | Replaced by `src/query/service.py` | |
| | `src/query/executors/db_executor.py` | Replaced by `src/query/executor/db.py` | |
| | `src/query/executors/tabular.py` | Replaced by `src/query/executor/tabular.py` | |
| | `src/agents/chatbot.py` (Phase 1 LangChain chatbot) | Phase 2 `ChatbotAgent` lives at the same path now β see Β§2.2 | |
| | `src/api/v1/knowledge.py` | Fake `/knowledge/rebuild` endpoint, never wired | |
| | `src/config/agents/system_prompt.md`, `guardrails_prompt.md` | Replaced by `src/config/prompts/{chatbot_system,guardrails}.md` | |
| | `src/models/structured_output.py` (`IntentClassification`) | Replaced by `IntentRouterDecision` Pydantic model inside `agents/orchestration.py` | |
| | `src/models/sql_query.py` | LLM no longer emits SQL; IR replaces it | |
| | `src/pipeline/orchestrator.py` (empty stub) | Redundant β `StructuredPipeline` takes the introspector at `run()` time | |
|
|
| ### 2.2 Renamed / moved (same role, new home) |
|
|
| | Phase 1 location | Phase 2 location | Notes | |
| |---|---|---| |
| | `src/agents/chatbot.py` (Phase 1) β deleted, then `src/agents/answer_agent.py` (`AnswerAgent`) β renamed | `src/agents/chatbot.py::ChatbotAgent` | Final answer formation; streams via `astream` | |
| | `src/knowledge/parquet_service.py` | `src/storage/parquet.py` | Parquet upload/download helper | |
| | `src/pipeline/document_pipeline/document_pipeline.py` (folder) | `src/pipeline/document_pipeline.py` (flat) | Single module | |
| | `src/rag/retrievers/document.py` | `src/retrieval/document.py` | `DocumentRetriever` migrated; tabular file types filtered out of results | |
| | `src/rag/router.py` | `src/retrieval/router.py` | `RetrievalRouter`, Redis-cached, unstructured-only; dead `db: AsyncSession` + `source_hint` params removed | |
| | `src/rag/base.py` (`RetrievalResult`, `BaseRetriever`) | `src/retrieval/base.py` | Same dataclass + ABC | |
|
|
| > **Heads-up on the intent router**: the Phase 1 file `src/agents/orchestration.py` and its class `OrchestratorAgent` were **kept in place** for Phase 2 β but the body was fully rewritten. The class now emits `IntentRouterDecision(needs_search, source_hint β {chat, unstructured, structured}, rewritten_query)`. The prompt file and test file use the `intent_router` name (`config/prompts/intent_router.md`, `tests/agents/test_intent_router.py`), but **the source module is still `orchestration.py` and the class is still `OrchestratorAgent`**. Existing imports continue to work; only the behavior changed. |
| |
| ### 2.3 Added (Phase 2 new) |
| |
| **Catalog subsystem (whole new concept)** |
| |
| | Path | Role | |
| |---|---| |
| | `src/catalog/models.py` | Pydantic: `Catalog β Source[] β Table[] β Column[]`, `ForeignKey`, `ColumnStats.top_values` | |
| | `src/catalog/introspect/base.py` | `BaseIntrospector` ABC | |
| | `src/catalog/introspect/database.py` | DB introspector β wraps Phase 1 `db_pipeline/extractor.py` (`get_schema`, `profile_column`, `get_row_count`) | |
| | `src/catalog/introspect/tabular.py` | CSV / XLSX / Parquet introspector β one `Table` per XLSX sheet | |
| | `src/catalog/render.py` | Renders a `Source` for the planner prompt | |
| | `src/catalog/validator.py` | Unique-ID + foreign-key-ref invariants | |
| | `src/catalog/store.py` | Postgres `jsonb` upsert keyed by `user_id` (table `data_catalog`) | |
| | `src/catalog/reader.py` | Loads + filters catalog by `source_hint` | |
| | `src/catalog/pii_detector.py` | Flags PII columns at ingestion β suppresses `sample_values` | |
| | `src/security/pii_patterns.py` | Name patterns + value regex used by the detector | |
|
|
| **JSON IR + query subsystem** |
|
|
| | Path | Role | |
| |---|---| |
| | `src/query/ir/models.py` | `QueryIR` Pydantic schema | |
| | `src/query/ir/operators.py` | `ALLOWED_FILTER_OPS`, `ALLOWED_AGG_FNS`, `LIMIT_HARD_CAP`, `TYPE_COMPATIBILITY` | |
| | `src/query/ir/validator.py` | Catalog-aware IR validation (rejects unknown column ids, bad ops, type mismatches, oversize limits) | |
| | `src/query/planner/service.py` | `QueryPlannerService.plan(question, catalog, previous_error)` β Azure OpenAI structured output β `QueryIR` | |
| | `src/query/planner/prompt.py` | Builds the planner prompt from catalog text | |
| | `src/query/compiler/base.py` | Compiler ABC | |
| | `src/query/compiler/sql.py` | `SqlCompiler` (Postgres) β all 12 filter ops, params as a dict | |
| | `src/query/compiler/pandas.py` | `PandasCompiler` β returns `CompiledPandas(apply, output_columns)` | |
| | `src/query/executor/base.py` | `BaseExecutor` + `QueryResult` | |
| | `src/query/executor/db.py` | `DbExecutor` β sqlglot SELECT-only guard, RO txn, 30 s `statement_timeout`, 10 k row cap | |
| | `src/query/executor/tabular.py` | `TabularExecutor` β Parquet via blob, `asyncio.to_thread`, 10 k cap | |
| | `src/query/executor/dispatcher.py` | `ExecutorDispatcher.pick(ir)` β picks by `source.source_type` | |
| | `src/query/service.py` | `QueryService.run(user_id, question, catalog)` β plan β validate β retry (max 3) β dispatch β execute | |
|
|
| **Agents** |
|
|
| | Path | Role | |
| |---|---| |
| | `src/agents/orchestration.py` | `OrchestratorAgent` β Phase 1 file/class name preserved; Phase 2 body. Emits `IntentRouterDecision` | |
| | `src/agents/chatbot.py` | `ChatbotAgent` β formerly `AnswerAgent` in `agents/answer_agent.py`; renamed in Cleanup PR | |
| | `src/agents/chat_handler.py` | `ChatHandler.handle(...)` β top-level orchestrator; yields `intent` / `chunk` / `done` / `error` SSE events | |
|
|
| **Pipelines & API** |
|
|
| | Path | Role | |
| |---|---| |
| | `src/pipeline/structured_pipeline.py` | DB / tabular ingestion: introspect β merge β validate β upsert | |
| | `src/pipeline/triggers.py` | `on_db_registered`, `on_tabular_uploaded`, `on_document_uploaded`, `on_catalog_rebuild_requested` | |
| | `src/api/v1/data_catalog.py` | `GET /api/v1/data-catalog/{user_id}` + `POST /api/v1/data-catalog/rebuild` | |
| | `src/models/api/catalog.py` | Catalog request/response models | |
| | `src/config/prompts/intent_router.md`, `query_planner.md`, `chatbot_system.md`, `guardrails.md` | New prompts. `guardrails.md` is appended to `chatbot_system.md` at load time | |
| | `src/db/postgres/models.py` (added `Catalog` SQLAlchemy class) | Stores the per-user jsonb document in `data_catalog` | |
|
|
| ### 2.4 Rewired API endpoints |
|
|
| | Endpoint | Phase 1 wiring | Phase 2 wiring | |
| |---|---|---| |
| | `POST /api/v1/chat/stream` | Inline in `chat.py`: `OrchestratorAgent` β `retriever` β `query_executor` β `chatbot` | Delegates to `ChatHandler.handle()`. Redis cache, fast intent, history load, and message persistence stay in the endpoint | |
| | `POST /api/v1/database-clients/{id}/ingest` | Called `db_pipeline_service.run()` and dual-wrote vectors | Calls **only** `on_db_registered` (catalog build). Failure β HTTP 500 | |
| | `POST /api/v1/document/process` | Always pushed to vector store | PDF/DOCX/TXT β `knowledge_processor` (vectors); CSV/XLSX β `on_tabular_uploaded` (catalog only, **no vector embedding**) | |
| | `POST /api/v1/document/upload` | Storage + DB row | Same, plus `on_document_uploaded` trigger | |
| | `POST /api/v1/data-catalog/rebuild` | β | New: iterates all sources, re-runs per-source trigger | |
| | `GET /api/v1/data-catalog/{user_id}` | β | New: returns `list[CatalogIndexEntry]` | |
|
|
| ### 2.5 Phase 1 files still in production use |
|
|
| These were **not rewritten** β Phase 2 imports them directly: |
|
|
| - `src/database_client/database_client_service.py` |
| - `src/utils/db_credential_encryption.py` (`decrypt_credentials_dict`) β `src/security/credentials.py` is still a stub |
| - `src/pipeline/db_pipeline/db_pipeline_service.py` (`engine_scope` context manager β used by both the introspector and `DbExecutor`) |
| - `src/pipeline/db_pipeline/extractor.py` (`get_schema`, `profile_column`, `get_row_count`) |
| - `src/knowledge/processing_service.py` (PDF / DOCX / TXT extraction + embedding) |
| - `src/db/postgres/{connection,init_db,vector_store}.py`, `src/storage/az_blob/`, `src/middlewares/`, `src/security/auth.py` |
|
|
| --- |
|
|
| ## 3. End-to-end flow (current state) |
|
|
| ### 3.1 Ingestion |
|
|
| ``` |
| User action Pipeline Storage |
| ββββββββββββββ ββββββββββββββββββββββββββββ βββββββββββββββββ |
| upload PDF/DOCX/TXT β DocumentPipeline β Azure Blob + PGVector |
| (extract β chunk β embed) (table: langchain_pg_embedding) |
| + on_document_uploaded + retrieval cache invalidate |
| |
| upload CSV/XLSX β TabularIntrospector β Azure Blob (Parquet) |
| (sheets / columns + sample + stats) + data_catalog jsonb row |
| β CatalogValidator β CatalogStore (NO vector store β catalog only) |
| via on_tabular_uploaded |
| |
| register DB β DatabaseIntrospector β data_catalog jsonb row |
| (information_schema + sample + FKs) |
| β validate β store |
| via on_db_registered |
| ``` |
|
|
| ### 3.2 Query (per user message β SSE stream) |
|
|
| ``` |
| POST /api/v1/chat/stream |
| β |
| βββ Redis cache check (24h TTL) β hit returns cached stream |
| βββ _fast_intent (greetings / goodbyes) β bypass LLM |
| βββ load history from chat_messages |
| β |
| βββ ChatHandler.handle(message, user_id, history) [src/agents/chat_handler.py] |
| β |
| ββ OrchestratorAgent.classify() [agents/orchestration.py] |
| β β needs_search, source_hint, rewritten_query |
| β |
| βββ source_hint == "chat" |
| β β ChatbotAgent.astream() β yield chunk events |
| β |
| βββ source_hint == "unstructured" |
| β β RetrievalRouter.retrieve() [retrieval/router.py, Redis-cached] |
| β β DocumentRetriever (PGVector MMR/cosine/etc.) |
| β β ChatbotAgent.astream(chunks=...) |
| β |
| βββ source_hint == "structured" |
| β CatalogReader.read(user_id, "structured") [catalog/reader.py] |
| β QueryService.run(user_id, question, catalog) [query/service.py] |
| β |
| ββ QueryPlannerService.plan(...) [query/planner/service.py] |
| β LLM(catalog, question, prev_error?) β QueryIR |
| β |
| ββ IRValidator.validate(ir, catalog) [query/ir/validator.py] |
| β fail β loop back to planner with error context (max 3) |
| β |
| ββ ExecutorDispatcher.pick(ir) [query/executor/dispatcher.py] |
| β schema source β DbExecutor |
| β tabular source β TabularExecutor |
| β |
| ββ DbExecutor.run(ir): [query/executor/db.py] |
| β SqlCompiler β (sql, params) |
| β β sqlglot SELECT-only guard |
| β β engine_scope (Phase 1 utility) in asyncio.to_thread |
| β β RO txn + statement_timeout=30s + 10k cap |
| β |
| ββ TabularExecutor.run(ir): [query/executor/tabular.py] |
| β resolve Parquet blob path |
| β β download β PandasCompiler.apply(df) |
| β β asyncio.to_thread β 10k cap |
| β |
| ββ QueryResult { rows, columns, row_count, |
| truncated, source_id, error?, elapsed_ms } |
| β |
| ChatbotAgent.astream(query_result=...) |
| β yield chunk events |
| β |
| βββ final events: done / error |
| β |
| βββ persist user + assistant messages to chat_messages |
| βββ populate Redis cache |
| ``` |
|
|
| **Safety invariants for the structured path** (read-only at every layer): |
|
|
| 1. IR validated against the catalog before reaching the compiler |
| 2. Identifiers come from the catalog (trusted; inlined as quoted identifiers) |
| 3. Values from `IR.filters` are always parameterized |
| 4. Compiler is deterministic β no LLM in the hot path |
| 5. sqlglot rejects anything that isn't a pure SELECT |
| 6. DB connection is read-only with a 30 s `statement_timeout` |
| 7. Hard 10 000 row cap on both executors; neither raises β errors go in `QueryResult.error` |
|
|
| --- |
|
|
| ## 4. Summary table for review |
|
|
| | Concern | Phase 1 β where it lived | Phase 2 β where it lives | Change type | |
| |---|---|---|---| |
| | Intent classification | `agents/orchestration.py::OrchestratorAgent` (free-text intent) | **Same path + same class name** β body rewritten to emit `IntentRouterDecision` | Body rewrite only | |
| | Top-level chat orchestration | Inline in `api/v1/chat.py` | `agents/chat_handler.py::ChatHandler` | Extracted to a reusable module | |
| | Final answer formation | `agents/chatbot.py` (Phase 1 LangChain) | `agents/chatbot.py::ChatbotAgent` (was `AnswerAgent` in `answer_agent.py` mid-cycle) | Rewritten + renamed | |
| | Schema retrieval (DB / tabular) | `rag/retrievers/schema.py` + PGVector chunks | **Removed**. Replaced by catalog (`catalog/store.py` jsonb) loaded verbatim into planner prompt | Whole concept replaced | |
| | Doc retrieval (PDF / DOCX / TXT) | `rag/retrievers/document.py`, `rag/router.py` | `retrieval/document.py`, `retrieval/router.py` | Moved; Redis cache restored; tabular files filtered | |
| | Query writing | `query/query_executor.py` + `models/sql_query.py` (LLM writes SQL) | `query/planner/service.py` (LLM writes IR) + `query/compiler/sql.py` (deterministic) | LLM emits intent, not SQL | |
| | DB execution | `query/executors/db_executor.py` | `query/executor/db.py::DbExecutor` | Folder renamed (`executors` β `executor`); sqlglot guard + RO txn + 30 s timeout kept | |
| | Tabular execution | `query/executors/tabular.py` | `query/executor/tabular.py::TabularExecutor` | Parquet-only; pandas compiler split out | |
| | Executor selection | Hard-coded in `query_executor.py` | `query/executor/dispatcher.py::ExecutorDispatcher` | New; routes by `source.source_type` | |
| | Catalog (NEW) | β | `catalog/` (models, introspect/, validator, store, reader, pii_detector, render) | New subsystem | |
| | Catalog persistence | (data was embedded in PGVector) | Postgres jsonb table `data_catalog`, keyed by `user_id` | New table | |
| | Ingestion triggers | Inline in API endpoints | `pipeline/triggers.py` (`on_db_registered`, `on_tabular_uploaded`, `on_document_uploaded`, `on_catalog_rebuild_requested`) | Centralized event entry points | |
| | Structured pipeline | `pipeline/db_pipeline/db_pipeline_service.py` (still present for `engine_scope` + extractor reuse) | `pipeline/structured_pipeline.py` (orchestrator) β reuses Phase 1 extractor | New orchestrator wraps Phase 1 introspection helpers | |
| | Document pipeline | `pipeline/document_pipeline/document_pipeline.py` (folder) | `pipeline/document_pipeline.py` (file) | Flattened; CSV / XLSX now skip the vector store | |
| | Parquet helper | `knowledge/parquet_service.py` | `storage/parquet.py` | Moved into `storage/` | |
| | Prompts | `config/agents/system_prompt.md`, `guardrails_prompt.md` | `config/prompts/{intent_router,query_planner,chatbot_system,guardrails}.md` | Folder renamed; split into four files; guardrails appended to `chatbot_system` at load | |
| | PII detection | β | `catalog/pii_detector.py` + `security/pii_patterns.py` | New. Columns flagged `pii_flag=true` get `sample_values: null` so PII never enters prompts | |
| | Chat endpoint | `api/v1/chat.py` (does everything inline) | `api/v1/chat.py` (cache + history + persistence) β delegates to `ChatHandler` | Slimmed; SSE event shape is `intent` / `chunk` / `done` / `error` | |
| | DB ingest endpoint | `api/v1/db_client.py::ingest` (Phase 1 `db_pipeline_service.run()`) | `api/v1/db_client.py::ingest` (calls `on_db_registered` only) | Phase 1 dual-write removed | |
| | Document process endpoint | `api/v1/document.py::process` (always vectorize) | `api/v1/document.py::process` (PDF/DOCX/TXT β vectors; CSV/XLSX β catalog via `on_tabular_uploaded`) | Routing by file type | |
| | Catalog management API | β | `api/v1/data_catalog.py` (GET index + POST rebuild) | New | |
|
|
| **Bottom line.** Every Phase 1 file under `src/rag/`, `src/tools/`, `src/query/executors/`, `src/query/query_executor.py`, `src/query/base.py`, `src/api/v1/knowledge.py`, and `src/config/agents/` is gone. Phase 1 introspection helpers under `src/pipeline/db_pipeline/` and `src/database_client/` are still imported by Phase 2 β they were not rewritten, just wrapped. The three LLM call sites are now explicit and the SQL-writing one no longer exists; the planner emits a Pydantic-validated `QueryIR` instead. |
|
|
| The one filename gotcha to remember: the **intent router** still lives at `src/agents/orchestration.py` as class `OrchestratorAgent` (Phase 1 name kept for import-site compatibility, Phase 2 body). The matching prompt and tests use the `intent_router` name, but the source module does not. |
|
|