R-Kentaren's picture
Upload folder using huggingface_hub
ff86d3d verified
raw
history blame contribute delete
19.4 kB
"""FastAPI / Gradio Server routes.
Defines all HTTP and API endpoints:
- GET / → serves the index.html frontend
- GET /api/model-status → model loading status
- GET /images/{f} → serve generated plot images
- GET /download/{f} → serve project ZIP downloads
- API web_search → Google search scraping
- API chat → streaming chat with code execution
- API push_hf → push to HuggingFace Hub
- API switch_model → switch between loaded models
- API upload_image → upload image for VLM inference
"""
from __future__ import annotations
import base64
import json
import logging
import os
import tempfile
from pathlib import Path
from typing import Any
from fastapi.responses import HTMLResponse, FileResponse
from gradio import Server
from code.config.constants import (
APP_TITLE,
DEFAULT_MODEL_KEY,
EXAMPLE_PROMPTS,
LANGUAGE_OPTIONS,
MODEL_CONFIGS,
MODEL_URL,
PY_TIMEOUT_S,
)
from code.execution.code_extractor import (
build_iframe,
extract_code,
extract_multi_file,
is_gradio_code,
normalize_language,
strip_thinking_blocks,
)
from code.execution.gradio_runner import run_gradio_app, stop_gradio_app
from code.execution.python_runner import run_python
from code.huggingface.push import create_project_zip, push_to_huggingface
from code.model.loader import (
get_model_status,
is_model_loaded,
get_current_model_key,
get_current_model_type,
switch_model,
)
from code.model.inference import call_model
from code.server.chat_helpers import chat_history_to_messages, targeted_prompt
from code.websearch.google_scraper import web_search_google, format_search_results
logger = logging.getLogger(__name__)
# ─── Served Files Registry ──────────────────────────────────────────────
_served_files: dict[str, str] = {}
# ─── Uploaded Images Registry ───────────────────────────────────────────
_uploaded_images: dict[str, str] = {}
# ─── Server Instance ────────────────────────────────────────────────────
app = Server()
# ─── HTTP Routes ────────────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
async def homepage():
"""Serve the index.html frontend with runtime config injected."""
html_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "index.html")
with open(html_path, "r", encoding="utf-8") as f:
content = f.read()
config = json.dumps({
"app_title": APP_TITLE,
"model_id": MODEL_CONFIGS[DEFAULT_MODEL_KEY]["id"],
"model_configs": {k: {"name": v["name"], "type": v["type"], "description": v["description"]} for k, v in MODEL_CONFIGS.items()},
"model_url": MODEL_URL,
"languages": LANGUAGE_OPTIONS,
"examples": [
{"label": label, "prompt": prompt, "language": lang, "framework": fw}
for label, prompt, lang, fw in EXAMPLE_PROMPTS
],
"default_model": "minicpm5-1b",
})
content = content.replace("__RUNTIME_CONFIG__", config)
return content
@app.get("/api/model-status")
async def model_status_endpoint():
"""Return the current model loading status."""
return get_model_status()
@app.get("/images/{filename}")
async def serve_image(filename: str):
"""Serve a generated plot image by filename."""
path = _served_files.get(f"img:{filename}")
if path and os.path.exists(path):
return FileResponse(path, media_type="image/png")
return HTMLResponse("Not found", status_code=404)
@app.get("/download/{filename}")
async def serve_download(filename: str):
"""Serve a project ZIP download by filename."""
path = _served_files.get(f"dl:{filename}")
if path and os.path.exists(path):
return FileResponse(path, filename=filename, media_type="application/octet-stream")
return HTMLResponse("Not found", status_code=404)
@app.get("/uploaded-images/{image_id}")
async def serve_uploaded_image(image_id: str):
"""Serve an uploaded image by its ID."""
path = _uploaded_images.get(image_id)
if path and os.path.exists(path):
return FileResponse(path, media_type="image/png")
return HTMLResponse("Not found", status_code=404)
# ─── Gradio API Endpoints ──────────────────────────────────────────────
@app.api(name="switch_model", concurrency_limit=1)
def handle_switch_model(model_key: str) -> str:
"""Switch to a different model."""
result = switch_model(model_key)
yield json.dumps(result)
@app.api(name="upload_image", concurrency_limit=4)
def handle_upload_image(image_data: str) -> str:
"""Upload a base64-encoded image for VLM inference.
Returns an image ID that can be referenced in chat.
"""
try:
if not image_data:
yield json.dumps({"success": False, "message": "No image data provided"})
return
# Handle data URI format: data:image/png;base64,...
if image_data.startswith("data:"):
# Extract the base64 part
parts = image_data.split(",", 1)
if len(parts) == 2:
image_data = parts[1]
# Decode base64
image_bytes = base64.b64decode(image_data)
# Save to temp file
img_dir = tempfile.mkdtemp(prefix="uploaded_img_")
image_id = f"img_{os.getpid()}_{int(os.urandom(4).hex(), 16)}"
img_path = os.path.join(img_dir, f"{image_id}.png")
Path(img_path).write_bytes(image_bytes)
# Register for serving
_uploaded_images[image_id] = img_path
# Create a URL for the image that the VLM can access
image_url = f"/uploaded-images/{image_id}"
# Also save as a file:// URL for local VLM access
file_url = f"file://{img_path}"
yield json.dumps({
"success": True,
"image_id": image_id,
"image_url": image_url,
"file_url": file_url,
"message": "Image uploaded successfully",
})
except Exception as exc:
logger.exception("Image upload failed")
yield json.dumps({
"success": False,
"message": f"Upload failed: {str(exc)}",
})
@app.api(name="web_search", concurrency_limit=4)
def handle_web_search(query: str) -> str:
"""Search the web using Google scraping. No API key needed."""
query = (query or "").strip()
if not query:
yield json.dumps({"success": False, "results": [], "message": "Empty search query"})
return
try:
results = web_search_google(query, num_results=8)
formatted = format_search_results(results)
yield json.dumps({
"success": True,
"results": results,
"formatted": formatted,
"message": f"Found {len(results)} results",
})
except Exception as exc:
logger.exception("Web search failed")
yield json.dumps({
"success": False,
"results": [],
"message": f"Search failed: {str(exc)}",
})
@app.api(name="chat", concurrency_limit=2)
def handle_chat(
prompt: str,
target_language: str,
target_framework: str,
history_json: str,
exec_context_json: str,
search_enabled: str = "false",
image_url: str = "",
) -> str:
"""Stream chat responses with code execution. Yields JSON strings."""
history = json.loads(history_json) if history_json else []
execution_context = json.loads(exec_context_json) if exec_context_json else {}
prompt = (prompt or "").strip()
if not prompt:
yield json.dumps({
"type": "error",
"status_text": "Enter a prompt to get started.",
"status_state": "info",
"history": history,
"execution": execution_context,
})
return
# Check model status
model_status = get_model_status()
if model_status["status"] == "loading":
yield json.dumps({
"type": "error",
"status_text": model_status["message"],
"status_state": "working",
"history": history,
"execution": execution_context,
})
return
if model_status["status"] != "ready":
yield json.dumps({
"type": "error",
"status_text": model_status["message"],
"status_state": "error",
"history": history,
"execution": execution_context,
})
return
# Add user message and placeholder assistant message
history = list(history) + [
{"role": "user", "content": prompt},
{"role": "assistant", "content": ""},
]
yield json.dumps({
"type": "status",
"status_text": "Thinking...",
"status_state": "working",
"history": history,
"execution": execution_context,
})
# Web search if enabled
search_context = ""
if search_enabled.lower() == "true":
yield json.dumps({
"type": "status",
"status_text": "Searching the web...",
"status_state": "working",
"history": history,
"execution": execution_context,
})
search_results = web_search_google(prompt, num_results=6)
if search_results:
search_context = format_search_results(search_results)
yield json.dumps({
"type": "search_results",
"status_text": f"Found {len(search_results)} results, generating code...",
"status_state": "working",
"history": history,
"execution": execution_context,
"search_results": search_results,
})
# Build messages for model
model_history = list(history[:-1])
model_history[-1] = {
"role": "user",
"content": targeted_prompt(
prompt, target_language, target_framework, execution_context, search_context
),
}
messages = chat_history_to_messages(model_history)
# Determine image URL for VLM
vlm_image_url = image_url.strip() if image_url else None
final_response = ""
for partial in call_model(messages, image_url=vlm_image_url):
final_response = partial
# Strip thinking blocks so chat only shows clean output
clean_partial = strip_thinking_blocks(partial)
history[-1]["content"] = clean_partial
yield json.dumps({
"type": "streaming",
"status_text": "Generating...",
"status_state": "working",
"history": history,
"execution": execution_context,
})
if not final_response:
history[-1]["content"] = "The model did not return a response."
yield json.dumps({
"type": "error",
"status_text": "No model response.",
"status_state": "error",
"history": history,
"execution": execution_context,
})
return
# Extract code from response (use cleaned version)
clean_response = strip_thinking_blocks(final_response)
code, fence_lang = extract_code(clean_response)
target = normalize_language(target_language, fence_lang)
# Also try multi-file extraction
multi_files = extract_multi_file(clean_response)
if not code and not multi_files:
yield json.dumps({
"type": "complete",
"status_text": "Answered without running code.",
"status_state": "info",
"history": history,
"execution": execution_context,
})
return
yield json.dumps({
"type": "status",
"status_text": "Running...",
"status_state": "working",
"history": history,
"execution": execution_context,
})
# Execute code
stdout, stderr, image_path, status_text, status_state = "", "", None, "Preview ready", "success"
is_gradio = False
gradio_url = None
if target == "python" and code:
if is_gradio_code(code) or target_framework == "Gradio":
is_gradio = True
gradio_result = run_gradio_app(code)
if gradio_result["success"]:
gradio_url = gradio_result["url"]
status_text = f"Gradio app running at {gradio_url}"
status_state = "success"
stderr = f"Gradio app launched successfully at {gradio_url}"
else:
status_text = "Gradio launch failed"
status_state = "error"
stderr = gradio_result.get("stderr", gradio_result.get("message", "Launch failed"))
else:
result = run_python(code)
if result.timed_out:
stdout, stderr, image_path = result.stdout, result.stderr, result.image_path
status_text = f"Timed out after {PY_TIMEOUT_S}s"
status_state = "error"
elif result.returncode:
stdout, stderr, image_path = result.stdout, result.stderr, result.image_path
status_text = "Finished with errors"
status_state = "error"
else:
stdout, stderr, image_path = result.stdout, result.stderr, result.image_path
status_text = "Ran successfully"
status_state = "success"
# Register image for serving
image_url_out = None
if image_path:
filename = os.path.basename(image_path)
_served_files[f"img:{filename}"] = image_path
image_url_out = f"/images/{filename}"
# Register code for download
download_url = None
project_files = dict(multi_files) if multi_files else {}
# Rename main.py → app.py for Python/Gradio projects (HF Spaces expects app.py)
if project_files and "main.py" in project_files and "app.py" not in project_files:
if target == "python" or is_gradio:
project_files["app.py"] = project_files.pop("main.py")
# If project_files is empty but we have single code, add it
if not project_files and code:
if target == "python":
fname = "app.py" if (is_gradio or is_gradio_code(code)) else "main.py"
elif target in {"web", "html", "javascript"}:
fname = "index.html"
else:
fname = f"main.{fence_lang or 'txt'}"
project_files = {fname: code}
if project_files:
project_name = "generated-project"
zip_path = create_project_zip(project_files, project_name)
zip_filename = f"{project_name}.zip"
_served_files[f"dl:{zip_filename}"] = zip_path
download_url = f"/download/{zip_filename}"
elif code:
ext = "py" if target == "python" else "html"
dl_filename = f"generated.{ext}"
dl_dir = tempfile.mkdtemp(prefix="fullstack_dl_")
dl_path = os.path.join(dl_dir, dl_filename)
Path(dl_path).write_text(code, encoding="utf-8")
_served_files[f"dl:{dl_filename}"] = dl_path
download_url = f"/download/{dl_filename}"
# Determine if this is web previewable
is_web = target in {"web", "javascript", "typescript", "html"} or (fence_lang or "") in {"html", "web"}
web_code = code if is_web else None
execution_context = {
"code": code,
"target": target,
"fence_lang": fence_lang or target,
"stdout": stdout,
"stderr": stderr,
"image_url": image_url_out,
"image_path": image_path,
"status": status_text,
"language": fence_lang or target,
"suggested_tab": "preview" if (image_path or is_web or is_gradio) else "console",
"download_url": download_url,
"project_files": project_files,
"is_web": is_web,
"web_code": web_code,
"is_gradio": is_gradio,
"gradio_url": gradio_url,
}
yield json.dumps({
"type": "complete",
"status_text": status_text,
"status_state": status_state,
"history": history,
"execution": execution_context,
})
@app.api(name="push_hf", concurrency_limit=1)
def handle_push_hf(
exec_context_json: str,
repo_name: str,
hf_token: str,
space_sdk: str = "auto",
is_space: str = "true",
) -> str:
"""Push generated project to HuggingFace Hub."""
try:
execution_context = json.loads(exec_context_json) if exec_context_json else {}
project_files = dict(execution_context.get("project_files", {}) or {})
code = execution_context.get("code", "")
# If project_files is empty but we have code, build files from code
if not project_files and code:
lang = execution_context.get("language", "python")
is_gradio = execution_context.get("is_gradio", False)
# Map language to entry file — JS/TS single-files get wrapped for Docker
if lang in ("javascript", "js", "typescript", "ts"):
# For single-file JS/TS code that is HTML (vanilla), keep as index.html
if "<!doctype" in code.lower() or "<html" in code.lower():
filename = "index.html"
else:
filename = "index.js"
elif lang in ("html", "web"):
filename = "index.html"
else:
ext_map = {
"python": "app.py", "py": "app.py",
}
filename = ext_map.get(lang, "app.py")
project_files = {filename: code}
# Auto-detect SDK for Gradio apps
if is_gradio or is_gradio_code(code):
space_sdk = "gradio"
# If still no files, try extracting from the raw response
if not project_files and code:
project_files = extract_multi_file(code)
if not project_files:
yield json.dumps({
"success": False,
"message": "No code to push. Generate some code first.",
"url": "",
})
return
# "auto" SDK means let push_to_huggingface decide
if space_sdk == "auto":
space_sdk = "static" # push_to_huggingface will auto-detect from files
project_name = repo_name.split("/")[-1] if "/" in repo_name else repo_name
result = push_to_huggingface(
files=project_files,
project_name=project_name,
repo_name=repo_name,
hf_token=hf_token,
space_sdk=space_sdk,
is_space=is_space.lower() == "true",
)
yield json.dumps(result)
except Exception as exc:
logger.exception("Push to HuggingFace failed")
yield json.dumps({
"success": False,
"message": f"Push failed: {str(exc)}",
"url": "",
})
def get_app() -> Server:
"""Return the configured Gradio Server app instance."""
return app