| """Backfill catalogs for existing users. |
| |
| One-off script. For each user that already has registered DB connections or |
| uploaded tabular files, run the structured pipeline to build their catalog. |
| |
| Run once against the live DB after deploying this branch to populate catalog |
| rows for data registered before the catalog pipeline landed. |
| |
| Note: enrich_all_sources.py is not needed β LLM enrichment was removed in |
| KM-557. The pipeline is now introspect β merge β validate β upsert. |
| |
| Usage: |
| uv run python scripts/build_initial_catalogs.py [--user-id USER_ID] |
| """ |
|
|
| import asyncio |
| import sys |
| import os |
|
|
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
| from sqlalchemy import select |
| from src.db.postgres.connection import AsyncSessionLocal |
| from src.db.postgres.models import DatabaseClient, Document |
| from src.pipeline.triggers import on_db_registered, on_tabular_uploaded |
|
|
|
|
| async def main() -> None: |
| user_id_filter = None |
| if "--user-id" in sys.argv: |
| idx = sys.argv.index("--user-id") |
| user_id_filter = sys.argv[idx + 1] |
| print(f"Filtering to user_id: {user_id_filter}") |
|
|
| async with AsyncSessionLocal() as db: |
| |
| query = select(DatabaseClient).where(DatabaseClient.status == "active") |
| if user_id_filter: |
| query = query.where(DatabaseClient.user_id == user_id_filter) |
| result = await db.execute(query) |
| db_clients = result.scalars().all() |
| print(f"\nFound {len(db_clients)} active DB client(s)") |
|
|
| for client in db_clients: |
| try: |
| await on_db_registered(client.id, client.user_id) |
| print(f" β db_client {client.id} ({client.name})") |
| except Exception as e: |
| print(f" β db_client {client.id} ({client.name}): {e}") |
|
|
| |
| query = select(Document).where( |
| Document.file_type.in_(["csv", "xlsx"]), |
| Document.status == "completed", |
| ) |
| if user_id_filter: |
| query = query.where(Document.user_id == user_id_filter) |
| result = await db.execute(query) |
| docs = result.scalars().all() |
| print(f"\nFound {len(docs)} completed tabular file(s)") |
|
|
| for doc in docs: |
| try: |
| await on_tabular_uploaded(doc.id, doc.user_id) |
| print(f" β {doc.file_type} {doc.id} ({doc.filename})") |
| except Exception as e: |
| print(f" β {doc.file_type} {doc.id} ({doc.filename}): {e}") |
|
|
| print("\nDone.") |
|
|
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |
|
|