""" API OpenAI-compatible wrappant aifreeforever.com Endpoints : GET /v1/models POST /v1/chat/completions (stream=false & stream=true) """ import asyncio, json, time, uuid from typing import Optional from fastapi import FastAPI, HTTPException from fastapi.responses import StreamingResponse from pydantic import BaseModel from playwright.async_api import async_playwright, Page, TimeoutError as PWTimeout # ════════════════════════════════════════ # FastAPI # ════════════════════════════════════════ app = FastAPI(title="Free Chat API") # Limite 1 requête Playwright à la fois (free tier = peu de RAM) SEM = asyncio.Semaphore(1) MODEL_NAME = "aifreeforever" # ════════════════════════════════════════ # Modèles Pydantic (format OpenAI) # ════════════════════════════════════════ class Message(BaseModel): role: str content: str class ChatRequest(BaseModel): model: Optional[str] = MODEL_NAME messages: list[Message] stream: Optional[bool] = False temperature: Optional[float] = None max_tokens: Optional[int] = None # ════════════════════════════════════════ # CONFIG PLAYWRIGHT # ════════════════════════════════════════ URL = "https://aifreeforever.com/tools/free-chatgpt-no-login" HEADLESS = True MAX_RETRIES = 5 SEL_TEXTAREA = 'textarea[placeholder*="Ask anything"]' SEL_SEND_BTN = 'button.absolute.right-3.top-4.w-10.h-10' SEL_BOT_MSG = '.flex.justify-start .rounded-2xl.shadow-sm.bg-white.border.border-gray-100' SEL_BOT_CONTENT = '.markdown-content' SEL_ACCEPT_BTN = 'button:has-text("Accept & Continue")' SEL_AGE_BTN = 'button:has-text("13 and Over")' SEL_COPY_BTN = ( 'button.text-xs.text-gray-500.flex.items-center.gap-1' '.px-2.py-1.rounded-md.transition-colors:has-text("Copy")' ) SEL_ERROR_MSG = 'p.text-red-600:has-text("Failed to send message")' COOKIE_SELECTORS = [ 'button.unic-agree-all-button', 'button:has-text("Accepter et continuer")', 'button:has-text("Accepter tout")', 'button:has-text("Accept all")', 'button:has-text("Accept All")', 'button:has-text("I agree")', ] AD_SELECTORS = [ 'button:has(path[fill="#1D1D1B"][opacity="0.7"])', 'button:has(path[style*="stroke: rgb(255, 255, 255)"][style*="stroke-width: 6.353"])', 'button[aria-label*="close" i]', 'button[aria-label*="fermer" i]', 'button[title*="close" i]', 'button:has-text("×")', 'button:has-text("✕")', 'button:has-text("Close")', '[class*="close-btn"]', ] # ════════════════════════════════════════ # HELPERS PLAYWRIGHT (identiques au script) # ════════════════════════════════════════ async def _try_click(page: Page, selector: str, timeout: int) -> bool: try: btn = page.locator(selector).first await btn.wait_for(state="visible", timeout=timeout) await btn.click() return True except PWTimeout: return False async def click_first_visible(page: Page, selectors: list[str], timeout: int = 4000): tasks = [_try_click(page, sel, timeout) for sel in selectors] results = await asyncio.gather(*tasks, return_exceptions=True) return any(r is True for r in results) async def click_if_visible(page: Page, selector: str, timeout: int = 8000) -> bool: return await _try_click(page, selector, timeout) async def send_with_retry(page: Page, prompt: str) -> bool: for attempt in range(1, MAX_RETRIES + 1): if attempt > 1: await page.locator(SEL_TEXTAREA).fill(prompt) await page.locator(SEL_SEND_BTN).click(timeout=15_000) await asyncio.sleep(2) if await page.locator(SEL_ERROR_MSG).count() == 0: return True return False async def try_copy_button(page: Page) -> str | None: try: msgs = page.locator(SEL_BOT_MSG) count = await msgs.count() if count == 0: return None last_msg = None for i in range(count - 1, -1, -1): msg = msgs.nth(i) if await msg.locator(SEL_BOT_CONTENT).count() > 0: last_msg = msg break if last_msg is None: last_msg = msgs.nth(count - 1) copy_btn = last_msg.locator(SEL_COPY_BTN) await copy_btn.wait_for(state="visible", timeout=3000) await copy_btn.click() await asyncio.sleep(0.8) text = await page.evaluate("async () => navigator.clipboard.readText()") if text and len(text.strip()) > 20: return text.strip() except Exception: pass return None async def scrape_last_bot_message(page: Page) -> str | None: try: msgs = page.locator(SEL_BOT_MSG) count = await msgs.count() for i in range(count - 1, -1, -1): msg = msgs.nth(i) content_el = msg.locator(SEL_BOT_CONTENT) if await content_el.count() > 0: text = (await content_el.first.inner_text()).strip() if len(text) > 20: return text else: text = (await msg.inner_text()).strip() if len(text) > 80 and "Accept & Continue" not in text: return text except Exception: pass return None async def get_response(page: Page) -> str: text = await try_copy_button(page) if text: return text return await scrape_last_bot_message(page) or "" async def wait_for_stream_end(page: Page, timeout_s: int = 120) -> None: prev_text = "" stable = 0 elapsed = 0.0 interval = 0.6 while elapsed < timeout_s: await asyncio.sleep(interval) elapsed += interval msgs = page.locator(SEL_BOT_MSG) count = await msgs.count() if count == 0: continue current = "" for i in range(count - 1, -1, -1): content_el = msgs.nth(i).locator(SEL_BOT_CONTENT) if await content_el.count() > 0: t = (await content_el.first.inner_text()).strip() if len(t) > 20: current = t break if not current: continue if current != prev_text: stable = 0 else: stable += 1 if stable >= 3: try: await msgs.nth(count - 1).locator(SEL_COPY_BTN).wait_for( state="visible", timeout=1500 ) return except PWTimeout: stable = 0 prev_text = current # ════════════════════════════════════════ # CŒUR : envoie un prompt, récupère la réponse # ════════════════════════════════════════ async def chat(prompt: str) -> str: async with async_playwright() as p: browser = await p.chromium.launch( headless=HEADLESS, args=[ "--disable-blink-features=AutomationControlled", "--disable-extensions", "--disable-default-apps", "--no-first-run", "--no-sandbox", "--disable-gpu", "--disable-dev-shm-usage", "--window-size=1280,720", ], ) context = await browser.new_context( user_agent=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), viewport={"width": 1280, "height": 720}, permissions=["clipboard-read", "clipboard-write"], java_script_enabled=True, ) await context.route( "**/*", lambda route: route.abort() if route.request.resource_type in ("image", "media", "font") else route.continue_(), ) page = await context.new_page() try: await page.goto(URL, wait_until="domcontentloaded", timeout=60_000) await asyncio.gather( click_first_visible(page, AD_SELECTORS, timeout=3000), click_first_visible(page, COOKIE_SELECTORS, timeout=5000), ) await page.wait_for_selector(SEL_TEXTAREA, timeout=30_000) await page.locator(SEL_TEXTAREA).fill(prompt) if not await send_with_retry(page, prompt): return "" await asyncio.gather( click_if_visible(page, SEL_AGE_BTN, timeout=8000), click_if_visible(page, SEL_ACCEPT_BTN, timeout=8000), ) await wait_for_stream_end(page, timeout_s=120) response = await get_response(page) finally: await browser.close() return response # ════════════════════════════════════════ # HELPERS FORMAT OPENAI # ════════════════════════════════════════ def _make_id(): return f"chatcmpl-{uuid.uuid4().hex[:29]}" def _completion_response(content: str, model: str) -> dict: return { "id": _make_id(), "object": "chat.completion", "created": int(time.time()), "model": model, "choices": [ { "index": 0, "message": {"role": "assistant", "content": content}, "finish_reason": "stop", } ], "usage": { "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0, }, } async def _stream_chunks(content: str, model: str): """Génère des SSE au format OpenAI streaming.""" cid = _make_id() created = int(time.time()) # Premier chunk (role) chunk = { "id": cid, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}], } yield f"data: {json.dumps(chunk)}\n\n" # Contenu découpé mot par mot words = content.split(" ") for i, word in enumerate(words): token = word if i == 0 else f" {word}" chunk = { "id": cid, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {"content": token}, "finish_reason": None}], } yield f"data: {json.dumps(chunk)}\n\n" await asyncio.sleep(0.02) # Dernier chunk chunk = { "id": cid, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], } yield f"data: {json.dumps(chunk)}\n\n" yield "data: [DONE]\n\n" # ════════════════════════════════════════ # ENDPOINTS # ════════════════════════════════════════ @app.get("/") async def root(): return {"status": "ok", "message": "OpenAI-compatible API. Use /v1/chat/completions"} @app.get("/v1/models") async def list_models(): return { "object": "list", "data": [ { "id": MODEL_NAME, "object": "model", "created": 1700000000, "owned_by": "aifreeforever", } ], } @app.post("/v1/chat/completions") async def chat_completions(req: ChatRequest): # Construire le prompt à partir des messages # On prend le dernier message user, ou on concatène tout parts = [] for m in req.messages: if m.role == "system": parts.append(f"[System] {m.content}") elif m.role == "user": parts.append(f"{m.content}") elif m.role == "assistant": parts.append(f"[Assistant] {m.content}") prompt = "\n\n".join(parts) if not prompt.strip(): raise HTTPException(status_code=400, detail="Empty prompt") # Sémaphore : 1 seule requête Playwright à la fois async with SEM: try: content = await chat(prompt) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if not content: raise HTTPException(status_code=502, detail="No response from upstream") model = req.model or MODEL_NAME # Streaming if req.stream: return StreamingResponse( _stream_chunks(content, model), media_type="text/event-stream", ) # Non-streaming return _completion_response(content, model)