"""ChatHandler — top-level Phase 2 chat orchestrator. End-to-end flow per user message: 1. `IntentRouter.classify` → `chat` / `unstructured` / `structured`. 2. Route: - `chat` → no context. Pass straight to ChatbotAgent. - `structured` → CatalogReader → QueryService → QueryResult. - `unstructured` → DocumentRetriever (placeholder, raises until TAB ships) → list[DocumentChunk]. 3. `ChatbotAgent.astream` → yield text tokens. 4. Wrap each step into an SSE-style event dict so the API endpoint can stream them as Server-Sent Events. Phase 1's chat endpoint (`src/api/v1/chat.py`) is intentionally NOT touched in this PR. PR7 cleanup will rewire it to call `ChatHandler.handle(...)`. All dependencies are injectable for tests. Default constructors lazy-build production deps (no `Settings()` triggered at import time as long as you inject mocks). """ from __future__ import annotations import json from collections.abc import AsyncIterator from typing import TYPE_CHECKING, Any from langchain_core.messages import BaseMessage from src.middlewares.logging import get_logger from src.retrieval.base import RetrievalResult from .chatbot import ChatbotAgent, DocumentChunk from .orchestration import OrchestratorAgent if TYPE_CHECKING: from ..catalog.reader import CatalogReader from ..query.service import QueryService from ..retrieval.router import RetrievalRouter logger = get_logger("chat_handler") class ChatHandler: """Top-level chat orchestrator. Returns an `AsyncIterator[dict]` of SSE-style events with shape `{"event": , "data": }`. Event types: - `intent` — emitted once after classification (JSON-encoded decision) - `sources` — JSON array of source refs (one per structured table, or per (document_id, page_label) for unstructured) - `chunk` — text fragment of the streaming answer (one per token) - `done` — end of stream (data is empty string) - `error` — failure; data is a user-facing message """ def __init__( self, intent_router: OrchestratorAgent | None = None, answer_agent: ChatbotAgent | None = None, catalog_reader: CatalogReader | None = None, query_service: QueryService | None = None, document_retriever: RetrievalRouter | None = None, ) -> None: self._intent_router = intent_router self._answer_agent = answer_agent self._catalog_reader = catalog_reader self._query_service = query_service self._document_retriever = document_retriever # ------------------------------------------------------------------ # Lazy default-dep builders # ------------------------------------------------------------------ def _get_intent_router(self) -> OrchestratorAgent: if self._intent_router is None: self._intent_router = OrchestratorAgent() return self._intent_router def _get_answer_agent(self) -> ChatbotAgent: if self._answer_agent is None: self._answer_agent = ChatbotAgent() return self._answer_agent def _get_catalog_reader(self) -> CatalogReader: if self._catalog_reader is None: from ..catalog.reader import CatalogReader from ..catalog.store import CatalogStore self._catalog_reader = CatalogReader(CatalogStore()) return self._catalog_reader def _get_query_service(self) -> QueryService: if self._query_service is None: from ..query.service import QueryService self._query_service = QueryService() return self._query_service def _get_document_retriever(self) -> RetrievalRouter: if self._document_retriever is None: from ..retrieval.router import RetrievalRouter self._document_retriever = RetrievalRouter() return self._document_retriever # ------------------------------------------------------------------ # Public entry # ------------------------------------------------------------------ async def handle( self, message: str, user_id: str, history: list[BaseMessage] | None = None, ) -> AsyncIterator[dict[str, Any]]: # ---- 1. Classify intent -------------------------------------- try: decision = await self._get_intent_router().classify(message, history) except Exception as e: logger.error("intent classification failed", error=str(e)) yield {"event": "error", "data": f"Could not classify message: {e}"} return yield {"event": "intent", "data": decision.model_dump_json()} rewritten = decision.rewritten_query or message query_result = None chunks: list[DocumentChunk] | None = None raw_chunks: Any = None # ---- 2. Route ------------------------------------------------ if decision.source_hint == "structured": try: catalog = await self._get_catalog_reader().read(user_id, "structured") query_result = await self._get_query_service().run( user_id, rewritten, catalog ) except Exception as e: logger.error( "structured route failed", user_id=user_id, error=str(e), ) yield {"event": "error", "data": f"Structured query failed: {e}"} return elif decision.source_hint == "unstructured": try: raw_chunks = await self._get_document_retriever().retrieve( rewritten, user_id ) chunks = _normalize_chunks(raw_chunks) except NotImplementedError: logger.warning("DocumentRetriever placeholder hit", user_id=user_id) yield { "event": "error", "data": "Document retrieval is not yet available — pending implementation.", } return except Exception as e: logger.error( "unstructured route failed", user_id=user_id, error=str(e) ) yield {"event": "error", "data": f"Document retrieval failed: {e}"} return # else: chat path — no context # ---- 2b. Emit sources --------------------------------------- sources = _build_sources( decision.source_hint, user_id, query_result, raw_chunks ) yield {"event": "sources", "data": json.dumps(sources)} # ---- 3. Stream answer ---------------------------------------- try: async for token in self._get_answer_agent().astream( message, history=history, query_result=query_result, chunks=chunks, ): yield {"event": "chunk", "data": token} except Exception as e: logger.error("answer streaming failed", user_id=user_id, error=str(e)) yield {"event": "error", "data": f"Answer generation failed: {e}"} return yield {"event": "done", "data": ""} def _build_sources( source_hint: str, user_id: str, query_result: Any, raw_chunks: Any, ) -> list[dict[str, Any]]: """Build the sources payload for the SSE `sources` event. - structured: one entry per executed table (table_name only). - unstructured: deduped by (document_id, page_label), Phase 1 shape. - chat or error: empty list. """ if source_hint == "structured": if query_result is None or getattr(query_result, "error", None): return [] table_name = getattr(query_result, "table_name", "") or "" if not table_name: return [] return [{ "document_id": f"{user_id}_{table_name}", "filename": table_name, "page_label": None, }] if source_hint == "unstructured" and raw_chunks: seen: set[tuple[Any, Any]] = set() sources: list[dict[str, Any]] = [] for item in raw_chunks: if isinstance(item, RetrievalResult): data = item.metadata.get("data", {}) elif isinstance(item, dict): data = item else: continue key = (data.get("document_id"), data.get("page_label")) if key in seen or key == (None, None): continue seen.add(key) sources.append({ "document_id": data.get("document_id"), "filename": data.get("filename", "Unknown"), "page_label": data.get("page_label", "Unknown"), }) return sources return [] def _normalize_chunks(raw: Any) -> list[DocumentChunk]: """Convert whatever the retriever returns into list[DocumentChunk]. The Phase 2 `DocumentRetriever.retrieve` interface is a stub today; when TAB owner ships it, it should return `list[DocumentChunk]` directly so this normalizer becomes a no-op. Until then we coerce common shapes (dict-with-content, plain string) defensively. """ if not raw: return [] if isinstance(raw, list) and all(isinstance(c, DocumentChunk) for c in raw): return raw chunks: list[DocumentChunk] = [] for item in raw: if isinstance(item, DocumentChunk): chunks.append(item) elif isinstance(item, dict): chunks.append( DocumentChunk( content=str(item.get("content", "")), filename=item.get("filename"), page_label=item.get("page_label"), ) ) elif isinstance(item, RetrievalResult): data = item.metadata.get("data", {}) page = data.get("page_label") chunks.append(DocumentChunk( content=item.content, filename=data.get("filename"), page_label=str(page) if page is not None else None, )) elif isinstance(item, str): chunks.append(DocumentChunk(content=item)) return chunks