Phase 1 β Phase 2 Migration Report
A walkthrough of what changed between the original retrieval-style backend (Phase 1) and the current catalog-driven backend (Phase 2). Intended as a hand-off for the lead.
1. The conceptual change
Phase 1 was a single retrieval-style RAG pipeline. Every question β whether it pointed at a database, a spreadsheet, or a PDF β went through the same primitive: chunk + embed + top-K over PGVector. Schema and tabular columns were embedded as chunks and ranked alongside prose. When the question needed SQL, the LLM wrote the SQL string directly (via query_executor).
Phase 2 splits the system into two paths governed by an LLM router:
| Path | Primitive | Why |
|---|---|---|
| Unstructured (PDF / DOCX / TXT) | Dense similarity over prose chunks (PGVector) | Right primitive for free text |
| Structured (DB / CSV / XLSX / Parquet) | Per-user data catalog β LLM emits a JSON IR of intent β deterministic compiler β executor (SQL or pandas) | A column lookup shouldn't go through a similarity ranking lottery; the LLM emits intent, never SQL syntax |
Three explicit LLM call sites only:
- Intent router (classifies the user message into
chat/unstructured/structured) - Query planner (turns the question + catalog into a Pydantic-validated
QueryIR) - Chatbot agent (formats the final answer, streamed over SSE)
Everything else β IR validation, SQL/pandas compilation, execution β is deterministic Python.
2. File-by-file changes
2.1 Deleted (Phase 1 only)
| Phase 1 path | Reason it was removed |
|---|---|
src/rag/base.py, src/rag/retriever.py, src/rag/router.py |
Replaced by src/retrieval/ |
src/rag/retrievers/baseline.py, schema.py, document.py |
Schema retrieval gone (catalog replaces it); document retriever rewritten in src/retrieval/document.py |
src/tools/search.py (whole tools/ folder) |
Only consumer was rag/router.py |
src/query/base.py |
Duplicate of query/executor/base.py |
src/query/query_executor.py |
Replaced by src/query/service.py |
src/query/executors/db_executor.py |
Replaced by src/query/executor/db.py |
src/query/executors/tabular.py |
Replaced by src/query/executor/tabular.py |
src/agents/chatbot.py (Phase 1 LangChain chatbot) |
Phase 2 ChatbotAgent lives at the same path now β see Β§2.2 |
src/api/v1/knowledge.py |
Fake /knowledge/rebuild endpoint, never wired |
src/config/agents/system_prompt.md, guardrails_prompt.md |
Replaced by src/config/prompts/{chatbot_system,guardrails}.md |
src/models/structured_output.py (IntentClassification) |
Replaced by IntentRouterDecision Pydantic model inside agents/orchestration.py |
src/models/sql_query.py |
LLM no longer emits SQL; IR replaces it |
src/pipeline/orchestrator.py (empty stub) |
Redundant β StructuredPipeline takes the introspector at run() time |
2.2 Renamed / moved (same role, new home)
| Phase 1 location | Phase 2 location | Notes |
|---|---|---|
src/agents/chatbot.py (Phase 1) β deleted, then src/agents/answer_agent.py (AnswerAgent) β renamed |
src/agents/chatbot.py::ChatbotAgent |
Final answer formation; streams via astream |
src/knowledge/parquet_service.py |
src/storage/parquet.py |
Parquet upload/download helper |
src/pipeline/document_pipeline/document_pipeline.py (folder) |
src/pipeline/document_pipeline.py (flat) |
Single module |
src/rag/retrievers/document.py |
src/retrieval/document.py |
DocumentRetriever migrated; tabular file types filtered out of results |
src/rag/router.py |
src/retrieval/router.py |
RetrievalRouter, Redis-cached, unstructured-only; dead db: AsyncSession + source_hint params removed |
src/rag/base.py (RetrievalResult, BaseRetriever) |
src/retrieval/base.py |
Same dataclass + ABC |
Heads-up on the intent router: the Phase 1 file
src/agents/orchestration.pyand its classOrchestratorAgentwere kept in place for Phase 2 β but the body was fully rewritten. The class now emitsIntentRouterDecision(needs_search, source_hint β {chat, unstructured, structured}, rewritten_query). The prompt file and test file use theintent_routername (config/prompts/intent_router.md,tests/agents/test_intent_router.py), but the source module is stillorchestration.pyand the class is stillOrchestratorAgent. Existing imports continue to work; only the behavior changed.
2.3 Added (Phase 2 new)
Catalog subsystem (whole new concept)
| Path | Role |
|---|---|
src/catalog/models.py |
Pydantic: Catalog β Source[] β Table[] β Column[], ForeignKey, ColumnStats.top_values |
src/catalog/introspect/base.py |
BaseIntrospector ABC |
src/catalog/introspect/database.py |
DB introspector β wraps Phase 1 db_pipeline/extractor.py (get_schema, profile_column, get_row_count) |
src/catalog/introspect/tabular.py |
CSV / XLSX / Parquet introspector β one Table per XLSX sheet |
src/catalog/render.py |
Renders a Source for the planner prompt |
src/catalog/validator.py |
Unique-ID + foreign-key-ref invariants |
src/catalog/store.py |
Postgres jsonb upsert keyed by user_id (table data_catalog) |
src/catalog/reader.py |
Loads + filters catalog by source_hint |
src/catalog/pii_detector.py |
Flags PII columns at ingestion β suppresses sample_values |
src/security/pii_patterns.py |
Name patterns + value regex used by the detector |
JSON IR + query subsystem
| Path | Role |
|---|---|
src/query/ir/models.py |
QueryIR Pydantic schema |
src/query/ir/operators.py |
ALLOWED_FILTER_OPS, ALLOWED_AGG_FNS, LIMIT_HARD_CAP, TYPE_COMPATIBILITY |
src/query/ir/validator.py |
Catalog-aware IR validation (rejects unknown column ids, bad ops, type mismatches, oversize limits) |
src/query/planner/service.py |
QueryPlannerService.plan(question, catalog, previous_error) β Azure OpenAI structured output β QueryIR |
src/query/planner/prompt.py |
Builds the planner prompt from catalog text |
src/query/compiler/base.py |
Compiler ABC |
src/query/compiler/sql.py |
SqlCompiler (Postgres) β all 12 filter ops, params as a dict |
src/query/compiler/pandas.py |
PandasCompiler β returns CompiledPandas(apply, output_columns) |
src/query/executor/base.py |
BaseExecutor + QueryResult |
src/query/executor/db.py |
DbExecutor β sqlglot SELECT-only guard, RO txn, 30 s statement_timeout, 10 k row cap |
src/query/executor/tabular.py |
TabularExecutor β Parquet via blob, asyncio.to_thread, 10 k cap |
src/query/executor/dispatcher.py |
ExecutorDispatcher.pick(ir) β picks by source.source_type |
src/query/service.py |
QueryService.run(user_id, question, catalog) β plan β validate β retry (max 3) β dispatch β execute |
Agents
| Path | Role |
|---|---|
src/agents/orchestration.py |
OrchestratorAgent β Phase 1 file/class name preserved; Phase 2 body. Emits IntentRouterDecision |
src/agents/chatbot.py |
ChatbotAgent β formerly AnswerAgent in agents/answer_agent.py; renamed in Cleanup PR |
src/agents/chat_handler.py |
ChatHandler.handle(...) β top-level orchestrator; yields intent / chunk / done / error SSE events |
Pipelines & API
| Path | Role |
|---|---|
src/pipeline/structured_pipeline.py |
DB / tabular ingestion: introspect β merge β validate β upsert |
src/pipeline/triggers.py |
on_db_registered, on_tabular_uploaded, on_document_uploaded, on_catalog_rebuild_requested |
src/api/v1/data_catalog.py |
GET /api/v1/data-catalog/{user_id} + POST /api/v1/data-catalog/rebuild |
src/models/api/catalog.py |
Catalog request/response models |
src/config/prompts/intent_router.md, query_planner.md, chatbot_system.md, guardrails.md |
New prompts. guardrails.md is appended to chatbot_system.md at load time |
src/db/postgres/models.py (added Catalog SQLAlchemy class) |
Stores the per-user jsonb document in data_catalog |
2.4 Rewired API endpoints
| Endpoint | Phase 1 wiring | Phase 2 wiring |
|---|---|---|
POST /api/v1/chat/stream |
Inline in chat.py: OrchestratorAgent β retriever β query_executor β chatbot |
Delegates to ChatHandler.handle(). Redis cache, fast intent, history load, and message persistence stay in the endpoint |
POST /api/v1/database-clients/{id}/ingest |
Called db_pipeline_service.run() and dual-wrote vectors |
Calls only on_db_registered (catalog build). Failure β HTTP 500 |
POST /api/v1/document/process |
Always pushed to vector store | PDF/DOCX/TXT β knowledge_processor (vectors); CSV/XLSX β on_tabular_uploaded (catalog only, no vector embedding) |
POST /api/v1/document/upload |
Storage + DB row | Same, plus on_document_uploaded trigger |
POST /api/v1/data-catalog/rebuild |
β | New: iterates all sources, re-runs per-source trigger |
GET /api/v1/data-catalog/{user_id} |
β | New: returns list[CatalogIndexEntry] |
2.5 Phase 1 files still in production use
These were not rewritten β Phase 2 imports them directly:
src/database_client/database_client_service.pysrc/utils/db_credential_encryption.py(decrypt_credentials_dict) βsrc/security/credentials.pyis still a stubsrc/pipeline/db_pipeline/db_pipeline_service.py(engine_scopecontext manager β used by both the introspector andDbExecutor)src/pipeline/db_pipeline/extractor.py(get_schema,profile_column,get_row_count)src/knowledge/processing_service.py(PDF / DOCX / TXT extraction + embedding)src/db/postgres/{connection,init_db,vector_store}.py,src/storage/az_blob/,src/middlewares/,src/security/auth.py
3. End-to-end flow (current state)
3.1 Ingestion
User action Pipeline Storage
ββββββββββββββ ββββββββββββββββββββββββββββ βββββββββββββββββ
upload PDF/DOCX/TXT β DocumentPipeline β Azure Blob + PGVector
(extract β chunk β embed) (table: langchain_pg_embedding)
+ on_document_uploaded + retrieval cache invalidate
upload CSV/XLSX β TabularIntrospector β Azure Blob (Parquet)
(sheets / columns + sample + stats) + data_catalog jsonb row
β CatalogValidator β CatalogStore (NO vector store β catalog only)
via on_tabular_uploaded
register DB β DatabaseIntrospector β data_catalog jsonb row
(information_schema + sample + FKs)
β validate β store
via on_db_registered
3.2 Query (per user message β SSE stream)
POST /api/v1/chat/stream
β
βββ Redis cache check (24h TTL) β hit returns cached stream
βββ _fast_intent (greetings / goodbyes) β bypass LLM
βββ load history from chat_messages
β
βββ ChatHandler.handle(message, user_id, history) [src/agents/chat_handler.py]
β
ββ OrchestratorAgent.classify() [agents/orchestration.py]
β β needs_search, source_hint, rewritten_query
β
βββ source_hint == "chat"
β β ChatbotAgent.astream() β yield chunk events
β
βββ source_hint == "unstructured"
β β RetrievalRouter.retrieve() [retrieval/router.py, Redis-cached]
β β DocumentRetriever (PGVector MMR/cosine/etc.)
β β ChatbotAgent.astream(chunks=...)
β
βββ source_hint == "structured"
β CatalogReader.read(user_id, "structured") [catalog/reader.py]
β QueryService.run(user_id, question, catalog) [query/service.py]
β
ββ QueryPlannerService.plan(...) [query/planner/service.py]
β LLM(catalog, question, prev_error?) β QueryIR
β
ββ IRValidator.validate(ir, catalog) [query/ir/validator.py]
β fail β loop back to planner with error context (max 3)
β
ββ ExecutorDispatcher.pick(ir) [query/executor/dispatcher.py]
β schema source β DbExecutor
β tabular source β TabularExecutor
β
ββ DbExecutor.run(ir): [query/executor/db.py]
β SqlCompiler β (sql, params)
β β sqlglot SELECT-only guard
β β engine_scope (Phase 1 utility) in asyncio.to_thread
β β RO txn + statement_timeout=30s + 10k cap
β
ββ TabularExecutor.run(ir): [query/executor/tabular.py]
β resolve Parquet blob path
β β download β PandasCompiler.apply(df)
β β asyncio.to_thread β 10k cap
β
ββ QueryResult { rows, columns, row_count,
truncated, source_id, error?, elapsed_ms }
β
ChatbotAgent.astream(query_result=...)
β yield chunk events
β
βββ final events: done / error
β
βββ persist user + assistant messages to chat_messages
βββ populate Redis cache
Safety invariants for the structured path (read-only at every layer):
- IR validated against the catalog before reaching the compiler
- Identifiers come from the catalog (trusted; inlined as quoted identifiers)
- Values from
IR.filtersare always parameterized - Compiler is deterministic β no LLM in the hot path
- sqlglot rejects anything that isn't a pure SELECT
- DB connection is read-only with a 30 s
statement_timeout - Hard 10 000 row cap on both executors; neither raises β errors go in
QueryResult.error
4. Summary table for review
| Concern | Phase 1 β where it lived | Phase 2 β where it lives | Change type |
|---|---|---|---|
| Intent classification | agents/orchestration.py::OrchestratorAgent (free-text intent) |
Same path + same class name β body rewritten to emit IntentRouterDecision |
Body rewrite only |
| Top-level chat orchestration | Inline in api/v1/chat.py |
agents/chat_handler.py::ChatHandler |
Extracted to a reusable module |
| Final answer formation | agents/chatbot.py (Phase 1 LangChain) |
agents/chatbot.py::ChatbotAgent (was AnswerAgent in answer_agent.py mid-cycle) |
Rewritten + renamed |
| Schema retrieval (DB / tabular) | rag/retrievers/schema.py + PGVector chunks |
Removed. Replaced by catalog (catalog/store.py jsonb) loaded verbatim into planner prompt |
Whole concept replaced |
| Doc retrieval (PDF / DOCX / TXT) | rag/retrievers/document.py, rag/router.py |
retrieval/document.py, retrieval/router.py |
Moved; Redis cache restored; tabular files filtered |
| Query writing | query/query_executor.py + models/sql_query.py (LLM writes SQL) |
query/planner/service.py (LLM writes IR) + query/compiler/sql.py (deterministic) |
LLM emits intent, not SQL |
| DB execution | query/executors/db_executor.py |
query/executor/db.py::DbExecutor |
Folder renamed (executors β executor); sqlglot guard + RO txn + 30 s timeout kept |
| Tabular execution | query/executors/tabular.py |
query/executor/tabular.py::TabularExecutor |
Parquet-only; pandas compiler split out |
| Executor selection | Hard-coded in query_executor.py |
query/executor/dispatcher.py::ExecutorDispatcher |
New; routes by source.source_type |
| Catalog (NEW) | β | catalog/ (models, introspect/, validator, store, reader, pii_detector, render) |
New subsystem |
| Catalog persistence | (data was embedded in PGVector) | Postgres jsonb table data_catalog, keyed by user_id |
New table |
| Ingestion triggers | Inline in API endpoints | pipeline/triggers.py (on_db_registered, on_tabular_uploaded, on_document_uploaded, on_catalog_rebuild_requested) |
Centralized event entry points |
| Structured pipeline | pipeline/db_pipeline/db_pipeline_service.py (still present for engine_scope + extractor reuse) |
pipeline/structured_pipeline.py (orchestrator) β reuses Phase 1 extractor |
New orchestrator wraps Phase 1 introspection helpers |
| Document pipeline | pipeline/document_pipeline/document_pipeline.py (folder) |
pipeline/document_pipeline.py (file) |
Flattened; CSV / XLSX now skip the vector store |
| Parquet helper | knowledge/parquet_service.py |
storage/parquet.py |
Moved into storage/ |
| Prompts | config/agents/system_prompt.md, guardrails_prompt.md |
config/prompts/{intent_router,query_planner,chatbot_system,guardrails}.md |
Folder renamed; split into four files; guardrails appended to chatbot_system at load |
| PII detection | β | catalog/pii_detector.py + security/pii_patterns.py |
New. Columns flagged pii_flag=true get sample_values: null so PII never enters prompts |
| Chat endpoint | api/v1/chat.py (does everything inline) |
api/v1/chat.py (cache + history + persistence) β delegates to ChatHandler |
Slimmed; SSE event shape is intent / chunk / done / error |
| DB ingest endpoint | api/v1/db_client.py::ingest (Phase 1 db_pipeline_service.run()) |
api/v1/db_client.py::ingest (calls on_db_registered only) |
Phase 1 dual-write removed |
| Document process endpoint | api/v1/document.py::process (always vectorize) |
api/v1/document.py::process (PDF/DOCX/TXT β vectors; CSV/XLSX β catalog via on_tabular_uploaded) |
Routing by file type |
| Catalog management API | β | api/v1/data_catalog.py (GET index + POST rebuild) |
New |
Bottom line. Every Phase 1 file under src/rag/, src/tools/, src/query/executors/, src/query/query_executor.py, src/query/base.py, src/api/v1/knowledge.py, and src/config/agents/ is gone. Phase 1 introspection helpers under src/pipeline/db_pipeline/ and src/database_client/ are still imported by Phase 2 β they were not rewritten, just wrapped. The three LLM call sites are now explicit and the SQL-writing one no longer exists; the planner emits a Pydantic-validated QueryIR instead.
The one filename gotcha to remember: the intent router still lives at src/agents/orchestration.py as class OrchestratorAgent (Phase 1 name kept for import-site compatibility, Phase 2 body). The matching prompt and tests use the intent_router name, but the source module does not.