Agentic-Service-Data-Eyond-Catalog / PHASE1_TO_PHASE2_REPORT.md
ishaq101's picture
feat/Catalog Retrieval System (#1)
6bff5d9
# Phase 1 β†’ Phase 2 Migration Report
A walkthrough of what changed between the original retrieval-style backend (Phase 1) and the current catalog-driven backend (Phase 2). Intended as a hand-off for the lead.
---
## 1. The conceptual change
**Phase 1** was a single retrieval-style RAG pipeline. Every question β€” whether it pointed at a database, a spreadsheet, or a PDF β€” went through the same primitive: **chunk + embed + top-K** over PGVector. Schema and tabular columns were embedded as chunks and ranked alongside prose. When the question needed SQL, the LLM **wrote the SQL string directly** (via `query_executor`).
**Phase 2** splits the system into two paths governed by an LLM router:
| Path | Primitive | Why |
|---|---|---|
| Unstructured (PDF / DOCX / TXT) | Dense similarity over prose chunks (PGVector) | Right primitive for free text |
| Structured (DB / CSV / XLSX / Parquet) | **Per-user data catalog** β†’ LLM emits a **JSON IR** of intent β†’ deterministic **compiler** β†’ **executor** (SQL or pandas) | A column lookup shouldn't go through a similarity ranking lottery; the LLM emits intent, never SQL syntax |
Three explicit LLM call sites only:
1. **Intent router** (classifies the user message into `chat` / `unstructured` / `structured`)
2. **Query planner** (turns the question + catalog into a Pydantic-validated `QueryIR`)
3. **Chatbot agent** (formats the final answer, streamed over SSE)
Everything else β€” IR validation, SQL/pandas compilation, execution β€” is deterministic Python.
---
## 2. File-by-file changes
### 2.1 Deleted (Phase 1 only)
| Phase 1 path | Reason it was removed |
|---|---|
| `src/rag/base.py`, `src/rag/retriever.py`, `src/rag/router.py` | Replaced by `src/retrieval/` |
| `src/rag/retrievers/baseline.py`, `schema.py`, `document.py` | Schema retrieval gone (catalog replaces it); document retriever rewritten in `src/retrieval/document.py` |
| `src/tools/search.py` (whole `tools/` folder) | Only consumer was `rag/router.py` |
| `src/query/base.py` | Duplicate of `query/executor/base.py` |
| `src/query/query_executor.py` | Replaced by `src/query/service.py` |
| `src/query/executors/db_executor.py` | Replaced by `src/query/executor/db.py` |
| `src/query/executors/tabular.py` | Replaced by `src/query/executor/tabular.py` |
| `src/agents/chatbot.py` (Phase 1 LangChain chatbot) | Phase 2 `ChatbotAgent` lives at the same path now β€” see Β§2.2 |
| `src/api/v1/knowledge.py` | Fake `/knowledge/rebuild` endpoint, never wired |
| `src/config/agents/system_prompt.md`, `guardrails_prompt.md` | Replaced by `src/config/prompts/{chatbot_system,guardrails}.md` |
| `src/models/structured_output.py` (`IntentClassification`) | Replaced by `IntentRouterDecision` Pydantic model inside `agents/orchestration.py` |
| `src/models/sql_query.py` | LLM no longer emits SQL; IR replaces it |
| `src/pipeline/orchestrator.py` (empty stub) | Redundant β€” `StructuredPipeline` takes the introspector at `run()` time |
### 2.2 Renamed / moved (same role, new home)
| Phase 1 location | Phase 2 location | Notes |
|---|---|---|
| `src/agents/chatbot.py` (Phase 1) β†’ deleted, then `src/agents/answer_agent.py` (`AnswerAgent`) β†’ renamed | `src/agents/chatbot.py::ChatbotAgent` | Final answer formation; streams via `astream` |
| `src/knowledge/parquet_service.py` | `src/storage/parquet.py` | Parquet upload/download helper |
| `src/pipeline/document_pipeline/document_pipeline.py` (folder) | `src/pipeline/document_pipeline.py` (flat) | Single module |
| `src/rag/retrievers/document.py` | `src/retrieval/document.py` | `DocumentRetriever` migrated; tabular file types filtered out of results |
| `src/rag/router.py` | `src/retrieval/router.py` | `RetrievalRouter`, Redis-cached, unstructured-only; dead `db: AsyncSession` + `source_hint` params removed |
| `src/rag/base.py` (`RetrievalResult`, `BaseRetriever`) | `src/retrieval/base.py` | Same dataclass + ABC |
> **Heads-up on the intent router**: the Phase 1 file `src/agents/orchestration.py` and its class `OrchestratorAgent` were **kept in place** for Phase 2 β€” but the body was fully rewritten. The class now emits `IntentRouterDecision(needs_search, source_hint ∈ {chat, unstructured, structured}, rewritten_query)`. The prompt file and test file use the `intent_router` name (`config/prompts/intent_router.md`, `tests/agents/test_intent_router.py`), but **the source module is still `orchestration.py` and the class is still `OrchestratorAgent`**. Existing imports continue to work; only the behavior changed.
### 2.3 Added (Phase 2 new)
**Catalog subsystem (whole new concept)**
| Path | Role |
|---|---|
| `src/catalog/models.py` | Pydantic: `Catalog β†’ Source[] β†’ Table[] β†’ Column[]`, `ForeignKey`, `ColumnStats.top_values` |
| `src/catalog/introspect/base.py` | `BaseIntrospector` ABC |
| `src/catalog/introspect/database.py` | DB introspector β€” wraps Phase 1 `db_pipeline/extractor.py` (`get_schema`, `profile_column`, `get_row_count`) |
| `src/catalog/introspect/tabular.py` | CSV / XLSX / Parquet introspector β€” one `Table` per XLSX sheet |
| `src/catalog/render.py` | Renders a `Source` for the planner prompt |
| `src/catalog/validator.py` | Unique-ID + foreign-key-ref invariants |
| `src/catalog/store.py` | Postgres `jsonb` upsert keyed by `user_id` (table `data_catalog`) |
| `src/catalog/reader.py` | Loads + filters catalog by `source_hint` |
| `src/catalog/pii_detector.py` | Flags PII columns at ingestion β†’ suppresses `sample_values` |
| `src/security/pii_patterns.py` | Name patterns + value regex used by the detector |
**JSON IR + query subsystem**
| Path | Role |
|---|---|
| `src/query/ir/models.py` | `QueryIR` Pydantic schema |
| `src/query/ir/operators.py` | `ALLOWED_FILTER_OPS`, `ALLOWED_AGG_FNS`, `LIMIT_HARD_CAP`, `TYPE_COMPATIBILITY` |
| `src/query/ir/validator.py` | Catalog-aware IR validation (rejects unknown column ids, bad ops, type mismatches, oversize limits) |
| `src/query/planner/service.py` | `QueryPlannerService.plan(question, catalog, previous_error)` β€” Azure OpenAI structured output β†’ `QueryIR` |
| `src/query/planner/prompt.py` | Builds the planner prompt from catalog text |
| `src/query/compiler/base.py` | Compiler ABC |
| `src/query/compiler/sql.py` | `SqlCompiler` (Postgres) β€” all 12 filter ops, params as a dict |
| `src/query/compiler/pandas.py` | `PandasCompiler` β€” returns `CompiledPandas(apply, output_columns)` |
| `src/query/executor/base.py` | `BaseExecutor` + `QueryResult` |
| `src/query/executor/db.py` | `DbExecutor` β€” sqlglot SELECT-only guard, RO txn, 30 s `statement_timeout`, 10 k row cap |
| `src/query/executor/tabular.py` | `TabularExecutor` β€” Parquet via blob, `asyncio.to_thread`, 10 k cap |
| `src/query/executor/dispatcher.py` | `ExecutorDispatcher.pick(ir)` β€” picks by `source.source_type` |
| `src/query/service.py` | `QueryService.run(user_id, question, catalog)` β€” plan β†’ validate β†’ retry (max 3) β†’ dispatch β†’ execute |
**Agents**
| Path | Role |
|---|---|
| `src/agents/orchestration.py` | `OrchestratorAgent` β€” Phase 1 file/class name preserved; Phase 2 body. Emits `IntentRouterDecision` |
| `src/agents/chatbot.py` | `ChatbotAgent` β€” formerly `AnswerAgent` in `agents/answer_agent.py`; renamed in Cleanup PR |
| `src/agents/chat_handler.py` | `ChatHandler.handle(...)` β€” top-level orchestrator; yields `intent` / `chunk` / `done` / `error` SSE events |
**Pipelines & API**
| Path | Role |
|---|---|
| `src/pipeline/structured_pipeline.py` | DB / tabular ingestion: introspect β†’ merge β†’ validate β†’ upsert |
| `src/pipeline/triggers.py` | `on_db_registered`, `on_tabular_uploaded`, `on_document_uploaded`, `on_catalog_rebuild_requested` |
| `src/api/v1/data_catalog.py` | `GET /api/v1/data-catalog/{user_id}` + `POST /api/v1/data-catalog/rebuild` |
| `src/models/api/catalog.py` | Catalog request/response models |
| `src/config/prompts/intent_router.md`, `query_planner.md`, `chatbot_system.md`, `guardrails.md` | New prompts. `guardrails.md` is appended to `chatbot_system.md` at load time |
| `src/db/postgres/models.py` (added `Catalog` SQLAlchemy class) | Stores the per-user jsonb document in `data_catalog` |
### 2.4 Rewired API endpoints
| Endpoint | Phase 1 wiring | Phase 2 wiring |
|---|---|---|
| `POST /api/v1/chat/stream` | Inline in `chat.py`: `OrchestratorAgent` β†’ `retriever` β†’ `query_executor` β†’ `chatbot` | Delegates to `ChatHandler.handle()`. Redis cache, fast intent, history load, and message persistence stay in the endpoint |
| `POST /api/v1/database-clients/{id}/ingest` | Called `db_pipeline_service.run()` and dual-wrote vectors | Calls **only** `on_db_registered` (catalog build). Failure β†’ HTTP 500 |
| `POST /api/v1/document/process` | Always pushed to vector store | PDF/DOCX/TXT β†’ `knowledge_processor` (vectors); CSV/XLSX β†’ `on_tabular_uploaded` (catalog only, **no vector embedding**) |
| `POST /api/v1/document/upload` | Storage + DB row | Same, plus `on_document_uploaded` trigger |
| `POST /api/v1/data-catalog/rebuild` | β€” | New: iterates all sources, re-runs per-source trigger |
| `GET /api/v1/data-catalog/{user_id}` | β€” | New: returns `list[CatalogIndexEntry]` |
### 2.5 Phase 1 files still in production use
These were **not rewritten** β€” Phase 2 imports them directly:
- `src/database_client/database_client_service.py`
- `src/utils/db_credential_encryption.py` (`decrypt_credentials_dict`) β€” `src/security/credentials.py` is still a stub
- `src/pipeline/db_pipeline/db_pipeline_service.py` (`engine_scope` context manager β€” used by both the introspector and `DbExecutor`)
- `src/pipeline/db_pipeline/extractor.py` (`get_schema`, `profile_column`, `get_row_count`)
- `src/knowledge/processing_service.py` (PDF / DOCX / TXT extraction + embedding)
- `src/db/postgres/{connection,init_db,vector_store}.py`, `src/storage/az_blob/`, `src/middlewares/`, `src/security/auth.py`
---
## 3. End-to-end flow (current state)
### 3.1 Ingestion
```
User action Pipeline Storage
────────────── ──────────────────────────── ─────────────────
upload PDF/DOCX/TXT β†’ DocumentPipeline β†’ Azure Blob + PGVector
(extract β†’ chunk β†’ embed) (table: langchain_pg_embedding)
+ on_document_uploaded + retrieval cache invalidate
upload CSV/XLSX β†’ TabularIntrospector β†’ Azure Blob (Parquet)
(sheets / columns + sample + stats) + data_catalog jsonb row
β†’ CatalogValidator β†’ CatalogStore (NO vector store β€” catalog only)
via on_tabular_uploaded
register DB β†’ DatabaseIntrospector β†’ data_catalog jsonb row
(information_schema + sample + FKs)
β†’ validate β†’ store
via on_db_registered
```
### 3.2 Query (per user message β†’ SSE stream)
```
POST /api/v1/chat/stream
β”‚
β”œβ”€β”€ Redis cache check (24h TTL) β€” hit returns cached stream
β”œβ”€β”€ _fast_intent (greetings / goodbyes) β€” bypass LLM
β”œβ”€β”€ load history from chat_messages
β”‚
└── ChatHandler.handle(message, user_id, history) [src/agents/chat_handler.py]
β”‚
β”œβ”€ OrchestratorAgent.classify() [agents/orchestration.py]
β”‚ β†’ needs_search, source_hint, rewritten_query
β”‚
β”œβ”€β”€ source_hint == "chat"
β”‚ β†’ ChatbotAgent.astream() β†’ yield chunk events
β”‚
β”œβ”€β”€ source_hint == "unstructured"
β”‚ β†’ RetrievalRouter.retrieve() [retrieval/router.py, Redis-cached]
β”‚ β†’ DocumentRetriever (PGVector MMR/cosine/etc.)
β”‚ β†’ ChatbotAgent.astream(chunks=...)
β”‚
└── source_hint == "structured"
β†’ CatalogReader.read(user_id, "structured") [catalog/reader.py]
β†’ QueryService.run(user_id, question, catalog) [query/service.py]
β”‚
β”œβ”€ QueryPlannerService.plan(...) [query/planner/service.py]
β”‚ LLM(catalog, question, prev_error?) β†’ QueryIR
β”‚
β”œβ”€ IRValidator.validate(ir, catalog) [query/ir/validator.py]
β”‚ fail β†’ loop back to planner with error context (max 3)
β”‚
β”œβ”€ ExecutorDispatcher.pick(ir) [query/executor/dispatcher.py]
β”‚ schema source β†’ DbExecutor
β”‚ tabular source β†’ TabularExecutor
β”‚
β”œβ”€ DbExecutor.run(ir): [query/executor/db.py]
β”‚ SqlCompiler β†’ (sql, params)
β”‚ β†’ sqlglot SELECT-only guard
β”‚ β†’ engine_scope (Phase 1 utility) in asyncio.to_thread
β”‚ β†’ RO txn + statement_timeout=30s + 10k cap
β”‚
β”œβ”€ TabularExecutor.run(ir): [query/executor/tabular.py]
β”‚ resolve Parquet blob path
β”‚ β†’ download β†’ PandasCompiler.apply(df)
β”‚ β†’ asyncio.to_thread β†’ 10k cap
β”‚
└─ QueryResult { rows, columns, row_count,
truncated, source_id, error?, elapsed_ms }
β†’
ChatbotAgent.astream(query_result=...)
β†’ yield chunk events
β”‚
└── final events: done / error
β”‚
└── persist user + assistant messages to chat_messages
└── populate Redis cache
```
**Safety invariants for the structured path** (read-only at every layer):
1. IR validated against the catalog before reaching the compiler
2. Identifiers come from the catalog (trusted; inlined as quoted identifiers)
3. Values from `IR.filters` are always parameterized
4. Compiler is deterministic β€” no LLM in the hot path
5. sqlglot rejects anything that isn't a pure SELECT
6. DB connection is read-only with a 30 s `statement_timeout`
7. Hard 10 000 row cap on both executors; neither raises β€” errors go in `QueryResult.error`
---
## 4. Summary table for review
| Concern | Phase 1 β€” where it lived | Phase 2 β€” where it lives | Change type |
|---|---|---|---|
| Intent classification | `agents/orchestration.py::OrchestratorAgent` (free-text intent) | **Same path + same class name** β€” body rewritten to emit `IntentRouterDecision` | Body rewrite only |
| Top-level chat orchestration | Inline in `api/v1/chat.py` | `agents/chat_handler.py::ChatHandler` | Extracted to a reusable module |
| Final answer formation | `agents/chatbot.py` (Phase 1 LangChain) | `agents/chatbot.py::ChatbotAgent` (was `AnswerAgent` in `answer_agent.py` mid-cycle) | Rewritten + renamed |
| Schema retrieval (DB / tabular) | `rag/retrievers/schema.py` + PGVector chunks | **Removed**. Replaced by catalog (`catalog/store.py` jsonb) loaded verbatim into planner prompt | Whole concept replaced |
| Doc retrieval (PDF / DOCX / TXT) | `rag/retrievers/document.py`, `rag/router.py` | `retrieval/document.py`, `retrieval/router.py` | Moved; Redis cache restored; tabular files filtered |
| Query writing | `query/query_executor.py` + `models/sql_query.py` (LLM writes SQL) | `query/planner/service.py` (LLM writes IR) + `query/compiler/sql.py` (deterministic) | LLM emits intent, not SQL |
| DB execution | `query/executors/db_executor.py` | `query/executor/db.py::DbExecutor` | Folder renamed (`executors` β†’ `executor`); sqlglot guard + RO txn + 30 s timeout kept |
| Tabular execution | `query/executors/tabular.py` | `query/executor/tabular.py::TabularExecutor` | Parquet-only; pandas compiler split out |
| Executor selection | Hard-coded in `query_executor.py` | `query/executor/dispatcher.py::ExecutorDispatcher` | New; routes by `source.source_type` |
| Catalog (NEW) | β€” | `catalog/` (models, introspect/, validator, store, reader, pii_detector, render) | New subsystem |
| Catalog persistence | (data was embedded in PGVector) | Postgres jsonb table `data_catalog`, keyed by `user_id` | New table |
| Ingestion triggers | Inline in API endpoints | `pipeline/triggers.py` (`on_db_registered`, `on_tabular_uploaded`, `on_document_uploaded`, `on_catalog_rebuild_requested`) | Centralized event entry points |
| Structured pipeline | `pipeline/db_pipeline/db_pipeline_service.py` (still present for `engine_scope` + extractor reuse) | `pipeline/structured_pipeline.py` (orchestrator) β€” reuses Phase 1 extractor | New orchestrator wraps Phase 1 introspection helpers |
| Document pipeline | `pipeline/document_pipeline/document_pipeline.py` (folder) | `pipeline/document_pipeline.py` (file) | Flattened; CSV / XLSX now skip the vector store |
| Parquet helper | `knowledge/parquet_service.py` | `storage/parquet.py` | Moved into `storage/` |
| Prompts | `config/agents/system_prompt.md`, `guardrails_prompt.md` | `config/prompts/{intent_router,query_planner,chatbot_system,guardrails}.md` | Folder renamed; split into four files; guardrails appended to `chatbot_system` at load |
| PII detection | β€” | `catalog/pii_detector.py` + `security/pii_patterns.py` | New. Columns flagged `pii_flag=true` get `sample_values: null` so PII never enters prompts |
| Chat endpoint | `api/v1/chat.py` (does everything inline) | `api/v1/chat.py` (cache + history + persistence) β†’ delegates to `ChatHandler` | Slimmed; SSE event shape is `intent` / `chunk` / `done` / `error` |
| DB ingest endpoint | `api/v1/db_client.py::ingest` (Phase 1 `db_pipeline_service.run()`) | `api/v1/db_client.py::ingest` (calls `on_db_registered` only) | Phase 1 dual-write removed |
| Document process endpoint | `api/v1/document.py::process` (always vectorize) | `api/v1/document.py::process` (PDF/DOCX/TXT β†’ vectors; CSV/XLSX β†’ catalog via `on_tabular_uploaded`) | Routing by file type |
| Catalog management API | β€” | `api/v1/data_catalog.py` (GET index + POST rebuild) | New |
**Bottom line.** Every Phase 1 file under `src/rag/`, `src/tools/`, `src/query/executors/`, `src/query/query_executor.py`, `src/query/base.py`, `src/api/v1/knowledge.py`, and `src/config/agents/` is gone. Phase 1 introspection helpers under `src/pipeline/db_pipeline/` and `src/database_client/` are still imported by Phase 2 β€” they were not rewritten, just wrapped. The three LLM call sites are now explicit and the SQL-writing one no longer exists; the planner emits a Pydantic-validated `QueryIR` instead.
The one filename gotcha to remember: the **intent router** still lives at `src/agents/orchestration.py` as class `OrchestratorAgent` (Phase 1 name kept for import-site compatibility, Phase 2 body). The matching prompt and tests use the `intent_router` name, but the source module does not.