| | """ |
| | API OpenAI-compatible wrappant aifreeforever.com |
| | Endpoints : |
| | GET /v1/models |
| | POST /v1/chat/completions (stream=false & stream=true) |
| | """ |
| |
|
| | import asyncio, json, time, uuid |
| | from typing import Optional |
| | from fastapi import FastAPI, HTTPException |
| | from fastapi.responses import StreamingResponse |
| | from pydantic import BaseModel |
| | from playwright.async_api import async_playwright, Page, TimeoutError as PWTimeout |
| |
|
| | |
| | |
| | |
| | app = FastAPI(title="Free Chat API") |
| |
|
| | |
| | SEM = asyncio.Semaphore(1) |
| |
|
| | MODEL_NAME = "aifreeforever" |
| |
|
| | |
| | |
| | |
| | class Message(BaseModel): |
| | role: str |
| | content: str |
| |
|
| | class ChatRequest(BaseModel): |
| | model: Optional[str] = MODEL_NAME |
| | messages: list[Message] |
| | stream: Optional[bool] = False |
| | temperature: Optional[float] = None |
| | max_tokens: Optional[int] = None |
| |
|
| | |
| | |
| | |
| | URL = "https://aifreeforever.com/tools/free-chatgpt-no-login" |
| | HEADLESS = True |
| | MAX_RETRIES = 5 |
| |
|
| | SEL_TEXTAREA = 'textarea[placeholder*="Ask anything"]' |
| | SEL_SEND_BTN = 'button.absolute.right-3.top-4.w-10.h-10' |
| | SEL_BOT_MSG = '.flex.justify-start .rounded-2xl.shadow-sm.bg-white.border.border-gray-100' |
| | SEL_BOT_CONTENT = '.markdown-content' |
| | SEL_ACCEPT_BTN = 'button:has-text("Accept & Continue")' |
| | SEL_AGE_BTN = 'button:has-text("13 and Over")' |
| | SEL_COPY_BTN = ( |
| | 'button.text-xs.text-gray-500.flex.items-center.gap-1' |
| | '.px-2.py-1.rounded-md.transition-colors:has-text("Copy")' |
| | ) |
| | SEL_ERROR_MSG = 'p.text-red-600:has-text("Failed to send message")' |
| |
|
| | COOKIE_SELECTORS = [ |
| | 'button.unic-agree-all-button', |
| | 'button:has-text("Accepter et continuer")', |
| | 'button:has-text("Accepter tout")', |
| | 'button:has-text("Accept all")', |
| | 'button:has-text("Accept All")', |
| | 'button:has-text("I agree")', |
| | ] |
| |
|
| | AD_SELECTORS = [ |
| | 'button:has(path[fill="#1D1D1B"][opacity="0.7"])', |
| | 'button:has(path[style*="stroke: rgb(255, 255, 255)"][style*="stroke-width: 6.353"])', |
| | 'button[aria-label*="close" i]', |
| | 'button[aria-label*="fermer" i]', |
| | 'button[title*="close" i]', |
| | 'button:has-text("Γ")', |
| | 'button:has-text("β")', |
| | 'button:has-text("Close")', |
| | '[class*="close-btn"]', |
| | ] |
| |
|
| | |
| | |
| | |
| |
|
| | async def _try_click(page: Page, selector: str, timeout: int) -> bool: |
| | try: |
| | btn = page.locator(selector).first |
| | await btn.wait_for(state="visible", timeout=timeout) |
| | await btn.click() |
| | return True |
| | except PWTimeout: |
| | return False |
| |
|
| | async def click_first_visible(page: Page, selectors: list[str], timeout: int = 4000): |
| | tasks = [_try_click(page, sel, timeout) for sel in selectors] |
| | results = await asyncio.gather(*tasks, return_exceptions=True) |
| | return any(r is True for r in results) |
| |
|
| | async def click_if_visible(page: Page, selector: str, timeout: int = 8000) -> bool: |
| | return await _try_click(page, selector, timeout) |
| |
|
| | async def send_with_retry(page: Page, prompt: str) -> bool: |
| | for attempt in range(1, MAX_RETRIES + 1): |
| | if attempt > 1: |
| | await page.locator(SEL_TEXTAREA).fill(prompt) |
| | await page.locator(SEL_SEND_BTN).click(timeout=15_000) |
| | await asyncio.sleep(2) |
| | if await page.locator(SEL_ERROR_MSG).count() == 0: |
| | return True |
| | return False |
| |
|
| | async def try_copy_button(page: Page) -> str | None: |
| | try: |
| | msgs = page.locator(SEL_BOT_MSG) |
| | count = await msgs.count() |
| | if count == 0: |
| | return None |
| | last_msg = None |
| | for i in range(count - 1, -1, -1): |
| | msg = msgs.nth(i) |
| | if await msg.locator(SEL_BOT_CONTENT).count() > 0: |
| | last_msg = msg |
| | break |
| | if last_msg is None: |
| | last_msg = msgs.nth(count - 1) |
| | copy_btn = last_msg.locator(SEL_COPY_BTN) |
| | await copy_btn.wait_for(state="visible", timeout=3000) |
| | await copy_btn.click() |
| | await asyncio.sleep(0.8) |
| | text = await page.evaluate("async () => navigator.clipboard.readText()") |
| | if text and len(text.strip()) > 20: |
| | return text.strip() |
| | except Exception: |
| | pass |
| | return None |
| |
|
| | async def scrape_last_bot_message(page: Page) -> str | None: |
| | try: |
| | msgs = page.locator(SEL_BOT_MSG) |
| | count = await msgs.count() |
| | for i in range(count - 1, -1, -1): |
| | msg = msgs.nth(i) |
| | content_el = msg.locator(SEL_BOT_CONTENT) |
| | if await content_el.count() > 0: |
| | text = (await content_el.first.inner_text()).strip() |
| | if len(text) > 20: |
| | return text |
| | else: |
| | text = (await msg.inner_text()).strip() |
| | if len(text) > 80 and "Accept & Continue" not in text: |
| | return text |
| | except Exception: |
| | pass |
| | return None |
| |
|
| | async def get_response(page: Page) -> str: |
| | text = await try_copy_button(page) |
| | if text: |
| | return text |
| | return await scrape_last_bot_message(page) or "" |
| |
|
| | async def wait_for_stream_end(page: Page, timeout_s: int = 120) -> None: |
| | prev_text = "" |
| | stable = 0 |
| | elapsed = 0.0 |
| | interval = 0.6 |
| | while elapsed < timeout_s: |
| | await asyncio.sleep(interval) |
| | elapsed += interval |
| | msgs = page.locator(SEL_BOT_MSG) |
| | count = await msgs.count() |
| | if count == 0: |
| | continue |
| | current = "" |
| | for i in range(count - 1, -1, -1): |
| | content_el = msgs.nth(i).locator(SEL_BOT_CONTENT) |
| | if await content_el.count() > 0: |
| | t = (await content_el.first.inner_text()).strip() |
| | if len(t) > 20: |
| | current = t |
| | break |
| | if not current: |
| | continue |
| | if current != prev_text: |
| | stable = 0 |
| | else: |
| | stable += 1 |
| | if stable >= 3: |
| | try: |
| | await msgs.nth(count - 1).locator(SEL_COPY_BTN).wait_for( |
| | state="visible", timeout=1500 |
| | ) |
| | return |
| | except PWTimeout: |
| | stable = 0 |
| | prev_text = current |
| |
|
| | |
| | |
| | |
| |
|
| | async def chat(prompt: str) -> str: |
| | async with async_playwright() as p: |
| | browser = await p.chromium.launch( |
| | headless=HEADLESS, |
| | args=[ |
| | "--disable-blink-features=AutomationControlled", |
| | "--disable-extensions", "--disable-default-apps", |
| | "--no-first-run", "--no-sandbox", "--disable-gpu", |
| | "--disable-dev-shm-usage", |
| | "--window-size=1280,720", |
| | ], |
| | ) |
| | context = await browser.new_context( |
| | user_agent=( |
| | "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| | "AppleWebKit/537.36 (KHTML, like Gecko) " |
| | "Chrome/124.0.0.0 Safari/537.36" |
| | ), |
| | viewport={"width": 1280, "height": 720}, |
| | permissions=["clipboard-read", "clipboard-write"], |
| | java_script_enabled=True, |
| | ) |
| | await context.route( |
| | "**/*", |
| | lambda route: route.abort() |
| | if route.request.resource_type in ("image", "media", "font") |
| | else route.continue_(), |
| | ) |
| | page = await context.new_page() |
| |
|
| | try: |
| | await page.goto(URL, wait_until="domcontentloaded", timeout=60_000) |
| |
|
| | await asyncio.gather( |
| | click_first_visible(page, AD_SELECTORS, timeout=3000), |
| | click_first_visible(page, COOKIE_SELECTORS, timeout=5000), |
| | ) |
| |
|
| | await page.wait_for_selector(SEL_TEXTAREA, timeout=30_000) |
| | await page.locator(SEL_TEXTAREA).fill(prompt) |
| |
|
| | if not await send_with_retry(page, prompt): |
| | return "" |
| |
|
| | await asyncio.gather( |
| | click_if_visible(page, SEL_AGE_BTN, timeout=8000), |
| | click_if_visible(page, SEL_ACCEPT_BTN, timeout=8000), |
| | ) |
| |
|
| | await wait_for_stream_end(page, timeout_s=120) |
| | response = await get_response(page) |
| | finally: |
| | await browser.close() |
| |
|
| | return response |
| |
|
| | |
| | |
| | |
| |
|
| | def _make_id(): |
| | return f"chatcmpl-{uuid.uuid4().hex[:29]}" |
| |
|
| | def _completion_response(content: str, model: str) -> dict: |
| | return { |
| | "id": _make_id(), |
| | "object": "chat.completion", |
| | "created": int(time.time()), |
| | "model": model, |
| | "choices": [ |
| | { |
| | "index": 0, |
| | "message": {"role": "assistant", "content": content}, |
| | "finish_reason": "stop", |
| | } |
| | ], |
| | "usage": { |
| | "prompt_tokens": 0, |
| | "completion_tokens": 0, |
| | "total_tokens": 0, |
| | }, |
| | } |
| |
|
| | async def _stream_chunks(content: str, model: str): |
| | """Génère des SSE au format OpenAI streaming.""" |
| | cid = _make_id() |
| | created = int(time.time()) |
| |
|
| | |
| | chunk = { |
| | "id": cid, "object": "chat.completion.chunk", |
| | "created": created, "model": model, |
| | "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}], |
| | } |
| | yield f"data: {json.dumps(chunk)}\n\n" |
| |
|
| | |
| | words = content.split(" ") |
| | for i, word in enumerate(words): |
| | token = word if i == 0 else f" {word}" |
| | chunk = { |
| | "id": cid, "object": "chat.completion.chunk", |
| | "created": created, "model": model, |
| | "choices": [{"index": 0, "delta": {"content": token}, "finish_reason": None}], |
| | } |
| | yield f"data: {json.dumps(chunk)}\n\n" |
| | await asyncio.sleep(0.02) |
| |
|
| | |
| | chunk = { |
| | "id": cid, "object": "chat.completion.chunk", |
| | "created": created, "model": model, |
| | "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], |
| | } |
| | yield f"data: {json.dumps(chunk)}\n\n" |
| | yield "data: [DONE]\n\n" |
| |
|
| | |
| | |
| | |
| |
|
| | @app.get("/") |
| | async def root(): |
| | return {"status": "ok", "message": "OpenAI-compatible API. Use /v1/chat/completions"} |
| |
|
| | @app.get("/v1/models") |
| | async def list_models(): |
| | return { |
| | "object": "list", |
| | "data": [ |
| | { |
| | "id": MODEL_NAME, |
| | "object": "model", |
| | "created": 1700000000, |
| | "owned_by": "aifreeforever", |
| | } |
| | ], |
| | } |
| |
|
| | @app.post("/v1/chat/completions") |
| | async def chat_completions(req: ChatRequest): |
| | |
| | |
| | parts = [] |
| | for m in req.messages: |
| | if m.role == "system": |
| | parts.append(f"[System] {m.content}") |
| | elif m.role == "user": |
| | parts.append(f"{m.content}") |
| | elif m.role == "assistant": |
| | parts.append(f"[Assistant] {m.content}") |
| | prompt = "\n\n".join(parts) |
| |
|
| | if not prompt.strip(): |
| | raise HTTPException(status_code=400, detail="Empty prompt") |
| |
|
| | |
| | async with SEM: |
| | try: |
| | content = await chat(prompt) |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | if not content: |
| | raise HTTPException(status_code=502, detail="No response from upstream") |
| |
|
| | model = req.model or MODEL_NAME |
| |
|
| | |
| | if req.stream: |
| | return StreamingResponse( |
| | _stream_chunks(content, model), |
| | media_type="text/event-stream", |
| | ) |
| |
|
| | |
| | return _completion_response(content, model) |