""" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ SUPREMEAI — API REST FastAPI complète Endpoints: génération, statut, historique, fine-tuning, streaming ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ from fastapi import FastAPI, BackgroundTasks, HTTPException, UploadFile, File, Form from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field from typing import Optional, List import asyncio, uuid, os, time, json, logging from pathlib import Path from datetime import datetime logger = logging.getLogger(__name__) OUTPUT_DIR = Path(os.getenv("SUPREMEAI_OUTPUT", "/tmp/supremeai_output")) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) # ── État global des jobs ───────────────────────────────────────────────────── JOBS: dict = {} # job_id → {status, progress, result, created_at} app = FastAPI( title="SupremeAI Video Engine API", description="API REST pour la génération vidéo IA la plus avancée", version="1.0.0", docs_url="/docs", redoc_url="/redoc", ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ── Schémas Pydantic ───────────────────────────────────────────────────────── class GenerateRequest(BaseModel): prompt: str = Field(..., min_length=3, max_length=1000, description="Description de la vidéo") negative_prompt: str = Field("", description="Ce à éviter dans la vidéo") style: str = Field("cinematic", description="Style vidéo") mode: str = Field("balanced", description="Mode: quality | balanced | speed") width: int = Field(1280, ge=256, le=3840) height: int = Field(720, ge=144, le=2160) fps: int = Field(24, ge=12, le=120) duration: float = Field(5.0, ge=1.0, le=60.0) num_steps: int = Field(20, ge=1, le=100) guidance_scale: float = Field(7.5, ge=1.0, le=20.0) seed: int = Field(-1) upscale_4k: bool = Field(False) interpolate_fps: int = Field(0) color_grading: str = Field("none") add_voiceover: bool = Field(False) voiceover_text: str = Field("") voice_lang: str = Field("fr") storyboard: List[str] = Field([], description="Scènes pour le mode Director") class GenerateResponse(BaseModel): job_id: str status: str message: str eta_seconds: Optional[float] = None class JobStatus(BaseModel): job_id: str status: str # pending | processing | done | error progress: int # 0-100 message: str video_url: Optional[str] = None error: Optional[str] = None model: Optional[str] = None duration: Optional[float] = None created_at: str class GPUInfoResponse(BaseModel): has_cuda: bool gpu_name: str vram_gb: float recommended_model: str recommended_mode: str class CacheStatsResponse(BaseModel): entries: int size_mb: float total_hits: int # ── Initialisation pipeline ────────────────────────────────────────────────── _pipeline = None _cache = None def get_pipeline(): global _pipeline if _pipeline is None: from pipeline.generator import VideoGenerationPipeline _pipeline = VideoGenerationPipeline() return _pipeline def get_cache(): global _cache if _cache is None: from optimizers.speed import GenerationCache _cache = GenerationCache() return _cache # ── Background task de génération ──────────────────────────────────────────── async def run_generation(job_id: str, req: GenerateRequest): """Exécute la génération en arrière-plan.""" import asyncio JOBS[job_id]["status"] = "processing" JOBS[job_id]["progress"] = 5 JOBS[job_id]["message"] = "Initialisation..." def progress_cb(pct: int, msg: str): JOBS[job_id]["progress"] = pct JOBS[job_id]["message"] = msg try: from core.architecture import VideoGenerationConfig, VideoStyle, GenerationMode style_map = {s.value: s for s in VideoStyle} mode_map = {m.value: m for m in GenerationMode} config = VideoGenerationConfig( prompt=req.prompt, negative_prompt=req.negative_prompt, style=style_map.get(req.style, VideoStyle.CINEMATIC), mode=mode_map.get(req.mode, GenerationMode.BALANCED), width=req.width, height=req.height, fps=req.fps, duration=req.duration, num_inference_steps=req.num_steps, guidance_scale=req.guidance_scale, seed=req.seed, upscale_to_4k=req.upscale_4k, interpolate_fps=req.interpolate_fps, color_grading=req.color_grading, add_voiceover=req.add_voiceover, voiceover_text=req.voiceover_text, voice_language=req.voice_lang, storyboard=req.storyboard, ) # Vérif cache cache = get_cache() cached = cache.get(config) if cached: JOBS[job_id].update({ "status": "done", "progress": 100, "message": "✅ Vidéo servie depuis le cache", "video_url": f"/video/{os.path.basename(cached)}", "model": "cache", }) return pipe = get_pipeline() # Mode Director si storyboard fourni if req.storyboard and len(req.storyboard) > 1: result = await asyncio.get_event_loop().run_in_executor( None, lambda: pipe.generate_director_mode( topic=req.prompt, n_scenes=len(req.storyboard), config=config, progress_cb=progress_cb ) ) else: result = await asyncio.get_event_loop().run_in_executor( None, lambda: pipe.generate(config, progress_cb=progress_cb) ) if result.success: cache.put(config, result.video_path) JOBS[job_id].update({ "status": "done", "progress": 100, "message": "✅ Vidéo générée avec succès", "video_url": f"/video/{os.path.basename(result.video_path)}", "model": result.model_used, "duration": result.generation_time, }) else: JOBS[job_id].update({ "status": "error", "message": result.error or "Erreur inconnue", "error": result.error, }) except Exception as e: logger.exception(f"Erreur job {job_id}") JOBS[job_id].update({"status": "error", "error": str(e), "message": str(e)}) # ── Endpoints ──────────────────────────────────────────────────────────────── @app.get("/") async def root(): return { "name": "SupremeAI Video Engine", "version": "1.0.0", "status": "online", "docs": "/docs", } @app.get("/gpu", response_model=GPUInfoResponse) async def gpu_info(): """Retourne les informations GPU et le modèle recommandé.""" from pipeline.generator import GPUProfiler info = GPUProfiler.detect() return GPUInfoResponse(**info) @app.post("/generate", response_model=GenerateResponse) async def generate_video(req: GenerateRequest, bg: BackgroundTasks): """Lance une génération vidéo asynchrone. Retourne un job_id.""" job_id = str(uuid.uuid4())[:8] JOBS[job_id] = { "status": "pending", "progress": 0, "message": "En attente...", "created_at": datetime.now().isoformat(), } bg.add_task(run_generation, job_id, req) # Estimation ETA eta = {"quality": 60, "balanced": 25, "speed": 8}.get(req.mode, 25) return GenerateResponse( job_id=job_id, status="pending", message="Génération lancée", eta_seconds=eta ) @app.get("/status/{job_id}", response_model=JobStatus) async def job_status(job_id: str): """Retourne le statut d'un job de génération.""" if job_id not in JOBS: raise HTTPException(404, detail="Job non trouvé") j = JOBS[job_id] return JobStatus( job_id=job_id, status=j["status"], progress=j.get("progress", 0), message=j.get("message", ""), video_url=j.get("video_url"), error=j.get("error"), model=j.get("model"), duration=j.get("duration"), created_at=j["created_at"], ) @app.get("/video/{filename}") async def get_video(filename: str): """Télécharge une vidéo générée.""" path = OUTPUT_DIR / filename if not path.exists(): raise HTTPException(404, detail="Vidéo non trouvée") return FileResponse(str(path), media_type="video/mp4", headers={"Content-Disposition": f"attachment; filename={filename}"}) @app.get("/history") async def get_history(limit: int = 20): """Retourne l'historique des jobs récents.""" jobs = sorted(JOBS.items(), key=lambda x: x[1]["created_at"], reverse=True) return [{"job_id": k, **v} for k, v in jobs[:limit]] @app.delete("/history") async def clear_history(): """Efface l'historique des jobs.""" JOBS.clear() return {"message": "Historique effacé"} @app.get("/cache/stats", response_model=CacheStatsResponse) async def cache_stats(): """Statistiques du cache de génération.""" cache = get_cache() stats = cache.stats() return CacheStatsResponse(**stats) @app.delete("/cache") async def clear_cache(): """Vide le cache de génération.""" cache = get_cache() cache.index = {} cache._save_index() return {"message": "Cache vidé"} @app.post("/generate/sync") async def generate_sync(req: GenerateRequest): """ Génération synchrone (attend le résultat). Pour les vidéos courtes (< 10s) uniquement. """ if req.duration > 10: raise HTTPException(400, detail="Mode synchrone limité à 10 secondes. Utilisez /generate pour les vidéos plus longues.") job_id = str(uuid.uuid4())[:8] JOBS[job_id] = {"status": "processing", "progress": 0, "message": "", "created_at": datetime.now().isoformat()} await run_generation(job_id, req) j = JOBS[job_id] if j["status"] == "done": return {"success": True, "video_url": j["video_url"], "model": j.get("model"), "duration": j.get("duration")} else: raise HTTPException(500, detail=j.get("error", "Erreur génération")) @app.get("/styles") async def list_styles(): """Retourne tous les styles vidéo disponibles.""" from core.architecture import VideoStyle, STYLE_ENHANCERS return [ {"id": s.value, "name": s.value.replace("_", " ").title(), "enhancer": STYLE_ENHANCERS.get(s, "")[:80]} for s in VideoStyle ] @app.get("/models") async def list_models(): """Retourne les modèles disponibles et leur statut.""" from pipeline.generator import GPUProfiler gpu = GPUProfiler.detect() vram = gpu.get("vram_gb", 0) return [ {"id": "wan2.1", "name": "Wan 2.1 (14B)", "min_vram": 24, "available": vram >= 24, "quality": "⭐⭐⭐⭐⭐", "speed": "Lente"}, {"id": "cogvideox-5b", "name": "CogVideoX-5B", "min_vram": 16, "available": vram >= 16, "quality": "⭐⭐⭐⭐", "speed": "Moyenne"}, {"id": "cogvideox-2b", "name": "CogVideoX-2B", "min_vram": 8, "available": vram >= 8, "quality": "⭐⭐⭐", "speed": "Rapide"}, {"id": "animatediff", "name": "AnimateDiff-Lightning", "min_vram": 4, "available": vram >= 4, "quality": "⭐⭐⭐", "speed": "Ultra rapide (4 steps)"}, {"id": "enhanced_moviepy", "name": "Enhanced MoviePy CPU","min_vram": 0, "available": True, "quality": "⭐⭐", "speed": "CPU uniquement"}, ] # ── Lancement ──────────────────────────────────────────────────────────────── if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000, reload=False, workers=1)