| """API endpoints for the per-user data catalog index. |
| |
| The index is a lightweight summary of every structured source registered |
| by a user (DB connections and tabular files). It is intended to be |
| consumed by the catalog refresher and by frontend listings — full |
| catalog payloads (tables + columns + samples + stats) are not exposed |
| here on purpose. |
| """ |
|
|
| from typing import List |
|
|
| from fastapi import APIRouter, HTTPException, Query, status |
|
|
| from src.catalog.store import CatalogStore |
| from src.middlewares.logging import get_logger, log_execution |
| from src.models.api.catalog import CatalogIndexEntry |
| from src.pipeline.triggers import on_catalog_rebuild_requested |
|
|
| logger = get_logger("data_catalog_api") |
|
|
| router = APIRouter(prefix="/api/v1", tags=["Data Catalog"]) |
|
|
|
|
| @router.get( |
| "/data-catalog/{user_id}", |
| response_model=List[CatalogIndexEntry], |
| summary="List the user's data catalog index", |
| response_description="One entry per registered structured source.", |
| responses={ |
| 200: {"description": "Returns an empty list if the user has no registered sources."}, |
| 500: {"description": "Internal server error while reading the catalog."}, |
| }, |
| ) |
| @log_execution(logger) |
| async def list_data_catalog_index(user_id: str): |
| """ |
| Return a lightweight index of every structured source registered by the user. |
| |
| One entry per source (DB connection or tabular file), including the |
| `source_id`, `source_type`, display `name`, `location_ref`, current |
| `table_count`, and `updated_at` timestamp. |
| |
| Used by the catalog refresher to decide which sources need to be |
| rebuilt. Returns an empty list if the user has no catalog yet. |
| """ |
| try: |
| catalog = await CatalogStore().get(user_id) |
| except Exception as e: |
| logger.error("Failed to read catalog index", user_id=user_id, error=str(e)) |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"Failed to read catalog index: {e}", |
| ) |
|
|
| if catalog is None: |
| return [] |
|
|
| return [ |
| CatalogIndexEntry( |
| source_id=s.source_id, |
| source_type=s.source_type, |
| name=s.name, |
| location_ref=s.location_ref, |
| table_count=len(s.tables), |
| updated_at=s.updated_at, |
| ) |
| for s in catalog.sources |
| ] |
|
|
|
|
| @router.post( |
| "/data-catalog/rebuild", |
| status_code=status.HTTP_200_OK, |
| summary="Rebuild the catalog for a user", |
| response_description="Confirmation that the rebuild was triggered.", |
| responses={ |
| 200: {"description": "Rebuild completed. Per-source errors are logged but do not fail this request."}, |
| 500: {"description": "Unexpected error before the rebuild loop started."}, |
| }, |
| ) |
| @log_execution(logger) |
| async def rebuild_data_catalog( |
| user_id: str = Query(..., description="ID of the user whose catalog should be rebuilt."), |
| ): |
| """ |
| Re-introspect every source in the user's catalog and upsert the results. |
| |
| Each source (DB connection or tabular file) is processed independently. |
| A failure on one source is logged but does not abort the remaining sources. |
| If the user has no catalog yet, returns success with no-op. |
| """ |
| try: |
| await on_catalog_rebuild_requested(user_id) |
| except Exception as e: |
| logger.error("catalog rebuild failed", user_id=user_id, error=str(e)) |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"Catalog rebuild failed: {e}", |
| ) |
| return {"status": "success", "user_id": user_id} |
|
|