#!/usr/bin/env python3 """ WebSocket Test Client — Monitor what the hub is broadcasting No external dependencies beyond websockets (already installed) Usage: python3 test_websocket.py [--subscribe|--publish] [--space ASSET_NAME] """ import asyncio import json import sys import websockets from datetime import datetime async def test_subscribe(hub_url: str): """Listen to what the hub is broadcasting.""" print(f"[*] Connecting to hub subscriber at {hub_url}/ws/subscribe") try: async with websockets.connect(f"{hub_url}/ws/subscribe") as ws: print(f"[✓] Connected! Listening for metrics updates...\n") count = 0 while True: try: msg = await asyncio.wait_for(ws.recv(), timeout=10.0) count += 1 data = json.loads(msg) ts = datetime.now().strftime("%H:%M:%S") print(f"[{ts}] Message #{count}:") print(f" {json.dumps(data, indent=2)}\n") except asyncio.TimeoutError: print("[!] No messages received for 10 seconds...") print(" → Asset spaces may not be connected yet") sys.exit(1) except Exception as e: print(f"[✗] Connection failed: {e}") print(f" Make sure hub is running and accessible at {hub_url}") sys.exit(1) async def test_publish(hub_url: str, space_name: str): """Send a test metric to the hub.""" print(f"[*] Connecting to hub publisher for space: {space_name}") test_message = { "training": { "training_steps": 9999, "actor_loss": 0.123, "critic_loss": 0.456, "avn_loss": 0.789, "avn_accuracy": 0.95, }, "voting": { "dominant_signal": "BUY", "buy_count": 42, "sell_count": 18, } } try: uri = f"{hub_url}/ws/publish/{space_name}" print(f"[*] Connecting to {uri}") async with websockets.connect(uri) as ws: print(f"[✓] Connected! Sending test message...") await ws.send(json.dumps(test_message)) print(f"[✓] Sent:\n{json.dumps(test_message, indent=2)}") # Keep connection open for 5 seconds print(f"[*] Keeping connection open for 5 seconds...") await asyncio.sleep(5) print(f"[✓] Done!") except Exception as e: print(f"[✗] Error: {e}") sys.exit(1) async def main(): # Default hub URL (adjust if needed) hub_url = "ws://127.0.0.1:7860" if len(sys.argv) > 1: if "--subscribe" in sys.argv: print("=" * 60) print("QUASAR Hub WebSocket Monitor (Subscribe Mode)") print("=" * 60) await test_subscribe(hub_url) elif "--publish" in sys.argv: space_name = "TEST_ASSET" if "--space" in sys.argv: idx = sys.argv.index("--space") if idx + 1 < len(sys.argv): space_name = sys.argv[idx + 1] print("=" * 60) print(f"QUASAR Hub WebSocket Test (Publish Mode)") print("=" * 60) await test_publish(hub_url, space_name) else: print_usage() else: print_usage() def print_usage(): print(""" ╔════════════════════════════════════════════════════════════════╗ ║ QUASAR WebSocket Test Tool v1.0 ║ ╚════════════════════════════════════════════════════════════════╝ USAGE: # Monitor what hub is broadcasting (metrics from all spaces) python3 test_websocket.py --subscribe # Send a test metric to hub (publish as a space) python3 test_websocket.py --publish --space TEST_ASSET # Send test metrics with a different space name python3 test_websocket.py --publish --space V100_1h EXAMPLE WORKFLOW: Terminal 1 (Monitor hub): $ python3 test_websocket.py --subscribe [✓] Connected! Listening for metrics updates... Terminal 2 (Send test data): $ python3 test_websocket.py --publish --space V100_1h [✓] Connected! Sending test message... [✓] Sent: { "training": {...}, "voting": {...} } Terminal 1 (should see the message): [12:34:56] Message #1: { "space_name": "V100_1h", "training": {...}, "voting": {...} } TROUBLESHOOTING: "Connection refused" → Hub not running on port 7860 $ curl http://127.0.0.1:7860/api/health No messages on subscribe → Asset spaces not connected Check if asset spaces are running and sending data "Module not found: websockets" → Install it $ pip install websockets """) if __name__ == "__main__": asyncio.run(main())