| | """ |
| | WebSocket API for Monitoring Services |
| | |
| | This module provides WebSocket endpoints for real-time monitoring data |
| | including health checks, pool management, and scheduler status. |
| | """ |
| |
|
| | import asyncio |
| | from datetime import datetime |
| | from typing import Any, Dict |
| | from fastapi import APIRouter, WebSocket, WebSocketDisconnect |
| | import logging |
| |
|
| | from backend.services.ws_service_manager import ws_manager, ServiceType |
| | from monitoring.health_checker import HealthChecker |
| | from monitoring.source_pool_manager import SourcePoolManager |
| | from monitoring.scheduler import TaskScheduler |
| | from config import Config |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | router = APIRouter() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class MonitoringStreamers: |
| | """Handles data streaming for all monitoring services""" |
| |
|
| | def __init__(self): |
| | self.config = Config() |
| | self.health_checker = HealthChecker() |
| | try: |
| | self.pool_manager = SourcePoolManager() |
| | except: |
| | self.pool_manager = None |
| | logger.warning("SourcePoolManager not available") |
| |
|
| | try: |
| | self.scheduler = TaskScheduler() |
| | except: |
| | self.scheduler = None |
| | logger.warning("TaskScheduler not available") |
| |
|
| | |
| | |
| | |
| |
|
| | async def stream_health_status(self): |
| | """Stream health check status for all providers""" |
| | try: |
| | health_data = await self.health_checker.check_all_providers() |
| | if health_data: |
| | return { |
| | "overall_health": health_data.get("overall_health", "unknown"), |
| | "healthy_count": health_data.get("healthy_count", 0), |
| | "unhealthy_count": health_data.get("unhealthy_count", 0), |
| | "total_providers": health_data.get("total_providers", 0), |
| | "providers": health_data.get("providers", {}), |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming health status: {e}") |
| | return None |
| |
|
| | async def stream_provider_health(self): |
| | """Stream individual provider health changes""" |
| | try: |
| | health_data = await self.health_checker.check_all_providers() |
| | if health_data and "providers" in health_data: |
| | |
| | issues = { |
| | name: status |
| | for name, status in health_data["providers"].items() |
| | if status.get("status") != "healthy" |
| | } |
| |
|
| | if issues: |
| | return { |
| | "providers_with_issues": issues, |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming provider health: {e}") |
| | return None |
| |
|
| | async def stream_health_alerts(self): |
| | """Stream health alerts for critical issues""" |
| | try: |
| | health_data = await self.health_checker.check_all_providers() |
| | if health_data: |
| | critical_issues = [] |
| |
|
| | for name, status in health_data.get("providers", {}).items(): |
| | if status.get("status") == "critical": |
| | critical_issues.append({ |
| | "provider": name, |
| | "status": status, |
| | "alert_level": "critical" |
| | }) |
| | elif status.get("status") == "unhealthy": |
| | critical_issues.append({ |
| | "provider": name, |
| | "status": status, |
| | "alert_level": "warning" |
| | }) |
| |
|
| | if critical_issues: |
| | return { |
| | "alerts": critical_issues, |
| | "total_alerts": len(critical_issues), |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming health alerts: {e}") |
| | return None |
| |
|
| | |
| | |
| | |
| |
|
| | async def stream_pool_status(self): |
| | """Stream source pool management status""" |
| | if not self.pool_manager: |
| | return None |
| |
|
| | try: |
| | pool_data = self.pool_manager.get_status() |
| | if pool_data: |
| | return { |
| | "pools": pool_data.get("pools", {}), |
| | "active_sources": pool_data.get("active_sources", []), |
| | "inactive_sources": pool_data.get("inactive_sources", []), |
| | "failover_count": pool_data.get("failover_count", 0), |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming pool status: {e}") |
| | return None |
| |
|
| | async def stream_failover_events(self): |
| | """Stream failover events""" |
| | if not self.pool_manager: |
| | return None |
| |
|
| | try: |
| | events = self.pool_manager.get_recent_failovers() |
| | if events: |
| | return { |
| | "failover_events": events, |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming failover events: {e}") |
| | return None |
| |
|
| | async def stream_source_health(self): |
| | """Stream individual source health in pools""" |
| | if not self.pool_manager: |
| | return None |
| |
|
| | try: |
| | health_data = self.pool_manager.get_source_health() |
| | if health_data: |
| | return { |
| | "source_health": health_data, |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming source health: {e}") |
| | return None |
| |
|
| | |
| | |
| | |
| |
|
| | async def stream_scheduler_status(self): |
| | """Stream scheduler status""" |
| | if not self.scheduler: |
| | return None |
| |
|
| | try: |
| | status_data = self.scheduler.get_status() |
| | if status_data: |
| | return { |
| | "running": status_data.get("running", False), |
| | "total_jobs": status_data.get("total_jobs", 0), |
| | "active_jobs": status_data.get("active_jobs", 0), |
| | "jobs": status_data.get("jobs", []), |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming scheduler status: {e}") |
| | return None |
| |
|
| | async def stream_job_executions(self): |
| | """Stream job execution events""" |
| | if not self.scheduler: |
| | return None |
| |
|
| | try: |
| | executions = self.scheduler.get_recent_executions() |
| | if executions: |
| | return { |
| | "executions": executions, |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming job executions: {e}") |
| | return None |
| |
|
| | async def stream_job_failures(self): |
| | """Stream job failures""" |
| | if not self.scheduler: |
| | return None |
| |
|
| | try: |
| | failures = self.scheduler.get_recent_failures() |
| | if failures: |
| | return { |
| | "failures": failures, |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming job failures: {e}") |
| | return None |
| |
|
| |
|
| | |
| | monitoring_streamers = MonitoringStreamers() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | async def start_monitoring_streams(): |
| | """Start all monitoring stream tasks""" |
| | logger.info("Starting monitoring WebSocket streams") |
| |
|
| | tasks = [ |
| | |
| | asyncio.create_task(ws_manager.start_service_stream( |
| | ServiceType.HEALTH_CHECKER, |
| | monitoring_streamers.stream_health_status, |
| | interval=30.0 |
| | )), |
| |
|
| | |
| | asyncio.create_task(ws_manager.start_service_stream( |
| | ServiceType.POOL_MANAGER, |
| | monitoring_streamers.stream_pool_status, |
| | interval=20.0 |
| | )), |
| |
|
| | |
| | asyncio.create_task(ws_manager.start_service_stream( |
| | ServiceType.SCHEDULER, |
| | monitoring_streamers.stream_scheduler_status, |
| | interval=15.0 |
| | )), |
| | ] |
| |
|
| | await asyncio.gather(*tasks, return_exceptions=True) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @router.websocket("/ws/monitoring") |
| | async def websocket_monitoring_endpoint(websocket: WebSocket): |
| | """ |
| | Unified WebSocket endpoint for all monitoring services |
| | |
| | Connection URL: ws://host:port/ws/monitoring |
| | |
| | After connecting, send subscription messages: |
| | { |
| | "action": "subscribe", |
| | "service": "health_checker" | "pool_manager" | "scheduler" | "all" |
| | } |
| | |
| | To unsubscribe: |
| | { |
| | "action": "unsubscribe", |
| | "service": "service_name" |
| | } |
| | """ |
| | connection = await ws_manager.connect(websocket) |
| |
|
| | try: |
| | while True: |
| | data = await websocket.receive_json() |
| | await ws_manager.handle_client_message(connection, data) |
| |
|
| | except WebSocketDisconnect: |
| | logger.info(f"Monitoring client disconnected: {connection.client_id}") |
| | except Exception as e: |
| | logger.error(f"Monitoring WebSocket error: {e}") |
| | finally: |
| | await ws_manager.disconnect(connection.client_id) |
| |
|
| |
|
| | @router.websocket("/ws/health") |
| | async def websocket_health(websocket: WebSocket): |
| | """ |
| | Dedicated WebSocket endpoint for health monitoring |
| | |
| | Auto-subscribes to health_checker service |
| | """ |
| | connection = await ws_manager.connect(websocket) |
| | connection.subscribe(ServiceType.HEALTH_CHECKER) |
| |
|
| | try: |
| | while True: |
| | data = await websocket.receive_json() |
| | await ws_manager.handle_client_message(connection, data) |
| | except WebSocketDisconnect: |
| | logger.info(f"Health monitoring client disconnected: {connection.client_id}") |
| | except Exception as e: |
| | logger.error(f"Health monitoring WebSocket error: {e}") |
| | finally: |
| | await ws_manager.disconnect(connection.client_id) |
| |
|
| |
|
| | @router.websocket("/ws/pool_status") |
| | async def websocket_pool_status(websocket: WebSocket): |
| | """ |
| | Dedicated WebSocket endpoint for pool manager status |
| | |
| | Auto-subscribes to pool_manager service |
| | """ |
| | connection = await ws_manager.connect(websocket) |
| | connection.subscribe(ServiceType.POOL_MANAGER) |
| |
|
| | try: |
| | while True: |
| | data = await websocket.receive_json() |
| | await ws_manager.handle_client_message(connection, data) |
| | except WebSocketDisconnect: |
| | logger.info(f"Pool status client disconnected: {connection.client_id}") |
| | except Exception as e: |
| | logger.error(f"Pool status WebSocket error: {e}") |
| | finally: |
| | await ws_manager.disconnect(connection.client_id) |
| |
|
| |
|
| | @router.websocket("/ws/scheduler_status") |
| | async def websocket_scheduler_status(websocket: WebSocket): |
| | """ |
| | Dedicated WebSocket endpoint for scheduler status |
| | |
| | Auto-subscribes to scheduler service |
| | """ |
| | connection = await ws_manager.connect(websocket) |
| | connection.subscribe(ServiceType.SCHEDULER) |
| |
|
| | try: |
| | while True: |
| | data = await websocket.receive_json() |
| | await ws_manager.handle_client_message(connection, data) |
| | except WebSocketDisconnect: |
| | logger.info(f"Scheduler status client disconnected: {connection.client_id}") |
| | except Exception as e: |
| | logger.error(f"Scheduler status WebSocket error: {e}") |
| | finally: |
| | await ws_manager.disconnect(connection.client_id) |
| |
|