File size: 5,837 Bytes
e391a84 6ace378 e391a84 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | """
interface/api/dependencies.py
βββββββββββββββββββββββββββββββ
FastAPI dependency injection wiring (Composition Root for the API).
This is the ONLY place in the codebase that selects concrete implementations
and wires them to use cases. Change infra providers here β nothing else changes.
DI hierarchy:
AsyncSession ββ PPGRepository, PredictionRepository
Settings ββ MessageBroker (singleton), ModelService (singleton)
Repositories + Broker ββ Use Cases
"""
from __future__ import annotations
from functools import lru_cache
from typing import Annotated
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from src.application.use_cases.get_prediction_history import GetPredictionHistoryUseCase
from src.application.use_cases.ingest_ppg import IngestPPGUseCase
from src.application.use_cases.process_and_predict import ProcessAndPredictUseCase
from src.domain.interfaces.repositories.ppg_repository import PPGRepository
from src.domain.interfaces.repositories.prediction_repository import PredictionRepository
from src.domain.interfaces.services.message_broker import MessageBroker
from src.domain.interfaces.services.model_service import ModelService
from src.domain.interfaces.services.signal_processor import SignalProcessor
from src.infrastructure.database.connection import get_async_session
from src.infrastructure.database.repositories.ppg_repository import SQLAlchemyPPGRepository
from src.infrastructure.database.repositories.prediction_repository import (
SQLAlchemyPredictionRepository,
)
from src.infrastructure.messaging.rabbitmq_broker import RabbitMQBroker
from src.infrastructure.model.mock_model_service import MockModelService
from src.infrastructure.processing.scipy_signal_processor import ScipySignalProcessor
from src.infrastructure.processing.scipy_cardiogan_preprocessor import SciPyCardioGANPreprocessor
from src.shared.config import get_settings
from src.shared.logger import get_logger
logger = get_logger(__name__)
# ββ Singleton Dependencies (created once per process) βββββββββββββββββββββββββ
@lru_cache(maxsize=1)
def get_broker() -> MessageBroker:
"""
Return the singleton MessageBroker instance.
Uses RabbitMQBroker in all cases β swapping to Kafka/Redis means
replacing this one line. The broker is connected lazily on first use.
"""
settings = get_settings()
logger.info("Creating RabbitMQBroker singleton")
return RabbitMQBroker(url=settings.rabbitmq_url)
@lru_cache(maxsize=1)
def get_signal_processor() -> SignalProcessor:
"""Return the singleton SignalProcessor instance (ScipySignalProcessor)."""
return ScipySignalProcessor()
@lru_cache(maxsize=1)
def get_model_service() -> ModelService:
"""
Return the singleton ModelService instance.
Selects MockModelService or GANVGTLNetService based on USE_MOCK_MODEL setting.
"""
settings = get_settings()
if settings.use_mock_model:
logger.info("ModelService: using MockModelService (USE_MOCK_MODEL=true)")
return MockModelService()
else:
logger.info("ModelService: using GANVGTLNetService (USE_MOCK_MODEL=false)")
from src.infrastructure.model.gan_vgtlnet_service import GANVGTLNetService
from src.infrastructure.processing.numba_vgtlnet_preprocessor import NumbaVGTLNetPreprocessor
gan_prep = SciPyCardioGANPreprocessor()
vgtl_prep = NumbaVGTLNetPreprocessor()
return GANVGTLNetService(gan_preprocessor=gan_prep, vgtlnet_preprocessor=vgtl_prep)
# ββ Per-Request Dependencies ββββββββββββββββββββββββββββββββββββββββββββββββββ
async def get_ppg_repository(
session: Annotated[AsyncSession, Depends(get_async_session)],
) -> PPGRepository:
"""Yield a PPGRepository scoped to the current request's DB session."""
return SQLAlchemyPPGRepository(session)
async def get_prediction_repository(
session: Annotated[AsyncSession, Depends(get_async_session)],
) -> PredictionRepository:
"""Yield a PredictionRepository scoped to the current request's DB session."""
return SQLAlchemyPredictionRepository(session)
# ββ Use Case Dependencies βββββββββββββββββββββββββββββββββββββββββββββββββββββ
async def get_ingest_ppg_use_case(
ppg_repo: Annotated[PPGRepository, Depends(get_ppg_repository)],
broker: Annotated[MessageBroker, Depends(get_broker)],
) -> IngestPPGUseCase:
"""Return an IngestPPGUseCase with request-scoped repository and singleton broker."""
return IngestPPGUseCase(ppg_repo=ppg_repo, broker=broker)
async def get_process_and_predict_use_case(
ppg_repo: Annotated[PPGRepository, Depends(get_ppg_repository)],
prediction_repo: Annotated[PredictionRepository, Depends(get_prediction_repository)],
signal_processor: Annotated[SignalProcessor, Depends(get_signal_processor)],
model_service: Annotated[ModelService, Depends(get_model_service)],
) -> ProcessAndPredictUseCase:
"""Return a ProcessAndPredictUseCase with all required dependencies."""
return ProcessAndPredictUseCase(
ppg_repo=ppg_repo,
prediction_repo=prediction_repo,
signal_processor=signal_processor,
model_service=model_service,
)
async def get_prediction_history_use_case(
prediction_repo: Annotated[PredictionRepository, Depends(get_prediction_repository)],
) -> GetPredictionHistoryUseCase:
"""Return a GetPredictionHistoryUseCase."""
return GetPredictionHistoryUseCase(prediction_repo=prediction_repo)
|