| |
| """ |
| Persistent OpenAI-compatible API server for Qualcomm Genie on NPU. |
| |
| Uses Python ctypes to call libGenie.so C API directly, keeping the model |
| loaded in memory between requests. This eliminates the ~3s model reload |
| overhead that occurs when spawning genie-t2t-run per request. |
| |
| Architecture: |
| - At startup: GenieDialogConfig_createFromJson + GenieDialog_create (loads model once) |
| - Per request: GenieDialog_reset + GenieDialog_query (streaming via callback) |
| - At shutdown: GenieDialog_free + GenieDialogConfig_free |
| |
| No external dependencies — uses only Python stdlib + ctypes. |
| """ |
|
|
| import ctypes |
| import json |
| import os |
| import queue |
| import signal |
| import sys |
| import time |
| import traceback |
| import uuid |
| from http.server import HTTPServer, BaseHTTPRequestHandler |
| from socketserver import ThreadingMixIn |
| from threading import Lock, Thread, Event |
|
|
|
|
| class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): |
| """HTTPServer that handles each request in a new thread. |
| |
| This prevents Open WebUI's concurrent requests (e.g. /v1/models polling |
| while a streaming response is in progress) from blocking the server. |
| The Genie inference itself is still serialized via _inference_lock. |
| """ |
| daemon_threads = True |
|
|
| |
| |
| |
| MODEL_DIR = os.environ.get("MODEL_DIR", "/tmp/genie_bundle") |
| MODEL_NAME = os.environ.get("MODEL_NAME", "llama-3.2-3b-instruct-npu") |
| CONFIG_FILE = os.environ.get("GENIE_CONFIG", os.path.join(MODEL_DIR, "genie_config.json")) |
| SERVER_PORT = int(os.environ.get("PORT", "8000")) |
| LIB_GENIE_PATH = os.environ.get("LIB_GENIE_PATH", os.path.join(MODEL_DIR, "libGenie.so")) |
|
|
| |
| GENIE_LIB_PATH = os.environ.get("LD_LIBRARY_PATH", f"{MODEL_DIR}:/usr/lib") |
| ADSP_LIB_PATH = os.environ.get("ADSP_LIBRARY_PATH", "/usr/lib/dsp/cdsp;/usr/lib/dsp/cdsp1") |
|
|
| |
| |
| |
| GENIE_STATUS_SUCCESS = 0 |
| GENIE_STATUS_WARNING_ABORTED = 1 |
| GENIE_STATUS_WARNING_CONTEXT_EXCEEDED = 4 |
|
|
| |
| GENIE_DIALOG_SENTENCE_COMPLETE = 0 |
| GENIE_DIALOG_SENTENCE_BEGIN = 1 |
| GENIE_DIALOG_SENTENCE_CONTINUE = 2 |
| GENIE_DIALOG_SENTENCE_END = 3 |
| GENIE_DIALOG_SENTENCE_ABORT = 4 |
|
|
| |
| GENIE_DIALOG_ACTION_ABORT = 0x01 |
|
|
| |
| |
| |
| |
| |
| |
| |
| QUERY_CALLBACK_TYPE = ctypes.CFUNCTYPE( |
| None, |
| ctypes.c_char_p, |
| ctypes.c_int, |
| ctypes.c_void_p, |
| ) |
|
|
| |
| |
| |
| class GenieLibrary: |
| """Wrapper around libGenie.so with ctypes bindings.""" |
|
|
| def __init__(self, lib_path): |
| |
| os.environ["LD_LIBRARY_PATH"] = GENIE_LIB_PATH |
| os.environ["ADSP_LIBRARY_PATH"] = ADSP_LIB_PATH |
|
|
| print(f"[GENIE] Loading libGenie.so from {lib_path}") |
| self.lib = ctypes.CDLL(lib_path) |
|
|
| |
| self._setup_signatures() |
|
|
| |
| major = self.lib.Genie_getApiMajorVersion() |
| minor = self.lib.Genie_getApiMinorVersion() |
| patch = self.lib.Genie_getApiPatchVersion() |
| print(f"[GENIE] API version: {major}.{minor}.{patch}") |
|
|
| def _setup_signatures(self): |
| """Define argument and return types for all functions.""" |
| lib = self.lib |
|
|
| |
| lib.GenieDialogConfig_createFromJson.argtypes = [ctypes.c_char_p, ctypes.POINTER(ctypes.c_void_p)] |
| lib.GenieDialogConfig_createFromJson.restype = ctypes.c_int32 |
|
|
| |
| lib.GenieDialogConfig_free.argtypes = [ctypes.c_void_p] |
| lib.GenieDialogConfig_free.restype = ctypes.c_int32 |
|
|
| |
| lib.GenieDialog_create.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p)] |
| lib.GenieDialog_create.restype = ctypes.c_int32 |
|
|
| |
| |
| |
| lib.GenieDialog_query.argtypes = [ |
| ctypes.c_void_p, |
| ctypes.c_char_p, |
| ctypes.c_int, |
| QUERY_CALLBACK_TYPE, |
| ctypes.c_void_p, |
| ] |
| lib.GenieDialog_query.restype = ctypes.c_int32 |
|
|
| |
| lib.GenieDialog_reset.argtypes = [ctypes.c_void_p] |
| lib.GenieDialog_reset.restype = ctypes.c_int32 |
|
|
| |
| lib.GenieDialog_free.argtypes = [ctypes.c_void_p] |
| lib.GenieDialog_free.restype = ctypes.c_int32 |
|
|
| |
| lib.GenieDialog_signal.argtypes = [ctypes.c_void_p, ctypes.c_int] |
| lib.GenieDialog_signal.restype = ctypes.c_int32 |
|
|
| |
| lib.GenieDialog_save.argtypes = [ctypes.c_void_p, ctypes.c_char_p] |
| lib.GenieDialog_save.restype = ctypes.c_int32 |
|
|
| |
| lib.GenieDialog_restore.argtypes = [ctypes.c_void_p, ctypes.c_char_p] |
| lib.GenieDialog_restore.restype = ctypes.c_int32 |
|
|
| |
| lib.GenieDialog_setStopSequence.argtypes = [ctypes.c_void_p, ctypes.c_char_p] |
| lib.GenieDialog_setStopSequence.restype = ctypes.c_int32 |
|
|
| |
| lib.GenieSamplerConfig_createFromJson.argtypes = [ctypes.c_char_p, ctypes.POINTER(ctypes.c_void_p)] |
| lib.GenieSamplerConfig_createFromJson.restype = ctypes.c_int32 |
|
|
| |
| lib.GenieSampler_applyConfig.argtypes = [ctypes.c_void_p, ctypes.c_void_p] |
| lib.GenieSampler_applyConfig.restype = ctypes.c_int32 |
|
|
| |
| lib.GenieSamplerConfig_free.argtypes = [ctypes.c_void_p] |
| lib.GenieSamplerConfig_free.restype = ctypes.c_int32 |
|
|
| |
| lib.GenieDialog_getSampler.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p)] |
| lib.GenieDialog_getSampler.restype = ctypes.c_int32 |
|
|
|
|
| def _check_status(status, func_name): |
| """Check Genie API return status and raise on error.""" |
| if status < 0: |
| raise RuntimeError(f"[GENIE] {func_name} failed with status {status}") |
| if status > 0: |
| print(f"[GENIE] {func_name} returned warning status {status}", file=sys.stderr) |
| return status |
|
|
|
|
| |
| |
| |
| class GenieDialogManager: |
| """Manages a persistent Genie dialog instance. |
| |
| Loads the model once at startup and handles queries by: |
| 1. Resetting the dialog (clears KV cache, keeps model loaded) |
| 2. Executing query with streaming callback |
| """ |
|
|
| def __init__(self, lib_path, config_path, model_dir): |
| self._lib = GenieLibrary(lib_path) |
| self._config_handle = ctypes.c_void_p() |
| self._dialog_handle = ctypes.c_void_p() |
| self._inference_lock = Lock() |
| self._model_loaded = False |
| self._model_dir = model_dir |
|
|
| |
| self._load_model(config_path) |
|
|
| def _load_model(self, config_path): |
| """Load model onto NPU (one-time operation).""" |
| print(f"[GENIE] Loading config from {config_path}") |
| t0 = time.time() |
|
|
| |
| with open(config_path, "r") as f: |
| config_str = f.read() |
|
|
| |
| status = self._lib.lib.GenieDialogConfig_createFromJson( |
| config_str.encode("utf-8"), |
| ctypes.byref(self._config_handle), |
| ) |
| _check_status(status, "GenieDialogConfig_createFromJson") |
| print(f"[GENIE] Config created in {time.time() - t0:.2f}s") |
|
|
| |
| print(f"[GENIE] Creating dialog (loading model onto NPU)...") |
| t1 = time.time() |
| status = self._lib.lib.GenieDialog_create( |
| self._config_handle, |
| ctypes.byref(self._dialog_handle), |
| ) |
| _check_status(status, "GenieDialog_create") |
| load_time = time.time() - t1 |
| print(f"[GENIE] Model loaded onto NPU in {load_time:.2f}s") |
| print(f"[GENIE] Total startup time: {time.time() - t0:.2f}s") |
|
|
| self._model_loaded = True |
|
|
| def query_blocking(self, prompt_text, max_tokens=2048): |
| """Execute a blocking query and return the complete response text. |
| |
| Returns (response_text, token_count_estimate). |
| """ |
| if not self._model_loaded: |
| raise RuntimeError("Model not loaded") |
|
|
| collected_tokens = [] |
|
|
| |
| @QUERY_CALLBACK_TYPE |
| def callback(response, sentence_code, user_data): |
| if response: |
| text = response.decode("utf-8", errors="replace") |
| collected_tokens.append(text) |
|
|
| with self._inference_lock: |
| |
| status = self._lib.lib.GenieDialog_reset(self._dialog_handle) |
| _check_status(status, "GenieDialog_reset") |
|
|
| |
| prompt_bytes = prompt_text.encode("utf-8") |
| status = self._lib.lib.GenieDialog_query( |
| self._dialog_handle, |
| prompt_bytes, |
| GENIE_DIALOG_SENTENCE_COMPLETE, |
| callback, |
| None, |
| ) |
| if status < 0: |
| print(f"[GENIE] GenieDialog_query returned error {status}", file=sys.stderr) |
|
|
| return "".join(collected_tokens), len(collected_tokens) |
|
|
| def query_streaming(self, prompt_text, max_tokens=2048): |
| """Execute a query and yield text chunks as they arrive via callback. |
| |
| Returns a generator that yields (text, sentence_code) tuples. |
| """ |
| if not self._model_loaded: |
| raise RuntimeError("Model not loaded") |
|
|
| token_queue = queue.Queue() |
| done_event = Event() |
|
|
| |
| @QUERY_CALLBACK_TYPE |
| def callback(response, sentence_code, user_data): |
| if response: |
| text = response.decode("utf-8", errors="replace") |
| token_queue.put((text, sentence_code)) |
| if sentence_code in (GENIE_DIALOG_SENTENCE_END, |
| GENIE_DIALOG_SENTENCE_COMPLETE, |
| GENIE_DIALOG_SENTENCE_ABORT): |
| done_event.set() |
|
|
| def run_query(): |
| try: |
| with self._inference_lock: |
| |
| status = self._lib.lib.GenieDialog_reset(self._dialog_handle) |
| _check_status(status, "GenieDialog_reset") |
|
|
| |
| prompt_bytes = prompt_text.encode("utf-8") |
| status = self._lib.lib.GenieDialog_query( |
| self._dialog_handle, |
| prompt_bytes, |
| GENIE_DIALOG_SENTENCE_COMPLETE, |
| callback, |
| None, |
| ) |
| if status < 0: |
| print(f"[GENIE] GenieDialog_query error {status}", file=sys.stderr) |
| except Exception as e: |
| print(f"[GENIE] Query thread error: {e}", file=sys.stderr) |
| finally: |
| done_event.set() |
| token_queue.put((None, GENIE_DIALOG_SENTENCE_END)) |
|
|
| |
| query_thread = Thread(target=run_query, daemon=True) |
| query_thread.start() |
|
|
| |
| while True: |
| try: |
| text, sentence_code = token_queue.get(timeout=120) |
| if text is None: |
| break |
| yield text, sentence_code |
| except queue.Empty: |
| print("[GENIE] Query timed out waiting for tokens", file=sys.stderr) |
| break |
|
|
| query_thread.join(timeout=5) |
|
|
| def shutdown(self): |
| """Free dialog and config handles.""" |
| if self._model_loaded: |
| print("[GENIE] Freeing dialog...") |
| self._lib.lib.GenieDialog_free(self._dialog_handle) |
| print("[GENIE] Freeing config...") |
| self._lib.lib.GenieDialogConfig_free(self._config_handle) |
| self._model_loaded = False |
| print("[GENIE] Shutdown complete") |
|
|
|
|
| |
| |
| |
| def _extract_text_from_content(content): |
| """Extract text from OpenAI message content, stripping image data. |
| |
| Open WebUI sends images in OpenAI vision format: |
| [{"type": "text", "text": "..."}, {"type": "image_url", "image_url": {"url": "data:image/jpeg;base64,..."}}] |
| |
| This model is text-only (Llama 3.1 8B Instruct) and cannot process images. |
| We extract only the text parts and add a note about stripped images. |
| """ |
| if isinstance(content, str): |
| return content |
| if isinstance(content, list): |
| text_parts = [] |
| has_images = False |
| for part in content: |
| if isinstance(part, dict): |
| if part.get("type") == "text": |
| text_parts.append(part.get("text", "")) |
| elif part.get("type") == "image_url": |
| has_images = True |
| result = " ".join(text_parts).strip() |
| if has_images: |
| if result: |
| result += "\n\n[Note: An image was attached but this model is text-only and cannot process images.]" |
| else: |
| result = "[An image was sent but this model is text-only and cannot process images. Please send a text message instead.]" |
| print(f"[GUARD] Stripped image data from message, text-only content: {result[:100]}...", flush=True) |
| return result |
| return str(content) |
|
|
|
|
| |
| |
| |
| def format_llama32_prompt(messages): |
| parts = ["<|begin_of_text|>"] |
| for msg in messages: |
| role = msg.get("role", "user") |
| content = _extract_text_from_content(msg.get("content", "")) |
| parts.append(f"<|start_header_id|>{role}<|end_header_id|>\n\n{content}<|eot_id|>") |
| parts.append("<|start_header_id|>assistant<|end_header_id|>\n\n") |
| return "".join(parts) |
|
|
|
|
| |
| |
| |
| class PersistentGenieHandler(BaseHTTPRequestHandler): |
| """OpenAI-compatible HTTP handler using persistent Genie dialog.""" |
|
|
| |
| dialog_manager = None |
| _startup_time = None |
| _request_count = 0 |
|
|
| def log_message(self, format, *args): |
| print(f"[HTTP] {self.client_address[0]} - {format % args}", file=sys.stderr) |
|
|
| def _send_json(self, data, status=200): |
| body = json.dumps(data).encode("utf-8") |
| self.send_response(status) |
| self.send_header("Content-Type", "application/json") |
| self.send_header("Content-Length", str(len(body))) |
| self.send_header("Access-Control-Allow-Origin", "*") |
| self.end_headers() |
| self.wfile.write(body) |
|
|
| def do_OPTIONS(self): |
| self.send_response(200) |
| self.send_header("Access-Control-Allow-Origin", "*") |
| self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") |
| self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization") |
| self.send_header("Content-Length", "0") |
| self.end_headers() |
|
|
| def do_GET(self): |
| if self.path == "/health": |
| uptime = time.time() - (self._startup_time or time.time()) |
| self._send_json({ |
| "status": "ok", |
| "model_loaded": self.dialog_manager._model_loaded if self.dialog_manager else False, |
| "mode": "persistent", |
| "uptime_seconds": round(uptime, 1), |
| "requests_served": self._request_count, |
| }) |
| elif self.path in ("/", ""): |
| self._send_json({ |
| "message": "Genie LLM Server (Persistent Mode - OpenAI-compatible)", |
| "model": MODEL_NAME, |
| "mode": "persistent", |
| "description": "Model kept loaded on NPU between requests — no reload overhead", |
| }) |
| elif self.path == "/v1/models": |
| self._send_json({ |
| "object": "list", |
| "data": [{ |
| "id": MODEL_NAME, |
| "object": "model", |
| "created": int(time.time()), |
| "owned_by": "qualcomm-genie", |
| "permission": [], |
| "root": MODEL_NAME, |
| "parent": None, |
| }] |
| }) |
| elif self.path.startswith("/v1/models/"): |
| self._send_json({ |
| "id": MODEL_NAME, |
| "object": "model", |
| "created": int(time.time()), |
| "owned_by": "qualcomm-genie", |
| }) |
| else: |
| self._send_json({"error": "Not found"}, 404) |
|
|
| def do_POST(self): |
| if self.path != "/v1/chat/completions": |
| self._send_json({"error": "Not found"}, 404) |
| return |
|
|
| content_length = int(self.headers.get("Content-Length", 0)) |
| body = self.rfile.read(content_length) |
|
|
| try: |
| request = json.loads(body) |
| except json.JSONDecodeError as e: |
| self._send_json({"error": f"Invalid JSON: {e}"}, 400) |
| return |
|
|
| messages = request.get("messages", []) |
| stream = request.get("stream", False) |
| max_tokens = request.get("max_tokens", 2048) |
|
|
| prompt = format_llama32_prompt(messages) |
| request_id = f"chatcmpl-{uuid.uuid4().hex[:12]}" |
| created = int(time.time()) |
|
|
| PersistentGenieHandler._request_count += 1 |
| t0 = time.time() |
|
|
| try: |
| if stream: |
| self._handle_streaming(prompt, request_id, created, max_tokens) |
| else: |
| self._handle_non_streaming(prompt, request_id, created, request, max_tokens) |
| except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError): |
| print(f"[HTTP] Client disconnected during request {self._request_count}", file=sys.stderr) |
| except Exception as e: |
| print(f"[HTTP] Unexpected error in request {self._request_count}: {e}", file=sys.stderr) |
| traceback.print_exc() |
|
|
| elapsed = time.time() - t0 |
| print(f"[PERF] Request {self._request_count}: {elapsed:.2f}s ({'stream' if stream else 'batch'})", file=sys.stderr) |
|
|
| def _handle_non_streaming(self, prompt, request_id, created, request, max_tokens): |
| try: |
| generated_text, token_count = self.dialog_manager.query_blocking(prompt, max_tokens) |
| except Exception as e: |
| self._send_json({"error": str(e)}, 500) |
| return |
|
|
| prompt_tokens = len(prompt) // 4 |
| completion_tokens = max(token_count, len(generated_text) // 4) |
|
|
| response = { |
| "id": request_id, |
| "object": "chat.completion", |
| "created": created, |
| "model": request.get("model", MODEL_NAME), |
| "choices": [{ |
| "index": 0, |
| "message": {"role": "assistant", "content": generated_text}, |
| "finish_reason": "stop", |
| }], |
| "usage": { |
| "prompt_tokens": prompt_tokens, |
| "completion_tokens": completion_tokens, |
| "total_tokens": prompt_tokens + completion_tokens, |
| } |
| } |
| self._send_json(response) |
|
|
| def _handle_streaming(self, prompt, request_id, created, max_tokens): |
| self.send_response(200) |
| self.send_header("Content-Type", "text/event-stream") |
| self.send_header("Cache-Control", "no-cache") |
| self.send_header("Connection", "keep-alive") |
| self.send_header("Access-Control-Allow-Origin", "*") |
| self.send_header("X-Accel-Buffering", "no") |
| self.end_headers() |
|
|
| def send_chunk(data): |
| line = f"data: {json.dumps(data)}\n\n" |
| self.wfile.write(line.encode("utf-8")) |
| self.wfile.flush() |
|
|
| try: |
| |
| send_chunk({ |
| "id": request_id, |
| "object": "chat.completion.chunk", |
| "created": created, |
| "model": MODEL_NAME, |
| "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}], |
| }) |
|
|
| |
| for token_text, sentence_code in self.dialog_manager.query_streaming(prompt, max_tokens): |
| send_chunk({ |
| "id": request_id, |
| "object": "chat.completion.chunk", |
| "created": created, |
| "model": MODEL_NAME, |
| "choices": [{"index": 0, "delta": {"content": token_text}, "finish_reason": None}], |
| }) |
|
|
| |
| send_chunk({ |
| "id": request_id, |
| "object": "chat.completion.chunk", |
| "created": created, |
| "model": MODEL_NAME, |
| "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], |
| }) |
|
|
| self.wfile.write(b"data: [DONE]\n\n") |
| self.wfile.flush() |
|
|
| except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError) as e: |
| |
| |
| print(f"[HTTP] Client disconnected during stream (normal): {type(e).__name__}", file=sys.stderr) |
| except Exception as e: |
| print(f"[HTTP] Streaming error: {e}", file=sys.stderr) |
|
|
|
|
| |
| |
| |
| def main(): |
| print("=" * 60) |
| print("Genie LLM Server — PERSISTENT MODE") |
| print("=" * 60) |
| print(f" Model: {MODEL_NAME}") |
| print(f" Config: {CONFIG_FILE}") |
| print(f" Model dir: {MODEL_DIR}") |
| print(f" libGenie.so: {LIB_GENIE_PATH}") |
| print(f" Library path: {GENIE_LIB_PATH}") |
| print(f" ADSP path: {ADSP_LIB_PATH}") |
| print(f" Port: {SERVER_PORT}") |
| print(f" Mode: PERSISTENT (model loaded once, kept resident)") |
| print() |
|
|
| |
| fastrpc_devs = [d for d in os.listdir("/dev") if d.startswith("fastrpc")] |
| if fastrpc_devs: |
| print(f" FastRPC devices: {', '.join(sorted(fastrpc_devs))}") |
| else: |
| print(" WARNING: No /dev/fastrpc-* devices found!", file=sys.stderr) |
| print(" Running on host or ensure CDI/device-plugin injects devices", file=sys.stderr) |
|
|
| |
| if not os.path.isfile(LIB_GENIE_PATH): |
| print(f" ERROR: libGenie.so not found: {LIB_GENIE_PATH}", file=sys.stderr) |
| sys.exit(1) |
|
|
| |
| if not os.path.isfile(CONFIG_FILE): |
| print(f" ERROR: Config not found: {CONFIG_FILE}", file=sys.stderr) |
| sys.exit(1) |
|
|
| print() |
| |
| |
| print(f"[STARTUP] Changing working directory to {MODEL_DIR}") |
| os.chdir(MODEL_DIR) |
|
|
| print("[STARTUP] Loading model onto NPU (one-time cost)...") |
| print() |
|
|
| |
| try: |
| dialog_manager = GenieDialogManager(LIB_GENIE_PATH, CONFIG_FILE, MODEL_DIR) |
| except Exception as e: |
| print(f"[FATAL] Failed to initialize Genie: {e}", file=sys.stderr) |
| traceback.print_exc() |
| sys.exit(1) |
|
|
| |
| PersistentGenieHandler.dialog_manager = dialog_manager |
| PersistentGenieHandler._startup_time = time.time() |
|
|
| ThreadingHTTPServer.allow_reuse_address = True |
| server = ThreadingHTTPServer(("0.0.0.0", SERVER_PORT), PersistentGenieHandler) |
|
|
| def shutdown_handler(sig, frame): |
| print("\n[SHUTDOWN] Received signal, shutting down...") |
| dialog_manager.shutdown() |
| server.shutdown() |
| sys.exit(0) |
|
|
| signal.signal(signal.SIGINT, shutdown_handler) |
| signal.signal(signal.SIGTERM, shutdown_handler) |
|
|
| print() |
| print(f"[READY] Listening on http://0.0.0.0:{SERVER_PORT}") |
| print(f"[READY] Model loaded and resident on NPU — no per-request reload!") |
| print() |
|
|
| server.serve_forever() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|