| | """ |
| | WebSocket API for Integration Services |
| | |
| | This module provides WebSocket endpoints for integration services |
| | including HuggingFace AI models and persistence operations. |
| | """ |
| |
|
| | import asyncio |
| | from datetime import datetime |
| | from typing import Any, Dict |
| | from fastapi import APIRouter, WebSocket, WebSocketDisconnect |
| | import logging |
| |
|
| | from backend.services.ws_service_manager import ws_manager, ServiceType |
| | from backend.services.hf_registry import HFRegistry |
| | from backend.services.hf_client import HFClient |
| | from backend.services.persistence_service import PersistenceService |
| | from config import Config |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | router = APIRouter() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class IntegrationStreamers: |
| | """Handles data streaming for integration services""" |
| |
|
| | def __init__(self): |
| | self.config = Config() |
| | try: |
| | self.hf_registry = HFRegistry() |
| | except: |
| | self.hf_registry = None |
| | logger.warning("HFRegistry not available") |
| |
|
| | try: |
| | self.hf_client = HFClient() |
| | except: |
| | self.hf_client = None |
| | logger.warning("HFClient not available") |
| |
|
| | try: |
| | self.persistence_service = PersistenceService() |
| | except: |
| | self.persistence_service = None |
| | logger.warning("PersistenceService not available") |
| |
|
| | |
| | |
| | |
| |
|
| | async def stream_hf_registry_status(self): |
| | """Stream HuggingFace registry status""" |
| | if not self.hf_registry: |
| | return None |
| |
|
| | try: |
| | status = self.hf_registry.get_status() |
| | if status: |
| | return { |
| | "total_models": status.get("total_models", 0), |
| | "total_datasets": status.get("total_datasets", 0), |
| | "available_models": status.get("available_models", []), |
| | "available_datasets": status.get("available_datasets", []), |
| | "last_refresh": status.get("last_refresh"), |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming HF registry status: {e}") |
| | return None |
| |
|
| | async def stream_hf_model_usage(self): |
| | """Stream HuggingFace model usage statistics""" |
| | if not self.hf_client: |
| | return None |
| |
|
| | try: |
| | usage = self.hf_client.get_usage_stats() |
| | if usage: |
| | return { |
| | "total_requests": usage.get("total_requests", 0), |
| | "successful_requests": usage.get("successful_requests", 0), |
| | "failed_requests": usage.get("failed_requests", 0), |
| | "average_latency": usage.get("average_latency"), |
| | "model_usage": usage.get("model_usage", {}), |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming HF model usage: {e}") |
| | return None |
| |
|
| | async def stream_sentiment_results(self): |
| | """Stream real-time sentiment analysis results""" |
| | if not self.hf_client: |
| | return None |
| |
|
| | try: |
| | |
| | results = self.hf_client.get_recent_results() |
| | if results: |
| | return { |
| | "sentiment_results": results, |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming sentiment results: {e}") |
| | return None |
| |
|
| | async def stream_model_events(self): |
| | """Stream model loading and unloading events""" |
| | if not self.hf_registry: |
| | return None |
| |
|
| | try: |
| | events = self.hf_registry.get_recent_events() |
| | if events: |
| | return { |
| | "model_events": events, |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming model events: {e}") |
| | return None |
| |
|
| | |
| | |
| | |
| |
|
| | async def stream_persistence_status(self): |
| | """Stream persistence service status""" |
| | if not self.persistence_service: |
| | return None |
| |
|
| | try: |
| | status = self.persistence_service.get_status() |
| | if status: |
| | return { |
| | "storage_location": status.get("storage_location"), |
| | "total_records": status.get("total_records", 0), |
| | "storage_size": status.get("storage_size"), |
| | "last_save": status.get("last_save"), |
| | "active_writers": status.get("active_writers", 0), |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming persistence status: {e}") |
| | return None |
| |
|
| | async def stream_save_events(self): |
| | """Stream data save events""" |
| | if not self.persistence_service: |
| | return None |
| |
|
| | try: |
| | events = self.persistence_service.get_recent_saves() |
| | if events: |
| | return { |
| | "save_events": events, |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming save events: {e}") |
| | return None |
| |
|
| | async def stream_export_progress(self): |
| | """Stream export operation progress""" |
| | if not self.persistence_service: |
| | return None |
| |
|
| | try: |
| | progress = self.persistence_service.get_export_progress() |
| | if progress: |
| | return { |
| | "export_operations": progress, |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming export progress: {e}") |
| | return None |
| |
|
| | async def stream_backup_events(self): |
| | """Stream backup creation events""" |
| | if not self.persistence_service: |
| | return None |
| |
|
| | try: |
| | backups = self.persistence_service.get_recent_backups() |
| | if backups: |
| | return { |
| | "backup_events": backups, |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming backup events: {e}") |
| | return None |
| |
|
| |
|
| | |
| | integration_streamers = IntegrationStreamers() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | async def start_integration_streams(): |
| | """Start all integration stream tasks""" |
| | logger.info("Starting integration WebSocket streams") |
| |
|
| | tasks = [ |
| | |
| | asyncio.create_task(ws_manager.start_service_stream( |
| | ServiceType.HUGGINGFACE, |
| | integration_streamers.stream_hf_registry_status, |
| | interval=60.0 |
| | )), |
| |
|
| | |
| | asyncio.create_task(ws_manager.start_service_stream( |
| | ServiceType.PERSISTENCE, |
| | integration_streamers.stream_persistence_status, |
| | interval=30.0 |
| | )), |
| | ] |
| |
|
| | await asyncio.gather(*tasks, return_exceptions=True) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @router.websocket("/ws/integration") |
| | async def websocket_integration_endpoint(websocket: WebSocket): |
| | """ |
| | Unified WebSocket endpoint for all integration services |
| | |
| | Connection URL: ws://host:port/ws/integration |
| | |
| | After connecting, send subscription messages: |
| | { |
| | "action": "subscribe", |
| | "service": "huggingface" | "persistence" | "all" |
| | } |
| | |
| | To unsubscribe: |
| | { |
| | "action": "unsubscribe", |
| | "service": "service_name" |
| | } |
| | """ |
| | connection = await ws_manager.connect(websocket) |
| |
|
| | try: |
| | while True: |
| | data = await websocket.receive_json() |
| | await ws_manager.handle_client_message(connection, data) |
| |
|
| | except WebSocketDisconnect: |
| | logger.info(f"Integration client disconnected: {connection.client_id}") |
| | except Exception as e: |
| | logger.error(f"Integration WebSocket error: {e}") |
| | finally: |
| | await ws_manager.disconnect(connection.client_id) |
| |
|
| |
|
| | @router.websocket("/ws/huggingface") |
| | async def websocket_huggingface(websocket: WebSocket): |
| | """ |
| | Dedicated WebSocket endpoint for HuggingFace services |
| | |
| | Auto-subscribes to huggingface service |
| | """ |
| | connection = await ws_manager.connect(websocket) |
| | connection.subscribe(ServiceType.HUGGINGFACE) |
| |
|
| | try: |
| | while True: |
| | data = await websocket.receive_json() |
| | await ws_manager.handle_client_message(connection, data) |
| | except WebSocketDisconnect: |
| | logger.info(f"HuggingFace client disconnected: {connection.client_id}") |
| | except Exception as e: |
| | logger.error(f"HuggingFace WebSocket error: {e}") |
| | finally: |
| | await ws_manager.disconnect(connection.client_id) |
| |
|
| |
|
| | @router.websocket("/ws/persistence") |
| | async def websocket_persistence(websocket: WebSocket): |
| | """ |
| | Dedicated WebSocket endpoint for persistence service |
| | |
| | Auto-subscribes to persistence service |
| | """ |
| | connection = await ws_manager.connect(websocket) |
| | connection.subscribe(ServiceType.PERSISTENCE) |
| |
|
| | try: |
| | while True: |
| | data = await websocket.receive_json() |
| | await ws_manager.handle_client_message(connection, data) |
| | except WebSocketDisconnect: |
| | logger.info(f"Persistence client disconnected: {connection.client_id}") |
| | except Exception as e: |
| | logger.error(f"Persistence WebSocket error: {e}") |
| | finally: |
| | await ws_manager.disconnect(connection.client_id) |
| |
|
| |
|
| | @router.websocket("/ws/ai") |
| | async def websocket_ai(websocket: WebSocket): |
| | """ |
| | Dedicated WebSocket endpoint for AI/ML operations (alias for HuggingFace) |
| | |
| | Auto-subscribes to huggingface service |
| | """ |
| | connection = await ws_manager.connect(websocket) |
| | connection.subscribe(ServiceType.HUGGINGFACE) |
| |
|
| | try: |
| | while True: |
| | data = await websocket.receive_json() |
| | await ws_manager.handle_client_message(connection, data) |
| | except WebSocketDisconnect: |
| | logger.info(f"AI client disconnected: {connection.client_id}") |
| | except Exception as e: |
| | logger.error(f"AI WebSocket error: {e}") |
| | finally: |
| | await ws_manager.disconnect(connection.client_id) |
| |
|