LIBRE / src /interface /api /dependencies.py
RyZ
fix: lazy import GANVGTLNetService and NumbaVGTLNetPreprocessor to avoid module-level torch import
6ace378
Raw
History Blame Contribute Delete
5.84 kB
"""
interface/api/dependencies.py
───────────────────────────────
FastAPI dependency injection wiring (Composition Root for the API).
This is the ONLY place in the codebase that selects concrete implementations
and wires them to use cases. Change infra providers here β€” nothing else changes.
DI hierarchy:
AsyncSession ─→ PPGRepository, PredictionRepository
Settings ─→ MessageBroker (singleton), ModelService (singleton)
Repositories + Broker ─→ Use Cases
"""
from __future__ import annotations
from functools import lru_cache
from typing import Annotated
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from src.application.use_cases.get_prediction_history import GetPredictionHistoryUseCase
from src.application.use_cases.ingest_ppg import IngestPPGUseCase
from src.application.use_cases.process_and_predict import ProcessAndPredictUseCase
from src.domain.interfaces.repositories.ppg_repository import PPGRepository
from src.domain.interfaces.repositories.prediction_repository import PredictionRepository
from src.domain.interfaces.services.message_broker import MessageBroker
from src.domain.interfaces.services.model_service import ModelService
from src.domain.interfaces.services.signal_processor import SignalProcessor
from src.infrastructure.database.connection import get_async_session
from src.infrastructure.database.repositories.ppg_repository import SQLAlchemyPPGRepository
from src.infrastructure.database.repositories.prediction_repository import (
SQLAlchemyPredictionRepository,
)
from src.infrastructure.messaging.rabbitmq_broker import RabbitMQBroker
from src.infrastructure.model.mock_model_service import MockModelService
from src.infrastructure.processing.scipy_signal_processor import ScipySignalProcessor
from src.infrastructure.processing.scipy_cardiogan_preprocessor import SciPyCardioGANPreprocessor
from src.shared.config import get_settings
from src.shared.logger import get_logger
logger = get_logger(__name__)
# ── Singleton Dependencies (created once per process) ─────────────────────────
@lru_cache(maxsize=1)
def get_broker() -> MessageBroker:
"""
Return the singleton MessageBroker instance.
Uses RabbitMQBroker in all cases β€” swapping to Kafka/Redis means
replacing this one line. The broker is connected lazily on first use.
"""
settings = get_settings()
logger.info("Creating RabbitMQBroker singleton")
return RabbitMQBroker(url=settings.rabbitmq_url)
@lru_cache(maxsize=1)
def get_signal_processor() -> SignalProcessor:
"""Return the singleton SignalProcessor instance (ScipySignalProcessor)."""
return ScipySignalProcessor()
@lru_cache(maxsize=1)
def get_model_service() -> ModelService:
"""
Return the singleton ModelService instance.
Selects MockModelService or GANVGTLNetService based on USE_MOCK_MODEL setting.
"""
settings = get_settings()
if settings.use_mock_model:
logger.info("ModelService: using MockModelService (USE_MOCK_MODEL=true)")
return MockModelService()
else:
logger.info("ModelService: using GANVGTLNetService (USE_MOCK_MODEL=false)")
from src.infrastructure.model.gan_vgtlnet_service import GANVGTLNetService
from src.infrastructure.processing.numba_vgtlnet_preprocessor import NumbaVGTLNetPreprocessor
gan_prep = SciPyCardioGANPreprocessor()
vgtl_prep = NumbaVGTLNetPreprocessor()
return GANVGTLNetService(gan_preprocessor=gan_prep, vgtlnet_preprocessor=vgtl_prep)
# ── Per-Request Dependencies ──────────────────────────────────────────────────
async def get_ppg_repository(
session: Annotated[AsyncSession, Depends(get_async_session)],
) -> PPGRepository:
"""Yield a PPGRepository scoped to the current request's DB session."""
return SQLAlchemyPPGRepository(session)
async def get_prediction_repository(
session: Annotated[AsyncSession, Depends(get_async_session)],
) -> PredictionRepository:
"""Yield a PredictionRepository scoped to the current request's DB session."""
return SQLAlchemyPredictionRepository(session)
# ── Use Case Dependencies ─────────────────────────────────────────────────────
async def get_ingest_ppg_use_case(
ppg_repo: Annotated[PPGRepository, Depends(get_ppg_repository)],
broker: Annotated[MessageBroker, Depends(get_broker)],
) -> IngestPPGUseCase:
"""Return an IngestPPGUseCase with request-scoped repository and singleton broker."""
return IngestPPGUseCase(ppg_repo=ppg_repo, broker=broker)
async def get_process_and_predict_use_case(
ppg_repo: Annotated[PPGRepository, Depends(get_ppg_repository)],
prediction_repo: Annotated[PredictionRepository, Depends(get_prediction_repository)],
signal_processor: Annotated[SignalProcessor, Depends(get_signal_processor)],
model_service: Annotated[ModelService, Depends(get_model_service)],
) -> ProcessAndPredictUseCase:
"""Return a ProcessAndPredictUseCase with all required dependencies."""
return ProcessAndPredictUseCase(
ppg_repo=ppg_repo,
prediction_repo=prediction_repo,
signal_processor=signal_processor,
model_service=model_service,
)
async def get_prediction_history_use_case(
prediction_repo: Annotated[PredictionRepository, Depends(get_prediction_repository)],
) -> GetPredictionHistoryUseCase:
"""Return a GetPredictionHistoryUseCase."""
return GetPredictionHistoryUseCase(prediction_repo=prediction_repo)