| from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import HTMLResponse |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.templating import Jinja2Templates |
| import json |
| import uvicorn |
|
|
| app = FastAPI() |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| app.mount("/static", StaticFiles(directory="static"), name="static") |
|
|
| |
| templates = Jinja2Templates(directory="templates") |
|
|
| with open("data.json", "r", encoding="utf8") as file: |
| maps_data = json.load(file) |
|
|
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def read_index(request: Request): |
| return templates.TemplateResponse("index.html", {"request": request, "maps": maps_data}) |
|
|
|
|
| @app.get("/map/{normalized_name}", response_class=HTMLResponse) |
| async def get_map_by_normalized_name(request: Request, normalized_name: str): |
| map_entry = maps_data.get(normalized_name) |
| if map_entry: |
| filtered_maps = [map_entry] |
| else: |
| filtered_maps = [] |
| return templates.TemplateResponse("map.html", {"request": request, "maps": filtered_maps}) |
|
|
|
|
| class ConnectionManager: |
| def __init__(self): |
| self.groups: dict[str, set[WebSocket]] = {} |
| self.connections: dict[WebSocket, str] = {} |
|
|
| async def connect(self, websocket: WebSocket): |
| await websocket.accept() |
|
|
| def disconnect(self, websocket: WebSocket): |
| if websocket in self.connections: |
| group_name = self.connections[websocket] |
| self.remove_from_group(websocket, group_name) |
|
|
| async def join_group(self, websocket: WebSocket, group_name: str): |
| |
| if websocket in self.connections: |
| old_group = self.connections[websocket] |
| self.remove_from_group(websocket, old_group) |
| |
| |
| if group_name not in self.groups: |
| self.groups[group_name] = set() |
| self.groups[group_name].add(websocket) |
| self.connections[websocket] = group_name |
| print(f"Client joined group '{group_name}'") |
|
|
| def remove_from_group(self, websocket: WebSocket, group_name: str): |
| if group_name in self.groups and websocket in self.groups[group_name]: |
| self.groups[group_name].remove(websocket) |
| if len(self.groups[group_name]) == 0: |
| del self.groups[group_name] |
| print(f"Group '{group_name}' deleted (no members)") |
| if websocket in self.connections: |
| del self.connections[websocket] |
| print(f"Client removed from group '{group_name}'") |
|
|
| async def broadcast_to_group(self, message: str, group_name: str): |
| if group_name in self.groups: |
| for connection in self.groups[group_name]: |
| await connection.send_text(message) |
|
|
|
|
| manager = ConnectionManager() |
|
|
|
|
| @app.websocket("/ws") |
| async def websocket_endpoint(websocket: WebSocket): |
| await manager.connect(websocket) |
| try: |
| while True: |
| data = await websocket.receive_text() |
| try: |
| message = json.loads(data) |
| |
| if message.get('type') == 'join': |
| group_name = message.get('group') |
| if group_name: |
| await manager.join_group(websocket, group_name) |
| else: |
| error = json.dumps({ |
| "type": "error", |
| "message": "Missing group name in join request" |
| }) |
| await websocket.send_text(error) |
| |
| |
| else: |
| if websocket in manager.connections: |
| group_name = manager.connections[websocket] |
| await manager.broadcast_to_group(data, group_name) |
| else: |
| error = json.dumps({ |
| "type": "error", |
| "message": "Join a group before sending messages" |
| }) |
| await websocket.send_text(error) |
| |
| except json.JSONDecodeError: |
| |
| if websocket in manager.connections: |
| group_name = manager.connections[websocket] |
| await manager.broadcast_to_group(data, group_name) |
| |
| except WebSocketDisconnect: |
| manager.disconnect(websocket) |
|
|
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=8000) |