Llama-3.2-3B-Instruct-QCS9075-HTP / server_persistent.py
Hariharasubramanian's picture
Upload Llama-3.2-3B-Instruct model for QCS9075 (HTP backend)
e148d8a verified
#!/usr/bin/env python3
"""
Persistent OpenAI-compatible API server for Qualcomm Genie on NPU.
Uses Python ctypes to call libGenie.so C API directly, keeping the model
loaded in memory between requests. This eliminates the ~3s model reload
overhead that occurs when spawning genie-t2t-run per request.
Architecture:
- At startup: GenieDialogConfig_createFromJson + GenieDialog_create (loads model once)
- Per request: GenieDialog_reset + GenieDialog_query (streaming via callback)
- At shutdown: GenieDialog_free + GenieDialogConfig_free
No external dependencies — uses only Python stdlib + ctypes.
"""
import ctypes
import json
import os
import queue
import signal
import sys
import time
import traceback
import uuid
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
from threading import Lock, Thread, Event
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
"""HTTPServer that handles each request in a new thread.
This prevents Open WebUI's concurrent requests (e.g. /v1/models polling
while a streaming response is in progress) from blocking the server.
The Genie inference itself is still serialized via _inference_lock.
"""
daemon_threads = True
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
MODEL_DIR = os.environ.get("MODEL_DIR", "/tmp/genie_bundle")
MODEL_NAME = os.environ.get("MODEL_NAME", "llama-3.2-3b-instruct-npu")
CONFIG_FILE = os.environ.get("GENIE_CONFIG", os.path.join(MODEL_DIR, "genie_config.json"))
SERVER_PORT = int(os.environ.get("PORT", "8000"))
LIB_GENIE_PATH = os.environ.get("LIB_GENIE_PATH", os.path.join(MODEL_DIR, "libGenie.so"))
# Library search paths
GENIE_LIB_PATH = os.environ.get("LD_LIBRARY_PATH", f"{MODEL_DIR}:/usr/lib")
ADSP_LIB_PATH = os.environ.get("ADSP_LIBRARY_PATH", "/usr/lib/dsp/cdsp;/usr/lib/dsp/cdsp1")
# ---------------------------------------------------------------------------
# Genie C API Constants (from GenieCommon.h / GenieDialog.h)
# ---------------------------------------------------------------------------
GENIE_STATUS_SUCCESS = 0
GENIE_STATUS_WARNING_ABORTED = 1
GENIE_STATUS_WARNING_CONTEXT_EXCEEDED = 4
# Sentence codes
GENIE_DIALOG_SENTENCE_COMPLETE = 0
GENIE_DIALOG_SENTENCE_BEGIN = 1
GENIE_DIALOG_SENTENCE_CONTINUE = 2
GENIE_DIALOG_SENTENCE_END = 3
GENIE_DIALOG_SENTENCE_ABORT = 4
# Actions
GENIE_DIALOG_ACTION_ABORT = 0x01
# ---------------------------------------------------------------------------
# Genie C API Callback Type
# ---------------------------------------------------------------------------
# typedef void (*GenieDialog_QueryCallback_t)(
# const char* response,
# const GenieDialog_SentenceCode_t sentenceCode,
# const void* userData);
QUERY_CALLBACK_TYPE = ctypes.CFUNCTYPE(
None, # return void
ctypes.c_char_p, # const char* response
ctypes.c_int, # GenieDialog_SentenceCode_t sentenceCode
ctypes.c_void_p, # const void* userData
)
# ---------------------------------------------------------------------------
# Genie Library Wrapper
# ---------------------------------------------------------------------------
class GenieLibrary:
"""Wrapper around libGenie.so with ctypes bindings."""
def __init__(self, lib_path):
# Set environment before loading
os.environ["LD_LIBRARY_PATH"] = GENIE_LIB_PATH
os.environ["ADSP_LIBRARY_PATH"] = ADSP_LIB_PATH
print(f"[GENIE] Loading libGenie.so from {lib_path}")
self.lib = ctypes.CDLL(lib_path)
# Set up function signatures
self._setup_signatures()
# Get API version
major = self.lib.Genie_getApiMajorVersion()
minor = self.lib.Genie_getApiMinorVersion()
patch = self.lib.Genie_getApiPatchVersion()
print(f"[GENIE] API version: {major}.{minor}.{patch}")
def _setup_signatures(self):
"""Define argument and return types for all functions."""
lib = self.lib
# GenieDialogConfig_createFromJson(const char* str, GenieDialogConfig_Handle_t* configHandle)
lib.GenieDialogConfig_createFromJson.argtypes = [ctypes.c_char_p, ctypes.POINTER(ctypes.c_void_p)]
lib.GenieDialogConfig_createFromJson.restype = ctypes.c_int32
# GenieDialogConfig_free(GenieDialogConfig_Handle_t configHandle)
lib.GenieDialogConfig_free.argtypes = [ctypes.c_void_p]
lib.GenieDialogConfig_free.restype = ctypes.c_int32
# GenieDialog_create(GenieDialogConfig_Handle_t configHandle, GenieDialog_Handle_t* dialogHandle)
lib.GenieDialog_create.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p)]
lib.GenieDialog_create.restype = ctypes.c_int32
# GenieDialog_query(GenieDialog_Handle_t dialogHandle, const char* queryStr,
# GenieDialog_SentenceCode_t sentenceCode,
# GenieDialog_QueryCallback_t callback, const void* userData)
lib.GenieDialog_query.argtypes = [
ctypes.c_void_p, # dialogHandle
ctypes.c_char_p, # queryStr
ctypes.c_int, # sentenceCode
QUERY_CALLBACK_TYPE, # callback
ctypes.c_void_p, # userData
]
lib.GenieDialog_query.restype = ctypes.c_int32
# GenieDialog_reset(GenieDialog_Handle_t dialogHandle)
lib.GenieDialog_reset.argtypes = [ctypes.c_void_p]
lib.GenieDialog_reset.restype = ctypes.c_int32
# GenieDialog_free(GenieDialog_Handle_t dialogHandle)
lib.GenieDialog_free.argtypes = [ctypes.c_void_p]
lib.GenieDialog_free.restype = ctypes.c_int32
# GenieDialog_signal(GenieDialog_Handle_t dialogHandle, GenieDialog_Action_t action)
lib.GenieDialog_signal.argtypes = [ctypes.c_void_p, ctypes.c_int]
lib.GenieDialog_signal.restype = ctypes.c_int32
# GenieDialog_save(GenieDialog_Handle_t dialogHandle, const char* path)
lib.GenieDialog_save.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
lib.GenieDialog_save.restype = ctypes.c_int32
# GenieDialog_restore(GenieDialog_Handle_t dialogHandle, const char* path)
lib.GenieDialog_restore.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
lib.GenieDialog_restore.restype = ctypes.c_int32
# GenieDialog_setStopSequence(GenieDialog_Handle_t dialogHandle, const char* newStopSequences)
lib.GenieDialog_setStopSequence.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
lib.GenieDialog_setStopSequence.restype = ctypes.c_int32
# GenieSamplerConfig_createFromJson(const char* str, GenieSamplerConfig_Handle_t* configHandle)
lib.GenieSamplerConfig_createFromJson.argtypes = [ctypes.c_char_p, ctypes.POINTER(ctypes.c_void_p)]
lib.GenieSamplerConfig_createFromJson.restype = ctypes.c_int32
# GenieSampler_applyConfig(GenieSampler_Handle_t samplerHandle, GenieSamplerConfig_Handle_t configHandle)
lib.GenieSampler_applyConfig.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
lib.GenieSampler_applyConfig.restype = ctypes.c_int32
# GenieSamplerConfig_free(GenieSamplerConfig_Handle_t configHandle)
lib.GenieSamplerConfig_free.argtypes = [ctypes.c_void_p]
lib.GenieSamplerConfig_free.restype = ctypes.c_int32
# GenieDialog_getSampler(GenieDialog_Handle_t dialogHandle, GenieSampler_Handle_t* samplerHandle)
lib.GenieDialog_getSampler.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p)]
lib.GenieDialog_getSampler.restype = ctypes.c_int32
def _check_status(status, func_name):
"""Check Genie API return status and raise on error."""
if status < 0:
raise RuntimeError(f"[GENIE] {func_name} failed with status {status}")
if status > 0:
print(f"[GENIE] {func_name} returned warning status {status}", file=sys.stderr)
return status
# ---------------------------------------------------------------------------
# Persistent Genie Dialog Manager
# ---------------------------------------------------------------------------
class GenieDialogManager:
"""Manages a persistent Genie dialog instance.
Loads the model once at startup and handles queries by:
1. Resetting the dialog (clears KV cache, keeps model loaded)
2. Executing query with streaming callback
"""
def __init__(self, lib_path, config_path, model_dir):
self._lib = GenieLibrary(lib_path)
self._config_handle = ctypes.c_void_p()
self._dialog_handle = ctypes.c_void_p()
self._inference_lock = Lock()
self._model_loaded = False
self._model_dir = model_dir
# Load model
self._load_model(config_path)
def _load_model(self, config_path):
"""Load model onto NPU (one-time operation)."""
print(f"[GENIE] Loading config from {config_path}")
t0 = time.time()
# Read config JSON
with open(config_path, "r") as f:
config_str = f.read()
# Create dialog config from JSON
status = self._lib.lib.GenieDialogConfig_createFromJson(
config_str.encode("utf-8"),
ctypes.byref(self._config_handle),
)
_check_status(status, "GenieDialogConfig_createFromJson")
print(f"[GENIE] Config created in {time.time() - t0:.2f}s")
# Create dialog (this loads model weights onto NPU — the expensive step)
print(f"[GENIE] Creating dialog (loading model onto NPU)...")
t1 = time.time()
status = self._lib.lib.GenieDialog_create(
self._config_handle,
ctypes.byref(self._dialog_handle),
)
_check_status(status, "GenieDialog_create")
load_time = time.time() - t1
print(f"[GENIE] Model loaded onto NPU in {load_time:.2f}s")
print(f"[GENIE] Total startup time: {time.time() - t0:.2f}s")
self._model_loaded = True
def query_blocking(self, prompt_text, max_tokens=2048):
"""Execute a blocking query and return the complete response text.
Returns (response_text, token_count_estimate).
"""
if not self._model_loaded:
raise RuntimeError("Model not loaded")
collected_tokens = []
# Define the callback that collects tokens
@QUERY_CALLBACK_TYPE
def callback(response, sentence_code, user_data):
if response:
text = response.decode("utf-8", errors="replace")
collected_tokens.append(text)
with self._inference_lock:
# Reset dialog KV cache for a fresh independent query
status = self._lib.lib.GenieDialog_reset(self._dialog_handle)
_check_status(status, "GenieDialog_reset")
# Execute query
prompt_bytes = prompt_text.encode("utf-8")
status = self._lib.lib.GenieDialog_query(
self._dialog_handle,
prompt_bytes,
GENIE_DIALOG_SENTENCE_COMPLETE,
callback,
None,
)
if status < 0:
print(f"[GENIE] GenieDialog_query returned error {status}", file=sys.stderr)
return "".join(collected_tokens), len(collected_tokens)
def query_streaming(self, prompt_text, max_tokens=2048):
"""Execute a query and yield text chunks as they arrive via callback.
Returns a generator that yields (text, sentence_code) tuples.
"""
if not self._model_loaded:
raise RuntimeError("Model not loaded")
token_queue = queue.Queue()
done_event = Event()
# Define the callback that pushes tokens to the queue
@QUERY_CALLBACK_TYPE
def callback(response, sentence_code, user_data):
if response:
text = response.decode("utf-8", errors="replace")
token_queue.put((text, sentence_code))
if sentence_code in (GENIE_DIALOG_SENTENCE_END,
GENIE_DIALOG_SENTENCE_COMPLETE,
GENIE_DIALOG_SENTENCE_ABORT):
done_event.set()
def run_query():
try:
with self._inference_lock:
# Reset dialog KV cache
status = self._lib.lib.GenieDialog_reset(self._dialog_handle)
_check_status(status, "GenieDialog_reset")
# Execute query — this blocks until generation completes
prompt_bytes = prompt_text.encode("utf-8")
status = self._lib.lib.GenieDialog_query(
self._dialog_handle,
prompt_bytes,
GENIE_DIALOG_SENTENCE_COMPLETE,
callback,
None,
)
if status < 0:
print(f"[GENIE] GenieDialog_query error {status}", file=sys.stderr)
except Exception as e:
print(f"[GENIE] Query thread error: {e}", file=sys.stderr)
finally:
done_event.set()
token_queue.put((None, GENIE_DIALOG_SENTENCE_END)) # Sentinel
# Run query in a separate thread so we can yield tokens as they arrive
query_thread = Thread(target=run_query, daemon=True)
query_thread.start()
# Yield tokens from the queue
while True:
try:
text, sentence_code = token_queue.get(timeout=120)
if text is None:
break
yield text, sentence_code
except queue.Empty:
print("[GENIE] Query timed out waiting for tokens", file=sys.stderr)
break
query_thread.join(timeout=5)
def shutdown(self):
"""Free dialog and config handles."""
if self._model_loaded:
print("[GENIE] Freeing dialog...")
self._lib.lib.GenieDialog_free(self._dialog_handle)
print("[GENIE] Freeing config...")
self._lib.lib.GenieDialogConfig_free(self._config_handle)
self._model_loaded = False
print("[GENIE] Shutdown complete")
# ---------------------------------------------------------------------------
# Content extraction (handles OpenAI vision format from Open WebUI)
# ---------------------------------------------------------------------------
def _extract_text_from_content(content):
"""Extract text from OpenAI message content, stripping image data.
Open WebUI sends images in OpenAI vision format:
[{"type": "text", "text": "..."}, {"type": "image_url", "image_url": {"url": "data:image/jpeg;base64,..."}}]
This model is text-only (Llama 3.1 8B Instruct) and cannot process images.
We extract only the text parts and add a note about stripped images.
"""
if isinstance(content, str):
return content
if isinstance(content, list):
text_parts = []
has_images = False
for part in content:
if isinstance(part, dict):
if part.get("type") == "text":
text_parts.append(part.get("text", ""))
elif part.get("type") == "image_url":
has_images = True
result = " ".join(text_parts).strip()
if has_images:
if result:
result += "\n\n[Note: An image was attached but this model is text-only and cannot process images.]"
else:
result = "[An image was sent but this model is text-only and cannot process images. Please send a text message instead.]"
print(f"[GUARD] Stripped image data from message, text-only content: {result[:100]}...", flush=True)
return result
return str(content)
# ---------------------------------------------------------------------------
# Llama 3.2 Instruct prompt formatting
# ---------------------------------------------------------------------------
def format_llama32_prompt(messages):
parts = ["<|begin_of_text|>"]
for msg in messages:
role = msg.get("role", "user")
content = _extract_text_from_content(msg.get("content", ""))
parts.append(f"<|start_header_id|>{role}<|end_header_id|>\n\n{content}<|eot_id|>")
parts.append("<|start_header_id|>assistant<|end_header_id|>\n\n")
return "".join(parts)
# ---------------------------------------------------------------------------
# HTTP Handler
# ---------------------------------------------------------------------------
class PersistentGenieHandler(BaseHTTPRequestHandler):
"""OpenAI-compatible HTTP handler using persistent Genie dialog."""
# Class-level reference to the dialog manager (set in main)
dialog_manager = None
_startup_time = None
_request_count = 0
def log_message(self, format, *args):
print(f"[HTTP] {self.client_address[0]} - {format % args}", file=sys.stderr)
def _send_json(self, data, status=200):
body = json.dumps(data).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(body)
def do_OPTIONS(self):
self.send_response(200)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization")
self.send_header("Content-Length", "0")
self.end_headers()
def do_GET(self):
if self.path == "/health":
uptime = time.time() - (self._startup_time or time.time())
self._send_json({
"status": "ok",
"model_loaded": self.dialog_manager._model_loaded if self.dialog_manager else False,
"mode": "persistent",
"uptime_seconds": round(uptime, 1),
"requests_served": self._request_count,
})
elif self.path in ("/", ""):
self._send_json({
"message": "Genie LLM Server (Persistent Mode - OpenAI-compatible)",
"model": MODEL_NAME,
"mode": "persistent",
"description": "Model kept loaded on NPU between requests — no reload overhead",
})
elif self.path == "/v1/models":
self._send_json({
"object": "list",
"data": [{
"id": MODEL_NAME,
"object": "model",
"created": int(time.time()),
"owned_by": "qualcomm-genie",
"permission": [],
"root": MODEL_NAME,
"parent": None,
}]
})
elif self.path.startswith("/v1/models/"):
self._send_json({
"id": MODEL_NAME,
"object": "model",
"created": int(time.time()),
"owned_by": "qualcomm-genie",
})
else:
self._send_json({"error": "Not found"}, 404)
def do_POST(self):
if self.path != "/v1/chat/completions":
self._send_json({"error": "Not found"}, 404)
return
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length)
try:
request = json.loads(body)
except json.JSONDecodeError as e:
self._send_json({"error": f"Invalid JSON: {e}"}, 400)
return
messages = request.get("messages", [])
stream = request.get("stream", False)
max_tokens = request.get("max_tokens", 2048)
prompt = format_llama32_prompt(messages)
request_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
created = int(time.time())
PersistentGenieHandler._request_count += 1
t0 = time.time()
try:
if stream:
self._handle_streaming(prompt, request_id, created, max_tokens)
else:
self._handle_non_streaming(prompt, request_id, created, request, max_tokens)
except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError):
print(f"[HTTP] Client disconnected during request {self._request_count}", file=sys.stderr)
except Exception as e:
print(f"[HTTP] Unexpected error in request {self._request_count}: {e}", file=sys.stderr)
traceback.print_exc()
elapsed = time.time() - t0
print(f"[PERF] Request {self._request_count}: {elapsed:.2f}s ({'stream' if stream else 'batch'})", file=sys.stderr)
def _handle_non_streaming(self, prompt, request_id, created, request, max_tokens):
try:
generated_text, token_count = self.dialog_manager.query_blocking(prompt, max_tokens)
except Exception as e:
self._send_json({"error": str(e)}, 500)
return
prompt_tokens = len(prompt) // 4
completion_tokens = max(token_count, len(generated_text) // 4)
response = {
"id": request_id,
"object": "chat.completion",
"created": created,
"model": request.get("model", MODEL_NAME),
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": generated_text},
"finish_reason": "stop",
}],
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
}
}
self._send_json(response)
def _handle_streaming(self, prompt, request_id, created, max_tokens):
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("X-Accel-Buffering", "no")
self.end_headers()
def send_chunk(data):
line = f"data: {json.dumps(data)}\n\n"
self.wfile.write(line.encode("utf-8"))
self.wfile.flush()
try:
# Initial role chunk
send_chunk({
"id": request_id,
"object": "chat.completion.chunk",
"created": created,
"model": MODEL_NAME,
"choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}],
})
# Stream tokens from callback
for token_text, sentence_code in self.dialog_manager.query_streaming(prompt, max_tokens):
send_chunk({
"id": request_id,
"object": "chat.completion.chunk",
"created": created,
"model": MODEL_NAME,
"choices": [{"index": 0, "delta": {"content": token_text}, "finish_reason": None}],
})
# Final chunk
send_chunk({
"id": request_id,
"object": "chat.completion.chunk",
"created": created,
"model": MODEL_NAME,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
})
self.wfile.write(b"data: [DONE]\n\n")
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError) as e:
# Client disconnected mid-stream (e.g. user opened a new chat in Open WebUI)
# This is normal and should NOT crash the server
print(f"[HTTP] Client disconnected during stream (normal): {type(e).__name__}", file=sys.stderr)
except Exception as e:
print(f"[HTTP] Streaming error: {e}", file=sys.stderr)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
print("=" * 60)
print("Genie LLM Server — PERSISTENT MODE")
print("=" * 60)
print(f" Model: {MODEL_NAME}")
print(f" Config: {CONFIG_FILE}")
print(f" Model dir: {MODEL_DIR}")
print(f" libGenie.so: {LIB_GENIE_PATH}")
print(f" Library path: {GENIE_LIB_PATH}")
print(f" ADSP path: {ADSP_LIB_PATH}")
print(f" Port: {SERVER_PORT}")
print(f" Mode: PERSISTENT (model loaded once, kept resident)")
print()
# Verify FastRPC devices
fastrpc_devs = [d for d in os.listdir("/dev") if d.startswith("fastrpc")]
if fastrpc_devs:
print(f" FastRPC devices: {', '.join(sorted(fastrpc_devs))}")
else:
print(" WARNING: No /dev/fastrpc-* devices found!", file=sys.stderr)
print(" Running on host or ensure CDI/device-plugin injects devices", file=sys.stderr)
# Verify libGenie.so exists
if not os.path.isfile(LIB_GENIE_PATH):
print(f" ERROR: libGenie.so not found: {LIB_GENIE_PATH}", file=sys.stderr)
sys.exit(1)
# Verify config exists
if not os.path.isfile(CONFIG_FILE):
print(f" ERROR: Config not found: {CONFIG_FILE}", file=sys.stderr)
sys.exit(1)
print()
# Change to model directory so Genie resolves relative paths
# (tokenizer.json, .bin files referenced in genie_config.json)
print(f"[STARTUP] Changing working directory to {MODEL_DIR}")
os.chdir(MODEL_DIR)
print("[STARTUP] Loading model onto NPU (one-time cost)...")
print()
# Create the persistent dialog manager
try:
dialog_manager = GenieDialogManager(LIB_GENIE_PATH, CONFIG_FILE, MODEL_DIR)
except Exception as e:
print(f"[FATAL] Failed to initialize Genie: {e}", file=sys.stderr)
traceback.print_exc()
sys.exit(1)
# Set up HTTP server
PersistentGenieHandler.dialog_manager = dialog_manager
PersistentGenieHandler._startup_time = time.time()
ThreadingHTTPServer.allow_reuse_address = True
server = ThreadingHTTPServer(("0.0.0.0", SERVER_PORT), PersistentGenieHandler)
def shutdown_handler(sig, frame):
print("\n[SHUTDOWN] Received signal, shutting down...")
dialog_manager.shutdown()
server.shutdown()
sys.exit(0)
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
print()
print(f"[READY] Listening on http://0.0.0.0:{SERVER_PORT}")
print(f"[READY] Model loaded and resident on NPU — no per-request reload!")
print()
server.serve_forever()
if __name__ == "__main__":
main()