"""Backfill catalogs for existing users. One-off script. For each user that already has registered DB connections or uploaded tabular files, run the structured pipeline to build their catalog. Run once against the live DB after deploying this branch to populate catalog rows for data registered before the catalog pipeline landed. Note: enrich_all_sources.py is not needed — LLM enrichment was removed in KM-557. The pipeline is now introspect → merge → validate → upsert. Usage: uv run python scripts/build_initial_catalogs.py [--user-id USER_ID] """ import asyncio import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from sqlalchemy import select from src.db.postgres.connection import AsyncSessionLocal from src.db.postgres.models import DatabaseClient, Document from src.pipeline.triggers import on_db_registered, on_tabular_uploaded async def main() -> None: user_id_filter = None if "--user-id" in sys.argv: idx = sys.argv.index("--user-id") user_id_filter = sys.argv[idx + 1] print(f"Filtering to user_id: {user_id_filter}") async with AsyncSessionLocal() as db: # ── 1. DB clients ────────────────────────────────────────────── query = select(DatabaseClient).where(DatabaseClient.status == "active") if user_id_filter: query = query.where(DatabaseClient.user_id == user_id_filter) result = await db.execute(query) db_clients = result.scalars().all() print(f"\nFound {len(db_clients)} active DB client(s)") for client in db_clients: try: await on_db_registered(client.id, client.user_id) print(f" ✓ db_client {client.id} ({client.name})") except Exception as e: print(f" ✗ db_client {client.id} ({client.name}): {e}") # ── 2. Tabular files ─────────────────────────────────────────── query = select(Document).where( Document.file_type.in_(["csv", "xlsx"]), Document.status == "completed", ) if user_id_filter: query = query.where(Document.user_id == user_id_filter) result = await db.execute(query) docs = result.scalars().all() print(f"\nFound {len(docs)} completed tabular file(s)") for doc in docs: try: await on_tabular_uploaded(doc.id, doc.user_id) print(f" ✓ {doc.file_type} {doc.id} ({doc.filename})") except Exception as e: print(f" ✗ {doc.file_type} {doc.id} ({doc.filename}): {e}") print("\nDone.") if __name__ == "__main__": asyncio.run(main())