import os import time import threading import socket import subprocess import asyncio import json import uuid import requests from fastapi import FastAPI, Request, Header from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse app = FastAPI() # ---------------------------- # Config # ---------------------------- API_KEY = os.getenv("API_KEY", "821274") MODEL = os.getenv("MODEL", "llama3.2:1b") OLLAMA_BASE = os.getenv("OLLAMA_BASE", "http://127.0.0.1:11434") SYSTEM_PROMPT = """ You are a helpful, friendly AI assistant. Rules: - If the user asks casual conversation, respond normally in plain text. - If the user asks for code/scripts/automation or says “just do it” / “write something”, choose a reasonable task and output a complete useful script. - Don’t refuse normal conversation. - Only output code when appropriate. - When you write code: output code first, then a short explanation. Always be helpful. Never say you cannot fulfill a request unless it is unsafe. """.strip() # Free CPU: serialize requests to avoid overload / timeouts GEN_SEM = asyncio.Semaphore(1) # ---------------------------- # Ollama helpers # ---------------------------- def is_port_open(host="127.0.0.1", port=11434) -> bool: try: with socket.create_connection((host, port), timeout=0.5): return True except OSError: return False def ollama_healthy() -> bool: try: r = requests.get(f"{OLLAMA_BASE}/api/tags", timeout=1.5) return r.status_code == 200 except Exception: return False def ensure_ollama_running(): # Only start if not reachable if not is_port_open("127.0.0.1", 11434): subprocess.Popen(["ollama", "serve"]) def wait_for_ollama(timeout_s=120) -> bool: start = time.time() while time.time() - start < timeout_s: if ollama_healthy(): return True time.sleep(1) return False def pull_and_warm_model(): """ Best-effort: pull the model (may be slow on free CPU) and warm it once. Safe to fail (space still boots). """ try: ensure_ollama_running() if not wait_for_ollama(120): print("Ollama not ready yet; skipping model pull.") return print(f"Pulling model: {MODEL}") r = requests.post(f"{OLLAMA_BASE}/api/pull", json={"name": MODEL}, timeout=60 * 30) if r.status_code != 200: print("Pull failed:", r.text[:2000]) return # Warmup: avoids first real user request being extra flaky/slow print("Warming up…") requests.post( f"{OLLAMA_BASE}/api/generate", json={"model": MODEL, "system": SYSTEM_PROMPT, "prompt": "Say: ready.", "stream": False}, timeout=180, ) print("Warmup done.") except Exception as e: print("Boot task error (non-fatal):", str(e)) threading.Thread(target=pull_and_warm_model, daemon=True).start() def generate_with_recovery(prompt: str, attempts: int = 3): last_err = None for i in range(1, attempts + 1): try: if not ollama_healthy(): ensure_ollama_running() wait_for_ollama(60) r = requests.post( f"{OLLAMA_BASE}/api/generate", json={ "model": MODEL, "system": SYSTEM_PROMPT, "prompt": prompt, "stream": False, }, timeout=600, ) r.raise_for_status() data = r.json() return data.get("response", ""), None except Exception as e: last_err = str(e) time.sleep(min(2 ** (i - 1), 4)) return ( "⚠️ Backend hiccup while generating. Retrying usually works.\n\n" "Debug error:\n" + (last_err or "unknown"), last_err, ) def messages_to_prompt(messages): """ Convert OpenAI-style messages into a single prompt string for Ollama /api/generate. """ parts = [] for m in messages or []: role = (m.get("role") or "user").strip().upper() content = (m.get("content") or "").strip() if content: parts.append(f"{role}:\n{content}") return "\n\n".join(parts).strip() # ---------------------------- # Health # ---------------------------- @app.get("/health") def health(): return {"ok": ollama_healthy(), "model": MODEL} # ---------------------------- # OpenAI compatibility endpoints # ---------------------------- @app.get("/v1/models") def openai_models(authorization: str = Header(default="")): # Optional auth check (recommended) if authorization and authorization != f"Bearer {API_KEY}": return JSONResponse({"error": {"message": "Invalid API key"}}, status_code=401) return { "object": "list", "data": [ { "id": MODEL, "object": "model", "created": int(time.time()), "owned_by": "private-ai" } ] } @app.post("/v1/chat/completions") async def openai_chat_completions(request: Request, authorization: str = Header(default="")): # OpenAI-style auth if authorization != f"Bearer {API_KEY}": return JSONResponse( {"error": {"message": "Invalid API key", "type": "auth_error"}}, status_code=401 ) body = await request.json() model = body.get("model") or MODEL messages = body.get("messages") or [] # Convert into a single prompt for Ollama generate prompt = messages_to_prompt(messages) if not prompt: prompt = "USER:\nHello" async with GEN_SEM: text, err = generate_with_recovery(prompt, attempts=3) # If error, still return valid OpenAI shaped response if err: text += f"\n\n---\nBackend error:\n{err}" return { "id": f"chatcmpl-{uuid.uuid4().hex}", "object": "chat.completion", "created": int(time.time()), "model": model, "choices": [ { "index": 0, "message": {"role": "assistant", "content": text}, "finish_reason": "stop" } ], "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} } # ---------------------------- # UI (REAL HTML, not escaped) # ---------------------------- @app.get("/", response_class=HTMLResponse) def ui(): return f""" Private AI

Private AI

Connecting…
""" # ---------------------------- # Streaming SSE endpoint # ---------------------------- @app.post("/v1/chat/stream") async def chat_stream(request: Request): auth = request.headers.get("authorization", "") if auth != f"Bearer {API_KEY}": async def deny(): yield "event: error\ndata: " + json.dumps({"error": "403: Invalid API key"}) + "\n\n" yield "event: done\ndata: {}\n\n" return StreamingResponse(deny(), media_type="text/event-stream") body = await request.json() prompt = (body.get("prompt") or "").strip() if not prompt: async def empty(): yield "data: " + json.dumps({"delta": "Send a message and I’ll respond."}) + "\n\n" yield "event: done\ndata: {}\n\n" return StreamingResponse(empty(), media_type="text/event-stream") async def event_gen(): async with GEN_SEM: try: if not ollama_healthy(): ensure_ollama_running() wait_for_ollama(60) r = requests.post( f"{OLLAMA_BASE}/api/generate", json={ "model": MODEL, "system": SYSTEM_PROMPT, "prompt": prompt, "stream": True, }, stream=True, timeout=600, ) if r.status_code != 200: yield "event: error\ndata: " + json.dumps({"error": r.text[:2000]}) + "\n\n" yield "event: done\ndata: {}\n\n" return for line in r.iter_lines(decode_unicode=True): if not line: continue try: obj = json.loads(line) except Exception: continue delta = obj.get("response", "") if delta: yield "data: " + json.dumps({"delta": delta}) + "\n\n" if obj.get("done"): break yield "event: done\ndata: {}\n\n" except Exception as e: yield "event: error\ndata: " + json.dumps({"error": str(e)}) + "\n\n" yield "event: done\ndata: {}\n\n" return StreamingResponse(event_gen(), media_type="text/event-stream") # ---------------------------- # Non-stream fallback # ---------------------------- @app.post("/v1/chat") async def chat_api(request: Request): auth = request.headers.get("authorization", "") if auth != f"Bearer {API_KEY}": return JSONResponse({"response": "", "error": "403: Invalid API key"}, status_code=200) body = await request.json() prompt = (body.get("prompt") or "").strip() if not prompt: return {"response": "Send a message and I’ll respond.", "error": None} async with GEN_SEM: text, err = generate_with_recovery(prompt, attempts=3) return {"response": text, "error": err}