KarlQuant commited on
Commit
a3eaedf
Β·
verified Β·
1 Parent(s): 0ec6d5e

Update websocket_hub.py

Browse files
Files changed (1) hide show
  1. websocket_hub.py +82 -1
websocket_hub.py CHANGED
@@ -462,6 +462,41 @@ _axrvi_rankings: List[dict] = []
462
  _axrvi_rankings_ts: float = 0.0
463
  _AXRVI_RANKINGS_TTL: float = 30.0 # seconds before falling back to snapshot scoring
464
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
465
 
466
  # ══════════════════════════════════════════════════════════════════════════════════════
467
  # SECTION 4 β€” FASTAPI APPLICATION
@@ -605,6 +640,49 @@ async def ws_subscriber_endpoint(websocket: WebSocket):
605
  await manager.unregister_subscriber(websocket)
606
 
607
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
608
  # ══════════════════════════════════════════════════════════════════════════════════════
609
  # SECTION 6 β€” REST API (READ-ONLY)
610
  # ══════════════════════════════════════════════════════════════════════════════════════
@@ -704,7 +782,7 @@ async def api_trades_closed(limit: int = 50):
704
  @app.get("/api/health")
705
  async def api_health():
706
  """Service health β€” includes live trade counts and log-file inventory."""
707
- state = _trade_parser.get_state()
708
  return JSONResponse({
709
  "service": "websocket_hub",
710
  "version": "v2.2-ranker-logs",
@@ -1128,6 +1206,9 @@ async def receive_axrvi_rankings(request: Request):
1128
  f"top={rankings[0].get('space_name','?')} score={rankings[0].get('score',0):.4f}"
1129
  if rankings else "[AXRVI Rankings] Received empty list"
1130
  )
 
 
 
1131
  return JSONResponse({"ok": True, "count": len(rankings), "ts": _axrvi_rankings_ts})
1132
 
1133
 
 
462
  _axrvi_rankings_ts: float = 0.0
463
  _AXRVI_RANKINGS_TTL: float = 30.0 # seconds before falling back to snapshot scoring
464
 
465
+ # ── Top-3 WebSocket client registry ───────────────────────────────────────────────────
466
+ # top3_client.py connects here and receives top3_rankings broadcasts whenever the
467
+ # Executo ranker POSTs new rankings via POST /api/axrvi/rankings.
468
+ _top3_clients: Set[WebSocket] = set()
469
+ _top3_clients_lock = asyncio.Lock()
470
+
471
+
472
+ async def _broadcast_top3_rankings(rankings: List[dict]) -> None:
473
+ """
474
+ Broadcast a top3_rankings message to all /ws/top3 subscribers.
475
+ Called immediately after /api/axrvi/rankings receives a fresh ranking list.
476
+ Dead connections are pruned automatically.
477
+ """
478
+ if not _top3_clients:
479
+ return
480
+ msg = json.dumps({
481
+ "type": "top3_rankings",
482
+ "rankings": rankings,
483
+ "total_assets": len(rankings),
484
+ "hub_timestamp": time.time(),
485
+ })
486
+ dead: list = []
487
+ async with _top3_clients_lock:
488
+ clients = list(_top3_clients)
489
+ for ws in clients:
490
+ try:
491
+ await ws.send_text(msg)
492
+ except Exception:
493
+ dead.append(ws)
494
+ if dead:
495
+ async with _top3_clients_lock:
496
+ for ws in dead:
497
+ _top3_clients.discard(ws)
498
+ logger.debug(f"[top3] Pruned {len(dead)} dead client(s)")
499
+
500
 
501
  # ══════════════════════════════════════════════════════════════════════════════════════
502
  # SECTION 4 β€” FASTAPI APPLICATION
 
640
  await manager.unregister_subscriber(websocket)
641
 
642
 
643
+ @app.websocket("/ws/top3")
644
+ async def ws_top3_endpoint(websocket: WebSocket):
645
+ """
646
+ /ws/top3 β€” consumed by top3_client.py (MT5 bridge).
647
+
648
+ Sends a top3_rankings message immediately on connect (replay of the latest
649
+ known ranking so the client does not have to wait for the next ranker cycle),
650
+ then keeps the socket open to receive subsequent broadcasts triggered by
651
+ POST /api/axrvi/rankings.
652
+
653
+ Message format:
654
+ {"type": "top3_rankings", "rankings": [...], "total_assets": N, "hub_timestamp": T}
655
+ """
656
+ await websocket.accept()
657
+ async with _top3_clients_lock:
658
+ _top3_clients.add(websocket)
659
+ logger.info(f"πŸ“ˆ top3 client connected (total={len(_top3_clients)})")
660
+
661
+ # ── Replay latest rankings immediately so client doesn't wait up to 5 s ───
662
+ if _axrvi_rankings:
663
+ try:
664
+ await websocket.send_text(json.dumps({
665
+ "type": "top3_rankings",
666
+ "rankings": _axrvi_rankings,
667
+ "total_assets": len(_axrvi_rankings),
668
+ "hub_timestamp": _axrvi_rankings_ts,
669
+ }))
670
+ except Exception:
671
+ pass
672
+
673
+ try:
674
+ while True:
675
+ await websocket.receive_text() # keep-alive β€” client sends nothing
676
+ except WebSocketDisconnect:
677
+ pass
678
+ except Exception as e:
679
+ logger.error(f"[top3] Client error: {e}")
680
+ finally:
681
+ async with _top3_clients_lock:
682
+ _top3_clients.discard(websocket)
683
+ logger.info(f"πŸ“‰ top3 client disconnected (remaining={len(_top3_clients)})")
684
+
685
+
686
  # ══════════════════════════════════════════════════════════════════════════════════════
687
  # SECTION 6 β€” REST API (READ-ONLY)
688
  # ══════════════════════════════════════════════════════════════════════════════════════
 
782
  @app.get("/api/health")
783
  async def api_health():
784
  """Service health β€” includes live trade counts and log-file inventory."""
785
+ state = _hub_trades.get_state()
786
  return JSONResponse({
787
  "service": "websocket_hub",
788
  "version": "v2.2-ranker-logs",
 
1206
  f"top={rankings[0].get('space_name','?')} score={rankings[0].get('score',0):.4f}"
1207
  if rankings else "[AXRVI Rankings] Received empty list"
1208
  )
1209
+ # Broadcast to all connected top3_client.py instances immediately
1210
+ if rankings:
1211
+ asyncio.create_task(_broadcast_top3_rankings(rankings))
1212
  return JSONResponse({"ok": True, "count": len(rankings), "ts": _axrvi_rankings_ts})
1213
 
1214