| """CatalogReader — loads + filters catalog by source_hint. |
| |
| For typical users (≤50 tables), returns the FULL catalog with no slicing. |
| Catalog-level search is added later if catalog grows past the limit. |
| """ |
|
|
| from datetime import UTC, datetime |
| from typing import Literal |
|
|
| from .models import Catalog |
| from .store import CatalogStore |
|
|
| SourceHint = Literal["chat", "unstructured", "structured"] |
|
|
|
|
| class CatalogReader: |
| """Loads the user's catalog and filters by source_hint. |
| |
| On miss, returns an empty Catalog (never raises) — query path is |
| responsible for handling "no data registered yet" gracefully. |
| Returned Catalog is always a copy; the underlying stored catalog |
| is never mutated. |
| """ |
|
|
| def __init__(self, store: CatalogStore) -> None: |
| self._store = store |
|
|
| async def read(self, user_id: str, source_hint: SourceHint) -> Catalog: |
| catalog = await self._store.get(user_id) |
| if catalog is None: |
| return Catalog(user_id=user_id, generated_at=datetime.now(UTC)) |
|
|
| if source_hint == "chat": |
| filtered: list = [] |
| elif source_hint == "structured": |
| filtered = [s for s in catalog.sources if s.source_type in {"schema", "tabular"}] |
| else: |
| filtered = [s for s in catalog.sources if s.source_type == "unstructured"] |
|
|
| return catalog.model_copy(update={"sources": filtered}) |
|
|