Agentic-Service-Data-Eyond-Catalog / scripts /build_initial_catalogs.py
ishaq101's picture
feat/Catalog Retrieval System (#1)
6bff5d9
"""Backfill catalogs for existing users.
One-off script. For each user that already has registered DB connections or
uploaded tabular files, run the structured pipeline to build their catalog.
Run once against the live DB after deploying this branch to populate catalog
rows for data registered before the catalog pipeline landed.
Note: enrich_all_sources.py is not needed β€” LLM enrichment was removed in
KM-557. The pipeline is now introspect β†’ merge β†’ validate β†’ upsert.
Usage:
uv run python scripts/build_initial_catalogs.py [--user-id USER_ID]
"""
import asyncio
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sqlalchemy import select
from src.db.postgres.connection import AsyncSessionLocal
from src.db.postgres.models import DatabaseClient, Document
from src.pipeline.triggers import on_db_registered, on_tabular_uploaded
async def main() -> None:
user_id_filter = None
if "--user-id" in sys.argv:
idx = sys.argv.index("--user-id")
user_id_filter = sys.argv[idx + 1]
print(f"Filtering to user_id: {user_id_filter}")
async with AsyncSessionLocal() as db:
# ── 1. DB clients ──────────────────────────────────────────────
query = select(DatabaseClient).where(DatabaseClient.status == "active")
if user_id_filter:
query = query.where(DatabaseClient.user_id == user_id_filter)
result = await db.execute(query)
db_clients = result.scalars().all()
print(f"\nFound {len(db_clients)} active DB client(s)")
for client in db_clients:
try:
await on_db_registered(client.id, client.user_id)
print(f" βœ“ db_client {client.id} ({client.name})")
except Exception as e:
print(f" βœ— db_client {client.id} ({client.name}): {e}")
# ── 2. Tabular files ───────────────────────────────────────────
query = select(Document).where(
Document.file_type.in_(["csv", "xlsx"]),
Document.status == "completed",
)
if user_id_filter:
query = query.where(Document.user_id == user_id_filter)
result = await db.execute(query)
docs = result.scalars().all()
print(f"\nFound {len(docs)} completed tabular file(s)")
for doc in docs:
try:
await on_tabular_uploaded(doc.id, doc.user_id)
print(f" βœ“ {doc.file_type} {doc.id} ({doc.filename})")
except Exception as e:
print(f" βœ— {doc.file_type} {doc.id} ({doc.filename}): {e}")
print("\nDone.")
if __name__ == "__main__":
asyncio.run(main())