""" interface/consumer/run_consumer.py ──────────────────────────────────── Standalone async consumer — Entry Point 2 (Google Colab / local). This script: 1. Wires all dependencies (same repos, broker, processor, model as the API). 2. Connects to RabbitMQ. 3. Runs ProcessAndPredictUseCase for every message consumed. Usage: python -m src.interface.consumer.run_consumer Colab usage: !python -m src.interface.consumer.run_consumer The consumer runs indefinitely until interrupted (Ctrl+C or Colab cell stop). """ from __future__ import annotations import asyncio import signal import sys from typing import Any from src.application.use_cases.process_and_predict import ProcessAndPredictUseCase from src.infrastructure.database.connection import create_all_tables, dispose_engine, get_session_factory from src.infrastructure.database.repositories.ppg_repository import SQLAlchemyPPGRepository from src.infrastructure.database.repositories.prediction_repository import SQLAlchemyPredictionRepository from src.infrastructure.messaging.rabbitmq_broker import RabbitMQBroker from src.infrastructure.model.gan_vgtlnet_service import GANVGTLNetService from src.infrastructure.model.mock_model_service import MockModelService from src.infrastructure.processing.scipy_signal_processor import ScipySignalProcessor from src.shared.config import get_settings from src.shared.constants import PPG_QUEUE_NAME from src.shared.logger import get_logger logger = get_logger(__name__) async def run() -> None: """ Bootstrap and run the message queue consumer. Dependency wiring mirrors the API's dependencies.py — the use case is identical, only the bootstrapping mechanism differs. """ settings = get_settings() logger.info("=" * 60) logger.info("BP Monitoring Pipeline — Consumer starting up") logger.info("Database : %s", settings.database_url.split("@")[-1]) logger.info("Broker : %s", settings.rabbitmq_url.split("@")[-1]) logger.info("Mock Model: %s", settings.use_mock_model) logger.info("=" * 60) # ── Create DB tables (dev/SQLite) ───────────────────────────────────────── if settings.debug or "sqlite" in settings.database_url: await create_all_tables() # ── Wire dependencies ───────────────────────────────────────────────────── broker = RabbitMQBroker(url=settings.rabbitmq_url) signal_processor = ScipySignalProcessor() model_service = MockModelService() if settings.use_mock_model else GANVGTLNetService() # Load model logger.info("Loading model (%s)…", model_service.model_version) await model_service.load_model() logger.info("Model ready.") # Connect broker await broker.connect() logger.info("Connected to RabbitMQ.") # ── Message handler ─────────────────────────────────────────────────────── session_factory = get_session_factory() async def handle_message(payload: dict[str, Any]) -> None: """ Invoked for each message dequeued from PPG_QUEUE_NAME. Creates a new DB session per message (request-scoped isolation). """ signal_id = payload.get("id", "") logger.info("Received message for signal_id=%s", signal_id) async with session_factory() as session: ppg_repo = SQLAlchemyPPGRepository(session) prediction_repo = SQLAlchemyPredictionRepository(session) use_case = ProcessAndPredictUseCase( ppg_repo=ppg_repo, prediction_repo=prediction_repo, signal_processor=signal_processor, model_service=model_service, ) try: prediction = await use_case.execute(payload) await session.commit() logger.info( "Prediction stored: id=%s SBP=%.1f DBP=%.1f", prediction.id, prediction.predicted_sbp, prediction.predicted_dbp, ) except Exception as exc: await session.rollback() logger.error( "Failed to process signal_id=%s: %s", signal_id, exc, exc_info=True ) raise # re-raise so the broker nacks the message # ── Start consuming ─────────────────────────────────────────────────────── logger.info("Listening on queue '%s'… (Ctrl+C to stop)", PPG_QUEUE_NAME) try: await broker.consume(queue_name=PPG_QUEUE_NAME, handler=handle_message) finally: await broker.disconnect() await dispose_engine() logger.info("Consumer shut down cleanly.") def main() -> None: """CLI entry point.""" loop = asyncio.get_event_loop() # Handle Ctrl+C gracefully def _handle_sigint(*_: Any) -> None: logger.info("Interrupt received — shutting down consumer…") for task in asyncio.all_tasks(loop): task.cancel() signal.signal(signal.SIGINT, _handle_sigint) try: loop.run_until_complete(run()) except asyncio.CancelledError: pass finally: loop.close() if __name__ == "__main__": main()