""" infrastructure/database/connection.py ─────────────────────────────────────── SQLAlchemy async engine + session factory — Supabase / PostgreSQL ready. Supports: • PostgreSQL via asyncpg (Supabase direct port 5432) • PostgreSQL via asyncpg (Supabase pooler port 6543 — Transaction mode) • SQLite via aiosqlite (local dev / tests) Supabase-specific tuning: • pool_size / max_overflow read from Settings (default: 5 / 10) • pool_recycle set to 1800 s (30 min) — avoids idle connection drops • connect_args.server_settings identifies the app in pg_stat_activity • Pooler (port 6543) mode disables prepared_statement_cache_size because pgBouncer Transaction mode does not support server-side prepared stmts. """ from __future__ import annotations from collections.abc import AsyncGenerator from sqlalchemy.ext.asyncio import ( AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine, ) from src.shared.config import get_settings from src.shared.logger import get_logger from sqlalchemy.ext.compiler import compiles from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.sql.elements import TextClause logger = get_logger(__name__) # ── SQLite Compatibility Compilers ─────────────────────────────────────────── # Registers global overrides to compile PostgreSQL-specific DDL for SQLite fallback. @compiles(JSONB, "sqlite") def compile_jsonb_sqlite(type_, compiler, **kw): """Compile JSONB to standard JSON in SQLite.""" return "JSON" @compiles(TextClause, "sqlite") def compile_text_sqlite(element, compiler, **kw): """Compile gen_random_uuid() to random hex blob in SQLite.""" if element.text == "gen_random_uuid()": return ( "(lower(hex(randomblob(4))) || '-' || " "lower(hex(randomblob(2))) || '-4' || " "substr(lower(hex(randomblob(2))),2) || '-' || " "substr('89ab',abs(random()) % 4 + 1, 1) || " "substr(lower(hex(randomblob(2))),2) || '-' || " "lower(hex(randomblob(6))))" ) return element.text # ── Module-level singletons (created once per process) ─────────────────────── _engine: AsyncEngine | None = None _session_factory: async_sessionmaker[AsyncSession] | None = None def get_engine() -> AsyncEngine: """Return (or create) the async SQLAlchemy engine singleton.""" global _engine if _engine is None: settings = get_settings() db_url = settings.database_url connect_args: dict = {} if settings.is_sqlite: # SQLite does not support connection pools or server_settings. connect_args = {"check_same_thread": False} _engine = create_async_engine( db_url, echo=settings.debug, connect_args=connect_args, ) else: # PostgreSQL / Supabase path ────────────────────────────────────── server_settings: dict[str, str] = { "application_name": "bp_monitoring_pipeline", } if settings.uses_pooler: # Supabase Transaction Pooler (pgBouncer, port 6543): # Prepared statements are not supported in transaction mode. server_settings["options"] = "-c statement_timeout=30000" connect_args = { "server_settings": server_settings, "prepared_statement_cache_size": 0, } logger.info( "Supabase Connection Pooler (port 6543) detected — " "prepared statement cache disabled." ) else: # Supabase Direct Connection (port 5432): # Full asyncpg feature set available. connect_args = {"server_settings": server_settings} _engine = create_async_engine( db_url, echo=settings.debug, pool_pre_ping=True, # Validate connections before use pool_size=settings.db_pool_size, max_overflow=settings.db_max_overflow, pool_recycle=settings.db_pool_recycle, connect_args=connect_args, ) # Log host only (strip credentials) safe_url = db_url.split("@")[-1] if "@" in db_url else db_url logger.info("Database engine created → %s", safe_url) return _engine def get_session_factory() -> async_sessionmaker[AsyncSession]: """Return (or create) the async session factory singleton.""" global _session_factory if _session_factory is None: _session_factory = async_sessionmaker( bind=get_engine(), class_=AsyncSession, expire_on_commit=False, # Avoid lazy-load issues after commit autoflush=False, autocommit=False, ) return _session_factory async def get_async_session() -> AsyncGenerator[AsyncSession, None]: """ FastAPI dependency: yield a scoped AsyncSession per request. Usage:: @router.post("/...") async def endpoint(session: AsyncSession = Depends(get_async_session)): ... """ factory = get_session_factory() async with factory() as session: try: yield session await session.commit() except Exception: await session.rollback() raise async def ping_database() -> bool: """ Send a lightweight ``SELECT 1`` to verify the database connection. Returns: True — connection is healthy. False — connection failed (logs the error). Used by the health-check endpoint and startup validation. """ from sqlalchemy import text try: async with get_engine().connect() as conn: await conn.execute(text("SELECT 1")) return True except Exception as exc: logger.error("Database ping failed: %s", exc) return False async def create_all_tables() -> None: """ Create all ORM tables — **development and test environments only**. In production (Supabase), always use Alembic migrations:: alembic upgrade head This function is intentionally guarded by a debug/SQLite check in app.py. """ settings = get_settings() if settings.is_supabase and not settings.debug: logger.warning( "create_all_tables() skipped — Supabase production detected. " "Run 'alembic upgrade head' to apply migrations." ) return from src.infrastructure.database.models.base import Base # noqa: F401 – registers models import src.infrastructure.database.models.ppg_model # noqa: F401 import src.infrastructure.database.models.prediction_model # noqa: F401 engine = get_engine() async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) logger.info("All database tables created.") async def dispose_engine() -> None: """Dispose the engine and all pooled connections (call on application shutdown).""" global _engine if _engine is not None: await _engine.dispose() _engine = None logger.info("Database engine disposed.")