ishaq101's picture
feat/Catalog Retrieval System (#1)
6bff5d9
"""API endpoints for the per-user data catalog index.
The index is a lightweight summary of every structured source registered
by a user (DB connections and tabular files). It is intended to be
consumed by the catalog refresher and by frontend listings — full
catalog payloads (tables + columns + samples + stats) are not exposed
here on purpose.
"""
from typing import List
from fastapi import APIRouter, HTTPException, Query, status
from src.catalog.store import CatalogStore
from src.middlewares.logging import get_logger, log_execution
from src.models.api.catalog import CatalogIndexEntry
from src.pipeline.triggers import on_catalog_rebuild_requested
logger = get_logger("data_catalog_api")
router = APIRouter(prefix="/api/v1", tags=["Data Catalog"])
@router.get(
"/data-catalog/{user_id}",
response_model=List[CatalogIndexEntry],
summary="List the user's data catalog index",
response_description="One entry per registered structured source.",
responses={
200: {"description": "Returns an empty list if the user has no registered sources."},
500: {"description": "Internal server error while reading the catalog."},
},
)
@log_execution(logger)
async def list_data_catalog_index(user_id: str):
"""
Return a lightweight index of every structured source registered by the user.
One entry per source (DB connection or tabular file), including the
`source_id`, `source_type`, display `name`, `location_ref`, current
`table_count`, and `updated_at` timestamp.
Used by the catalog refresher to decide which sources need to be
rebuilt. Returns an empty list if the user has no catalog yet.
"""
try:
catalog = await CatalogStore().get(user_id)
except Exception as e:
logger.error("Failed to read catalog index", user_id=user_id, error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to read catalog index: {e}",
)
if catalog is None:
return []
return [
CatalogIndexEntry(
source_id=s.source_id,
source_type=s.source_type,
name=s.name,
location_ref=s.location_ref,
table_count=len(s.tables),
updated_at=s.updated_at,
)
for s in catalog.sources
]
@router.post(
"/data-catalog/rebuild",
status_code=status.HTTP_200_OK,
summary="Rebuild the catalog for a user",
response_description="Confirmation that the rebuild was triggered.",
responses={
200: {"description": "Rebuild completed. Per-source errors are logged but do not fail this request."},
500: {"description": "Unexpected error before the rebuild loop started."},
},
)
@log_execution(logger)
async def rebuild_data_catalog(
user_id: str = Query(..., description="ID of the user whose catalog should be rebuilt."),
):
"""
Re-introspect every source in the user's catalog and upsert the results.
Each source (DB connection or tabular file) is processed independently.
A failure on one source is logged but does not abort the remaining sources.
If the user has no catalog yet, returns success with no-op.
"""
try:
await on_catalog_rebuild_requested(user_id)
except Exception as e:
logger.error("catalog rebuild failed", user_id=user_id, error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Catalog rebuild failed: {e}",
)
return {"status": "success", "user_id": user_id}