Spaces:
Running
Running
| """FastAPI / Gradio Server routes. | |
| Defines all HTTP and API endpoints: | |
| - GET / → serves the index.html frontend | |
| - GET /api/model-status → model loading status | |
| - GET /images/{f} → serve generated plot images | |
| - GET /download/{f} → serve project ZIP downloads | |
| - API web_search → Google search scraping | |
| - API chat → streaming chat with code execution | |
| - API push_hf → push to HuggingFace Hub | |
| - API switch_model → switch between loaded models | |
| - API upload_image → upload image for VLM inference | |
| - API hf_auth → get HF OAuth profile & organizations | |
| """ | |
| from __future__ import annotations | |
| import base64 | |
| import json | |
| import logging | |
| import os | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Any | |
| from fastapi.responses import HTMLResponse, FileResponse | |
| from gradio import Server | |
| from code.config.constants import ( | |
| APP_TITLE, | |
| DEFAULT_MODEL_KEY, | |
| EXAMPLE_PROMPTS, | |
| LANGUAGE_OPTIONS, | |
| MODEL_CONFIGS, | |
| MODEL_URL, | |
| PY_TIMEOUT_S, | |
| ) | |
| from code.execution.code_extractor import ( | |
| build_iframe, | |
| extract_code, | |
| extract_multi_file, | |
| is_gradio_code, | |
| normalize_language, | |
| strip_thinking_blocks, | |
| ) | |
| from code.execution.gradio_runner import run_gradio_app, stop_gradio_app | |
| from code.execution.python_runner import run_python | |
| from code.huggingface.push import create_project_zip, push_to_huggingface | |
| from code.model.loader import ( | |
| get_model_status, | |
| is_model_loaded, | |
| get_current_model_key, | |
| get_current_model_type, | |
| switch_model, | |
| ) | |
| from code.model.inference import call_model | |
| from code.server.chat_helpers import chat_history_to_messages, targeted_prompt | |
| from code.websearch.google_scraper import web_search_google, format_search_results | |
| logger = logging.getLogger(__name__) | |
| # ─── Served Files Registry ────────────────────────────────────────────── | |
| _served_files: dict[str, str] = {} | |
| # ─── Uploaded Images Registry ─────────────────────────────────────────── | |
| _uploaded_images: dict[str, str] = {} | |
| # ─── Server Instance ──────────────────────────────────────────────────── | |
| app = Server() | |
| # ─── HTTP Routes ──────────────────────────────────────────────────────── | |
| async def homepage(): | |
| """Serve the index.html frontend with runtime config injected.""" | |
| html_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "index.html") | |
| with open(html_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| config = json.dumps({ | |
| "app_title": APP_TITLE, | |
| "model_id": MODEL_CONFIGS[DEFAULT_MODEL_KEY]["id"], | |
| "model_configs": {k: {"name": v["name"], "type": v["type"], "description": v["description"]} for k, v in MODEL_CONFIGS.items()}, | |
| "model_url": MODEL_URL, | |
| "languages": LANGUAGE_OPTIONS, | |
| "examples": [ | |
| {"label": label, "prompt": prompt, "language": lang, "framework": fw} | |
| for label, prompt, lang, fw in EXAMPLE_PROMPTS | |
| ], | |
| "default_model": "minicpm5-1b", | |
| }) | |
| content = content.replace("__RUNTIME_CONFIG__", config) | |
| return content | |
| async def model_status_endpoint(): | |
| """Return the current model loading status.""" | |
| return get_model_status() | |
| async def serve_image(filename: str): | |
| """Serve a generated plot image by filename.""" | |
| path = _served_files.get(f"img:{filename}") | |
| if path and os.path.exists(path): | |
| return FileResponse(path, media_type="image/png") | |
| return HTMLResponse("Not found", status_code=404) | |
| async def serve_download(filename: str): | |
| """Serve a project ZIP download by filename.""" | |
| path = _served_files.get(f"dl:{filename}") | |
| if path and os.path.exists(path): | |
| return FileResponse(path, filename=filename, media_type="application/octet-stream") | |
| return HTMLResponse("Not found", status_code=404) | |
| async def serve_uploaded_image(image_id: str): | |
| """Serve an uploaded image by its ID.""" | |
| path = _uploaded_images.get(image_id) | |
| if path and os.path.exists(path): | |
| return FileResponse(path, media_type="image/png") | |
| return HTMLResponse("Not found", status_code=404) | |
| # ─── Gradio API Endpoints ────────────────────────────────────────────── | |
| def handle_switch_model(model_key: str) -> str: | |
| """Switch to a different model.""" | |
| result = switch_model(model_key) | |
| yield json.dumps(result) | |
| def handle_upload_image(image_data: str) -> str: | |
| """Upload a base64-encoded image for VLM inference. | |
| Returns an image ID that can be referenced in chat. | |
| """ | |
| try: | |
| if not image_data: | |
| yield json.dumps({"success": False, "message": "No image data provided"}) | |
| return | |
| # Handle data URI format: data:image/png;base64,... | |
| if image_data.startswith("data:"): | |
| # Extract the base64 part | |
| parts = image_data.split(",", 1) | |
| if len(parts) == 2: | |
| image_data = parts[1] | |
| # Decode base64 | |
| image_bytes = base64.b64decode(image_data) | |
| # Save to temp file | |
| img_dir = tempfile.mkdtemp(prefix="uploaded_img_") | |
| image_id = f"img_{os.getpid()}_{int(os.urandom(4).hex(), 16)}" | |
| img_path = os.path.join(img_dir, f"{image_id}.png") | |
| Path(img_path).write_bytes(image_bytes) | |
| # Register for serving | |
| _uploaded_images[image_id] = img_path | |
| # Create a URL for the image that the VLM can access | |
| image_url = f"/uploaded-images/{image_id}" | |
| # Also save as a file:// URL for local VLM access | |
| file_url = f"file://{img_path}" | |
| yield json.dumps({ | |
| "success": True, | |
| "image_id": image_id, | |
| "image_url": image_url, | |
| "file_url": file_url, | |
| "message": "Image uploaded successfully", | |
| }) | |
| except Exception as exc: | |
| logger.exception("Image upload failed") | |
| yield json.dumps({ | |
| "success": False, | |
| "message": f"Upload failed: {str(exc)}", | |
| }) | |
| def handle_web_search(query: str) -> str: | |
| """Search the web using Google scraping. No API key needed.""" | |
| query = (query or "").strip() | |
| if not query: | |
| yield json.dumps({"success": False, "results": [], "message": "Empty search query"}) | |
| return | |
| try: | |
| results = web_search_google(query, num_results=8) | |
| formatted = format_search_results(results) | |
| yield json.dumps({ | |
| "success": True, | |
| "results": results, | |
| "formatted": formatted, | |
| "message": f"Found {len(results)} results", | |
| }) | |
| except Exception as exc: | |
| logger.exception("Web search failed") | |
| yield json.dumps({ | |
| "success": False, | |
| "results": [], | |
| "message": f"Search failed: {str(exc)}", | |
| }) | |
| def handle_chat( | |
| prompt: str, | |
| target_language: str, | |
| target_framework: str, | |
| history_json: str, | |
| exec_context_json: str, | |
| search_enabled: str = "false", | |
| image_url: str = "", | |
| ) -> str: | |
| """Stream chat responses with code execution. Yields JSON strings.""" | |
| history = json.loads(history_json) if history_json else [] | |
| execution_context = json.loads(exec_context_json) if exec_context_json else {} | |
| prompt = (prompt or "").strip() | |
| if not prompt: | |
| yield json.dumps({ | |
| "type": "error", | |
| "status_text": "Enter a prompt to get started.", | |
| "status_state": "info", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| return | |
| # Check model status | |
| model_status = get_model_status() | |
| if model_status["status"] == "loading": | |
| yield json.dumps({ | |
| "type": "error", | |
| "status_text": model_status["message"], | |
| "status_state": "working", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| return | |
| if model_status["status"] != "ready": | |
| yield json.dumps({ | |
| "type": "error", | |
| "status_text": model_status["message"], | |
| "status_state": "error", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| return | |
| # Add user message and placeholder assistant message | |
| history = list(history) + [ | |
| {"role": "user", "content": prompt}, | |
| {"role": "assistant", "content": ""}, | |
| ] | |
| yield json.dumps({ | |
| "type": "status", | |
| "status_text": "Thinking...", | |
| "status_state": "working", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| # Web search if enabled | |
| search_context = "" | |
| if search_enabled.lower() == "true": | |
| yield json.dumps({ | |
| "type": "status", | |
| "status_text": "Searching the web...", | |
| "status_state": "working", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| search_results = web_search_google(prompt, num_results=6) | |
| if search_results: | |
| search_context = format_search_results(search_results) | |
| yield json.dumps({ | |
| "type": "search_results", | |
| "status_text": f"Found {len(search_results)} results, generating code...", | |
| "status_state": "working", | |
| "history": history, | |
| "execution": execution_context, | |
| "search_results": search_results, | |
| }) | |
| # Build messages for model | |
| model_history = list(history[:-1]) | |
| model_history[-1] = { | |
| "role": "user", | |
| "content": targeted_prompt( | |
| prompt, target_language, target_framework, execution_context, search_context | |
| ), | |
| } | |
| messages = chat_history_to_messages(model_history) | |
| # Determine image URL for VLM | |
| vlm_image_url = image_url.strip() if image_url else None | |
| final_response = "" | |
| for partial in call_model(messages, image_url=vlm_image_url): | |
| final_response = partial | |
| # Strip thinking blocks so chat only shows clean output | |
| clean_partial = strip_thinking_blocks(partial) | |
| history[-1]["content"] = clean_partial | |
| yield json.dumps({ | |
| "type": "streaming", | |
| "status_text": "Generating...", | |
| "status_state": "working", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| if not final_response: | |
| history[-1]["content"] = "The model did not return a response." | |
| yield json.dumps({ | |
| "type": "error", | |
| "status_text": "No model response.", | |
| "status_state": "error", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| return | |
| # Extract code from response (use cleaned version) | |
| clean_response = strip_thinking_blocks(final_response) | |
| code, fence_lang = extract_code(clean_response) | |
| target = normalize_language(target_language, fence_lang) | |
| # Also try multi-file extraction | |
| multi_files = extract_multi_file(clean_response) | |
| if not code and not multi_files: | |
| yield json.dumps({ | |
| "type": "complete", | |
| "status_text": "Answered without running code.", | |
| "status_state": "info", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| return | |
| yield json.dumps({ | |
| "type": "status", | |
| "status_text": "Running...", | |
| "status_state": "working", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| # Execute code | |
| stdout, stderr, image_path, status_text, status_state = "", "", None, "Preview ready", "success" | |
| is_gradio = False | |
| gradio_url = None | |
| if target == "python" and code: | |
| if is_gradio_code(code) or target_framework == "Gradio": | |
| is_gradio = True | |
| gradio_result = run_gradio_app(code) | |
| if gradio_result["success"]: | |
| gradio_url = gradio_result["url"] | |
| status_text = f"Gradio app running at {gradio_url}" | |
| status_state = "success" | |
| stderr = f"Gradio app launched successfully at {gradio_url}" | |
| else: | |
| status_text = "Gradio launch failed" | |
| status_state = "error" | |
| stderr = gradio_result.get("stderr", gradio_result.get("message", "Launch failed")) | |
| else: | |
| result = run_python(code) | |
| if result.timed_out: | |
| stdout, stderr, image_path = result.stdout, result.stderr, result.image_path | |
| status_text = f"Timed out after {PY_TIMEOUT_S}s" | |
| status_state = "error" | |
| elif result.returncode: | |
| stdout, stderr, image_path = result.stdout, result.stderr, result.image_path | |
| status_text = "Finished with errors" | |
| status_state = "error" | |
| else: | |
| stdout, stderr, image_path = result.stdout, result.stderr, result.image_path | |
| status_text = "Ran successfully" | |
| status_state = "success" | |
| # Register image for serving | |
| image_url_out = None | |
| if image_path: | |
| filename = os.path.basename(image_path) | |
| _served_files[f"img:{filename}"] = image_path | |
| image_url_out = f"/images/{filename}" | |
| # Register code for download | |
| download_url = None | |
| project_files = dict(multi_files) if multi_files else {} | |
| # Rename main.py → app.py for Python/Gradio projects (HF Spaces expects app.py) | |
| if project_files and "main.py" in project_files and "app.py" not in project_files: | |
| if target == "python" or is_gradio: | |
| project_files["app.py"] = project_files.pop("main.py") | |
| # If project_files is empty but we have single code, add it | |
| if not project_files and code: | |
| if target == "python": | |
| fname = "app.py" if (is_gradio or is_gradio_code(code)) else "main.py" | |
| elif target in {"web", "html", "javascript"}: | |
| fname = "index.html" | |
| else: | |
| fname = f"main.{fence_lang or 'txt'}" | |
| project_files = {fname: code} | |
| if project_files: | |
| project_name = "generated-project" | |
| zip_path = create_project_zip(project_files, project_name) | |
| zip_filename = f"{project_name}.zip" | |
| _served_files[f"dl:{zip_filename}"] = zip_path | |
| download_url = f"/download/{zip_filename}" | |
| elif code: | |
| ext = "py" if target == "python" else "html" | |
| dl_filename = f"generated.{ext}" | |
| dl_dir = tempfile.mkdtemp(prefix="fullstack_dl_") | |
| dl_path = os.path.join(dl_dir, dl_filename) | |
| Path(dl_path).write_text(code, encoding="utf-8") | |
| _served_files[f"dl:{dl_filename}"] = dl_path | |
| download_url = f"/download/{dl_filename}" | |
| # Determine if this is web previewable | |
| is_web = target in {"web", "javascript", "typescript", "html"} or (fence_lang or "") in {"html", "web"} | |
| web_code = code if is_web else None | |
| execution_context = { | |
| "code": code, | |
| "target": target, | |
| "fence_lang": fence_lang or target, | |
| "stdout": stdout, | |
| "stderr": stderr, | |
| "image_url": image_url_out, | |
| "image_path": image_path, | |
| "status": status_text, | |
| "language": fence_lang or target, | |
| "suggested_tab": "preview" if (image_path or is_web or is_gradio) else "console", | |
| "download_url": download_url, | |
| "project_files": project_files, | |
| "is_web": is_web, | |
| "web_code": web_code, | |
| "is_gradio": is_gradio, | |
| "gradio_url": gradio_url, | |
| } | |
| yield json.dumps({ | |
| "type": "complete", | |
| "status_text": status_text, | |
| "status_state": status_state, | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| def handle_hf_auth( | |
| oauth_token: str = "", | |
| ) -> str: | |
| """Get HuggingFace OAuth profile and list of organizations. | |
| If oauth_token is provided (from Gradio OAuth), uses it to fetch user info. | |
| Otherwise, returns empty auth info. | |
| """ | |
| try: | |
| import gradio as gr | |
| from huggingface_hub import whoami | |
| token = oauth_token.strip() if oauth_token else "" | |
| if not token: | |
| yield json.dumps({ | |
| "authenticated": False, | |
| "username": "", | |
| "name": "", | |
| "picture": "", | |
| "organizations": [], | |
| "message": "Not signed in. Click Sign In to authenticate with HuggingFace.", | |
| }) | |
| return | |
| # Get user info using the OAuth token | |
| user_info = whoami(token=token) | |
| username = user_info.get("name", "") | |
| fullname = user_info.get("fullname", username) | |
| # Get avatar | |
| avatar_url = "" | |
| avatar_info = user_info.get("avatarUrl", "") | |
| if avatar_info: | |
| avatar_url = avatar_info | |
| # Get organizations | |
| orgs = [] | |
| for org in user_info.get("orgs", []): | |
| orgs.append({ | |
| "name": org.get("name", ""), | |
| "avatar": org.get("avatarUrl", ""), | |
| }) | |
| # Also check orgRoles for role info | |
| org_roles = user_info.get("orgRoles", []) | |
| for role_info in org_roles: | |
| org_name = role_info.get("org", "") | |
| role = role_info.get("role", "member") | |
| # Add role info to existing org if found | |
| for org in orgs: | |
| if org["name"] == org_name: | |
| org["role"] = role | |
| break | |
| yield json.dumps({ | |
| "authenticated": True, | |
| "username": username, | |
| "name": fullname, | |
| "picture": avatar_url, | |
| "organizations": orgs, | |
| "token": token, | |
| "message": f"Signed in as {username}", | |
| }) | |
| except Exception as exc: | |
| logger.exception("HF auth check failed") | |
| yield json.dumps({ | |
| "authenticated": False, | |
| "username": "", | |
| "name": "", | |
| "picture": "", | |
| "organizations": [], | |
| "message": f"Auth check failed: {str(exc)}", | |
| }) | |
| def handle_push_hf( | |
| exec_context_json: str, | |
| repo_name: str, | |
| hf_token: str, | |
| space_sdk: str = "auto", | |
| is_space: str = "true", | |
| ) -> str: | |
| """Push generated project to HuggingFace Hub.""" | |
| try: | |
| execution_context = json.loads(exec_context_json) if exec_context_json else {} | |
| project_files = dict(execution_context.get("project_files", {}) or {}) | |
| code = execution_context.get("code", "") | |
| # If project_files is empty but we have code, build files from code | |
| if not project_files and code: | |
| lang = execution_context.get("language", "python") | |
| is_gradio = execution_context.get("is_gradio", False) | |
| # Map language to entry file — JS/TS single-files get wrapped for Docker | |
| if lang in ("javascript", "js", "typescript", "ts"): | |
| # For single-file JS/TS code that is HTML (vanilla), keep as index.html | |
| if "<!doctype" in code.lower() or "<html" in code.lower(): | |
| filename = "index.html" | |
| else: | |
| filename = "index.js" | |
| elif lang in ("html", "web"): | |
| filename = "index.html" | |
| else: | |
| ext_map = { | |
| "python": "app.py", "py": "app.py", | |
| } | |
| filename = ext_map.get(lang, "app.py") | |
| project_files = {filename: code} | |
| # Auto-detect SDK for Gradio apps | |
| if is_gradio or is_gradio_code(code): | |
| space_sdk = "gradio" | |
| # If still no files, try extracting from the raw response | |
| if not project_files and code: | |
| project_files = extract_multi_file(code) | |
| if not project_files: | |
| yield json.dumps({ | |
| "success": False, | |
| "message": "No code to push. Generate some code first.", | |
| "url": "", | |
| }) | |
| return | |
| # "auto" SDK means let push_to_huggingface decide | |
| if space_sdk == "auto": | |
| space_sdk = "static" # push_to_huggingface will auto-detect from files | |
| project_name = repo_name.split("/")[-1] if "/" in repo_name else repo_name | |
| result = push_to_huggingface( | |
| files=project_files, | |
| project_name=project_name, | |
| repo_name=repo_name, | |
| hf_token=hf_token, | |
| space_sdk=space_sdk, | |
| is_space=is_space.lower() == "true", | |
| ) | |
| yield json.dumps(result) | |
| except Exception as exc: | |
| logger.exception("Push to HuggingFace failed") | |
| yield json.dumps({ | |
| "success": False, | |
| "message": f"Push failed: {str(exc)}", | |
| "url": "", | |
| }) | |
| def get_app() -> Server: | |
| """Return the configured Gradio Server app instance.""" | |
| return app | |