| """ChatHandler — top-level Phase 2 chat orchestrator. |
| |
| End-to-end flow per user message: |
| |
| 1. `IntentRouter.classify` → `chat` / `unstructured` / `structured`. |
| 2. Route: |
| - `chat` → no context. Pass straight to ChatbotAgent. |
| - `structured` → CatalogReader → QueryService → QueryResult. |
| - `unstructured` → DocumentRetriever (placeholder, raises until TAB |
| ships) → list[DocumentChunk]. |
| 3. `ChatbotAgent.astream` → yield text tokens. |
| 4. Wrap each step into an SSE-style event dict so the API endpoint can |
| stream them as Server-Sent Events. |
| |
| Phase 1's chat endpoint (`src/api/v1/chat.py`) is intentionally NOT touched |
| in this PR. PR7 cleanup will rewire it to call `ChatHandler.handle(...)`. |
| |
| All dependencies are injectable for tests. Default constructors lazy-build |
| production deps (no `Settings()` triggered at import time as long as you |
| inject mocks). |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| from collections.abc import AsyncIterator |
| from typing import TYPE_CHECKING, Any |
|
|
| from langchain_core.messages import BaseMessage |
|
|
| from src.middlewares.logging import get_logger |
| from src.retrieval.base import RetrievalResult |
|
|
| from .chatbot import ChatbotAgent, DocumentChunk |
| from .orchestration import OrchestratorAgent |
|
|
| if TYPE_CHECKING: |
| from ..catalog.reader import CatalogReader |
| from ..query.service import QueryService |
| from ..retrieval.router import RetrievalRouter |
|
|
| logger = get_logger("chat_handler") |
|
|
|
|
| class ChatHandler: |
| """Top-level chat orchestrator. |
| |
| Returns an `AsyncIterator[dict]` of SSE-style events with shape |
| `{"event": <name>, "data": <str>}`. Event types: |
| - `intent` — emitted once after classification (JSON-encoded decision) |
| - `sources` — JSON array of source refs (one per structured table, or |
| per (document_id, page_label) for unstructured) |
| - `chunk` — text fragment of the streaming answer (one per token) |
| - `done` — end of stream (data is empty string) |
| - `error` — failure; data is a user-facing message |
| """ |
|
|
| def __init__( |
| self, |
| intent_router: OrchestratorAgent | None = None, |
| answer_agent: ChatbotAgent | None = None, |
| catalog_reader: CatalogReader | None = None, |
| query_service: QueryService | None = None, |
| document_retriever: RetrievalRouter | None = None, |
| ) -> None: |
| self._intent_router = intent_router |
| self._answer_agent = answer_agent |
| self._catalog_reader = catalog_reader |
| self._query_service = query_service |
| self._document_retriever = document_retriever |
|
|
| |
| |
| |
|
|
| def _get_intent_router(self) -> OrchestratorAgent: |
| if self._intent_router is None: |
| self._intent_router = OrchestratorAgent() |
| return self._intent_router |
|
|
| def _get_answer_agent(self) -> ChatbotAgent: |
| if self._answer_agent is None: |
| self._answer_agent = ChatbotAgent() |
| return self._answer_agent |
|
|
| def _get_catalog_reader(self) -> CatalogReader: |
| if self._catalog_reader is None: |
| from ..catalog.reader import CatalogReader |
| from ..catalog.store import CatalogStore |
|
|
| self._catalog_reader = CatalogReader(CatalogStore()) |
| return self._catalog_reader |
|
|
| def _get_query_service(self) -> QueryService: |
| if self._query_service is None: |
| from ..query.service import QueryService |
|
|
| self._query_service = QueryService() |
| return self._query_service |
|
|
| def _get_document_retriever(self) -> RetrievalRouter: |
| if self._document_retriever is None: |
| from ..retrieval.router import RetrievalRouter |
|
|
| self._document_retriever = RetrievalRouter() |
| return self._document_retriever |
|
|
| |
| |
| |
|
|
| async def handle( |
| self, |
| message: str, |
| user_id: str, |
| history: list[BaseMessage] | None = None, |
| ) -> AsyncIterator[dict[str, Any]]: |
| |
| try: |
| decision = await self._get_intent_router().classify(message, history) |
| except Exception as e: |
| logger.error("intent classification failed", error=str(e)) |
| yield {"event": "error", "data": f"Could not classify message: {e}"} |
| return |
|
|
| yield {"event": "intent", "data": decision.model_dump_json()} |
|
|
| rewritten = decision.rewritten_query or message |
| query_result = None |
| chunks: list[DocumentChunk] | None = None |
| raw_chunks: Any = None |
|
|
| |
| if decision.source_hint == "structured": |
| try: |
| catalog = await self._get_catalog_reader().read(user_id, "structured") |
| query_result = await self._get_query_service().run( |
| user_id, rewritten, catalog |
| ) |
| except Exception as e: |
| logger.error( |
| "structured route failed", |
| user_id=user_id, |
| error=str(e), |
| ) |
| yield {"event": "error", "data": f"Structured query failed: {e}"} |
| return |
| elif decision.source_hint == "unstructured": |
| try: |
| raw_chunks = await self._get_document_retriever().retrieve( |
| rewritten, user_id |
| ) |
| chunks = _normalize_chunks(raw_chunks) |
| except NotImplementedError: |
| logger.warning("DocumentRetriever placeholder hit", user_id=user_id) |
| yield { |
| "event": "error", |
| "data": "Document retrieval is not yet available — pending implementation.", |
| } |
| return |
| except Exception as e: |
| logger.error( |
| "unstructured route failed", user_id=user_id, error=str(e) |
| ) |
| yield {"event": "error", "data": f"Document retrieval failed: {e}"} |
| return |
| |
|
|
| |
| sources = _build_sources( |
| decision.source_hint, user_id, query_result, raw_chunks |
| ) |
| yield {"event": "sources", "data": json.dumps(sources)} |
|
|
| |
| try: |
| async for token in self._get_answer_agent().astream( |
| message, |
| history=history, |
| query_result=query_result, |
| chunks=chunks, |
| ): |
| yield {"event": "chunk", "data": token} |
| except Exception as e: |
| logger.error("answer streaming failed", user_id=user_id, error=str(e)) |
| yield {"event": "error", "data": f"Answer generation failed: {e}"} |
| return |
|
|
| yield {"event": "done", "data": ""} |
|
|
|
|
| def _build_sources( |
| source_hint: str, |
| user_id: str, |
| query_result: Any, |
| raw_chunks: Any, |
| ) -> list[dict[str, Any]]: |
| """Build the sources payload for the SSE `sources` event. |
| |
| - structured: one entry per executed table (table_name only). |
| - unstructured: deduped by (document_id, page_label), Phase 1 shape. |
| - chat or error: empty list. |
| """ |
| if source_hint == "structured": |
| if query_result is None or getattr(query_result, "error", None): |
| return [] |
| table_name = getattr(query_result, "table_name", "") or "" |
| if not table_name: |
| return [] |
| return [{ |
| "document_id": f"{user_id}_{table_name}", |
| "filename": table_name, |
| "page_label": None, |
| }] |
|
|
| if source_hint == "unstructured" and raw_chunks: |
| seen: set[tuple[Any, Any]] = set() |
| sources: list[dict[str, Any]] = [] |
| for item in raw_chunks: |
| if isinstance(item, RetrievalResult): |
| data = item.metadata.get("data", {}) |
| elif isinstance(item, dict): |
| data = item |
| else: |
| continue |
| key = (data.get("document_id"), data.get("page_label")) |
| if key in seen or key == (None, None): |
| continue |
| seen.add(key) |
| sources.append({ |
| "document_id": data.get("document_id"), |
| "filename": data.get("filename", "Unknown"), |
| "page_label": data.get("page_label", "Unknown"), |
| }) |
| return sources |
|
|
| return [] |
|
|
|
|
| def _normalize_chunks(raw: Any) -> list[DocumentChunk]: |
| """Convert whatever the retriever returns into list[DocumentChunk]. |
| |
| The Phase 2 `DocumentRetriever.retrieve` interface is a stub today; |
| when TAB owner ships it, it should return `list[DocumentChunk]` |
| directly so this normalizer becomes a no-op. Until then we coerce |
| common shapes (dict-with-content, plain string) defensively. |
| """ |
| if not raw: |
| return [] |
| if isinstance(raw, list) and all(isinstance(c, DocumentChunk) for c in raw): |
| return raw |
| chunks: list[DocumentChunk] = [] |
| for item in raw: |
| if isinstance(item, DocumentChunk): |
| chunks.append(item) |
| elif isinstance(item, dict): |
| chunks.append( |
| DocumentChunk( |
| content=str(item.get("content", "")), |
| filename=item.get("filename"), |
| page_label=item.get("page_label"), |
| ) |
| ) |
| elif isinstance(item, RetrievalResult): |
| data = item.metadata.get("data", {}) |
| page = data.get("page_label") |
| chunks.append(DocumentChunk( |
| content=item.content, |
| filename=data.get("filename"), |
| page_label=str(page) if page is not None else None, |
| )) |
| elif isinstance(item, str): |
| chunks.append(DocumentChunk(content=item)) |
| return chunks |
|
|