ishaq101's picture
feat/Catalog Retrieval System (#1)
6bff5d9
"""ChatHandler — top-level Phase 2 chat orchestrator.
End-to-end flow per user message:
1. `IntentRouter.classify` → `chat` / `unstructured` / `structured`.
2. Route:
- `chat` → no context. Pass straight to ChatbotAgent.
- `structured` → CatalogReader → QueryService → QueryResult.
- `unstructured` → DocumentRetriever (placeholder, raises until TAB
ships) → list[DocumentChunk].
3. `ChatbotAgent.astream` → yield text tokens.
4. Wrap each step into an SSE-style event dict so the API endpoint can
stream them as Server-Sent Events.
Phase 1's chat endpoint (`src/api/v1/chat.py`) is intentionally NOT touched
in this PR. PR7 cleanup will rewire it to call `ChatHandler.handle(...)`.
All dependencies are injectable for tests. Default constructors lazy-build
production deps (no `Settings()` triggered at import time as long as you
inject mocks).
"""
from __future__ import annotations
import json
from collections.abc import AsyncIterator
from typing import TYPE_CHECKING, Any
from langchain_core.messages import BaseMessage
from src.middlewares.logging import get_logger
from src.retrieval.base import RetrievalResult
from .chatbot import ChatbotAgent, DocumentChunk
from .orchestration import OrchestratorAgent
if TYPE_CHECKING:
from ..catalog.reader import CatalogReader
from ..query.service import QueryService
from ..retrieval.router import RetrievalRouter
logger = get_logger("chat_handler")
class ChatHandler:
"""Top-level chat orchestrator.
Returns an `AsyncIterator[dict]` of SSE-style events with shape
`{"event": <name>, "data": <str>}`. Event types:
- `intent` — emitted once after classification (JSON-encoded decision)
- `sources` — JSON array of source refs (one per structured table, or
per (document_id, page_label) for unstructured)
- `chunk` — text fragment of the streaming answer (one per token)
- `done` — end of stream (data is empty string)
- `error` — failure; data is a user-facing message
"""
def __init__(
self,
intent_router: OrchestratorAgent | None = None,
answer_agent: ChatbotAgent | None = None,
catalog_reader: CatalogReader | None = None,
query_service: QueryService | None = None,
document_retriever: RetrievalRouter | None = None,
) -> None:
self._intent_router = intent_router
self._answer_agent = answer_agent
self._catalog_reader = catalog_reader
self._query_service = query_service
self._document_retriever = document_retriever
# ------------------------------------------------------------------
# Lazy default-dep builders
# ------------------------------------------------------------------
def _get_intent_router(self) -> OrchestratorAgent:
if self._intent_router is None:
self._intent_router = OrchestratorAgent()
return self._intent_router
def _get_answer_agent(self) -> ChatbotAgent:
if self._answer_agent is None:
self._answer_agent = ChatbotAgent()
return self._answer_agent
def _get_catalog_reader(self) -> CatalogReader:
if self._catalog_reader is None:
from ..catalog.reader import CatalogReader
from ..catalog.store import CatalogStore
self._catalog_reader = CatalogReader(CatalogStore())
return self._catalog_reader
def _get_query_service(self) -> QueryService:
if self._query_service is None:
from ..query.service import QueryService
self._query_service = QueryService()
return self._query_service
def _get_document_retriever(self) -> RetrievalRouter:
if self._document_retriever is None:
from ..retrieval.router import RetrievalRouter
self._document_retriever = RetrievalRouter()
return self._document_retriever
# ------------------------------------------------------------------
# Public entry
# ------------------------------------------------------------------
async def handle(
self,
message: str,
user_id: str,
history: list[BaseMessage] | None = None,
) -> AsyncIterator[dict[str, Any]]:
# ---- 1. Classify intent --------------------------------------
try:
decision = await self._get_intent_router().classify(message, history)
except Exception as e:
logger.error("intent classification failed", error=str(e))
yield {"event": "error", "data": f"Could not classify message: {e}"}
return
yield {"event": "intent", "data": decision.model_dump_json()}
rewritten = decision.rewritten_query or message
query_result = None
chunks: list[DocumentChunk] | None = None
raw_chunks: Any = None
# ---- 2. Route ------------------------------------------------
if decision.source_hint == "structured":
try:
catalog = await self._get_catalog_reader().read(user_id, "structured")
query_result = await self._get_query_service().run(
user_id, rewritten, catalog
)
except Exception as e:
logger.error(
"structured route failed",
user_id=user_id,
error=str(e),
)
yield {"event": "error", "data": f"Structured query failed: {e}"}
return
elif decision.source_hint == "unstructured":
try:
raw_chunks = await self._get_document_retriever().retrieve(
rewritten, user_id
)
chunks = _normalize_chunks(raw_chunks)
except NotImplementedError:
logger.warning("DocumentRetriever placeholder hit", user_id=user_id)
yield {
"event": "error",
"data": "Document retrieval is not yet available — pending implementation.",
}
return
except Exception as e:
logger.error(
"unstructured route failed", user_id=user_id, error=str(e)
)
yield {"event": "error", "data": f"Document retrieval failed: {e}"}
return
# else: chat path — no context
# ---- 2b. Emit sources ---------------------------------------
sources = _build_sources(
decision.source_hint, user_id, query_result, raw_chunks
)
yield {"event": "sources", "data": json.dumps(sources)}
# ---- 3. Stream answer ----------------------------------------
try:
async for token in self._get_answer_agent().astream(
message,
history=history,
query_result=query_result,
chunks=chunks,
):
yield {"event": "chunk", "data": token}
except Exception as e:
logger.error("answer streaming failed", user_id=user_id, error=str(e))
yield {"event": "error", "data": f"Answer generation failed: {e}"}
return
yield {"event": "done", "data": ""}
def _build_sources(
source_hint: str,
user_id: str,
query_result: Any,
raw_chunks: Any,
) -> list[dict[str, Any]]:
"""Build the sources payload for the SSE `sources` event.
- structured: one entry per executed table (table_name only).
- unstructured: deduped by (document_id, page_label), Phase 1 shape.
- chat or error: empty list.
"""
if source_hint == "structured":
if query_result is None or getattr(query_result, "error", None):
return []
table_name = getattr(query_result, "table_name", "") or ""
if not table_name:
return []
return [{
"document_id": f"{user_id}_{table_name}",
"filename": table_name,
"page_label": None,
}]
if source_hint == "unstructured" and raw_chunks:
seen: set[tuple[Any, Any]] = set()
sources: list[dict[str, Any]] = []
for item in raw_chunks:
if isinstance(item, RetrievalResult):
data = item.metadata.get("data", {})
elif isinstance(item, dict):
data = item
else:
continue
key = (data.get("document_id"), data.get("page_label"))
if key in seen or key == (None, None):
continue
seen.add(key)
sources.append({
"document_id": data.get("document_id"),
"filename": data.get("filename", "Unknown"),
"page_label": data.get("page_label", "Unknown"),
})
return sources
return []
def _normalize_chunks(raw: Any) -> list[DocumentChunk]:
"""Convert whatever the retriever returns into list[DocumentChunk].
The Phase 2 `DocumentRetriever.retrieve` interface is a stub today;
when TAB owner ships it, it should return `list[DocumentChunk]`
directly so this normalizer becomes a no-op. Until then we coerce
common shapes (dict-with-content, plain string) defensively.
"""
if not raw:
return []
if isinstance(raw, list) and all(isinstance(c, DocumentChunk) for c in raw):
return raw
chunks: list[DocumentChunk] = []
for item in raw:
if isinstance(item, DocumentChunk):
chunks.append(item)
elif isinstance(item, dict):
chunks.append(
DocumentChunk(
content=str(item.get("content", "")),
filename=item.get("filename"),
page_label=item.get("page_label"),
)
)
elif isinstance(item, RetrievalResult):
data = item.metadata.get("data", {})
page = data.get("page_label")
chunks.append(DocumentChunk(
content=item.content,
filename=data.get("filename"),
page_label=str(page) if page is not None else None,
))
elif isinstance(item, str):
chunks.append(DocumentChunk(content=item))
return chunks