IP1418HR's picture
Upload main.py
8f86b84 verified
"""
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
SUPREMEAI — API REST FastAPI complète
Endpoints: génération, statut, historique, fine-tuning, streaming
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
from fastapi import FastAPI, BackgroundTasks, HTTPException, UploadFile, File, Form
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from typing import Optional, List
import asyncio, uuid, os, time, json, logging
from pathlib import Path
from datetime import datetime
logger = logging.getLogger(__name__)
OUTPUT_DIR = Path(os.getenv("SUPREMEAI_OUTPUT", "/tmp/supremeai_output"))
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# ── État global des jobs ─────────────────────────────────────────────────────
JOBS: dict = {} # job_id → {status, progress, result, created_at}
app = FastAPI(
title="SupremeAI Video Engine API",
description="API REST pour la génération vidéo IA la plus avancée",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ── Schémas Pydantic ─────────────────────────────────────────────────────────
class GenerateRequest(BaseModel):
prompt: str = Field(..., min_length=3, max_length=1000, description="Description de la vidéo")
negative_prompt: str = Field("", description="Ce à éviter dans la vidéo")
style: str = Field("cinematic", description="Style vidéo")
mode: str = Field("balanced", description="Mode: quality | balanced | speed")
width: int = Field(1280, ge=256, le=3840)
height: int = Field(720, ge=144, le=2160)
fps: int = Field(24, ge=12, le=120)
duration: float = Field(5.0, ge=1.0, le=60.0)
num_steps: int = Field(20, ge=1, le=100)
guidance_scale: float = Field(7.5, ge=1.0, le=20.0)
seed: int = Field(-1)
upscale_4k: bool = Field(False)
interpolate_fps: int = Field(0)
color_grading: str = Field("none")
add_voiceover: bool = Field(False)
voiceover_text: str = Field("")
voice_lang: str = Field("fr")
storyboard: List[str] = Field([], description="Scènes pour le mode Director")
class GenerateResponse(BaseModel):
job_id: str
status: str
message: str
eta_seconds: Optional[float] = None
class JobStatus(BaseModel):
job_id: str
status: str # pending | processing | done | error
progress: int # 0-100
message: str
video_url: Optional[str] = None
error: Optional[str] = None
model: Optional[str] = None
duration: Optional[float] = None
created_at: str
class GPUInfoResponse(BaseModel):
has_cuda: bool
gpu_name: str
vram_gb: float
recommended_model: str
recommended_mode: str
class CacheStatsResponse(BaseModel):
entries: int
size_mb: float
total_hits: int
# ── Initialisation pipeline ──────────────────────────────────────────────────
_pipeline = None
_cache = None
def get_pipeline():
global _pipeline
if _pipeline is None:
from pipeline.generator import VideoGenerationPipeline
_pipeline = VideoGenerationPipeline()
return _pipeline
def get_cache():
global _cache
if _cache is None:
from optimizers.speed import GenerationCache
_cache = GenerationCache()
return _cache
# ── Background task de génération ────────────────────────────────────────────
async def run_generation(job_id: str, req: GenerateRequest):
"""Exécute la génération en arrière-plan."""
import asyncio
JOBS[job_id]["status"] = "processing"
JOBS[job_id]["progress"] = 5
JOBS[job_id]["message"] = "Initialisation..."
def progress_cb(pct: int, msg: str):
JOBS[job_id]["progress"] = pct
JOBS[job_id]["message"] = msg
try:
from core.architecture import VideoGenerationConfig, VideoStyle, GenerationMode
style_map = {s.value: s for s in VideoStyle}
mode_map = {m.value: m for m in GenerationMode}
config = VideoGenerationConfig(
prompt=req.prompt,
negative_prompt=req.negative_prompt,
style=style_map.get(req.style, VideoStyle.CINEMATIC),
mode=mode_map.get(req.mode, GenerationMode.BALANCED),
width=req.width, height=req.height,
fps=req.fps, duration=req.duration,
num_inference_steps=req.num_steps,
guidance_scale=req.guidance_scale,
seed=req.seed,
upscale_to_4k=req.upscale_4k,
interpolate_fps=req.interpolate_fps,
color_grading=req.color_grading,
add_voiceover=req.add_voiceover,
voiceover_text=req.voiceover_text,
voice_language=req.voice_lang,
storyboard=req.storyboard,
)
# Vérif cache
cache = get_cache()
cached = cache.get(config)
if cached:
JOBS[job_id].update({
"status": "done",
"progress": 100,
"message": "✅ Vidéo servie depuis le cache",
"video_url": f"/video/{os.path.basename(cached)}",
"model": "cache",
})
return
pipe = get_pipeline()
# Mode Director si storyboard fourni
if req.storyboard and len(req.storyboard) > 1:
result = await asyncio.get_event_loop().run_in_executor(
None, lambda: pipe.generate_director_mode(
topic=req.prompt, n_scenes=len(req.storyboard),
config=config, progress_cb=progress_cb
)
)
else:
result = await asyncio.get_event_loop().run_in_executor(
None, lambda: pipe.generate(config, progress_cb=progress_cb)
)
if result.success:
cache.put(config, result.video_path)
JOBS[job_id].update({
"status": "done",
"progress": 100,
"message": "✅ Vidéo générée avec succès",
"video_url": f"/video/{os.path.basename(result.video_path)}",
"model": result.model_used,
"duration": result.generation_time,
})
else:
JOBS[job_id].update({
"status": "error",
"message": result.error or "Erreur inconnue",
"error": result.error,
})
except Exception as e:
logger.exception(f"Erreur job {job_id}")
JOBS[job_id].update({"status": "error", "error": str(e), "message": str(e)})
# ── Endpoints ────────────────────────────────────────────────────────────────
@app.get("/")
async def root():
return {
"name": "SupremeAI Video Engine",
"version": "1.0.0",
"status": "online",
"docs": "/docs",
}
@app.get("/gpu", response_model=GPUInfoResponse)
async def gpu_info():
"""Retourne les informations GPU et le modèle recommandé."""
from pipeline.generator import GPUProfiler
info = GPUProfiler.detect()
return GPUInfoResponse(**info)
@app.post("/generate", response_model=GenerateResponse)
async def generate_video(req: GenerateRequest, bg: BackgroundTasks):
"""Lance une génération vidéo asynchrone. Retourne un job_id."""
job_id = str(uuid.uuid4())[:8]
JOBS[job_id] = {
"status": "pending",
"progress": 0,
"message": "En attente...",
"created_at": datetime.now().isoformat(),
}
bg.add_task(run_generation, job_id, req)
# Estimation ETA
eta = {"quality": 60, "balanced": 25, "speed": 8}.get(req.mode, 25)
return GenerateResponse(
job_id=job_id, status="pending",
message="Génération lancée", eta_seconds=eta
)
@app.get("/status/{job_id}", response_model=JobStatus)
async def job_status(job_id: str):
"""Retourne le statut d'un job de génération."""
if job_id not in JOBS:
raise HTTPException(404, detail="Job non trouvé")
j = JOBS[job_id]
return JobStatus(
job_id=job_id, status=j["status"],
progress=j.get("progress", 0), message=j.get("message", ""),
video_url=j.get("video_url"), error=j.get("error"),
model=j.get("model"), duration=j.get("duration"),
created_at=j["created_at"],
)
@app.get("/video/{filename}")
async def get_video(filename: str):
"""Télécharge une vidéo générée."""
path = OUTPUT_DIR / filename
if not path.exists():
raise HTTPException(404, detail="Vidéo non trouvée")
return FileResponse(str(path), media_type="video/mp4",
headers={"Content-Disposition": f"attachment; filename={filename}"})
@app.get("/history")
async def get_history(limit: int = 20):
"""Retourne l'historique des jobs récents."""
jobs = sorted(JOBS.items(), key=lambda x: x[1]["created_at"], reverse=True)
return [{"job_id": k, **v} for k, v in jobs[:limit]]
@app.delete("/history")
async def clear_history():
"""Efface l'historique des jobs."""
JOBS.clear()
return {"message": "Historique effacé"}
@app.get("/cache/stats", response_model=CacheStatsResponse)
async def cache_stats():
"""Statistiques du cache de génération."""
cache = get_cache()
stats = cache.stats()
return CacheStatsResponse(**stats)
@app.delete("/cache")
async def clear_cache():
"""Vide le cache de génération."""
cache = get_cache()
cache.index = {}
cache._save_index()
return {"message": "Cache vidé"}
@app.post("/generate/sync")
async def generate_sync(req: GenerateRequest):
"""
Génération synchrone (attend le résultat).
Pour les vidéos courtes (< 10s) uniquement.
"""
if req.duration > 10:
raise HTTPException(400, detail="Mode synchrone limité à 10 secondes. Utilisez /generate pour les vidéos plus longues.")
job_id = str(uuid.uuid4())[:8]
JOBS[job_id] = {"status": "processing", "progress": 0, "message": "", "created_at": datetime.now().isoformat()}
await run_generation(job_id, req)
j = JOBS[job_id]
if j["status"] == "done":
return {"success": True, "video_url": j["video_url"], "model": j.get("model"), "duration": j.get("duration")}
else:
raise HTTPException(500, detail=j.get("error", "Erreur génération"))
@app.get("/styles")
async def list_styles():
"""Retourne tous les styles vidéo disponibles."""
from core.architecture import VideoStyle, STYLE_ENHANCERS
return [
{"id": s.value, "name": s.value.replace("_", " ").title(),
"enhancer": STYLE_ENHANCERS.get(s, "")[:80]}
for s in VideoStyle
]
@app.get("/models")
async def list_models():
"""Retourne les modèles disponibles et leur statut."""
from pipeline.generator import GPUProfiler
gpu = GPUProfiler.detect()
vram = gpu.get("vram_gb", 0)
return [
{"id": "wan2.1", "name": "Wan 2.1 (14B)", "min_vram": 24, "available": vram >= 24, "quality": "⭐⭐⭐⭐⭐", "speed": "Lente"},
{"id": "cogvideox-5b", "name": "CogVideoX-5B", "min_vram": 16, "available": vram >= 16, "quality": "⭐⭐⭐⭐", "speed": "Moyenne"},
{"id": "cogvideox-2b", "name": "CogVideoX-2B", "min_vram": 8, "available": vram >= 8, "quality": "⭐⭐⭐", "speed": "Rapide"},
{"id": "animatediff", "name": "AnimateDiff-Lightning", "min_vram": 4, "available": vram >= 4, "quality": "⭐⭐⭐", "speed": "Ultra rapide (4 steps)"},
{"id": "enhanced_moviepy", "name": "Enhanced MoviePy CPU","min_vram": 0, "available": True, "quality": "⭐⭐", "speed": "CPU uniquement"},
]
# ── Lancement ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, reload=False, workers=1)