LIBRE / src /interface /consumer /run_consumer.py
RyZ
feat: adding full working local ETL Pipeline
e391a84
Raw
History Blame Contribute Delete
5.7 kB
"""
interface/consumer/run_consumer.py
────────────────────────────────────
Standalone async consumer β€” Entry Point 2 (Google Colab / local).
This script:
1. Wires all dependencies (same repos, broker, processor, model as the API).
2. Connects to RabbitMQ.
3. Runs ProcessAndPredictUseCase for every message consumed.
Usage:
python -m src.interface.consumer.run_consumer
Colab usage:
!python -m src.interface.consumer.run_consumer
The consumer runs indefinitely until interrupted (Ctrl+C or Colab cell stop).
"""
from __future__ import annotations
import asyncio
import signal
import sys
from typing import Any
from src.application.use_cases.process_and_predict import ProcessAndPredictUseCase
from src.infrastructure.database.connection import create_all_tables, dispose_engine, get_session_factory
from src.infrastructure.database.repositories.ppg_repository import SQLAlchemyPPGRepository
from src.infrastructure.database.repositories.prediction_repository import SQLAlchemyPredictionRepository
from src.infrastructure.messaging.rabbitmq_broker import RabbitMQBroker
from src.infrastructure.model.gan_vgtlnet_service import GANVGTLNetService
from src.infrastructure.model.mock_model_service import MockModelService
from src.infrastructure.processing.scipy_signal_processor import ScipySignalProcessor
from src.shared.config import get_settings
from src.shared.constants import PPG_QUEUE_NAME
from src.shared.logger import get_logger
logger = get_logger(__name__)
async def run() -> None:
"""
Bootstrap and run the message queue consumer.
Dependency wiring mirrors the API's dependencies.py β€” the use case is
identical, only the bootstrapping mechanism differs.
"""
settings = get_settings()
logger.info("=" * 60)
logger.info("BP Monitoring Pipeline β€” Consumer starting up")
logger.info("Database : %s", settings.database_url.split("@")[-1])
logger.info("Broker : %s", settings.rabbitmq_url.split("@")[-1])
logger.info("Mock Model: %s", settings.use_mock_model)
logger.info("=" * 60)
# ── Create DB tables (dev/SQLite) ─────────────────────────────────────────
if settings.debug or "sqlite" in settings.database_url:
await create_all_tables()
# ── Wire dependencies ─────────────────────────────────────────────────────
broker = RabbitMQBroker(url=settings.rabbitmq_url)
signal_processor = ScipySignalProcessor()
model_service = MockModelService() if settings.use_mock_model else GANVGTLNetService()
# Load model
logger.info("Loading model (%s)…", model_service.model_version)
await model_service.load_model()
logger.info("Model ready.")
# Connect broker
await broker.connect()
logger.info("Connected to RabbitMQ.")
# ── Message handler ───────────────────────────────────────────────────────
session_factory = get_session_factory()
async def handle_message(payload: dict[str, Any]) -> None:
"""
Invoked for each message dequeued from PPG_QUEUE_NAME.
Creates a new DB session per message (request-scoped isolation).
"""
signal_id = payload.get("id", "<unknown>")
logger.info("Received message for signal_id=%s", signal_id)
async with session_factory() as session:
ppg_repo = SQLAlchemyPPGRepository(session)
prediction_repo = SQLAlchemyPredictionRepository(session)
use_case = ProcessAndPredictUseCase(
ppg_repo=ppg_repo,
prediction_repo=prediction_repo,
signal_processor=signal_processor,
model_service=model_service,
)
try:
prediction = await use_case.execute(payload)
await session.commit()
logger.info(
"Prediction stored: id=%s SBP=%.1f DBP=%.1f",
prediction.id,
prediction.predicted_sbp,
prediction.predicted_dbp,
)
except Exception as exc:
await session.rollback()
logger.error(
"Failed to process signal_id=%s: %s", signal_id, exc, exc_info=True
)
raise # re-raise so the broker nacks the message
# ── Start consuming ───────────────────────────────────────────────────────
logger.info("Listening on queue '%s'… (Ctrl+C to stop)", PPG_QUEUE_NAME)
try:
await broker.consume(queue_name=PPG_QUEUE_NAME, handler=handle_message)
finally:
await broker.disconnect()
await dispose_engine()
logger.info("Consumer shut down cleanly.")
def main() -> None:
"""CLI entry point."""
loop = asyncio.get_event_loop()
# Handle Ctrl+C gracefully
def _handle_sigint(*_: Any) -> None:
logger.info("Interrupt received β€” shutting down consumer…")
for task in asyncio.all_tasks(loop):
task.cancel()
signal.signal(signal.SIGINT, _handle_sigint)
try:
loop.run_until_complete(run())
except asyncio.CancelledError:
pass
finally:
loop.close()
if __name__ == "__main__":
main()