Agentic-Service-Data-Eyond-Catalog / PHASE1_TO_PHASE2_REPORT.md
ishaq101's picture
feat/Catalog Retrieval System (#1)
6bff5d9

Phase 1 β†’ Phase 2 Migration Report

A walkthrough of what changed between the original retrieval-style backend (Phase 1) and the current catalog-driven backend (Phase 2). Intended as a hand-off for the lead.


1. The conceptual change

Phase 1 was a single retrieval-style RAG pipeline. Every question β€” whether it pointed at a database, a spreadsheet, or a PDF β€” went through the same primitive: chunk + embed + top-K over PGVector. Schema and tabular columns were embedded as chunks and ranked alongside prose. When the question needed SQL, the LLM wrote the SQL string directly (via query_executor).

Phase 2 splits the system into two paths governed by an LLM router:

Path Primitive Why
Unstructured (PDF / DOCX / TXT) Dense similarity over prose chunks (PGVector) Right primitive for free text
Structured (DB / CSV / XLSX / Parquet) Per-user data catalog β†’ LLM emits a JSON IR of intent β†’ deterministic compiler β†’ executor (SQL or pandas) A column lookup shouldn't go through a similarity ranking lottery; the LLM emits intent, never SQL syntax

Three explicit LLM call sites only:

  1. Intent router (classifies the user message into chat / unstructured / structured)
  2. Query planner (turns the question + catalog into a Pydantic-validated QueryIR)
  3. Chatbot agent (formats the final answer, streamed over SSE)

Everything else β€” IR validation, SQL/pandas compilation, execution β€” is deterministic Python.


2. File-by-file changes

2.1 Deleted (Phase 1 only)

Phase 1 path Reason it was removed
src/rag/base.py, src/rag/retriever.py, src/rag/router.py Replaced by src/retrieval/
src/rag/retrievers/baseline.py, schema.py, document.py Schema retrieval gone (catalog replaces it); document retriever rewritten in src/retrieval/document.py
src/tools/search.py (whole tools/ folder) Only consumer was rag/router.py
src/query/base.py Duplicate of query/executor/base.py
src/query/query_executor.py Replaced by src/query/service.py
src/query/executors/db_executor.py Replaced by src/query/executor/db.py
src/query/executors/tabular.py Replaced by src/query/executor/tabular.py
src/agents/chatbot.py (Phase 1 LangChain chatbot) Phase 2 ChatbotAgent lives at the same path now β€” see Β§2.2
src/api/v1/knowledge.py Fake /knowledge/rebuild endpoint, never wired
src/config/agents/system_prompt.md, guardrails_prompt.md Replaced by src/config/prompts/{chatbot_system,guardrails}.md
src/models/structured_output.py (IntentClassification) Replaced by IntentRouterDecision Pydantic model inside agents/orchestration.py
src/models/sql_query.py LLM no longer emits SQL; IR replaces it
src/pipeline/orchestrator.py (empty stub) Redundant β€” StructuredPipeline takes the introspector at run() time

2.2 Renamed / moved (same role, new home)

Phase 1 location Phase 2 location Notes
src/agents/chatbot.py (Phase 1) β†’ deleted, then src/agents/answer_agent.py (AnswerAgent) β†’ renamed src/agents/chatbot.py::ChatbotAgent Final answer formation; streams via astream
src/knowledge/parquet_service.py src/storage/parquet.py Parquet upload/download helper
src/pipeline/document_pipeline/document_pipeline.py (folder) src/pipeline/document_pipeline.py (flat) Single module
src/rag/retrievers/document.py src/retrieval/document.py DocumentRetriever migrated; tabular file types filtered out of results
src/rag/router.py src/retrieval/router.py RetrievalRouter, Redis-cached, unstructured-only; dead db: AsyncSession + source_hint params removed
src/rag/base.py (RetrievalResult, BaseRetriever) src/retrieval/base.py Same dataclass + ABC

Heads-up on the intent router: the Phase 1 file src/agents/orchestration.py and its class OrchestratorAgent were kept in place for Phase 2 β€” but the body was fully rewritten. The class now emits IntentRouterDecision(needs_search, source_hint ∈ {chat, unstructured, structured}, rewritten_query). The prompt file and test file use the intent_router name (config/prompts/intent_router.md, tests/agents/test_intent_router.py), but the source module is still orchestration.py and the class is still OrchestratorAgent. Existing imports continue to work; only the behavior changed.

2.3 Added (Phase 2 new)

Catalog subsystem (whole new concept)

Path Role
src/catalog/models.py Pydantic: Catalog β†’ Source[] β†’ Table[] β†’ Column[], ForeignKey, ColumnStats.top_values
src/catalog/introspect/base.py BaseIntrospector ABC
src/catalog/introspect/database.py DB introspector β€” wraps Phase 1 db_pipeline/extractor.py (get_schema, profile_column, get_row_count)
src/catalog/introspect/tabular.py CSV / XLSX / Parquet introspector β€” one Table per XLSX sheet
src/catalog/render.py Renders a Source for the planner prompt
src/catalog/validator.py Unique-ID + foreign-key-ref invariants
src/catalog/store.py Postgres jsonb upsert keyed by user_id (table data_catalog)
src/catalog/reader.py Loads + filters catalog by source_hint
src/catalog/pii_detector.py Flags PII columns at ingestion β†’ suppresses sample_values
src/security/pii_patterns.py Name patterns + value regex used by the detector

JSON IR + query subsystem

Path Role
src/query/ir/models.py QueryIR Pydantic schema
src/query/ir/operators.py ALLOWED_FILTER_OPS, ALLOWED_AGG_FNS, LIMIT_HARD_CAP, TYPE_COMPATIBILITY
src/query/ir/validator.py Catalog-aware IR validation (rejects unknown column ids, bad ops, type mismatches, oversize limits)
src/query/planner/service.py QueryPlannerService.plan(question, catalog, previous_error) β€” Azure OpenAI structured output β†’ QueryIR
src/query/planner/prompt.py Builds the planner prompt from catalog text
src/query/compiler/base.py Compiler ABC
src/query/compiler/sql.py SqlCompiler (Postgres) β€” all 12 filter ops, params as a dict
src/query/compiler/pandas.py PandasCompiler β€” returns CompiledPandas(apply, output_columns)
src/query/executor/base.py BaseExecutor + QueryResult
src/query/executor/db.py DbExecutor β€” sqlglot SELECT-only guard, RO txn, 30 s statement_timeout, 10 k row cap
src/query/executor/tabular.py TabularExecutor β€” Parquet via blob, asyncio.to_thread, 10 k cap
src/query/executor/dispatcher.py ExecutorDispatcher.pick(ir) β€” picks by source.source_type
src/query/service.py QueryService.run(user_id, question, catalog) β€” plan β†’ validate β†’ retry (max 3) β†’ dispatch β†’ execute

Agents

Path Role
src/agents/orchestration.py OrchestratorAgent β€” Phase 1 file/class name preserved; Phase 2 body. Emits IntentRouterDecision
src/agents/chatbot.py ChatbotAgent β€” formerly AnswerAgent in agents/answer_agent.py; renamed in Cleanup PR
src/agents/chat_handler.py ChatHandler.handle(...) β€” top-level orchestrator; yields intent / chunk / done / error SSE events

Pipelines & API

Path Role
src/pipeline/structured_pipeline.py DB / tabular ingestion: introspect β†’ merge β†’ validate β†’ upsert
src/pipeline/triggers.py on_db_registered, on_tabular_uploaded, on_document_uploaded, on_catalog_rebuild_requested
src/api/v1/data_catalog.py GET /api/v1/data-catalog/{user_id} + POST /api/v1/data-catalog/rebuild
src/models/api/catalog.py Catalog request/response models
src/config/prompts/intent_router.md, query_planner.md, chatbot_system.md, guardrails.md New prompts. guardrails.md is appended to chatbot_system.md at load time
src/db/postgres/models.py (added Catalog SQLAlchemy class) Stores the per-user jsonb document in data_catalog

2.4 Rewired API endpoints

Endpoint Phase 1 wiring Phase 2 wiring
POST /api/v1/chat/stream Inline in chat.py: OrchestratorAgent β†’ retriever β†’ query_executor β†’ chatbot Delegates to ChatHandler.handle(). Redis cache, fast intent, history load, and message persistence stay in the endpoint
POST /api/v1/database-clients/{id}/ingest Called db_pipeline_service.run() and dual-wrote vectors Calls only on_db_registered (catalog build). Failure β†’ HTTP 500
POST /api/v1/document/process Always pushed to vector store PDF/DOCX/TXT β†’ knowledge_processor (vectors); CSV/XLSX β†’ on_tabular_uploaded (catalog only, no vector embedding)
POST /api/v1/document/upload Storage + DB row Same, plus on_document_uploaded trigger
POST /api/v1/data-catalog/rebuild β€” New: iterates all sources, re-runs per-source trigger
GET /api/v1/data-catalog/{user_id} β€” New: returns list[CatalogIndexEntry]

2.5 Phase 1 files still in production use

These were not rewritten β€” Phase 2 imports them directly:

  • src/database_client/database_client_service.py
  • src/utils/db_credential_encryption.py (decrypt_credentials_dict) β€” src/security/credentials.py is still a stub
  • src/pipeline/db_pipeline/db_pipeline_service.py (engine_scope context manager β€” used by both the introspector and DbExecutor)
  • src/pipeline/db_pipeline/extractor.py (get_schema, profile_column, get_row_count)
  • src/knowledge/processing_service.py (PDF / DOCX / TXT extraction + embedding)
  • src/db/postgres/{connection,init_db,vector_store}.py, src/storage/az_blob/, src/middlewares/, src/security/auth.py

3. End-to-end flow (current state)

3.1 Ingestion

User action                Pipeline                                Storage
──────────────             ────────────────────────────             ─────────────────
upload PDF/DOCX/TXT  β†’   DocumentPipeline                       β†’  Azure Blob + PGVector
                          (extract β†’ chunk β†’ embed)                 (table: langchain_pg_embedding)
                          + on_document_uploaded                    + retrieval cache invalidate

upload CSV/XLSX     β†’    TabularIntrospector                   β†’  Azure Blob (Parquet)
                          (sheets / columns + sample + stats)       + data_catalog jsonb row
                          β†’ CatalogValidator β†’ CatalogStore         (NO vector store β€” catalog only)
                          via on_tabular_uploaded

register DB         β†’    DatabaseIntrospector                  β†’  data_catalog jsonb row
                          (information_schema + sample + FKs)
                          β†’ validate β†’ store
                          via on_db_registered

3.2 Query (per user message β†’ SSE stream)

POST /api/v1/chat/stream
        β”‚
        β”œβ”€β”€ Redis cache check (24h TTL) β€” hit returns cached stream
        β”œβ”€β”€ _fast_intent (greetings / goodbyes) β€” bypass LLM
        β”œβ”€β”€ load history from chat_messages
        β”‚
        └── ChatHandler.handle(message, user_id, history)         [src/agents/chat_handler.py]
                β”‚
                β”œβ”€ OrchestratorAgent.classify()                    [agents/orchestration.py]
                β”‚     β†’ needs_search, source_hint, rewritten_query
                β”‚
                β”œβ”€β”€ source_hint == "chat"
                β”‚     β†’ ChatbotAgent.astream() β†’ yield chunk events
                β”‚
                β”œβ”€β”€ source_hint == "unstructured"
                β”‚     β†’ RetrievalRouter.retrieve()                 [retrieval/router.py, Redis-cached]
                β”‚         β†’ DocumentRetriever (PGVector MMR/cosine/etc.)
                β”‚     β†’ ChatbotAgent.astream(chunks=...)
                β”‚
                └── source_hint == "structured"
                      β†’ CatalogReader.read(user_id, "structured")  [catalog/reader.py]
                      β†’ QueryService.run(user_id, question, catalog)   [query/service.py]
                           β”‚
                           β”œβ”€ QueryPlannerService.plan(...)        [query/planner/service.py]
                           β”‚    LLM(catalog, question, prev_error?) β†’ QueryIR
                           β”‚
                           β”œβ”€ IRValidator.validate(ir, catalog)    [query/ir/validator.py]
                           β”‚    fail β†’ loop back to planner with error context (max 3)
                           β”‚
                           β”œβ”€ ExecutorDispatcher.pick(ir)          [query/executor/dispatcher.py]
                           β”‚    schema source  β†’ DbExecutor
                           β”‚    tabular source β†’ TabularExecutor
                           β”‚
                           β”œβ”€ DbExecutor.run(ir):                  [query/executor/db.py]
                           β”‚    SqlCompiler β†’ (sql, params)
                           β”‚      β†’ sqlglot SELECT-only guard
                           β”‚      β†’ engine_scope (Phase 1 utility) in asyncio.to_thread
                           β”‚      β†’ RO txn + statement_timeout=30s + 10k cap
                           β”‚
                           β”œβ”€ TabularExecutor.run(ir):             [query/executor/tabular.py]
                           β”‚    resolve Parquet blob path
                           β”‚      β†’ download β†’ PandasCompiler.apply(df)
                           β”‚      β†’ asyncio.to_thread β†’ 10k cap
                           β”‚
                           └─ QueryResult { rows, columns, row_count,
                                            truncated, source_id, error?, elapsed_ms }
                      β†’
                      ChatbotAgent.astream(query_result=...)
                            β†’ yield chunk events
                β”‚
                └── final events: done / error
        β”‚
        └── persist user + assistant messages to chat_messages
        └── populate Redis cache

Safety invariants for the structured path (read-only at every layer):

  1. IR validated against the catalog before reaching the compiler
  2. Identifiers come from the catalog (trusted; inlined as quoted identifiers)
  3. Values from IR.filters are always parameterized
  4. Compiler is deterministic β€” no LLM in the hot path
  5. sqlglot rejects anything that isn't a pure SELECT
  6. DB connection is read-only with a 30 s statement_timeout
  7. Hard 10 000 row cap on both executors; neither raises β€” errors go in QueryResult.error

4. Summary table for review

Concern Phase 1 β€” where it lived Phase 2 β€” where it lives Change type
Intent classification agents/orchestration.py::OrchestratorAgent (free-text intent) Same path + same class name β€” body rewritten to emit IntentRouterDecision Body rewrite only
Top-level chat orchestration Inline in api/v1/chat.py agents/chat_handler.py::ChatHandler Extracted to a reusable module
Final answer formation agents/chatbot.py (Phase 1 LangChain) agents/chatbot.py::ChatbotAgent (was AnswerAgent in answer_agent.py mid-cycle) Rewritten + renamed
Schema retrieval (DB / tabular) rag/retrievers/schema.py + PGVector chunks Removed. Replaced by catalog (catalog/store.py jsonb) loaded verbatim into planner prompt Whole concept replaced
Doc retrieval (PDF / DOCX / TXT) rag/retrievers/document.py, rag/router.py retrieval/document.py, retrieval/router.py Moved; Redis cache restored; tabular files filtered
Query writing query/query_executor.py + models/sql_query.py (LLM writes SQL) query/planner/service.py (LLM writes IR) + query/compiler/sql.py (deterministic) LLM emits intent, not SQL
DB execution query/executors/db_executor.py query/executor/db.py::DbExecutor Folder renamed (executors β†’ executor); sqlglot guard + RO txn + 30 s timeout kept
Tabular execution query/executors/tabular.py query/executor/tabular.py::TabularExecutor Parquet-only; pandas compiler split out
Executor selection Hard-coded in query_executor.py query/executor/dispatcher.py::ExecutorDispatcher New; routes by source.source_type
Catalog (NEW) β€” catalog/ (models, introspect/, validator, store, reader, pii_detector, render) New subsystem
Catalog persistence (data was embedded in PGVector) Postgres jsonb table data_catalog, keyed by user_id New table
Ingestion triggers Inline in API endpoints pipeline/triggers.py (on_db_registered, on_tabular_uploaded, on_document_uploaded, on_catalog_rebuild_requested) Centralized event entry points
Structured pipeline pipeline/db_pipeline/db_pipeline_service.py (still present for engine_scope + extractor reuse) pipeline/structured_pipeline.py (orchestrator) β€” reuses Phase 1 extractor New orchestrator wraps Phase 1 introspection helpers
Document pipeline pipeline/document_pipeline/document_pipeline.py (folder) pipeline/document_pipeline.py (file) Flattened; CSV / XLSX now skip the vector store
Parquet helper knowledge/parquet_service.py storage/parquet.py Moved into storage/
Prompts config/agents/system_prompt.md, guardrails_prompt.md config/prompts/{intent_router,query_planner,chatbot_system,guardrails}.md Folder renamed; split into four files; guardrails appended to chatbot_system at load
PII detection β€” catalog/pii_detector.py + security/pii_patterns.py New. Columns flagged pii_flag=true get sample_values: null so PII never enters prompts
Chat endpoint api/v1/chat.py (does everything inline) api/v1/chat.py (cache + history + persistence) β†’ delegates to ChatHandler Slimmed; SSE event shape is intent / chunk / done / error
DB ingest endpoint api/v1/db_client.py::ingest (Phase 1 db_pipeline_service.run()) api/v1/db_client.py::ingest (calls on_db_registered only) Phase 1 dual-write removed
Document process endpoint api/v1/document.py::process (always vectorize) api/v1/document.py::process (PDF/DOCX/TXT β†’ vectors; CSV/XLSX β†’ catalog via on_tabular_uploaded) Routing by file type
Catalog management API β€” api/v1/data_catalog.py (GET index + POST rebuild) New

Bottom line. Every Phase 1 file under src/rag/, src/tools/, src/query/executors/, src/query/query_executor.py, src/query/base.py, src/api/v1/knowledge.py, and src/config/agents/ is gone. Phase 1 introspection helpers under src/pipeline/db_pipeline/ and src/database_client/ are still imported by Phase 2 β€” they were not rewritten, just wrapped. The three LLM call sites are now explicit and the SQL-writing one no longer exists; the planner emits a Pydantic-validated QueryIR instead.

The one filename gotcha to remember: the intent router still lives at src/agents/orchestration.py as class OrchestratorAgent (Phase 1 name kept for import-site compatibility, Phase 2 body). The matching prompt and tests use the intent_router name, but the source module does not.