#!/usr/bin/env python3 """ Persistent OpenAI-compatible API server for Qualcomm Genie on NPU. Uses Python ctypes to call libGenie.so C API directly, keeping the model loaded in memory between requests. This eliminates the ~3s model reload overhead that occurs when spawning genie-t2t-run per request. Architecture: - At startup: GenieDialogConfig_createFromJson + GenieDialog_create (loads model once) - Per request: GenieDialog_reset + GenieDialog_query (streaming via callback) - At shutdown: GenieDialog_free + GenieDialogConfig_free No external dependencies — uses only Python stdlib + ctypes. """ import ctypes import json import os import queue import signal import sys import time import traceback import uuid from http.server import HTTPServer, BaseHTTPRequestHandler from socketserver import ThreadingMixIn from threading import Lock, Thread, Event class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): """HTTPServer that handles each request in a new thread. This prevents Open WebUI's concurrent requests (e.g. /v1/models polling while a streaming response is in progress) from blocking the server. The Genie inference itself is still serialized via _inference_lock. """ daemon_threads = True # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- MODEL_DIR = os.environ.get("MODEL_DIR", "/tmp/genie_bundle") MODEL_NAME = os.environ.get("MODEL_NAME", "llama-3.2-3b-instruct-npu") CONFIG_FILE = os.environ.get("GENIE_CONFIG", os.path.join(MODEL_DIR, "genie_config.json")) SERVER_PORT = int(os.environ.get("PORT", "8000")) LIB_GENIE_PATH = os.environ.get("LIB_GENIE_PATH", os.path.join(MODEL_DIR, "libGenie.so")) # Library search paths GENIE_LIB_PATH = os.environ.get("LD_LIBRARY_PATH", f"{MODEL_DIR}:/usr/lib") ADSP_LIB_PATH = os.environ.get("ADSP_LIBRARY_PATH", "/usr/lib/dsp/cdsp;/usr/lib/dsp/cdsp1") # --------------------------------------------------------------------------- # Genie C API Constants (from GenieCommon.h / GenieDialog.h) # --------------------------------------------------------------------------- GENIE_STATUS_SUCCESS = 0 GENIE_STATUS_WARNING_ABORTED = 1 GENIE_STATUS_WARNING_CONTEXT_EXCEEDED = 4 # Sentence codes GENIE_DIALOG_SENTENCE_COMPLETE = 0 GENIE_DIALOG_SENTENCE_BEGIN = 1 GENIE_DIALOG_SENTENCE_CONTINUE = 2 GENIE_DIALOG_SENTENCE_END = 3 GENIE_DIALOG_SENTENCE_ABORT = 4 # Actions GENIE_DIALOG_ACTION_ABORT = 0x01 # --------------------------------------------------------------------------- # Genie C API Callback Type # --------------------------------------------------------------------------- # typedef void (*GenieDialog_QueryCallback_t)( # const char* response, # const GenieDialog_SentenceCode_t sentenceCode, # const void* userData); QUERY_CALLBACK_TYPE = ctypes.CFUNCTYPE( None, # return void ctypes.c_char_p, # const char* response ctypes.c_int, # GenieDialog_SentenceCode_t sentenceCode ctypes.c_void_p, # const void* userData ) # --------------------------------------------------------------------------- # Genie Library Wrapper # --------------------------------------------------------------------------- class GenieLibrary: """Wrapper around libGenie.so with ctypes bindings.""" def __init__(self, lib_path): # Set environment before loading os.environ["LD_LIBRARY_PATH"] = GENIE_LIB_PATH os.environ["ADSP_LIBRARY_PATH"] = ADSP_LIB_PATH print(f"[GENIE] Loading libGenie.so from {lib_path}") self.lib = ctypes.CDLL(lib_path) # Set up function signatures self._setup_signatures() # Get API version major = self.lib.Genie_getApiMajorVersion() minor = self.lib.Genie_getApiMinorVersion() patch = self.lib.Genie_getApiPatchVersion() print(f"[GENIE] API version: {major}.{minor}.{patch}") def _setup_signatures(self): """Define argument and return types for all functions.""" lib = self.lib # GenieDialogConfig_createFromJson(const char* str, GenieDialogConfig_Handle_t* configHandle) lib.GenieDialogConfig_createFromJson.argtypes = [ctypes.c_char_p, ctypes.POINTER(ctypes.c_void_p)] lib.GenieDialogConfig_createFromJson.restype = ctypes.c_int32 # GenieDialogConfig_free(GenieDialogConfig_Handle_t configHandle) lib.GenieDialogConfig_free.argtypes = [ctypes.c_void_p] lib.GenieDialogConfig_free.restype = ctypes.c_int32 # GenieDialog_create(GenieDialogConfig_Handle_t configHandle, GenieDialog_Handle_t* dialogHandle) lib.GenieDialog_create.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p)] lib.GenieDialog_create.restype = ctypes.c_int32 # GenieDialog_query(GenieDialog_Handle_t dialogHandle, const char* queryStr, # GenieDialog_SentenceCode_t sentenceCode, # GenieDialog_QueryCallback_t callback, const void* userData) lib.GenieDialog_query.argtypes = [ ctypes.c_void_p, # dialogHandle ctypes.c_char_p, # queryStr ctypes.c_int, # sentenceCode QUERY_CALLBACK_TYPE, # callback ctypes.c_void_p, # userData ] lib.GenieDialog_query.restype = ctypes.c_int32 # GenieDialog_reset(GenieDialog_Handle_t dialogHandle) lib.GenieDialog_reset.argtypes = [ctypes.c_void_p] lib.GenieDialog_reset.restype = ctypes.c_int32 # GenieDialog_free(GenieDialog_Handle_t dialogHandle) lib.GenieDialog_free.argtypes = [ctypes.c_void_p] lib.GenieDialog_free.restype = ctypes.c_int32 # GenieDialog_signal(GenieDialog_Handle_t dialogHandle, GenieDialog_Action_t action) lib.GenieDialog_signal.argtypes = [ctypes.c_void_p, ctypes.c_int] lib.GenieDialog_signal.restype = ctypes.c_int32 # GenieDialog_save(GenieDialog_Handle_t dialogHandle, const char* path) lib.GenieDialog_save.argtypes = [ctypes.c_void_p, ctypes.c_char_p] lib.GenieDialog_save.restype = ctypes.c_int32 # GenieDialog_restore(GenieDialog_Handle_t dialogHandle, const char* path) lib.GenieDialog_restore.argtypes = [ctypes.c_void_p, ctypes.c_char_p] lib.GenieDialog_restore.restype = ctypes.c_int32 # GenieDialog_setStopSequence(GenieDialog_Handle_t dialogHandle, const char* newStopSequences) lib.GenieDialog_setStopSequence.argtypes = [ctypes.c_void_p, ctypes.c_char_p] lib.GenieDialog_setStopSequence.restype = ctypes.c_int32 # GenieSamplerConfig_createFromJson(const char* str, GenieSamplerConfig_Handle_t* configHandle) lib.GenieSamplerConfig_createFromJson.argtypes = [ctypes.c_char_p, ctypes.POINTER(ctypes.c_void_p)] lib.GenieSamplerConfig_createFromJson.restype = ctypes.c_int32 # GenieSampler_applyConfig(GenieSampler_Handle_t samplerHandle, GenieSamplerConfig_Handle_t configHandle) lib.GenieSampler_applyConfig.argtypes = [ctypes.c_void_p, ctypes.c_void_p] lib.GenieSampler_applyConfig.restype = ctypes.c_int32 # GenieSamplerConfig_free(GenieSamplerConfig_Handle_t configHandle) lib.GenieSamplerConfig_free.argtypes = [ctypes.c_void_p] lib.GenieSamplerConfig_free.restype = ctypes.c_int32 # GenieDialog_getSampler(GenieDialog_Handle_t dialogHandle, GenieSampler_Handle_t* samplerHandle) lib.GenieDialog_getSampler.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p)] lib.GenieDialog_getSampler.restype = ctypes.c_int32 def _check_status(status, func_name): """Check Genie API return status and raise on error.""" if status < 0: raise RuntimeError(f"[GENIE] {func_name} failed with status {status}") if status > 0: print(f"[GENIE] {func_name} returned warning status {status}", file=sys.stderr) return status # --------------------------------------------------------------------------- # Persistent Genie Dialog Manager # --------------------------------------------------------------------------- class GenieDialogManager: """Manages a persistent Genie dialog instance. Loads the model once at startup and handles queries by: 1. Resetting the dialog (clears KV cache, keeps model loaded) 2. Executing query with streaming callback """ def __init__(self, lib_path, config_path, model_dir): self._lib = GenieLibrary(lib_path) self._config_handle = ctypes.c_void_p() self._dialog_handle = ctypes.c_void_p() self._inference_lock = Lock() self._model_loaded = False self._model_dir = model_dir # Load model self._load_model(config_path) def _load_model(self, config_path): """Load model onto NPU (one-time operation).""" print(f"[GENIE] Loading config from {config_path}") t0 = time.time() # Read config JSON with open(config_path, "r") as f: config_str = f.read() # Create dialog config from JSON status = self._lib.lib.GenieDialogConfig_createFromJson( config_str.encode("utf-8"), ctypes.byref(self._config_handle), ) _check_status(status, "GenieDialogConfig_createFromJson") print(f"[GENIE] Config created in {time.time() - t0:.2f}s") # Create dialog (this loads model weights onto NPU — the expensive step) print(f"[GENIE] Creating dialog (loading model onto NPU)...") t1 = time.time() status = self._lib.lib.GenieDialog_create( self._config_handle, ctypes.byref(self._dialog_handle), ) _check_status(status, "GenieDialog_create") load_time = time.time() - t1 print(f"[GENIE] Model loaded onto NPU in {load_time:.2f}s") print(f"[GENIE] Total startup time: {time.time() - t0:.2f}s") self._model_loaded = True def query_blocking(self, prompt_text, max_tokens=2048): """Execute a blocking query and return the complete response text. Returns (response_text, token_count_estimate). """ if not self._model_loaded: raise RuntimeError("Model not loaded") collected_tokens = [] # Define the callback that collects tokens @QUERY_CALLBACK_TYPE def callback(response, sentence_code, user_data): if response: text = response.decode("utf-8", errors="replace") collected_tokens.append(text) with self._inference_lock: # Reset dialog KV cache for a fresh independent query status = self._lib.lib.GenieDialog_reset(self._dialog_handle) _check_status(status, "GenieDialog_reset") # Execute query prompt_bytes = prompt_text.encode("utf-8") status = self._lib.lib.GenieDialog_query( self._dialog_handle, prompt_bytes, GENIE_DIALOG_SENTENCE_COMPLETE, callback, None, ) if status < 0: print(f"[GENIE] GenieDialog_query returned error {status}", file=sys.stderr) return "".join(collected_tokens), len(collected_tokens) def query_streaming(self, prompt_text, max_tokens=2048): """Execute a query and yield text chunks as they arrive via callback. Returns a generator that yields (text, sentence_code) tuples. """ if not self._model_loaded: raise RuntimeError("Model not loaded") token_queue = queue.Queue() done_event = Event() # Define the callback that pushes tokens to the queue @QUERY_CALLBACK_TYPE def callback(response, sentence_code, user_data): if response: text = response.decode("utf-8", errors="replace") token_queue.put((text, sentence_code)) if sentence_code in (GENIE_DIALOG_SENTENCE_END, GENIE_DIALOG_SENTENCE_COMPLETE, GENIE_DIALOG_SENTENCE_ABORT): done_event.set() def run_query(): try: with self._inference_lock: # Reset dialog KV cache status = self._lib.lib.GenieDialog_reset(self._dialog_handle) _check_status(status, "GenieDialog_reset") # Execute query — this blocks until generation completes prompt_bytes = prompt_text.encode("utf-8") status = self._lib.lib.GenieDialog_query( self._dialog_handle, prompt_bytes, GENIE_DIALOG_SENTENCE_COMPLETE, callback, None, ) if status < 0: print(f"[GENIE] GenieDialog_query error {status}", file=sys.stderr) except Exception as e: print(f"[GENIE] Query thread error: {e}", file=sys.stderr) finally: done_event.set() token_queue.put((None, GENIE_DIALOG_SENTENCE_END)) # Sentinel # Run query in a separate thread so we can yield tokens as they arrive query_thread = Thread(target=run_query, daemon=True) query_thread.start() # Yield tokens from the queue while True: try: text, sentence_code = token_queue.get(timeout=120) if text is None: break yield text, sentence_code except queue.Empty: print("[GENIE] Query timed out waiting for tokens", file=sys.stderr) break query_thread.join(timeout=5) def shutdown(self): """Free dialog and config handles.""" if self._model_loaded: print("[GENIE] Freeing dialog...") self._lib.lib.GenieDialog_free(self._dialog_handle) print("[GENIE] Freeing config...") self._lib.lib.GenieDialogConfig_free(self._config_handle) self._model_loaded = False print("[GENIE] Shutdown complete") # --------------------------------------------------------------------------- # Content extraction (handles OpenAI vision format from Open WebUI) # --------------------------------------------------------------------------- def _extract_text_from_content(content): """Extract text from OpenAI message content, stripping image data. Open WebUI sends images in OpenAI vision format: [{"type": "text", "text": "..."}, {"type": "image_url", "image_url": {"url": "data:image/jpeg;base64,..."}}] This model is text-only (Llama 3.1 8B Instruct) and cannot process images. We extract only the text parts and add a note about stripped images. """ if isinstance(content, str): return content if isinstance(content, list): text_parts = [] has_images = False for part in content: if isinstance(part, dict): if part.get("type") == "text": text_parts.append(part.get("text", "")) elif part.get("type") == "image_url": has_images = True result = " ".join(text_parts).strip() if has_images: if result: result += "\n\n[Note: An image was attached but this model is text-only and cannot process images.]" else: result = "[An image was sent but this model is text-only and cannot process images. Please send a text message instead.]" print(f"[GUARD] Stripped image data from message, text-only content: {result[:100]}...", flush=True) return result return str(content) # --------------------------------------------------------------------------- # Llama 3.2 Instruct prompt formatting # --------------------------------------------------------------------------- def format_llama32_prompt(messages): parts = ["<|begin_of_text|>"] for msg in messages: role = msg.get("role", "user") content = _extract_text_from_content(msg.get("content", "")) parts.append(f"<|start_header_id|>{role}<|end_header_id|>\n\n{content}<|eot_id|>") parts.append("<|start_header_id|>assistant<|end_header_id|>\n\n") return "".join(parts) # --------------------------------------------------------------------------- # HTTP Handler # --------------------------------------------------------------------------- class PersistentGenieHandler(BaseHTTPRequestHandler): """OpenAI-compatible HTTP handler using persistent Genie dialog.""" # Class-level reference to the dialog manager (set in main) dialog_manager = None _startup_time = None _request_count = 0 def log_message(self, format, *args): print(f"[HTTP] {self.client_address[0]} - {format % args}", file=sys.stderr) def _send_json(self, data, status=200): body = json.dumps(data).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(body) def do_OPTIONS(self): self.send_response(200) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization") self.send_header("Content-Length", "0") self.end_headers() def do_GET(self): if self.path == "/health": uptime = time.time() - (self._startup_time or time.time()) self._send_json({ "status": "ok", "model_loaded": self.dialog_manager._model_loaded if self.dialog_manager else False, "mode": "persistent", "uptime_seconds": round(uptime, 1), "requests_served": self._request_count, }) elif self.path in ("/", ""): self._send_json({ "message": "Genie LLM Server (Persistent Mode - OpenAI-compatible)", "model": MODEL_NAME, "mode": "persistent", "description": "Model kept loaded on NPU between requests — no reload overhead", }) elif self.path == "/v1/models": self._send_json({ "object": "list", "data": [{ "id": MODEL_NAME, "object": "model", "created": int(time.time()), "owned_by": "qualcomm-genie", "permission": [], "root": MODEL_NAME, "parent": None, }] }) elif self.path.startswith("/v1/models/"): self._send_json({ "id": MODEL_NAME, "object": "model", "created": int(time.time()), "owned_by": "qualcomm-genie", }) else: self._send_json({"error": "Not found"}, 404) def do_POST(self): if self.path != "/v1/chat/completions": self._send_json({"error": "Not found"}, 404) return content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length) try: request = json.loads(body) except json.JSONDecodeError as e: self._send_json({"error": f"Invalid JSON: {e}"}, 400) return messages = request.get("messages", []) stream = request.get("stream", False) max_tokens = request.get("max_tokens", 2048) prompt = format_llama32_prompt(messages) request_id = f"chatcmpl-{uuid.uuid4().hex[:12]}" created = int(time.time()) PersistentGenieHandler._request_count += 1 t0 = time.time() try: if stream: self._handle_streaming(prompt, request_id, created, max_tokens) else: self._handle_non_streaming(prompt, request_id, created, request, max_tokens) except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError): print(f"[HTTP] Client disconnected during request {self._request_count}", file=sys.stderr) except Exception as e: print(f"[HTTP] Unexpected error in request {self._request_count}: {e}", file=sys.stderr) traceback.print_exc() elapsed = time.time() - t0 print(f"[PERF] Request {self._request_count}: {elapsed:.2f}s ({'stream' if stream else 'batch'})", file=sys.stderr) def _handle_non_streaming(self, prompt, request_id, created, request, max_tokens): try: generated_text, token_count = self.dialog_manager.query_blocking(prompt, max_tokens) except Exception as e: self._send_json({"error": str(e)}, 500) return prompt_tokens = len(prompt) // 4 completion_tokens = max(token_count, len(generated_text) // 4) response = { "id": request_id, "object": "chat.completion", "created": created, "model": request.get("model", MODEL_NAME), "choices": [{ "index": 0, "message": {"role": "assistant", "content": generated_text}, "finish_reason": "stop", }], "usage": { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens, } } self._send_json(response) def _handle_streaming(self, prompt, request_id, created, max_tokens): self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self.send_header("Access-Control-Allow-Origin", "*") self.send_header("X-Accel-Buffering", "no") self.end_headers() def send_chunk(data): line = f"data: {json.dumps(data)}\n\n" self.wfile.write(line.encode("utf-8")) self.wfile.flush() try: # Initial role chunk send_chunk({ "id": request_id, "object": "chat.completion.chunk", "created": created, "model": MODEL_NAME, "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}], }) # Stream tokens from callback for token_text, sentence_code in self.dialog_manager.query_streaming(prompt, max_tokens): send_chunk({ "id": request_id, "object": "chat.completion.chunk", "created": created, "model": MODEL_NAME, "choices": [{"index": 0, "delta": {"content": token_text}, "finish_reason": None}], }) # Final chunk send_chunk({ "id": request_id, "object": "chat.completion.chunk", "created": created, "model": MODEL_NAME, "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], }) self.wfile.write(b"data: [DONE]\n\n") self.wfile.flush() except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError) as e: # Client disconnected mid-stream (e.g. user opened a new chat in Open WebUI) # This is normal and should NOT crash the server print(f"[HTTP] Client disconnected during stream (normal): {type(e).__name__}", file=sys.stderr) except Exception as e: print(f"[HTTP] Streaming error: {e}", file=sys.stderr) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): print("=" * 60) print("Genie LLM Server — PERSISTENT MODE") print("=" * 60) print(f" Model: {MODEL_NAME}") print(f" Config: {CONFIG_FILE}") print(f" Model dir: {MODEL_DIR}") print(f" libGenie.so: {LIB_GENIE_PATH}") print(f" Library path: {GENIE_LIB_PATH}") print(f" ADSP path: {ADSP_LIB_PATH}") print(f" Port: {SERVER_PORT}") print(f" Mode: PERSISTENT (model loaded once, kept resident)") print() # Verify FastRPC devices fastrpc_devs = [d for d in os.listdir("/dev") if d.startswith("fastrpc")] if fastrpc_devs: print(f" FastRPC devices: {', '.join(sorted(fastrpc_devs))}") else: print(" WARNING: No /dev/fastrpc-* devices found!", file=sys.stderr) print(" Running on host or ensure CDI/device-plugin injects devices", file=sys.stderr) # Verify libGenie.so exists if not os.path.isfile(LIB_GENIE_PATH): print(f" ERROR: libGenie.so not found: {LIB_GENIE_PATH}", file=sys.stderr) sys.exit(1) # Verify config exists if not os.path.isfile(CONFIG_FILE): print(f" ERROR: Config not found: {CONFIG_FILE}", file=sys.stderr) sys.exit(1) print() # Change to model directory so Genie resolves relative paths # (tokenizer.json, .bin files referenced in genie_config.json) print(f"[STARTUP] Changing working directory to {MODEL_DIR}") os.chdir(MODEL_DIR) print("[STARTUP] Loading model onto NPU (one-time cost)...") print() # Create the persistent dialog manager try: dialog_manager = GenieDialogManager(LIB_GENIE_PATH, CONFIG_FILE, MODEL_DIR) except Exception as e: print(f"[FATAL] Failed to initialize Genie: {e}", file=sys.stderr) traceback.print_exc() sys.exit(1) # Set up HTTP server PersistentGenieHandler.dialog_manager = dialog_manager PersistentGenieHandler._startup_time = time.time() ThreadingHTTPServer.allow_reuse_address = True server = ThreadingHTTPServer(("0.0.0.0", SERVER_PORT), PersistentGenieHandler) def shutdown_handler(sig, frame): print("\n[SHUTDOWN] Received signal, shutting down...") dialog_manager.shutdown() server.shutdown() sys.exit(0) signal.signal(signal.SIGINT, shutdown_handler) signal.signal(signal.SIGTERM, shutdown_handler) print() print(f"[READY] Listening on http://0.0.0.0:{SERVER_PORT}") print(f"[READY] Model loaded and resident on NPU — no per-request reload!") print() server.serve_forever() if __name__ == "__main__": main()