GPT5API / app.py
MB-IDK's picture
Create app.py
0dd82d9 verified
"""
API OpenAI-compatible wrappant aifreeforever.com
Endpoints :
GET /v1/models
POST /v1/chat/completions (stream=false & stream=true)
"""
import asyncio, json, time, uuid
from typing import Optional
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from playwright.async_api import async_playwright, Page, TimeoutError as PWTimeout
# ════════════════════════════════════════
# FastAPI
# ════════════════════════════════════════
app = FastAPI(title="Free Chat API")
# Limite 1 requΓͺte Playwright Γ  la fois (free tier = peu de RAM)
SEM = asyncio.Semaphore(1)
MODEL_NAME = "aifreeforever"
# ════════════════════════════════════════
# Modèles Pydantic (format OpenAI)
# ════════════════════════════════════════
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: Optional[str] = MODEL_NAME
messages: list[Message]
stream: Optional[bool] = False
temperature: Optional[float] = None
max_tokens: Optional[int] = None
# ════════════════════════════════════════
# CONFIG PLAYWRIGHT
# ════════════════════════════════════════
URL = "https://aifreeforever.com/tools/free-chatgpt-no-login"
HEADLESS = True
MAX_RETRIES = 5
SEL_TEXTAREA = 'textarea[placeholder*="Ask anything"]'
SEL_SEND_BTN = 'button.absolute.right-3.top-4.w-10.h-10'
SEL_BOT_MSG = '.flex.justify-start .rounded-2xl.shadow-sm.bg-white.border.border-gray-100'
SEL_BOT_CONTENT = '.markdown-content'
SEL_ACCEPT_BTN = 'button:has-text("Accept & Continue")'
SEL_AGE_BTN = 'button:has-text("13 and Over")'
SEL_COPY_BTN = (
'button.text-xs.text-gray-500.flex.items-center.gap-1'
'.px-2.py-1.rounded-md.transition-colors:has-text("Copy")'
)
SEL_ERROR_MSG = 'p.text-red-600:has-text("Failed to send message")'
COOKIE_SELECTORS = [
'button.unic-agree-all-button',
'button:has-text("Accepter et continuer")',
'button:has-text("Accepter tout")',
'button:has-text("Accept all")',
'button:has-text("Accept All")',
'button:has-text("I agree")',
]
AD_SELECTORS = [
'button:has(path[fill="#1D1D1B"][opacity="0.7"])',
'button:has(path[style*="stroke: rgb(255, 255, 255)"][style*="stroke-width: 6.353"])',
'button[aria-label*="close" i]',
'button[aria-label*="fermer" i]',
'button[title*="close" i]',
'button:has-text("Γ—")',
'button:has-text("βœ•")',
'button:has-text("Close")',
'[class*="close-btn"]',
]
# ════════════════════════════════════════
# HELPERS PLAYWRIGHT (identiques au script)
# ════════════════════════════════════════
async def _try_click(page: Page, selector: str, timeout: int) -> bool:
try:
btn = page.locator(selector).first
await btn.wait_for(state="visible", timeout=timeout)
await btn.click()
return True
except PWTimeout:
return False
async def click_first_visible(page: Page, selectors: list[str], timeout: int = 4000):
tasks = [_try_click(page, sel, timeout) for sel in selectors]
results = await asyncio.gather(*tasks, return_exceptions=True)
return any(r is True for r in results)
async def click_if_visible(page: Page, selector: str, timeout: int = 8000) -> bool:
return await _try_click(page, selector, timeout)
async def send_with_retry(page: Page, prompt: str) -> bool:
for attempt in range(1, MAX_RETRIES + 1):
if attempt > 1:
await page.locator(SEL_TEXTAREA).fill(prompt)
await page.locator(SEL_SEND_BTN).click(timeout=15_000)
await asyncio.sleep(2)
if await page.locator(SEL_ERROR_MSG).count() == 0:
return True
return False
async def try_copy_button(page: Page) -> str | None:
try:
msgs = page.locator(SEL_BOT_MSG)
count = await msgs.count()
if count == 0:
return None
last_msg = None
for i in range(count - 1, -1, -1):
msg = msgs.nth(i)
if await msg.locator(SEL_BOT_CONTENT).count() > 0:
last_msg = msg
break
if last_msg is None:
last_msg = msgs.nth(count - 1)
copy_btn = last_msg.locator(SEL_COPY_BTN)
await copy_btn.wait_for(state="visible", timeout=3000)
await copy_btn.click()
await asyncio.sleep(0.8)
text = await page.evaluate("async () => navigator.clipboard.readText()")
if text and len(text.strip()) > 20:
return text.strip()
except Exception:
pass
return None
async def scrape_last_bot_message(page: Page) -> str | None:
try:
msgs = page.locator(SEL_BOT_MSG)
count = await msgs.count()
for i in range(count - 1, -1, -1):
msg = msgs.nth(i)
content_el = msg.locator(SEL_BOT_CONTENT)
if await content_el.count() > 0:
text = (await content_el.first.inner_text()).strip()
if len(text) > 20:
return text
else:
text = (await msg.inner_text()).strip()
if len(text) > 80 and "Accept & Continue" not in text:
return text
except Exception:
pass
return None
async def get_response(page: Page) -> str:
text = await try_copy_button(page)
if text:
return text
return await scrape_last_bot_message(page) or ""
async def wait_for_stream_end(page: Page, timeout_s: int = 120) -> None:
prev_text = ""
stable = 0
elapsed = 0.0
interval = 0.6
while elapsed < timeout_s:
await asyncio.sleep(interval)
elapsed += interval
msgs = page.locator(SEL_BOT_MSG)
count = await msgs.count()
if count == 0:
continue
current = ""
for i in range(count - 1, -1, -1):
content_el = msgs.nth(i).locator(SEL_BOT_CONTENT)
if await content_el.count() > 0:
t = (await content_el.first.inner_text()).strip()
if len(t) > 20:
current = t
break
if not current:
continue
if current != prev_text:
stable = 0
else:
stable += 1
if stable >= 3:
try:
await msgs.nth(count - 1).locator(SEL_COPY_BTN).wait_for(
state="visible", timeout=1500
)
return
except PWTimeout:
stable = 0
prev_text = current
# ════════════════════════════════════════
# CŒUR : envoie un prompt, récupère la réponse
# ════════════════════════════════════════
async def chat(prompt: str) -> str:
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=HEADLESS,
args=[
"--disable-blink-features=AutomationControlled",
"--disable-extensions", "--disable-default-apps",
"--no-first-run", "--no-sandbox", "--disable-gpu",
"--disable-dev-shm-usage",
"--window-size=1280,720",
],
)
context = await browser.new_context(
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
viewport={"width": 1280, "height": 720},
permissions=["clipboard-read", "clipboard-write"],
java_script_enabled=True,
)
await context.route(
"**/*",
lambda route: route.abort()
if route.request.resource_type in ("image", "media", "font")
else route.continue_(),
)
page = await context.new_page()
try:
await page.goto(URL, wait_until="domcontentloaded", timeout=60_000)
await asyncio.gather(
click_first_visible(page, AD_SELECTORS, timeout=3000),
click_first_visible(page, COOKIE_SELECTORS, timeout=5000),
)
await page.wait_for_selector(SEL_TEXTAREA, timeout=30_000)
await page.locator(SEL_TEXTAREA).fill(prompt)
if not await send_with_retry(page, prompt):
return ""
await asyncio.gather(
click_if_visible(page, SEL_AGE_BTN, timeout=8000),
click_if_visible(page, SEL_ACCEPT_BTN, timeout=8000),
)
await wait_for_stream_end(page, timeout_s=120)
response = await get_response(page)
finally:
await browser.close()
return response
# ════════════════════════════════════════
# HELPERS FORMAT OPENAI
# ════════════════════════════════════════
def _make_id():
return f"chatcmpl-{uuid.uuid4().hex[:29]}"
def _completion_response(content: str, model: str) -> dict:
return {
"id": _make_id(),
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": content},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0,
},
}
async def _stream_chunks(content: str, model: str):
"""Génère des SSE au format OpenAI streaming."""
cid = _make_id()
created = int(time.time())
# Premier chunk (role)
chunk = {
"id": cid, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}],
}
yield f"data: {json.dumps(chunk)}\n\n"
# Contenu dΓ©coupΓ© mot par mot
words = content.split(" ")
for i, word in enumerate(words):
token = word if i == 0 else f" {word}"
chunk = {
"id": cid, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0, "delta": {"content": token}, "finish_reason": None}],
}
yield f"data: {json.dumps(chunk)}\n\n"
await asyncio.sleep(0.02)
# Dernier chunk
chunk = {
"id": cid, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
}
yield f"data: {json.dumps(chunk)}\n\n"
yield "data: [DONE]\n\n"
# ════════════════════════════════════════
# ENDPOINTS
# ════════════════════════════════════════
@app.get("/")
async def root():
return {"status": "ok", "message": "OpenAI-compatible API. Use /v1/chat/completions"}
@app.get("/v1/models")
async def list_models():
return {
"object": "list",
"data": [
{
"id": MODEL_NAME,
"object": "model",
"created": 1700000000,
"owned_by": "aifreeforever",
}
],
}
@app.post("/v1/chat/completions")
async def chat_completions(req: ChatRequest):
# Construire le prompt Γ  partir des messages
# On prend le dernier message user, ou on concatène tout
parts = []
for m in req.messages:
if m.role == "system":
parts.append(f"[System] {m.content}")
elif m.role == "user":
parts.append(f"{m.content}")
elif m.role == "assistant":
parts.append(f"[Assistant] {m.content}")
prompt = "\n\n".join(parts)
if not prompt.strip():
raise HTTPException(status_code=400, detail="Empty prompt")
# SΓ©maphore : 1 seule requΓͺte Playwright Γ  la fois
async with SEM:
try:
content = await chat(prompt)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if not content:
raise HTTPException(status_code=502, detail="No response from upstream")
model = req.model or MODEL_NAME
# Streaming
if req.stream:
return StreamingResponse(
_stream_chunks(content, model),
media_type="text/event-stream",
)
# Non-streaming
return _completion_response(content, model)