Spaces:
Sleeping
Sleeping
| import json, os, re, uuid, subprocess, sys, time, traceback, threading, base64 | |
| from io import BytesIO | |
| from collections import deque | |
| from pathlib import Path | |
| from typing import Optional, Tuple, List, Dict, Any | |
| from dataclasses import dataclass, field | |
| from contextlib import contextmanager | |
| from fastapi import FastAPI, HTTPException, Response | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, validator | |
| from huggingface_hub import HfApi, create_repo, CommitOperationAdd | |
| # Optional .env for local testing | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # -------- Gemini + GPT client setup -------- | |
| from google import genai | |
| from google.genai import types | |
| try: | |
| from openai import OpenAI | |
| except ImportError: | |
| OpenAI = None | |
| # We keep the GEMINI_* env vars for compatibility. | |
| API_KEY = os.getenv("GEMINI_API_KEY", "") | |
| MODEL = os.getenv("GEMINI_MODEL", "gemini-2.5-pro") | |
| GEMINI_SMALL_MODEL = os.getenv("GEMINI_SMALL_MODEL") | |
| DEFAULT_OPENAI_SMALL_MODEL = "gpt-4o-mini" | |
| OPENAI_SMALL_MODEL = os.getenv("OPENAI_SMALL_MODEL") or DEFAULT_OPENAI_SMALL_MODEL | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| _OPENAI_ENV = os.getenv("USE_OPENAI") | |
| if _OPENAI_ENV is None: | |
| USE_OPENAI = bool(OPENAI_API_KEY) | |
| else: | |
| USE_OPENAI = _OPENAI_ENV.lower() == "true" | |
| PORT = int(os.getenv("PORT", "7860")) | |
| _OPENAI_RESPONSES_MODELS_ENV = os.getenv("OPENAI_RESPONSES_MODELS", "") | |
| RESPONSES_API_MODEL_NAMES = {"gpt-5-mini"} | |
| if _OPENAI_RESPONSES_MODELS_ENV: | |
| RESPONSES_API_MODEL_NAMES.update( | |
| model.strip().lower() | |
| for model in _OPENAI_RESPONSES_MODELS_ENV.split(",") | |
| if model.strip() | |
| ) | |
| _OPENAI_RESPONSES_PREFIXES_ENV = os.getenv("OPENAI_RESPONSES_PREFIXES", "") | |
| _RESPONSES_API_MODEL_PREFIXES = ["gpt-5"] | |
| if _OPENAI_RESPONSES_PREFIXES_ENV: | |
| _RESPONSES_API_MODEL_PREFIXES.extend( | |
| prefix.strip().lower() | |
| for prefix in _OPENAI_RESPONSES_PREFIXES_ENV.split(",") | |
| if prefix.strip() | |
| ) | |
| RESPONSES_API_MODEL_PREFIXES = tuple(_RESPONSES_API_MODEL_PREFIXES) | |
| RESPONSES_API_ERROR_HINTS = ( | |
| "only supported in v1/responses", | |
| "use the responses api", | |
| "use the responses endpoint", | |
| "please call the responses api", | |
| "please use the responses endpoint", | |
| ) | |
| gemini_client = genai.Client(api_key=API_KEY) if API_KEY else None | |
| gpt_client = OpenAI(api_key=OPENAI_API_KEY) if (OPENAI_API_KEY and OpenAI and USE_OPENAI) else None | |
| # -------- FastAPI app -------- | |
| app = FastAPI(title="Manim Render API (error + visual refine)") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # tighten in prod | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| RUNS = Path("runs"); RUNS.mkdir(parents=True, exist_ok=True) | |
| HF_DATASET_ID = os.getenv("HF_DATASET_ID", "MathFrames/email-log") | |
| HF_TOKEN = os.getenv("HF_TOKEN", "") | |
| hf_api = HfApi(token=HF_TOKEN) if HF_TOKEN else None | |
| if hf_api: | |
| try: | |
| create_repo( | |
| HF_DATASET_ID, | |
| repo_type="dataset", | |
| private=True, | |
| exist_ok=True, | |
| token=HF_TOKEN, | |
| ) | |
| except Exception: | |
| # Ignore startup race/permission errors; individual writes will surface issues. | |
| pass | |
| # ---------------- simple 10 RPM rate limiter ---------------- | |
| class RateLimiter: | |
| def __init__(self, max_per_minute: int): | |
| self.max = max_per_minute | |
| self.lock = threading.Lock() | |
| self.events = deque() # timestamps (time.time()) | |
| def acquire(self): | |
| with self.lock: | |
| now = time.time() | |
| # drop events older than 60s | |
| while self.events and now - self.events[0] >= 60: | |
| self.events.popleft() | |
| if len(self.events) < self.max: | |
| self.events.append(now) | |
| return | |
| # need to wait until the oldest is 60s old | |
| wait_for = 60 - (now - self.events[0]) | |
| if wait_for > 0: | |
| time.sleep(wait_for + 0.01) | |
| # recurse once to record post-sleep | |
| self.acquire() | |
| limiter = RateLimiter(10) | |
| storyboard_limiter = RateLimiter(30) | |
| RENDER_LOCK = threading.Lock() | |
| def acquire_render_slot(timeout: Optional[float] = None): | |
| """ | |
| Global render queue: only one Manim render runs at a time. | |
| Blocks until the lock is available (optional timeout). | |
| """ | |
| if timeout is None: | |
| acquired = RENDER_LOCK.acquire() | |
| else: | |
| acquired = RENDER_LOCK.acquire(timeout=timeout) | |
| if not acquired: | |
| raise RuntimeError("Render queue is busy; try again shortly.") | |
| try: | |
| yield | |
| finally: | |
| RENDER_LOCK.release() | |
| def _to_chat_content_item(item: Any) -> Any: | |
| if isinstance(item, str): | |
| return {"type": "text", "text": item} | |
| if isinstance(item, dict): | |
| return item | |
| return {"type": "text", "text": str(item)} | |
| def _to_response_content_item(item: Any) -> Dict[str, Any]: | |
| if isinstance(item, str): | |
| return {"type": "input_text", "text": item} | |
| if isinstance(item, dict): | |
| itype = item.get("type") | |
| if itype == "text": | |
| return {"type": "input_text", "text": item.get("text", "")} | |
| if itype == "image_url": | |
| image_url = item.get("image_url", {}) | |
| if isinstance(image_url, dict): | |
| return {"type": "input_image", "image_url": image_url} | |
| return {"type": "input_image", "image_url": {"url": str(image_url)}} | |
| if itype in {"input_text", "input_image", "input_file"}: | |
| return item | |
| return {"type": "input_text", "text": str(item)} | |
| def _build_openai_content(contents: Any, *, for_chat: bool) -> Any: | |
| """ | |
| Normalize content payloads for chat (strings or multimodal lists) and responses API (typed blocks). | |
| """ | |
| if isinstance(contents, str): | |
| return contents if for_chat else [_to_response_content_item(contents)] | |
| if isinstance(contents, (list, tuple)): | |
| if for_chat: | |
| return [_to_chat_content_item(item) for item in contents] | |
| return [_to_response_content_item(item) for item in contents] | |
| return contents if for_chat else [_to_response_content_item(contents)] | |
| def _build_chat_messages(system: str, contents: Any) -> List[Dict[str, Any]]: | |
| return [ | |
| {"role": "system", "content": system}, | |
| {"role": "user", "content": _build_openai_content(contents, for_chat=True)}, | |
| ] | |
| def _build_responses_input(system: str, contents: Any) -> List[Dict[str, Any]]: | |
| return [ | |
| {"role": "system", "content": _build_openai_content(system, for_chat=False)}, | |
| {"role": "user", "content": _build_openai_content(contents, for_chat=False)}, | |
| ] | |
| def _requires_responses_api(model: str) -> bool: | |
| lowered = (model or "").lower() | |
| if not lowered: | |
| return False | |
| if lowered in RESPONSES_API_MODEL_NAMES: | |
| return True | |
| return any( | |
| prefix and lowered.startswith(prefix) | |
| for prefix in RESPONSES_API_MODEL_PREFIXES | |
| ) | |
| def _should_use_responses_fallback(err: Exception) -> bool: | |
| message = str(err).lower() | |
| return any(hint in message for hint in RESPONSES_API_ERROR_HINTS) | |
| def _extract_chat_content(resp: Any) -> str: | |
| content = resp.choices[0].message.content | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, list): | |
| text_parts = [] | |
| for chunk in content: | |
| if isinstance(chunk, dict) and chunk.get("type") == "text": | |
| text_parts.append(chunk.get("text", "")) | |
| else: | |
| text_parts.append(str(chunk)) | |
| return "\n".join(filter(None, text_parts)) | |
| return str(content) | |
| def _extract_responses_content(resp: Any) -> str: | |
| text = getattr(resp, "output_text", None) | |
| if text: | |
| return text | |
| output = getattr(resp, "output", None) | |
| if output: | |
| chunks = [] | |
| for item in output: | |
| for elem in getattr(item, "content", []) or []: | |
| chunk_text = getattr(elem, "text", None) or getattr(elem, "content", None) | |
| if chunk_text: | |
| chunks.append(chunk_text) | |
| if chunks: | |
| return "\n".join(map(str, chunks)) | |
| return str(resp) | |
| def _invoke_gpt_model(model: str, system: str, contents: Any) -> str: | |
| if not gpt_client: | |
| raise RuntimeError("GPT client is not configured") | |
| messages = _build_chat_messages(system, contents) | |
| responses_input: Optional[List[Dict[str, Any]]] = None | |
| if _requires_responses_api(model): | |
| responses_input = _build_responses_input(system, contents) | |
| resp = gpt_client.responses.create(model=model, input=responses_input) | |
| return _extract_responses_content(resp) | |
| try: | |
| resp = gpt_client.chat.completions.create(model=model, messages=messages) | |
| return _extract_chat_content(resp) | |
| except Exception as err: | |
| if not _should_use_responses_fallback(err): | |
| raise | |
| if responses_input is None: | |
| responses_input = _build_responses_input(system, contents) | |
| resp = gpt_client.responses.create(model=model, input=responses_input) | |
| return _extract_responses_content(resp) | |
| def gemini_call(*, system: str, contents): | |
| """Wrapper to: enforce RPM and standardize text extraction.""" | |
| if not gemini_client: | |
| raise RuntimeError("Gemini client is not configured") | |
| limiter.acquire() | |
| resp = gemini_client.models.generate_content( | |
| model=MODEL, | |
| config=types.GenerateContentConfig(system_instruction=system), | |
| contents=contents, | |
| ) | |
| return getattr(resp, "text", str(resp)) | |
| def gemini_small_call(*, system: str, contents: str) -> str: | |
| """Lightweight wrapper for the storyboard assistant using a smaller model with Gemini fallback.""" | |
| storyboard_limiter.acquire() | |
| if gpt_client: | |
| target_model = OPENAI_SMALL_MODEL | |
| return _invoke_gpt_model(target_model, system, contents) | |
| if not gemini_client: | |
| raise RuntimeError("Gemini client is not configured") | |
| fallback_model = (GEMINI_SMALL_MODEL or MODEL) or MODEL | |
| if ( | |
| not fallback_model | |
| or _requires_responses_api(fallback_model) | |
| or str(fallback_model).lower().startswith("gpt-") | |
| ): | |
| fallback_model = MODEL | |
| resp = gemini_client.models.generate_content( | |
| model=fallback_model, | |
| config=types.GenerateContentConfig(system_instruction=system), | |
| contents=contents, | |
| ) | |
| return getattr(resp, "text", str(resp)) | |
| # ---------------- prompts ---------------- | |
| SYSTEM_PROMPT = """You are a Manim CE (0.19.x) code generator/refiner. | |
| Return ONLY valid Python code (no backticks, no prose). | |
| Define exactly one class: AutoScene(Scene). | |
| Keep it short (preferably ≤ ~60 s) and quickly renderable. | |
| Use: from manim import * | |
| Allowed imports: manim, math, numpy. | |
| Forbidden: os, subprocess, sys, requests, pathlib, socket, shutil, psutil, any file/network/OS access. | |
| # CAPTURE POLICY (must follow exactly) | |
| - Insert a comment line `# CAPTURE_POINT` at the final, steady layout of the scene. | |
| - Right after `# CAPTURE_POINT`, call self.wait(0.75) and then END THE SCENE. | |
| - DO NOT add any outro animations, fades, or camera moves after `# CAPTURE_POINT`. | |
| - Ensure all intended elements are visible and legible at `# CAPTURE_POINT` (adequate margins, no overlaps, font ≥ 32 px at 854x480). | |
| # Common Manim CE 0.19 API constraints (must follow) | |
| - Do NOT use `vertex=` with RightAngle(...). Choose the corner by line ordering or set quadrant=(±1, ±1). | |
| - Do NOT call `.to_center()` (not a valid method). Use `.center()` or `.move_to(ORIGIN)`. | |
| - Prefer `.move_to()`, `.align_to()`, `.to_edge()`, `.scale()`, `.next_to()` for layout/placement, keeping generous spacing (buff ≥ 0.6) so nothing overlaps. | |
| - Only introduce objects that directly support the user's request. Avoid decorative or redundant elements that clutter the scene. | |
| """ | |
| DEFAULT_SCENE = """from manim import * | |
| class AutoScene(Scene): | |
| def construct(self): | |
| t = Text("Hello from Manim").scale(1) | |
| self.play(Write(t)) | |
| # CAPTURE_POINT | |
| self.wait(0.75) | |
| """ | |
| STORYBOARD_SYSTEM_PROMPT = """You are MathFrames' storyboard director. | |
| You interview educators, refine their ideas, and maintain a structured shot list for a short Manim video. | |
| Always respond with a single JSON object matching this schema exactly: | |
| { | |
| "reply": "<short conversational answer for the user>", | |
| "plan": { | |
| "concept": "<core idea you are visualizing>", | |
| "notes": "<optional reminders or staging notes>", | |
| "scenes": [ | |
| { | |
| "title": "Scene 1: Setup", | |
| "objective": "<what this scene accomplishes>", | |
| "steps": ["<bullet-level action>", "..."] | |
| } | |
| ] | |
| }, | |
| "questions": ["<optional clarification question>", "..."] | |
| } | |
| Rules: | |
| - Keep scene titles in the format: "Scene N: Subtitle". | |
| - Each scene must list 1-5 clear, imperative steps or beats (use educational language, no code). | |
| - Reflect any user-provided edits exactly. | |
| - If the user supplies a plan JSON, treat it as the source of truth and improve it gently. | |
| - Ask for clarification only when needed; otherwise leave the questions array empty. | |
| - Never include Markdown fences, prose outside JSON, or code snippets. | |
| # Professional editor guidance (use to drive the conversation naturally): | |
| - Confirm the concept/topic and any subtopics that should appear. | |
| - Capture the learning goal: what must the viewer understand by the end? | |
| - Clarify how deep the explanation should go (introductory vs. detailed walk-through). | |
| - Ask about any specific visuals, references, or prior scenes the user wants included. | |
| - Check whether there's an existing script or outline to honor. | |
| - Note any stylistic tone or audience expectations (e.g., middle school vs. college). | |
| """ | |
| STORYBOARD_CONFIRM_SYSTEM_PROMPT = """You are MathFrames' storyboard director. | |
| The user has finalized their plan. Craft the final handoff for the rendering model. | |
| Return a JSON object: | |
| { | |
| "reply": "<brief confirmation for the user>", | |
| "render_prompt": "<single paragraph prompt for the Manim code generator>", | |
| "plan": { ... same structure as provided ... } | |
| } | |
| Guidelines: | |
| - Keep render_prompt concise but fully descriptive. Mention each scene's purpose and key visuals. | |
| - Respect the provided storyboard plan exactly—do not invent new scenes or steps. | |
| - Include relevant settings (style, length, audience, resolution) when supplied. | |
| - Do not add Markdown or code; respond with JSON only. | |
| """ | |
| MAX_STORYBOARD_SCENES = 6 | |
| class ScenePayload(BaseModel): | |
| id: Optional[str] = None | |
| title: str | |
| objective: Optional[str] = "" | |
| steps: List[str] | |
| def _clean_title(cls, value: Any) -> str: | |
| if isinstance(value, str): | |
| value = value.strip() | |
| if not value: | |
| return "Scene" | |
| return value | |
| def _coerce_steps(cls, value: Any) -> List[str]: | |
| collected: List[str] = [] | |
| if isinstance(value, str): | |
| candidates = value.replace("\r", "").split("\n") | |
| collected.extend(candidates) | |
| elif isinstance(value, (list, tuple)): | |
| for item in value: | |
| if isinstance(item, str): | |
| collected.extend(item.replace("\r", "").split("\n")) | |
| elif isinstance(item, (list, tuple)): | |
| for sub in item: | |
| if isinstance(sub, str): | |
| collected.append(sub) | |
| cleaned = [] | |
| for step in collected: | |
| step = str(step).strip(" •\t-") | |
| if step: | |
| cleaned.append(step) | |
| return cleaned or ["Outline the key idea for this scene."] | |
| class PlanPayload(BaseModel): | |
| concept: str | |
| scenes: List[ScenePayload] | |
| notes: Optional[str] = "" | |
| def _clean_concept(cls, value: Any) -> str: | |
| if isinstance(value, str): | |
| value = value.strip() | |
| return value or "Untitled Concept" | |
| def _ensure_scenes(cls, value: Any) -> List[Any]: | |
| if isinstance(value, (list, tuple)): | |
| return list(value) | |
| return [] | |
| class StoryboardChatIn(BaseModel): | |
| session_id: Optional[str] = None | |
| message: Optional[str] = "" | |
| plan: Optional[PlanPayload] = None | |
| settings: Optional[Dict[str, Any]] = None | |
| def _default_message(cls, value: Any) -> str: | |
| if value is None: | |
| return "" | |
| return str(value) | |
| def _sanitize_settings(cls, value: Any) -> Dict[str, Any]: | |
| if isinstance(value, dict): | |
| return value | |
| return {} | |
| class StoryboardConfirmIn(BaseModel): | |
| session_id: Optional[str] = None | |
| plan: PlanPayload | |
| settings: Optional[Dict[str, Any]] = None | |
| def _sanitize_settings(cls, value: Any) -> Dict[str, Any]: | |
| if isinstance(value, dict): | |
| return value | |
| return {} | |
| class PlanSession: | |
| session_id: str | |
| messages: List[Dict[str, Any]] = field(default_factory=list) | |
| plan: Optional[PlanPayload] = None | |
| settings: Dict[str, Any] = field(default_factory=dict) | |
| created_at: float = field(default_factory=time.time) | |
| updated_at: float = field(default_factory=time.time) | |
| PLAN_SESSIONS: Dict[str, PlanSession] = {} | |
| PLAN_LOCK = threading.Lock() | |
| # ---------- NEW: carry full CLI error back to the refiner ---------- | |
| class RenderError(Exception): | |
| def __init__(self, log: str): | |
| super().__init__("Manim render failed") | |
| self.log = log or "" | |
| # ---------------- helpers ---------------- | |
| def _clean_code(text: str) -> str: | |
| """Strip common Markdown fences like ```python ... ``` or ``` ...""" | |
| if not text: | |
| return "" | |
| text = re.sub(r"^```(?:\s*python)?\s*", "", text.strip(), flags=re.IGNORECASE) | |
| text = re.sub(r"\s*```$", "", text) | |
| return text.strip() | |
| def _preflight_sanitize(code: str) -> str: | |
| """ | |
| Auto-correct a few frequent Manim CE 0.19 mistakes to reduce trivial crashes. | |
| - .to_center() -> .center() | |
| - Remove vertex=... from RightAngle(...), then normalize commas. | |
| """ | |
| c = code | |
| # 1) replace invalid method | |
| c = re.sub(r"\.to_center\(\)", ".center()", c) | |
| # 2) remove vertex=... kwarg inside RightAngle(...) | |
| # Case A: middle of arg list with trailing comma | |
| c = re.sub( | |
| r"(RightAngle\s*\([^)]*?),\s*vertex\s*=\s*[^,)\s]+(\s*,)", | |
| r"\1\2", | |
| c, | |
| flags=re.DOTALL, | |
| ) | |
| # Case B: last kwarg before ')' | |
| c = re.sub( | |
| r"(RightAngle\s*\([^)]*?),\s*vertex\s*=\s*[^,)\s]+(\s*\))", | |
| r"\1\2", | |
| c, | |
| flags=re.DOTALL, | |
| ) | |
| # Normalize doubled commas or commas before ')' | |
| c = re.sub(r",\s*,", ", ", c) | |
| c = re.sub(r",\s*\)", ")", c) | |
| return c | |
| def _extract_json_dict(raw: str) -> Dict[str, Any]: | |
| """Best-effort JSON extraction from the LLM response.""" | |
| if not raw: | |
| raise ValueError("Empty response from model") | |
| stripped = raw.strip() | |
| if stripped.startswith("```"): | |
| stripped = re.sub(r"^```(?:json)?", "", stripped, flags=re.IGNORECASE).strip() | |
| stripped = re.sub(r"```$", "", stripped).strip() | |
| try: | |
| return json.loads(stripped) | |
| except json.JSONDecodeError: | |
| match = re.search(r"\{.*\}", stripped, flags=re.DOTALL) | |
| if match: | |
| candidate = match.group(0) | |
| try: | |
| return json.loads(candidate) | |
| except json.JSONDecodeError: | |
| pass | |
| raise ValueError("Model did not return valid JSON") | |
| def _generate_scene_id(index: int) -> str: | |
| return f"scene-{index}-{uuid.uuid4().hex[:6]}" | |
| def _normalize_scene_title(index: int, title: str) -> str: | |
| title = title.strip() | |
| if not title: | |
| return f"Scene {index}: Beat" | |
| prefix = f"Scene {index}" | |
| if not title.lower().startswith("scene"): | |
| return f"{prefix}: {title}" | |
| parts = title.split(":", 1) | |
| if len(parts) == 2: | |
| return f"{prefix}: {parts[1].strip()}" | |
| return f"{prefix}: {title.split(maxsplit=1)[-1]}" | |
| def _sanitize_plan(plan: Optional[PlanPayload], *, concept_hint: str = "Untitled Concept") -> PlanPayload: | |
| if not plan: | |
| default_scene = ScenePayload( | |
| id=_generate_scene_id(1), | |
| title="Scene 1: Setup", | |
| objective=f"Introduce {concept_hint}", | |
| steps=[ | |
| f"Display the title \"{concept_hint}\"", | |
| "Provide quick context for the viewer", | |
| "Highlight the main question to explore", | |
| ], | |
| ) | |
| return PlanPayload(concept=concept_hint, notes="", scenes=[default_scene]) | |
| concept = plan.concept.strip() or concept_hint or "Untitled Concept" | |
| sanitized_scenes: List[ScenePayload] = [] | |
| for idx, scene in enumerate(plan.scenes[:MAX_STORYBOARD_SCENES], start=1): | |
| steps = [str(step).strip() for step in scene.steps if step and str(step).strip()] | |
| if not steps: | |
| steps = [f"Explain the next idea for {concept}."] | |
| title = _normalize_scene_title(idx, scene.title or f"Scene {idx}") | |
| objective = (scene.objective or "").strip() | |
| sanitized_scenes.append( | |
| ScenePayload( | |
| id=scene.id or _generate_scene_id(idx), | |
| title=title, | |
| objective=objective or f"Advance the story about {concept}.", | |
| steps=steps, | |
| ) | |
| ) | |
| if not sanitized_scenes: | |
| sanitized_scenes.append( | |
| ScenePayload( | |
| id=_generate_scene_id(1), | |
| title="Scene 1: Setup", | |
| objective=f"Introduce {concept}", | |
| steps=[ | |
| f"Present the main idea \"{concept}\"", | |
| "Explain why it matters to the viewer", | |
| ], | |
| ) | |
| ) | |
| notes = (plan.notes or "").strip() | |
| return PlanPayload(concept=concept, notes=notes, scenes=sanitized_scenes) | |
| def _plan_to_public_dict(plan: PlanPayload) -> Dict[str, Any]: | |
| return plan.dict() | |
| def _format_conversation(messages: List[Dict[str, Any]], limit: int = 8) -> str: | |
| if not messages: | |
| return "None yet." | |
| recent = messages[-limit:] | |
| lines = [] | |
| for msg in recent: | |
| role = msg.get("role", "assistant").title() | |
| content = str(msg.get("content", "")).strip() | |
| lines.append(f"{role}: {content}") | |
| return "\n".join(lines) | |
| def _audience_label(value: Optional[str]) -> Optional[str]: | |
| mapping = { | |
| "ms": "middle school students", | |
| "hs": "high school students", | |
| "ug": "undergraduate students", | |
| } | |
| return mapping.get(str(value).lower()) if value else None | |
| def _style_label(value: Optional[str]) -> Optional[str]: | |
| mapping = { | |
| "minimal": "minimal visuals (focus on narration and a few key elements)", | |
| "steps": "step-by-step exposition with clear transitions", | |
| "geometry": "geometry-focused visuals that highlight shapes and spatial relationships", | |
| } | |
| return mapping.get(str(value).lower()) if value else None | |
| def _length_label(value: Optional[str]) -> Optional[str]: | |
| mapping = { | |
| "short": "short (~30–45s)", | |
| "medium": "medium (~60–90s)", | |
| } | |
| return mapping.get(str(value).lower()) if value else None | |
| def _quality_from_settings(settings: Optional[Dict[str, Any]]) -> str: | |
| if not settings: | |
| return "medium" | |
| resolution = str(settings.get("resolution", "")).lower() | |
| if resolution == "480p": | |
| return "low" | |
| if resolution == "1080p": | |
| return "high" | |
| return "medium" | |
| def _quality_flag(quality: str) -> str: | |
| return { | |
| "low": "-ql", | |
| "medium": "-qm", | |
| "high": "-qh", | |
| }.get(quality, "-qm") | |
| def _compose_default_render_prompt(plan: PlanPayload, settings: Dict[str, Any], conversation: List[Dict[str, Any]]) -> str: | |
| lines = [ | |
| f"Create a concise Manim CE 0.19 scene illustrating the concept \"{plan.concept}\".", | |
| "Structure the animation around these storyboard scenes:", | |
| ] | |
| for scene in plan.scenes: | |
| lines.append(f"- {scene.title} ({scene.objective})") | |
| for step in scene.steps: | |
| lines.append(f" • {step}") | |
| if plan.notes: | |
| lines.append(f"Production notes: {plan.notes}") | |
| if settings: | |
| audience_text = _audience_label(settings.get("audience")) | |
| style_text = _style_label(settings.get("style")) | |
| length_text = _length_label(settings.get("length")) | |
| lines.append("Production settings to honor:") | |
| if audience_text: | |
| lines.append(f"- Tailor explanations for {audience_text} (language, pacing, assumptions).") | |
| if style_text: | |
| lines.append(f"- Presentation style: {style_text}.") | |
| if length_text: | |
| lines.append(f"- Keep total runtime {length_text}.") | |
| resolution = settings.get("resolution") | |
| if resolution: | |
| lines.append(f"- Render for {resolution} output (frame layout should read well at that resolution).") | |
| if conversation: | |
| lines.append("Incorporate the important constraints already discussed with the user.") | |
| lines.append("Follow the CAPTURE policy: include # CAPTURE_POINT just before the final self.wait(0.75).") | |
| return "\n".join(lines) | |
| def _prune_plan_sessions(max_sessions: int = 200, max_age_seconds: int = 3600) -> None: | |
| now = time.time() | |
| with PLAN_LOCK: | |
| if len(PLAN_SESSIONS) > max_sessions: | |
| sorted_items = sorted(PLAN_SESSIONS.items(), key=lambda item: item[1].updated_at) | |
| for session_id, _ in sorted_items[: len(PLAN_SESSIONS) - max_sessions]: | |
| PLAN_SESSIONS.pop(session_id, None) | |
| for session_id, session in list(PLAN_SESSIONS.items()): | |
| if now - session.updated_at > max_age_seconds: | |
| PLAN_SESSIONS.pop(session_id, None) | |
| def _get_or_create_session(session_id: Optional[str], settings: Optional[Dict[str, Any]] = None) -> PlanSession: | |
| with PLAN_LOCK: | |
| if session_id and session_id in PLAN_SESSIONS: | |
| session = PLAN_SESSIONS[session_id] | |
| if settings: | |
| session.settings.update(settings) | |
| return session | |
| new_id = session_id or uuid.uuid4().hex | |
| session = PlanSession(session_id=new_id) | |
| if settings: | |
| session.settings.update(settings) | |
| PLAN_SESSIONS[new_id] = session | |
| _prune_plan_sessions() | |
| return session | |
| def _storyboard_model_reply(session: PlanSession, user_message: str) -> Tuple[str, PlanPayload, List[str]]: | |
| concept_hint = session.plan.concept if session.plan else (user_message.strip() or "Untitled Concept") | |
| session.plan = _sanitize_plan(session.plan, concept_hint=concept_hint) | |
| session.updated_at = time.time() | |
| plan_json = json.dumps(_plan_to_public_dict(session.plan), indent=2) | |
| settings_json = json.dumps(session.settings or {}, indent=2) | |
| history_text = _format_conversation(session.messages) | |
| latest_message = user_message.strip() or "User adjusted the storyboard without additional text." | |
| contents = f"""You are refining a math animation storyboard with the user. | |
| Current storyboard plan JSON: | |
| {plan_json} | |
| Session settings: | |
| {settings_json} | |
| Conversation so far: | |
| {history_text} | |
| Update the plan if needed and craft your reply (JSON only). Latest user message: | |
| {latest_message} | |
| """ | |
| raw_response = gemini_small_call(system=STORYBOARD_SYSTEM_PROMPT, contents=contents) | |
| try: | |
| parsed = _extract_json_dict(raw_response) | |
| except Exception as exc: | |
| print("Storyboard model JSON parse failed:", exc, file=sys.stderr) | |
| parsed = {} | |
| reply_text = str(parsed.get("reply") or "").strip() or "Understood—updating the storyboard." | |
| plan_data = parsed.get("plan") | |
| new_plan = session.plan | |
| if isinstance(plan_data, dict): | |
| try: | |
| new_plan = PlanPayload(**plan_data) | |
| except Exception as exc: | |
| print("Unable to parse plan from storyboard model:", exc, file=sys.stderr) | |
| session.plan = _sanitize_plan(new_plan, concept_hint=session.plan.concept if session.plan else concept_hint) | |
| questions_field = parsed.get("questions") or [] | |
| questions = [str(q).strip() for q in questions_field if isinstance(q, (str, int)) and str(q).strip()] | |
| session.updated_at = time.time() | |
| return reply_text, session.plan, questions | |
| def _storyboard_model_confirm(session: PlanSession) -> Tuple[str, PlanPayload, str]: | |
| session.plan = _sanitize_plan(session.plan, concept_hint=session.plan.concept if session.plan else "Untitled Concept") | |
| plan_json = json.dumps(_plan_to_public_dict(session.plan), indent=2) | |
| settings_json = json.dumps(session.settings or {}, indent=2) | |
| history_text = _format_conversation(session.messages) | |
| contents = f"""The user has approved this storyboard plan: | |
| {plan_json} | |
| Session settings: | |
| {settings_json} | |
| Conversation summary: | |
| {history_text} | |
| Produce the confirmation JSON only (no Markdown).""" | |
| raw_response = gemini_small_call(system=STORYBOARD_CONFIRM_SYSTEM_PROMPT, contents=contents) | |
| try: | |
| parsed = _extract_json_dict(raw_response) | |
| except Exception as exc: | |
| print("Storyboard confirm JSON parse failed:", exc, file=sys.stderr) | |
| parsed = {} | |
| reply_text = str(parsed.get("reply") or "").strip() or "Great! Locking the storyboard and preparing the renderer." | |
| plan_data = parsed.get("plan") | |
| final_plan = session.plan | |
| if isinstance(plan_data, dict): | |
| try: | |
| final_plan = PlanPayload(**plan_data) | |
| except Exception as exc: | |
| print("Unable to parse confirmed plan:", exc, file=sys.stderr) | |
| final_plan = _sanitize_plan(final_plan, concept_hint=final_plan.concept if final_plan else session.plan.concept) | |
| render_prompt = str(parsed.get("render_prompt") or "").strip() | |
| if not render_prompt: | |
| render_prompt = _compose_default_render_prompt(final_plan, session.settings, session.messages) | |
| session.plan = final_plan | |
| session.updated_at = time.time() | |
| return reply_text, final_plan, render_prompt | |
| def _run_manim(scene_code: str, run_id: Optional[str] = None, quality: str = "medium") -> Tuple[bytes, Optional[Path]]: | |
| """Render MP4 (fast) and also save a steady-state PNG (last frame).""" | |
| run_id = run_id or str(uuid.uuid4())[:8] | |
| work = RUNS / run_id; work.mkdir(parents=True, exist_ok=True) | |
| media = work / "media"; media.mkdir(parents=True, exist_ok=True) | |
| scene_path = work / "scene.py" | |
| # Write scene code (after sanitizer) | |
| safe_code = _preflight_sanitize(scene_code) | |
| scene_path.write_text(safe_code, encoding="utf-8") | |
| env = os.environ.copy() | |
| env["PYTHONPATH"] = str(work) | |
| quality_flag = _quality_flag(quality) | |
| # 1) Render video | |
| cmd_video = [ | |
| "manim", quality_flag, "--disable_caching", | |
| "--media_dir", str(media), | |
| "-o", f"{run_id}.mp4", | |
| str(scene_path), "AutoScene", | |
| ] | |
| proc_v = subprocess.run( | |
| cmd_video, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| env=env, | |
| ) | |
| if proc_v.returncode != 0: | |
| log = proc_v.stdout or "" | |
| print("Manim stdout/stderr:\n", log, file=sys.stderr) | |
| raise RenderError(log) | |
| # Locate output mp4 | |
| mp4 = None | |
| for p in media.rglob(f"{run_id}.mp4"): | |
| mp4 = p; break | |
| if not mp4: | |
| for p in media.rglob("*.mp4"): | |
| mp4 = p; break | |
| if not mp4: | |
| raise RenderError("Rendered video not found") | |
| # 2) Save last frame PNG (leverages our CAPTURE_POINT rule) | |
| png_path = None | |
| cmd_png = [ | |
| "manim", quality_flag, "--disable_caching", "-s", # -s saves the last frame as an image | |
| "--media_dir", str(media), | |
| str(scene_path), "AutoScene", | |
| ] | |
| proc_p = subprocess.run( | |
| cmd_png, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| env=env, | |
| ) | |
| if proc_p.returncode == 0: | |
| cand = None | |
| for p in media.rglob("*.png"): | |
| cand = p | |
| png_path = cand | |
| return mp4.read_bytes(), png_path | |
| def _upload_image_to_gemini(png_path: Path): | |
| """Prepare an inline data URI that the OpenAI vision API accepts.""" | |
| if not gemini_client or not png_path or not png_path.exists(): | |
| return None | |
| limiter.acquire() | |
| with open(png_path, "rb") as f: | |
| file_ref = gemini_client.files.upload( | |
| file=f, | |
| config={"mime_type": "image/png"}, | |
| ) | |
| return file_ref | |
| def llm_generate_manim_code( | |
| prompt: str, | |
| settings: Optional[Dict[str, Any]] = None, | |
| previous_code: Optional[str] = None, | |
| ) -> str: | |
| """First-pass generation (capture-aware).""" | |
| if not gemini_client: | |
| return DEFAULT_SCENE | |
| try: | |
| contents = f"Create AutoScene for: {prompt}\nRemember the CAPTURE POLICY and Common API constraints." | |
| if settings: | |
| audience_text = _audience_label(settings.get("audience")) | |
| style_text = _style_label(settings.get("style")) | |
| length_text = _length_label(settings.get("length")) | |
| contents += "\nProduction settings to respect:" | |
| if audience_text: | |
| contents += f"\n- Tailor explanations for {audience_text}." | |
| if style_text: | |
| contents += f"\n- Style: {style_text}." | |
| if length_text: | |
| contents += f"\n- Target runtime: {length_text}." | |
| resolution = settings.get("resolution") | |
| if resolution: | |
| contents += f"\n- Design visuals that read clearly at {resolution}." | |
| contents += "\nLayout requirement: ensure every element has clear separation—absolutely no overlaps at the capture point." | |
| contents += "\nKeep the composition minimal: only include elements explicitly needed for the prompt." | |
| response_text = gemini_call(system=SYSTEM_PROMPT, contents=contents) | |
| code = _clean_code(response_text) | |
| if "class AutoScene" not in code: | |
| code = previous_code or DEFAULT_SCENE | |
| return code | |
| except Exception: | |
| print("LLM generate error:", file=sys.stderr) | |
| traceback.print_exc() | |
| return previous_code or DEFAULT_SCENE | |
| def llm_refine_from_error( | |
| previous_code: str, | |
| error_message: str, | |
| original_user_prompt: str, | |
| settings: Optional[Dict[str, Any]] = None, | |
| ) -> str: | |
| """When Manim fails; send the *real* CLI log/trace to the LLM.""" | |
| if not gemini_client: | |
| return previous_code or DEFAULT_SCENE | |
| try: | |
| trimmed = error_message[-4000:] if error_message else "" | |
| user_prompt = f"""Original user prompt: | |
| {original_user_prompt} | |
| The following Manim CE (0.19.x) code failed to render. Fix it. | |
| Current code: | |
| {previous_code} | |
| Error / stack trace (tail): | |
| {trimmed} | |
| Requirements: | |
| - Fix the bug while preserving the math logic and planned animations. | |
| - Keep exactly one class AutoScene(Scene). | |
| - Keep the CAPTURE POLICY and ensure # CAPTURE_POINT is at the final steady layout. | |
| - Eliminate any overlapping elements; maintain clear spacing at the capture point. | |
| - Remove any objects that are not necessary for the prompt or storyboard; keep the scene concise. | |
| - Scan for nonexistent methods (e.g., `.to_center`) or invalid kwargs (e.g., `vertex=` on RightAngle) and replace with valid Manim CE 0.19 API. | |
| - Prefer `.center()`/`.move_to(ORIGIN)`, and `.move_to()`, `.align_to()`, `.to_edge()`, `.next_to()` for layout. | |
| - Apply the smallest change necessary to resolve the failure; do not overhaul structure, pacing, or stylistic choices the user made. | |
| - Preserve all existing text content (titles, labels, strings) unless it directly causes the error. | |
| - Do not alter functional math/logic that already works; only touch the problematic lines needed for a successful render. | |
| - Return ONLY the corrected Python code (no backticks). | |
| """ | |
| if settings: | |
| audience_text = _audience_label(settings.get("audience")) | |
| style_text = _style_label(settings.get("style")) | |
| length_text = _length_label(settings.get("length")) | |
| extra = "\nProduction targets to preserve:" | |
| if audience_text: | |
| extra += f"\n- Audience: {audience_text}." | |
| if style_text: | |
| extra += f"\n- Style: {style_text}." | |
| if length_text: | |
| extra += f"\n- Runtime goal: {length_text}." | |
| resolution = settings.get("resolution") | |
| if resolution: | |
| extra += f"\n- Ensure layout reads clearly at {resolution}." | |
| user_prompt += extra | |
| response_text = gemini_call(system=SYSTEM_PROMPT, contents=user_prompt) | |
| code = _clean_code(response_text) | |
| if "class AutoScene" not in code: | |
| return previous_code or DEFAULT_SCENE | |
| return code | |
| except Exception: | |
| print("LLM refine error:", file=sys.stderr) | |
| traceback.print_exc() | |
| return previous_code or DEFAULT_SCENE | |
| def llm_visual_refine_from_image( | |
| original_user_prompt: str, | |
| previous_code: str, | |
| png_path: Optional[Path], | |
| settings: Optional[Dict[str, Any]] = None, | |
| ) -> str: | |
| """ | |
| Use the screenshot to request layout/legibility/placement fixes. | |
| Includes the original prompt and current code, and asks for minimal edits. | |
| """ | |
| if not gemini_client or not png_path or not png_path.exists(): | |
| return previous_code | |
| try: | |
| file_ref = _upload_image_to_gemini(png_path) | |
| if not file_ref: | |
| return previous_code | |
| visual_prompt = f"""You are refining a Manim CE (0.19.x) scene based on its steady-state screenshot. | |
| Original user prompt: | |
| {original_user_prompt} | |
| Current Manim code: | |
| {previous_code} | |
| Tasks (optimize for readability and visual quality without changing the math meaning): | |
| - Fix layout issues (overlaps, cramped margins, alignment, consistent scaling). | |
| - Improve text legibility (minimum size ~32 px at 854x480, adequate contrast). | |
| - Ensure all intended elements are visible at the capture point. | |
| - Remove any overlapping elements; keep generous spacing between visuals. | |
| - Remove decorative or redundant elements that are not required by the user's prompt or storyboard. | |
| - Keep animation semantics as-is unless they're obviously broken. | |
| - Keep exactly one class AutoScene(Scene). | |
| - Preserve the CAPTURE POLICY and place `# CAPTURE_POINT` at the final steady layout with self.wait(0.75) and NO outro after that. | |
| - Make the minimal adjustments needed to fix readability; do not rework the overall composition or pacing beyond what the user already authored. | |
| - Preserve all text labels, titles, and strings as written unless they directly cause overlap/legibility issues. | |
| - Avoid rewriting functioning math/logic—only adjust positioning, styling, or other elements required to fix the visual defect. | |
| Return ONLY the revised Python code (no backticks). | |
| """ | |
| if settings: | |
| audience_text = _audience_label(settings.get("audience")) | |
| style_text = _style_label(settings.get("style")) | |
| length_text = _length_label(settings.get("length")) | |
| visual_prompt += "\nKeep these production settings in mind:" | |
| if audience_text: | |
| visual_prompt += f"\n- Audience: {audience_text}." | |
| if style_text: | |
| visual_prompt += f"\n- Style: {style_text}." | |
| if length_text: | |
| visual_prompt += f"\n- Runtime target: {length_text}." | |
| resolution = settings.get("resolution") | |
| if resolution: | |
| visual_prompt += f"\n- Layout should stay readable at {resolution}." | |
| response_text = gemini_call(system=SYSTEM_PROMPT, contents=[file_ref, visual_prompt]) | |
| code = _clean_code(response_text) | |
| if "class AutoScene" not in code: | |
| return previous_code | |
| return code | |
| except Exception: | |
| print("LLM visual refine error:", file=sys.stderr) | |
| traceback.print_exc() | |
| return previous_code | |
| def _attempt_render_with_refine( | |
| base_code: str, | |
| *, | |
| user_prompt: str, | |
| settings: Optional[Dict[str, Any]], | |
| quality: str, | |
| run_prefix: str, | |
| max_refines: int, | |
| ) -> Tuple[Optional[str], Optional[bytes], Optional[Path], str]: | |
| """ | |
| Try to render `base_code`, refining up to `max_refines` times using Gemini on failure. | |
| Returns tuple: (final_code, video_bytes, png_path, last_error_log). | |
| If rendering still fails, code/video/png are None and last_error_log carries the last trace. | |
| """ | |
| attempts = 0 | |
| current_code = base_code | |
| last_log = "" | |
| while True: | |
| try: | |
| mp4_bytes, png_path = _run_manim( | |
| current_code, | |
| run_id=f"{run_prefix}_try{attempts}", | |
| quality=quality, | |
| ) | |
| return current_code, mp4_bytes, png_path, "" | |
| except RenderError as err: | |
| last_log = err.log or last_log | |
| except Exception: | |
| last_log = traceback.format_exc() | |
| if attempts >= max_refines: | |
| return None, None, None, last_log | |
| attempts += 1 | |
| current_code = llm_refine_from_error( | |
| previous_code=current_code, | |
| error_message=last_log, | |
| original_user_prompt=user_prompt, | |
| settings=settings, | |
| ) | |
| def refine_loop( | |
| user_prompt: str, | |
| settings: Optional[Dict[str, Any]] = None, | |
| max_error_refines: int = 3, | |
| do_visual_refine: bool = False, | |
| ) -> bytes: | |
| """ | |
| Generate → render; on error, refine up to N times from Manim traceback → re-render. | |
| If first render succeeds and do_visual_refine==True, run an image-based refinement | |
| using the saved steady-state PNG, then re-render. Fallback to the best successful MP4. | |
| """ | |
| # 1) initial generation (capture-aware) | |
| initial_code = llm_generate_manim_code(user_prompt, settings=settings) | |
| quality = _quality_from_settings(settings) | |
| code, mp4_bytes, png_path, last_log = _attempt_render_with_refine( | |
| initial_code, | |
| user_prompt=user_prompt, | |
| settings=settings, | |
| quality=quality, | |
| run_prefix="primary", | |
| max_refines=max_error_refines, | |
| ) | |
| if code is None: | |
| print("Primary render failed after refinements; generating fallback code...", file=sys.stderr) | |
| fallback_code = llm_generate_manim_code(user_prompt, settings=settings) | |
| code, mp4_bytes, png_path, last_log = _attempt_render_with_refine( | |
| fallback_code, | |
| user_prompt=user_prompt, | |
| settings=settings, | |
| quality=quality, | |
| run_prefix="fallback", | |
| max_refines=2, | |
| ) | |
| if code is None: | |
| error_message = last_log or "Render failed after fallback attempts." | |
| raise RenderError(error_message) | |
| # 3) optional visual refinement loop | |
| if do_visual_refine and png_path and png_path.exists(): | |
| refined2 = llm_visual_refine_from_image( | |
| original_user_prompt=user_prompt, | |
| previous_code=code, | |
| png_path=png_path, | |
| settings=settings, | |
| ) | |
| if refined2.strip() != code.strip(): | |
| try: | |
| mp4_bytes2, _ = _run_manim(refined2, run_id="iter2", quality=quality) | |
| return mp4_bytes2 | |
| except Exception: | |
| print("Visual refine render failed; returning best known render.", file=sys.stderr) | |
| return mp4_bytes | |
| return mp4_bytes | |
| def _auto_fix_render( | |
| user_prompt: str, | |
| code: str, | |
| settings: Optional[Dict[str, Any]], | |
| initial_log: str, | |
| max_attempts: int = 3, | |
| ) -> Tuple[Optional[str], Optional[bytes], str]: | |
| """Attempt to auto-fix user code via LLM refinement if available.""" | |
| if not gemini_client: | |
| return None, None, initial_log | |
| quality = _quality_from_settings(settings) | |
| attempt_code = code | |
| last_log = initial_log | |
| for attempt in range(max_attempts): | |
| refined = llm_refine_from_error( | |
| previous_code=attempt_code, | |
| error_message=last_log, | |
| original_user_prompt=user_prompt, | |
| settings=settings, | |
| ) | |
| if refined.strip() == attempt_code.strip(): | |
| break | |
| attempt_code = refined | |
| try: | |
| mp4_bytes, _ = _run_manim( | |
| attempt_code, | |
| run_id=f"manual_fix_{attempt}", | |
| quality=quality, | |
| ) | |
| return attempt_code, mp4_bytes, "" | |
| except RenderError as err: | |
| last_log = err.log or last_log | |
| return None, None, last_log | |
| # ---------------- API ---------------- | |
| def storyboard_chat(inp: StoryboardChatIn): | |
| if not (gpt_client or gemini_client): | |
| raise HTTPException(500, "Storyboard model is not configured") | |
| if not inp.message.strip() and not inp.plan: | |
| raise HTTPException(400, "Message or plan updates are required.") | |
| session = _get_or_create_session(inp.session_id, inp.settings or {}) | |
| if inp.settings: | |
| session.settings.update(inp.settings) | |
| if inp.plan: | |
| try: | |
| session.plan = _sanitize_plan(inp.plan, concept_hint=inp.plan.concept) | |
| except Exception as exc: | |
| print("Failed to apply user-supplied plan:", exc, file=sys.stderr) | |
| user_message = inp.message.strip() | |
| if user_message: | |
| session.messages.append({"role": "user", "content": user_message}) | |
| else: | |
| session.messages.append({"role": "user", "content": "[Plan updated without additional message]"}) | |
| try: | |
| reply_text, plan_model, questions = _storyboard_model_reply(session, user_message) | |
| except Exception as exc: | |
| print("Storyboard chat error:", exc, file=sys.stderr) | |
| raise HTTPException(500, "Storyboard assistant failed to respond") | |
| session.messages.append({"role": "assistant", "content": reply_text}) | |
| return { | |
| "session_id": session.session_id, | |
| "reply": reply_text, | |
| "plan": plan_model.dict(), | |
| "questions": questions, | |
| "settings": session.settings, | |
| } | |
| def storyboard_confirm(inp: StoryboardConfirmIn): | |
| if not (gpt_client or gemini_client): | |
| raise HTTPException(500, "Storyboard model is not configured") | |
| session = _get_or_create_session(inp.session_id, inp.settings or {}) | |
| if inp.settings: | |
| session.settings.update(inp.settings) | |
| session.plan = _sanitize_plan(inp.plan, concept_hint=inp.plan.concept) | |
| session.messages.append({"role": "user", "content": "[User confirmed the storyboard plan]"}) | |
| try: | |
| reply_text, final_plan, render_prompt = _storyboard_model_confirm(session) | |
| except Exception as exc: | |
| print("Storyboard confirm error:", exc, file=sys.stderr) | |
| final_plan = session.plan | |
| render_prompt = _compose_default_render_prompt(final_plan, session.settings, session.messages) | |
| reply_text = "Plan confirmed. Falling back to a templated prompt." | |
| session.messages.append({"role": "assistant", "content": reply_text}) | |
| return { | |
| "session_id": session.session_id, | |
| "reply": reply_text, | |
| "render_prompt": render_prompt, | |
| "plan": final_plan.dict(), | |
| "settings": session.settings, | |
| } | |
| class PromptIn(BaseModel): | |
| prompt: str | |
| settings: Optional[Dict[str, Any]] = None | |
| def _validate_prompt(cls, value: str) -> str: | |
| if not value or not value.strip(): | |
| raise ValueError("Prompt cannot be empty") | |
| return value.strip() | |
| def _sanitize_settings(cls, value: Any) -> Optional[Dict[str, Any]]: | |
| if isinstance(value, dict): | |
| return value | |
| return None | |
| class GenerateCodeIn(PromptIn): | |
| pass | |
| class RenderCodeIn(BaseModel): | |
| code: str | |
| prompt: Optional[str] = "" | |
| settings: Optional[Dict[str, Any]] = None | |
| auto_fix: bool = False | |
| def _validate_code(cls, value: str) -> str: | |
| if not value or not value.strip(): | |
| raise ValueError("Code cannot be empty") | |
| return value | |
| def _sanitize_prompt(cls, value: Any) -> str: | |
| return str(value or "").strip() | |
| def _sanitize_settings(cls, value: Any) -> Optional[Dict[str, Any]]: | |
| if isinstance(value, dict): | |
| return value | |
| return None | |
| class EmailIn(BaseModel): | |
| email: str | |
| def sanitized(self) -> str: | |
| return self.email | |
| def validate_email(cls, value: str) -> str: | |
| cleaned = value.strip().lower() | |
| if not cleaned: | |
| raise ValueError("Email cannot be empty") | |
| if not re.match(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", cleaned): | |
| raise ValueError("Email is not valid") | |
| return cleaned | |
| def health(): | |
| return { | |
| "ok": True, | |
| "model": MODEL, | |
| "has_gemini": bool(gemini_client), | |
| "has_gpt": bool(gpt_client), | |
| } | |
| def generate_code(inp: GenerateCodeIn): | |
| """Return ONLY the generated Manim Python code (no rendering).""" | |
| code = llm_generate_manim_code(inp.prompt, settings=inp.settings) | |
| return {"code": code} | |
| def generate_and_render(inp: PromptIn): | |
| try: | |
| with acquire_render_slot(): | |
| mp4 = refine_loop(inp.prompt, settings=inp.settings, max_error_refines=3, do_visual_refine=False) | |
| except RuntimeError: | |
| raise HTTPException( | |
| status_code=503, | |
| detail={ | |
| "error": "queue_busy", | |
| "message": "Another render is already running. Please wait a moment and try again.", | |
| }, | |
| ) | |
| except Exception: | |
| raise HTTPException(500, "Failed to produce video after refinement") | |
| return Response( | |
| content=mp4, | |
| media_type="video/mp4", | |
| headers={"Content-Disposition": 'inline; filename="result.mp4"'} | |
| ) | |
| def render_code(inp: RenderCodeIn): | |
| quality = _quality_from_settings(inp.settings) | |
| try: | |
| with acquire_render_slot(): | |
| try: | |
| mp4_bytes, _ = _run_manim(inp.code, run_id="manual", quality=quality) | |
| return Response( | |
| content=mp4_bytes, | |
| media_type="video/mp4", | |
| headers={"Content-Disposition": 'inline; filename="result.mp4"'} | |
| ) | |
| except RenderError as exc: | |
| log = exc.log or "" | |
| # if False: #not inp.auto_fix: | |
| # raise HTTPException( | |
| # status_code=400, | |
| # detail={ | |
| # "error": "Render failed", | |
| # "message": "Render failed. Attempting automatic fix...", | |
| # }, | |
| # ) | |
| fixed_code, fixed_video, final_log = _auto_fix_render( | |
| user_prompt=inp.prompt or "User-edited Manim code", | |
| code=inp.code, | |
| settings=inp.settings, | |
| initial_log=log, | |
| ) | |
| if fixed_code and fixed_video: | |
| payload = { | |
| "auto_fixed": True, | |
| "message": "Your code triggered a Manim error, so I applied the smallest possible fix (keeping your edits) and reran the render.", | |
| "code": fixed_code, | |
| "video_base64": base64.b64encode(fixed_video).decode("utf-8"), | |
| "video_mime_type": "video/mp4", | |
| "files": [ | |
| {"filename": "scene.py", "contents": fixed_code} | |
| ], | |
| "meta": {"resolution": inp.settings.get("resolution") if inp.settings else None}, | |
| "log_tail": (log or "")[-600:] | |
| } | |
| return Response( | |
| content=json.dumps(payload), | |
| media_type="application/json", | |
| ) | |
| detail_log = (final_log or log)[-6000:] | |
| raise HTTPException( | |
| status_code=400, | |
| detail={"error": "Render failed", "log": detail_log, "code": inp.code}, | |
| ) | |
| except RuntimeError: | |
| raise HTTPException( | |
| status_code=503, | |
| detail={ | |
| "error": "queue_busy", | |
| "message": "Another render is already running. Please wait a moment and try again.", | |
| }, | |
| ) | |
| except Exception as exc: | |
| raise HTTPException(status_code=500, detail={"error": "Unexpected render failure", "log": str(exc)}) | |
| def store_email(email: EmailIn): | |
| """Store the provided email address in the configured Hugging Face dataset.""" | |
| if not hf_api or not HF_TOKEN: | |
| raise HTTPException(500, "Email logging is not configured") | |
| sanitized_email = email.sanitized | |
| timestamp = int(time.time()) | |
| key = f"emails/{int(time.time() * 1000)}-{uuid.uuid4().hex}.json" | |
| payload = {"email": sanitized_email, "ts": timestamp} | |
| try: | |
| hf_api.create_commit( | |
| repo_id=HF_DATASET_ID, | |
| repo_type="dataset", | |
| operations=[ | |
| CommitOperationAdd( | |
| path_in_repo=key, | |
| path_or_fileobj=BytesIO(json.dumps(payload).encode("utf-8")), | |
| ) | |
| ], | |
| commit_message=f"Log email: {sanitized_email}", | |
| token=HF_TOKEN, | |
| ) | |
| except Exception as exc: | |
| print("Failed to log email to Hugging Face:", exc, file=sys.stderr) | |
| raise HTTPException(500, "Failed to save email address") | |
| return {"stored": True, "path": key} | |