| | """Server that will listen for GET and POST requests from the client.""" |
| |
|
| | import time |
| | from typing import List |
| | from fastapi import FastAPI, File, Form, UploadFile, HTTPException |
| | from fastapi.responses import JSONResponse, Response |
| | import asyncio |
| | from common import SERVER_TMP_PATH, SEIZURE_DETECTION_MODEL_PATH |
| | from client_server_interface import FHEServer |
| | import logging |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | FHE_SERVER = FHEServer(model_path=SEIZURE_DETECTION_MODEL_PATH) |
| |
|
| | def get_server_file_path(name, user_id): |
| | """Get the correct temporary file path for the server.""" |
| | return SERVER_TMP_PATH / f"{name}_seizure_detection_{user_id}" |
| |
|
| | |
| | app = FastAPI() |
| |
|
| | @app.get("/") |
| | def root(): |
| | return {"message": "Welcome to Your Seizure Detection FHE Server!"} |
| |
|
| | @app.post("/send_input") |
| | async def send_input( |
| | user_id: str = Form(), |
| | files: List[UploadFile] = File(), |
| | ): |
| | """Send the inputs to the server.""" |
| | try: |
| | encrypted_image_path = get_server_file_path("encrypted_image", user_id) |
| | evaluation_key_path = get_server_file_path("evaluation_key", user_id) |
| |
|
| | async with encrypted_image_path.open("wb") as encrypted_image, evaluation_key_path.open("wb") as evaluation_key: |
| | encrypted_image.write(await files[0].read()) |
| | evaluation_key.write(await files[1].read()) |
| | |
| | return JSONResponse(content={"message": "Input received successfully"}, status_code=200) |
| | except Exception as e: |
| | logger.error(f"Error in send_input: {str(e)}") |
| | raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") |
| |
|
| | @app.post("/run_fhe") |
| | async def run_fhe( |
| | user_id: str = Form(), |
| | ): |
| | """Execute seizure detection on the encrypted input image using FHE.""" |
| | try: |
| | encrypted_image_path = get_server_file_path("encrypted_image", user_id) |
| | evaluation_key_path = get_server_file_path("evaluation_key", user_id) |
| |
|
| | async with encrypted_image_path.open("rb") as encrypted_image_file, evaluation_key_path.open("rb") as evaluation_key_file: |
| | encrypted_image = await encrypted_image_file.read() |
| | evaluation_key = await evaluation_key_file.read() |
| |
|
| | async def run_fhe_task(): |
| | start = time.time() |
| | encrypted_output = FHE_SERVER.run(encrypted_image, evaluation_key) |
| | fhe_execution_time = round(time.time() - start, 2) |
| |
|
| | encrypted_output_path = get_server_file_path("encrypted_output", user_id) |
| | async with encrypted_output_path.open("wb") as encrypted_output_file: |
| | await encrypted_output_file.write(encrypted_output) |
| |
|
| | return fhe_execution_time |
| |
|
| | task = asyncio.create_task(run_fhe_task()) |
| | return JSONResponse(content={"message": "FHE execution started", "task_id": str(id(task))}) |
| | except Exception as e: |
| | logger.error(f"Error in run_fhe: {str(e)}") |
| | raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") |
| |
|
| | @app.get("/fhe_status/{task_id}") |
| | async def fhe_status(task_id: str): |
| | """Check the status of an FHE execution task.""" |
| | try: |
| | for task in asyncio.all_tasks(): |
| | if str(id(task)) == task_id: |
| | if task.done(): |
| | try: |
| | execution_time = await task |
| | return JSONResponse(content={"status": "completed", "execution_time": execution_time}) |
| | except Exception as e: |
| | logger.error(f"Error in task execution: {str(e)}") |
| | return JSONResponse(content={"status": "error", "message": str(e)}) |
| | else: |
| | return JSONResponse(content={"status": "running"}) |
| |
|
| | return JSONResponse(content={"status": "not_found"}) |
| | except Exception as e: |
| | logger.error(f"Error in fhe_status: {str(e)}") |
| | raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") |
| |
|
| | @app.post("/get_output") |
| | async def get_output( |
| | user_id: str = Form(), |
| | ): |
| | """Retrieve the encrypted output.""" |
| | try: |
| | encrypted_output_path = get_server_file_path("encrypted_output", user_id) |
| |
|
| | async with encrypted_output_path.open("rb") as encrypted_output_file: |
| | encrypted_output = await encrypted_output_file.read() |
| |
|
| | return Response(content=encrypted_output, media_type="application/octet-stream") |
| | except Exception as e: |
| | logger.error(f"Error in get_output: {str(e)}") |
| | raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") |
| |
|