| """ |
| interface/consumer/run_consumer.py |
| ββββββββββββββββββββββββββββββββββββ |
| Standalone async consumer β Entry Point 2 (Google Colab / local). |
| |
| This script: |
| 1. Wires all dependencies (same repos, broker, processor, model as the API). |
| 2. Connects to RabbitMQ. |
| 3. Runs ProcessAndPredictUseCase for every message consumed. |
| |
| Usage: |
| python -m src.interface.consumer.run_consumer |
| |
| Colab usage: |
| !python -m src.interface.consumer.run_consumer |
| |
| The consumer runs indefinitely until interrupted (Ctrl+C or Colab cell stop). |
| """ |
| from __future__ import annotations |
|
|
| import asyncio |
| import signal |
| import sys |
| from typing import Any |
|
|
| from src.application.use_cases.process_and_predict import ProcessAndPredictUseCase |
| from src.infrastructure.database.connection import create_all_tables, dispose_engine, get_session_factory |
| from src.infrastructure.database.repositories.ppg_repository import SQLAlchemyPPGRepository |
| from src.infrastructure.database.repositories.prediction_repository import SQLAlchemyPredictionRepository |
| from src.infrastructure.messaging.rabbitmq_broker import RabbitMQBroker |
| from src.infrastructure.model.gan_vgtlnet_service import GANVGTLNetService |
| from src.infrastructure.model.mock_model_service import MockModelService |
| from src.infrastructure.processing.scipy_signal_processor import ScipySignalProcessor |
| from src.shared.config import get_settings |
| from src.shared.constants import PPG_QUEUE_NAME |
| from src.shared.logger import get_logger |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| async def run() -> None: |
| """ |
| Bootstrap and run the message queue consumer. |
| |
| Dependency wiring mirrors the API's dependencies.py β the use case is |
| identical, only the bootstrapping mechanism differs. |
| """ |
| settings = get_settings() |
|
|
| logger.info("=" * 60) |
| logger.info("BP Monitoring Pipeline β Consumer starting up") |
| logger.info("Database : %s", settings.database_url.split("@")[-1]) |
| logger.info("Broker : %s", settings.rabbitmq_url.split("@")[-1]) |
| logger.info("Mock Model: %s", settings.use_mock_model) |
| logger.info("=" * 60) |
|
|
| |
| if settings.debug or "sqlite" in settings.database_url: |
| await create_all_tables() |
|
|
| |
| broker = RabbitMQBroker(url=settings.rabbitmq_url) |
| signal_processor = ScipySignalProcessor() |
| model_service = MockModelService() if settings.use_mock_model else GANVGTLNetService() |
|
|
| |
| logger.info("Loading model (%s)β¦", model_service.model_version) |
| await model_service.load_model() |
| logger.info("Model ready.") |
|
|
| |
| await broker.connect() |
| logger.info("Connected to RabbitMQ.") |
|
|
| |
| session_factory = get_session_factory() |
|
|
| async def handle_message(payload: dict[str, Any]) -> None: |
| """ |
| Invoked for each message dequeued from PPG_QUEUE_NAME. |
| |
| Creates a new DB session per message (request-scoped isolation). |
| """ |
| signal_id = payload.get("id", "<unknown>") |
| logger.info("Received message for signal_id=%s", signal_id) |
|
|
| async with session_factory() as session: |
| ppg_repo = SQLAlchemyPPGRepository(session) |
| prediction_repo = SQLAlchemyPredictionRepository(session) |
|
|
| use_case = ProcessAndPredictUseCase( |
| ppg_repo=ppg_repo, |
| prediction_repo=prediction_repo, |
| signal_processor=signal_processor, |
| model_service=model_service, |
| ) |
|
|
| try: |
| prediction = await use_case.execute(payload) |
| await session.commit() |
| logger.info( |
| "Prediction stored: id=%s SBP=%.1f DBP=%.1f", |
| prediction.id, |
| prediction.predicted_sbp, |
| prediction.predicted_dbp, |
| ) |
| except Exception as exc: |
| await session.rollback() |
| logger.error( |
| "Failed to process signal_id=%s: %s", signal_id, exc, exc_info=True |
| ) |
| raise |
|
|
| |
| logger.info("Listening on queue '%s'β¦ (Ctrl+C to stop)", PPG_QUEUE_NAME) |
| try: |
| await broker.consume(queue_name=PPG_QUEUE_NAME, handler=handle_message) |
| finally: |
| await broker.disconnect() |
| await dispose_engine() |
| logger.info("Consumer shut down cleanly.") |
|
|
|
|
| def main() -> None: |
| """CLI entry point.""" |
| loop = asyncio.get_event_loop() |
|
|
| |
| def _handle_sigint(*_: Any) -> None: |
| logger.info("Interrupt received β shutting down consumerβ¦") |
| for task in asyncio.all_tasks(loop): |
| task.cancel() |
|
|
| signal.signal(signal.SIGINT, _handle_sigint) |
|
|
| try: |
| loop.run_until_complete(run()) |
| except asyncio.CancelledError: |
| pass |
| finally: |
| loop.close() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|