LIBRE / src /infrastructure /database /connection.py
RyZ
deploy: fix hf deploy
d852d80
Raw
History Blame Contribute Delete
7.6 kB
"""
infrastructure/database/connection.py
───────────────────────────────────────
SQLAlchemy async engine + session factory β€” Supabase / PostgreSQL ready.
Supports:
β€’ PostgreSQL via asyncpg (Supabase direct port 5432)
β€’ PostgreSQL via asyncpg (Supabase pooler port 6543 β€” Transaction mode)
β€’ SQLite via aiosqlite (local dev / tests)
Supabase-specific tuning:
β€’ pool_size / max_overflow read from Settings (default: 5 / 10)
β€’ pool_recycle set to 1800 s (30 min) β€” avoids idle connection drops
β€’ connect_args.server_settings identifies the app in pg_stat_activity
β€’ Pooler (port 6543) mode disables prepared_statement_cache_size because
pgBouncer Transaction mode does not support server-side prepared stmts.
"""
from __future__ import annotations
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from src.shared.config import get_settings
from src.shared.logger import get_logger
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.sql.elements import TextClause
logger = get_logger(__name__)
# ── SQLite Compatibility Compilers ───────────────────────────────────────────
# Registers global overrides to compile PostgreSQL-specific DDL for SQLite fallback.
@compiles(JSONB, "sqlite")
def compile_jsonb_sqlite(type_, compiler, **kw):
"""Compile JSONB to standard JSON in SQLite."""
return "JSON"
@compiles(TextClause, "sqlite")
def compile_text_sqlite(element, compiler, **kw):
"""Compile gen_random_uuid() to random hex blob in SQLite."""
if element.text == "gen_random_uuid()":
return (
"(lower(hex(randomblob(4))) || '-' || "
"lower(hex(randomblob(2))) || '-4' || "
"substr(lower(hex(randomblob(2))),2) || '-' || "
"substr('89ab',abs(random()) % 4 + 1, 1) || "
"substr(lower(hex(randomblob(2))),2) || '-' || "
"lower(hex(randomblob(6))))"
)
return element.text
# ── Module-level singletons (created once per process) ───────────────────────
_engine: AsyncEngine | None = None
_session_factory: async_sessionmaker[AsyncSession] | None = None
def get_engine() -> AsyncEngine:
"""Return (or create) the async SQLAlchemy engine singleton."""
global _engine
if _engine is None:
settings = get_settings()
db_url = settings.database_url
connect_args: dict = {}
if settings.is_sqlite:
# SQLite does not support connection pools or server_settings.
connect_args = {"check_same_thread": False}
_engine = create_async_engine(
db_url,
echo=settings.debug,
connect_args=connect_args,
)
else:
# PostgreSQL / Supabase path ──────────────────────────────────────
server_settings: dict[str, str] = {
"application_name": "bp_monitoring_pipeline",
}
if settings.uses_pooler:
# Supabase Transaction Pooler (pgBouncer, port 6543):
# Prepared statements are not supported in transaction mode.
server_settings["options"] = "-c statement_timeout=30000"
connect_args = {
"server_settings": server_settings,
"prepared_statement_cache_size": 0,
}
logger.info(
"Supabase Connection Pooler (port 6543) detected β€” "
"prepared statement cache disabled."
)
else:
# Supabase Direct Connection (port 5432):
# Full asyncpg feature set available.
connect_args = {"server_settings": server_settings}
_engine = create_async_engine(
db_url,
echo=settings.debug,
pool_pre_ping=True, # Validate connections before use
pool_size=settings.db_pool_size,
max_overflow=settings.db_max_overflow,
pool_recycle=settings.db_pool_recycle,
connect_args=connect_args,
)
# Log host only (strip credentials)
safe_url = db_url.split("@")[-1] if "@" in db_url else db_url
logger.info("Database engine created β†’ %s", safe_url)
return _engine
def get_session_factory() -> async_sessionmaker[AsyncSession]:
"""Return (or create) the async session factory singleton."""
global _session_factory
if _session_factory is None:
_session_factory = async_sessionmaker(
bind=get_engine(),
class_=AsyncSession,
expire_on_commit=False, # Avoid lazy-load issues after commit
autoflush=False,
autocommit=False,
)
return _session_factory
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
"""
FastAPI dependency: yield a scoped AsyncSession per request.
Usage::
@router.post("/...")
async def endpoint(session: AsyncSession = Depends(get_async_session)):
...
"""
factory = get_session_factory()
async with factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
async def ping_database() -> bool:
"""
Send a lightweight ``SELECT 1`` to verify the database connection.
Returns:
True β€” connection is healthy.
False β€” connection failed (logs the error).
Used by the health-check endpoint and startup validation.
"""
from sqlalchemy import text
try:
async with get_engine().connect() as conn:
await conn.execute(text("SELECT 1"))
return True
except Exception as exc:
logger.error("Database ping failed: %s", exc)
return False
async def create_all_tables() -> None:
"""
Create all ORM tables β€” **development and test environments only**.
In production (Supabase), always use Alembic migrations::
alembic upgrade head
This function is intentionally guarded by a debug/SQLite check in app.py.
"""
settings = get_settings()
if settings.is_supabase and not settings.debug:
logger.warning(
"create_all_tables() skipped β€” Supabase production detected. "
"Run 'alembic upgrade head' to apply migrations."
)
return
from src.infrastructure.database.models.base import Base # noqa: F401 – registers models
import src.infrastructure.database.models.ppg_model # noqa: F401
import src.infrastructure.database.models.prediction_model # noqa: F401
engine = get_engine()
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
logger.info("All database tables created.")
async def dispose_engine() -> None:
"""Dispose the engine and all pooled connections (call on application shutdown)."""
global _engine
if _engine is not None:
await _engine.dispose()
_engine = None
logger.info("Database engine disposed.")