"""API endpoints for the per-user data catalog index. The index is a lightweight summary of every structured source registered by a user (DB connections and tabular files). It is intended to be consumed by the catalog refresher and by frontend listings — full catalog payloads (tables + columns + samples + stats) are not exposed here on purpose. """ from typing import List from fastapi import APIRouter, HTTPException, Query, status from src.catalog.store import CatalogStore from src.middlewares.logging import get_logger, log_execution from src.models.api.catalog import CatalogIndexEntry from src.pipeline.triggers import on_catalog_rebuild_requested logger = get_logger("data_catalog_api") router = APIRouter(prefix="/api/v1", tags=["Data Catalog"]) @router.get( "/data-catalog/{user_id}", response_model=List[CatalogIndexEntry], summary="List the user's data catalog index", response_description="One entry per registered structured source.", responses={ 200: {"description": "Returns an empty list if the user has no registered sources."}, 500: {"description": "Internal server error while reading the catalog."}, }, ) @log_execution(logger) async def list_data_catalog_index(user_id: str): """ Return a lightweight index of every structured source registered by the user. One entry per source (DB connection or tabular file), including the `source_id`, `source_type`, display `name`, `location_ref`, current `table_count`, and `updated_at` timestamp. Used by the catalog refresher to decide which sources need to be rebuilt. Returns an empty list if the user has no catalog yet. """ try: catalog = await CatalogStore().get(user_id) except Exception as e: logger.error("Failed to read catalog index", user_id=user_id, error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to read catalog index: {e}", ) if catalog is None: return [] return [ CatalogIndexEntry( source_id=s.source_id, source_type=s.source_type, name=s.name, location_ref=s.location_ref, table_count=len(s.tables), updated_at=s.updated_at, ) for s in catalog.sources ] @router.post( "/data-catalog/rebuild", status_code=status.HTTP_200_OK, summary="Rebuild the catalog for a user", response_description="Confirmation that the rebuild was triggered.", responses={ 200: {"description": "Rebuild completed. Per-source errors are logged but do not fail this request."}, 500: {"description": "Unexpected error before the rebuild loop started."}, }, ) @log_execution(logger) async def rebuild_data_catalog( user_id: str = Query(..., description="ID of the user whose catalog should be rebuilt."), ): """ Re-introspect every source in the user's catalog and upsert the results. Each source (DB connection or tabular file) is processed independently. A failure on one source is logged but does not abort the remaining sources. If the user has no catalog yet, returns success with no-op. """ try: await on_catalog_rebuild_requested(user_id) except Exception as e: logger.error("catalog rebuild failed", user_id=user_id, error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Catalog rebuild failed: {e}", ) return {"status": "success", "user_id": user_id}