#!/usr/bin/env python3 """ ╔══════════════════════════════════════════════════════════════════════════════╗ ║ ChatGPTGratuit API — Hugging Face Spaces Edition ║ ║ Based on Ultimate Edition v5.0 ║ ║ Deployed as a Docker Space on Hugging Face (Free Tier) ║ ╚══════════════════════════════════════════════════════════════════════════════╝ """ from __future__ import annotations import re import os import sys import json import uuid import time import random import string import logging import threading from abc import ABC, abstractmethod from collections import deque from contextlib import contextmanager from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum, auto from pathlib import Path from typing import ( Any, Deque, Dict, Generator, List, Optional, Tuple, Union ) from urllib.parse import urlencode import requests from bs4 import BeautifulSoup try: import yaml HAS_YAML = True except ImportError: HAS_YAML = False # ══════════════════════════════════════════════════════════════ # CONSTANTS & ENUMS # ══════════════════════════════════════════════════════════════ VERSION = "5.0.0-hf" APP_NAME = "ChatGPTGratuit-API" BASE_URL = "https://chatgptgratuit.org" class SessionState(Enum): UNINITIALIZED = auto() READY = auto() DEGRADED = auto() EXPIRED = auto() FAILED = auto() class ErrorCode(Enum): INIT_FAILED = "INIT_FAILED" NONCE_EXPIRED = "NONCE_EXPIRED" CACHE_FAILED = "CACHE_FAILED" STREAM_FAILED = "STREAM_FAILED" STREAM_EMPTY = "STREAM_EMPTY" RATE_LIMITED = "RATE_LIMITED" TIMEOUT = "TIMEOUT" PARSE_ERROR = "PARSE_ERROR" INVALID_INPUT = "INVALID_INPUT" CIRCUIT_OPEN = "CIRCUIT_OPEN" POOL_EXHAUSTED = "POOL_EXHAUSTED" UNKNOWN = "UNKNOWN" class CircuitState(Enum): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open" USER_AGENTS: List[str] = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_5) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 Edg/125.0.0.0", ] ACCEPT_LANGUAGES: List[str] = [ "fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7", "fr-FR,fr;q=0.9,en;q=0.8", "en-US,en;q=0.9,fr;q=0.8", ] CONTROL_EVENTS: frozenset = frozenset({ "done", "message_start", "openai_response_id", "error", "ping", "heartbeat", "keep-alive", }) # ══════════════════════════════════════════════════════════════ # LOGGING # ══════════════════════════════════════════════════════════════ class PrettyFormatter(logging.Formatter): COLORS = { "DEBUG": "\033[36m", "INFO": "\033[32m", "WARNING": "\033[33m", "ERROR": "\033[31m", "CRITICAL": "\033[35m", } RESET = "\033[0m" ICONS = { "DEBUG": "🔍", "INFO": "✅", "WARNING": "⚠️ ", "ERROR": "❌", "CRITICAL": "💀", } def format(self, record: logging.LogRecord) -> str: color = self.COLORS.get(record.levelname, "") icon = self.ICONS.get(record.levelname, "") ts = datetime.now().strftime("%H:%M:%S.%f")[:-3] msg = record.getMessage() extra_parts = [] for key in ("action", "duration_ms", "status_code", "cache_key"): val = getattr(record, key, None) if val is not None: extra_parts.append(f"{key}={val}") extra_str = f" [{', '.join(extra_parts)}]" if extra_parts else "" return f"{color}{ts} {icon} {msg}{extra_str}{self.RESET}" def setup_logging(level: str = "INFO") -> logging.Logger: logger = logging.getLogger(APP_NAME) logger.setLevel(getattr(logging, level.upper(), logging.INFO)) logger.handlers.clear() handler = logging.StreamHandler(sys.stderr) handler.setFormatter(PrettyFormatter()) logger.addHandler(handler) return logger log = setup_logging(os.environ.get("CGPT_LOG_LEVEL", "INFO")) # ══════════════════════════════════════════════════════════════ # CONFIGURATION # ══════════════════════════════════════════════════════════════ @dataclass class Config: base_url: str = BASE_URL timeout_init: int = 20 timeout_cache: int = 30 timeout_stream: int = 120 max_retries: int = 3 retry_backoff_base: float = 1.5 retry_jitter: float = 0.5 rate_limit_rpm: int = 15 rate_limit_burst: int = 5 pool_size: int = 2 session_ttl: int = 1800 max_history_messages: int = 50 max_message_length: int = 10000 cb_failure_threshold: int = 5 cb_recovery_timeout: int = 60 cb_half_open_max: int = 2 host: str = "0.0.0.0" port: int = 7860 debug: bool = False log_level: str = "INFO" json_logs: bool = False log_sse_raw: bool = False enable_metrics: bool = True enable_openai_compat: bool = True enable_cors: bool = True @classmethod def from_env(cls) -> "Config": cfg = cls() env_map = { "CGPT_BASE_URL": ("base_url", str), "CGPT_TIMEOUT": ("timeout_stream", int), "CGPT_MAX_RETRIES": ("max_retries", int), "CGPT_RATE_LIMIT": ("rate_limit_rpm", int), "CGPT_POOL_SIZE": ("pool_size", int), "CGPT_MAX_HISTORY": ("max_history_messages", int), "CGPT_HOST": ("host", str), "CGPT_PORT": ("port", int), "CGPT_DEBUG": ("debug", lambda x: x.lower() in ("1", "true", "yes")), "CGPT_LOG_LEVEL": ("log_level", str), "CGPT_JSON_LOGS": ("json_logs", lambda x: x.lower() in ("1", "true", "yes")), } for env_key, (attr, converter) in env_map.items(): val = os.environ.get(env_key) if val is not None: try: setattr(cfg, attr, converter(val)) except (ValueError, TypeError): log.warning(f"Invalid env var {env_key}={val}, using default") return cfg def validate(self) -> List[str]: warnings = [] if self.pool_size < 1: self.pool_size = 1 warnings.append("pool_size adjusted to minimum 1") if self.max_retries < 0: self.max_retries = 0 warnings.append("max_retries adjusted to 0") if self.rate_limit_rpm < 1: self.rate_limit_rpm = 1 warnings.append("rate_limit_rpm adjusted to minimum 1") if self.timeout_stream < 10: self.timeout_stream = 10 warnings.append("timeout_stream adjusted to minimum 10s") return warnings # ══════════════════════════════════════════════════════════════ # EXCEPTIONS # ══════════════════════════════════════════════════════════════ class ChatGPTGratuitError(Exception): def __init__(self, message: str, code: ErrorCode = ErrorCode.UNKNOWN, details: Optional[Dict] = None): super().__init__(message) self.code = code self.details = details or {} def to_dict(self) -> Dict[str, Any]: return {"error": str(self), "code": self.code.value, "details": self.details} class InitError(ChatGPTGratuitError): def __init__(self, msg="Session initialization failed", **kw): super().__init__(msg, ErrorCode.INIT_FAILED, **kw) class NonceExpiredError(ChatGPTGratuitError): def __init__(self, msg="Nonce expired", **kw): super().__init__(msg, ErrorCode.NONCE_EXPIRED, **kw) class CacheError(ChatGPTGratuitError): def __init__(self, msg="Cache operation failed", **kw): super().__init__(msg, ErrorCode.CACHE_FAILED, **kw) class StreamError(ChatGPTGratuitError): def __init__(self, msg="Stream error", **kw): super().__init__(msg, ErrorCode.STREAM_FAILED, **kw) class EmptyResponseError(ChatGPTGratuitError): def __init__(self, msg="Empty response from stream", **kw): super().__init__(msg, ErrorCode.STREAM_EMPTY, **kw) class RateLimitError(ChatGPTGratuitError): def __init__(self, msg="Rate limit exceeded", **kw): super().__init__(msg, ErrorCode.RATE_LIMITED, **kw) class CircuitOpenError(ChatGPTGratuitError): def __init__(self, msg="Circuit breaker is open", **kw): super().__init__(msg, ErrorCode.CIRCUIT_OPEN, **kw) class InputValidationError(ChatGPTGratuitError): def __init__(self, msg="Invalid input", **kw): super().__init__(msg, ErrorCode.INVALID_INPUT, **kw) # ══════════════════════════════════════════════════════════════ # DATA MODELS # ══════════════════════════════════════════════════════════════ @dataclass class Message: role: str content: str timestamp: float = field(default_factory=time.time) message_id: str = field(default_factory=lambda: str(uuid.uuid4())) tokens_estimate: int = 0 def __post_init__(self): self.tokens_estimate = max(1, len(self.content) // 4) @dataclass class Conversation: conversation_id: str = field(default_factory=lambda: str(uuid.uuid4())) messages: List[Message] = field(default_factory=list) created_at: float = field(default_factory=time.time) updated_at: float = field(default_factory=time.time) title: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) def add_message(self, role: str, content: str, max_messages: int = 50) -> Message: msg = Message(role=role, content=content) self.messages.append(msg) self.updated_at = time.time() if self.title is None and role == "user": self.title = content[:80] + ("..." if len(content) > 80 else "") if len(self.messages) > max_messages: system_msgs = [m for m in self.messages if m.role == "system"] other_msgs = [m for m in self.messages if m.role != "system"] keep = max_messages - len(system_msgs) self.messages = system_msgs + other_msgs[-keep:] return msg @property def total_tokens(self) -> int: return sum(m.tokens_estimate for m in self.messages) def to_dict(self) -> Dict: return { "conversation_id": self.conversation_id, "title": self.title, "message_count": len(self.messages), "total_tokens": self.total_tokens, "created_at": self.created_at, "updated_at": self.updated_at, } @dataclass class SessionInfo: nonce: Optional[str] = None bot_id: Optional[str] = None post_id: Optional[str] = None created_at: float = field(default_factory=time.time) last_used: float = field(default_factory=time.time) request_count: int = 0 error_count: int = 0 @property def is_valid(self) -> bool: return bool(self.nonce and self.bot_id) @property def age_seconds(self) -> float: return time.time() - self.created_at def mark_used(self): self.last_used = time.time() self.request_count += 1 def mark_error(self): self.error_count += 1 # ══════════════════════════════════════════════════════════════ # METRICS # ══════════════════════════════════════════════════════════════ @dataclass class Metrics: _lock: threading.Lock = field(default_factory=threading.Lock, repr=False) total_requests: int = 0 successful_requests: int = 0 failed_requests: int = 0 total_retries: int = 0 nonce_refreshes: int = 0 total_chars_received: int = 0 total_response_time_ms: float = 0.0 active_streams: int = 0 circuit_breaker_trips: int = 0 _latencies: Deque[float] = field(default_factory=lambda: deque(maxlen=1000), repr=False) started_at: float = field(default_factory=time.time) def record_request(self, success: bool, duration_ms: float, chars: int = 0): with self._lock: self.total_requests += 1 if success: self.successful_requests += 1 self.total_chars_received += chars else: self.failed_requests += 1 self.total_response_time_ms += duration_ms self._latencies.append(duration_ms) def record_retry(self): with self._lock: self.total_retries += 1 def record_nonce_refresh(self): with self._lock: self.nonce_refreshes += 1 def record_circuit_trip(self): with self._lock: self.circuit_breaker_trips += 1 @property def avg_latency_ms(self) -> float: with self._lock: return sum(self._latencies) / len(self._latencies) if self._latencies else 0.0 @property def p95_latency_ms(self) -> float: with self._lock: if not self._latencies: return 0.0 s = sorted(self._latencies) return s[min(int(len(s) * 0.95), len(s) - 1)] @property def success_rate(self) -> float: with self._lock: return self.successful_requests / self.total_requests if self.total_requests else 1.0 def to_dict(self) -> Dict[str, Any]: with self._lock: return { "total_requests": self.total_requests, "successful_requests": self.successful_requests, "failed_requests": self.failed_requests, "success_rate": round(self.success_rate, 4), "total_retries": self.total_retries, "nonce_refreshes": self.nonce_refreshes, "total_chars_received": self.total_chars_received, "avg_latency_ms": round(self.avg_latency_ms, 1), "p95_latency_ms": round(self.p95_latency_ms, 1), "active_streams": self.active_streams, "circuit_breaker_trips": self.circuit_breaker_trips, "uptime_seconds": round(time.time() - self.started_at, 1), } metrics = Metrics() # ══════════════════════════════════════════════════════════════ # RATE LIMITER # ══════════════════════════════════════════════════════════════ class RateLimiter: def __init__(self, rpm: int = 10, burst: int = 3): self.rate = rpm / 60.0 self.max_tokens = float(burst) self.tokens = float(burst) self.last_refill = time.monotonic() self._lock = threading.Lock() def _refill(self): now = time.monotonic() self.tokens = min(self.max_tokens, self.tokens + (now - self.last_refill) * self.rate) self.last_refill = now def acquire(self, timeout: float = 30.0) -> bool: deadline = time.monotonic() + timeout while True: with self._lock: self._refill() if self.tokens >= 1.0: self.tokens -= 1.0 return True if time.monotonic() >= deadline: return False time.sleep(min(1.0 / max(self.rate, 0.01), 0.5)) @property def available_tokens(self) -> float: with self._lock: self._refill() return self.tokens # ══════════════════════════════════════════════════════════════ # CIRCUIT BREAKER # ══════════════════════════════════════════════════════════════ class CircuitBreaker: def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60, half_open_max: int = 2): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.half_open_max = half_open_max self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time = 0.0 self.half_open_attempts = 0 self._lock = threading.Lock() def can_execute(self) -> bool: with self._lock: if self.state == CircuitState.CLOSED: return True if self.state == CircuitState.OPEN: if time.time() - self.last_failure_time >= self.recovery_timeout: self.state = CircuitState.HALF_OPEN self.half_open_attempts = 0 log.info("Circuit breaker → HALF_OPEN") return True return False return self.half_open_attempts < self.half_open_max def record_success(self): with self._lock: if self.state == CircuitState.HALF_OPEN: self.success_count += 1 if self.success_count >= self.half_open_max: self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 log.info("Circuit breaker → CLOSED (recovered)") else: self.failure_count = max(0, self.failure_count - 1) def record_failure(self): with self._lock: self.failure_count += 1 self.last_failure_time = time.time() if self.state == CircuitState.HALF_OPEN: self.state = CircuitState.OPEN metrics.record_circuit_trip() log.warning("Circuit breaker → OPEN (recovery failed)") elif self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN metrics.record_circuit_trip() log.warning(f"Circuit breaker → OPEN (failures={self.failure_count})") # ══════════════════════════════════════════════════════════════ # PLUGIN SYSTEM # ══════════════════════════════════════════════════════════════ class ResponsePlugin(ABC): @property @abstractmethod def name(self) -> str: ... @abstractmethod def process(self, response: str, conversation: Conversation) -> str: ... class StripWhitespacePlugin(ResponsePlugin): @property def name(self) -> str: return "strip_whitespace" def process(self, response: str, conversation: Conversation) -> str: return re.sub(r'\n{3,}', '\n\n', response).strip() class PluginManager: def __init__(self): self._plugins: List[ResponsePlugin] = [] def register(self, plugin: ResponsePlugin): self._plugins.append(plugin) def apply_all(self, response: str, conversation: Conversation) -> str: for plugin in self._plugins: try: response = plugin.process(response, conversation) except Exception as e: log.warning(f"Plugin {plugin.name} failed: {e}") return response # ══════════════════════════════════════════════════════════════ # WEBSITE PARSER # ══════════════════════════════════════════════════════════════ class WebsiteParser: @staticmethod def parse(html: str) -> SessionInfo: info = SessionInfo() soup = BeautifulSoup(html, "html.parser") container = soup.find(class_="aipkit_chat_container") if container: config_str = container.get("data-config", "") if config_str: try: config = json.loads(config_str) info.nonce = config.get("nonce", "") info.bot_id = str(config.get("botId", "")) info.post_id = str(config.get("postId", "")) if info.is_valid: return info except json.JSONDecodeError: pass if not info.bot_id: textarea = soup.find("textarea", id=re.compile(r"aipkit_chat_input_field_\d+")) if textarea: match = re.search(r"_(\d+)$", textarea.get("id", "")) if match: info.bot_id = match.group(1) if not info.nonce: for pattern in [ r'"nonce"\s*:\s*"([a-f0-9]{8,})"', r"'nonce'\s*:\s*'([a-f0-9]{8,})'", r'nonce["\s:=]+["\']([a-f0-9]{8,})', ]: match = re.search(pattern, html) if match: info.nonce = match.group(1) break if not info.bot_id: for pattern in [r'"botId"\s*:\s*"?(\d+)', r'"bot_id"\s*:\s*"?(\d+)']: match = re.search(pattern, html) if match: info.bot_id = match.group(1) break if not info.post_id: match = re.search(r'"postId"\s*:\s*"?(\d+)', html) info.post_id = match.group(1) if match else "7" return info # ══════════════════════════════════════════════════════════════ # SSE PARSER # ══════════════════════════════════════════════════════════════ class SSEParser: @staticmethod def iter_events(response: requests.Response, log_raw: bool = False) -> Generator[Tuple[str, str], None, None]: current_event = "message" data_buffer: List[str] = [] for line in response.iter_lines(decode_unicode=True): if log_raw: log.debug(f"SSE RAW: {repr(line)}") if line is None: continue if not line: if data_buffer: yield current_event, "\n".join(data_buffer) data_buffer.clear() current_event = "message" continue if line.startswith("event:"): current_event = line[6:].strip() elif line.startswith("data:"): data_str = line[5:] if data_str.startswith(" "): data_str = data_str[1:] data_buffer.append(data_str) elif line.startswith("id:") or line.startswith(":"): continue if data_buffer: yield current_event, "\n".join(data_buffer) @staticmethod def extract_text(data_str: str) -> str: try: data = json.loads(data_str) if isinstance(data, dict): if "delta" in data and isinstance(data["delta"], str): return data["delta"] if "choices" in data and data["choices"]: choice = data["choices"][0] if "delta" in choice: return choice["delta"].get("content", "") if "message" in choice: return choice["message"].get("content", "") for key in ("content", "text", "message", "data", "chunk", "response"): if key in data and isinstance(data[key], str): return data[key] if isinstance(data, str): return data return "" except (json.JSONDecodeError, TypeError): return data_str # ══════════════════════════════════════════════════════════════ # CORE CLIENT # ══════════════════════════════════════════════════════════════ class ChatGPTGratuitClient: def __init__(self, config: Optional[Config] = None): self.config = config or Config.from_env() self._session = requests.Session() self._session_info = SessionInfo() self._state = SessionState.UNINITIALIZED self._guest_uuid = str(uuid.uuid4()) self._lock = threading.Lock() self.rate_limiter = RateLimiter( rpm=self.config.rate_limit_rpm, burst=self.config.rate_limit_burst, ) self.circuit_breaker = CircuitBreaker( failure_threshold=self.config.cb_failure_threshold, recovery_timeout=self.config.cb_recovery_timeout, half_open_max=self.config.cb_half_open_max, ) self.plugin_manager = PluginManager() self.plugin_manager.register(StripWhitespacePlugin()) self._conversations: Dict[str, Conversation] = {} self._active_conversation_id: Optional[str] = None self._rotate_identity() @property def state(self) -> SessionState: return self._state @property def is_ready(self) -> bool: return self._state in (SessionState.READY, SessionState.DEGRADED) @property def active_conversation(self) -> Conversation: if self._active_conversation_id not in self._conversations: conv = Conversation() self._conversations[conv.conversation_id] = conv self._active_conversation_id = conv.conversation_id return self._conversations[self._active_conversation_id] @property def session_info(self) -> SessionInfo: return self._session_info def _rotate_identity(self): ua = random.choice(USER_AGENTS) lang = random.choice(ACCEPT_LANGUAGES) self._session.headers.update({ "User-Agent": ua, "Accept-Language": lang, "Referer": self.config.base_url + "/", "Origin": self.config.base_url, "DNT": "1", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", }) def init_session(self, force: bool = False) -> bool: with self._lock: if self.is_ready and not force: if self._session_info.age_seconds < self.config.session_ttl: return True log.info("Initializing session...", extra={"action": "init_session"}) self._rotate_identity() try: resp = self._session.get(self.config.base_url, timeout=self.config.timeout_init) resp.raise_for_status() except requests.RequestException as e: self._state = SessionState.FAILED log.error(f"Failed to load page: {e}") return False self._session_info = WebsiteParser.parse(resp.text) if self._session_info.is_valid: self._state = SessionState.READY log.info( f"Session ready: bot_id={self._session_info.bot_id}, " f"nonce={self._session_info.nonce[:8]}...", extra={"action": "init_session", "bot_id": self._session_info.bot_id}, ) else: self._state = SessionState.DEGRADED log.warning( f"Partial init: nonce={'✓' if self._session_info.nonce else '✗'}, " f"bot_id={'✓' if self._session_info.bot_id else '✗'}" ) return self._session_info.is_valid def refresh_nonce(self) -> bool: log.info("Refreshing nonce...") metrics.record_nonce_refresh() return self.init_session(force=True) def _ensure_session(self): if not self.is_ready: if not self.init_session(): raise InitError() if self._session_info.age_seconds > self.config.session_ttl: self.refresh_nonce() def new_conversation(self, system_prompt: Optional[str] = None) -> Conversation: conv = Conversation() if system_prompt: conv.add_message("system", system_prompt, self.config.max_history_messages) self._conversations[conv.conversation_id] = conv self._active_conversation_id = conv.conversation_id return conv def get_conversation(self, conversation_id: str) -> Optional[Conversation]: return self._conversations.get(conversation_id) def list_conversations(self) -> List[Dict]: return [c.to_dict() for c in self._conversations.values()] def _generate_client_msg_id(self) -> str: ts = int(time.time() * 1000) rand = ''.join(random.choices(string.ascii_lowercase + string.digits, k=5)) return f"aipkit-client-msg-{self._session_info.bot_id}-{ts}-{rand}" def _calculate_backoff(self, attempt: int) -> float: return self.config.retry_backoff_base ** attempt + random.uniform(0, self.config.retry_jitter) def send_message( self, message: str, *, stream: bool = False, conversation_id: Optional[str] = None, system_prompt: Optional[str] = None, ) -> Union[str, Generator[str, None, None]]: message = message.strip() if not message: raise InputValidationError("Message cannot be empty") if len(message) > self.config.max_message_length: raise InputValidationError(f"Message too long ({len(message)} chars)") if not self.circuit_breaker.can_execute(): raise CircuitOpenError() if not self.rate_limiter.acquire(timeout=10.0): raise RateLimitError() self._ensure_session() if conversation_id and conversation_id in self._conversations: conv = self._conversations[conversation_id] else: conv = self.active_conversation if system_prompt and not any(m.role == "system" for m in conv.messages): conv.add_message("system", system_prompt, self.config.max_history_messages) conv.add_message("user", message, self.config.max_history_messages) last_error: Optional[Exception] = None start_time = time.monotonic() for attempt in range(self.config.max_retries + 1): try: if attempt > 0: delay = self._calculate_backoff(attempt) log.info(f"Retry {attempt}/{self.config.max_retries} after {delay:.1f}s") metrics.record_retry() time.sleep(delay) if stream: gen = self._execute_stream(message, conv) return self._wrap_stream_generator(gen, conv, start_time) else: result = self._execute_blocking(message, conv) duration = (time.monotonic() - start_time) * 1000 result = self.plugin_manager.apply_all(result, conv) conv.add_message("assistant", result, self.config.max_history_messages) metrics.record_request(True, duration, len(result)) self.circuit_breaker.record_success() self._session_info.mark_used() log.info( f"Response received ({len(result)} chars)", extra={"action": "send_message", "duration_ms": round(duration)}, ) return result except NonceExpiredError: log.warning("Nonce expired, refreshing...") self.refresh_nonce() last_error = NonceExpiredError() except (StreamError, EmptyResponseError, CacheError) as e: last_error = e self.circuit_breaker.record_failure() self._session_info.mark_error() log.warning(f"Attempt {attempt + 1} failed: {e}") if attempt < self.config.max_retries: self.refresh_nonce() except requests.Timeout: last_error = ChatGPTGratuitError("Request timeout", ErrorCode.TIMEOUT) self.circuit_breaker.record_failure() except requests.RequestException as e: last_error = ChatGPTGratuitError(str(e), ErrorCode.UNKNOWN) self.circuit_breaker.record_failure() duration = (time.monotonic() - start_time) * 1000 metrics.record_request(False, duration) if isinstance(last_error, ChatGPTGratuitError): raise last_error raise ChatGPTGratuitError(f"All attempts failed: {last_error}", ErrorCode.UNKNOWN) def _wrap_stream_generator(self, gen, conv, start_time): full_text = "" try: for chunk in gen: full_text += chunk yield chunk full_text = self.plugin_manager.apply_all(full_text, conv) conv.add_message("assistant", full_text, self.config.max_history_messages) duration = (time.monotonic() - start_time) * 1000 metrics.record_request(True, duration, len(full_text)) self.circuit_breaker.record_success() self._session_info.mark_used() except Exception: duration = (time.monotonic() - start_time) * 1000 metrics.record_request(False, duration) self.circuit_breaker.record_failure() raise def _cache_message(self, message: str) -> str: info = self._session_info payload = { "action": "aipkit_cache_sse_message", "message": message, "_ajax_nonce": info.nonce, "bot_id": info.bot_id, "user_client_message_id": self._generate_client_msg_id(), } ajax_url = f"{self.config.base_url}/wp-admin/admin-ajax.php" resp = self._session.post( ajax_url, data=payload, headers={"X-Requested-With": "XMLHttpRequest", "Accept": "*/*"}, timeout=self.config.timeout_cache, ) if resp.status_code == 403 or (resp.status_code == 400 and "nonce" in resp.text.lower()): raise NonceExpiredError() try: data = resp.json() except json.JSONDecodeError: raise CacheError(f"Invalid JSON (HTTP {resp.status_code}): {resp.text[:200]}") if not data.get("success"): err_data = data.get("data", {}) err_msg = err_data.get("message", str(err_data)) if isinstance(err_data, dict) else str(err_data) if "nonce" in err_msg.lower(): raise NonceExpiredError() raise CacheError(f"Cache rejected: {err_msg}") return data["data"]["cache_key"] def _open_stream(self, cache_key: str, conv: Conversation) -> requests.Response: info = self._session_info ajax_url = f"{self.config.base_url}/wp-admin/admin-ajax.php" params = { "action": "aipkit_frontend_chat_stream", "cache_key": cache_key, "bot_id": info.bot_id, "session_id": self._guest_uuid, "conversation_uuid": conv.conversation_id, "post_id": info.post_id, "_ts": str(int(time.time() * 1000)), "_ajax_nonce": info.nonce, } resp = self._session.get( f"{ajax_url}?{urlencode(params)}", headers={"Accept": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive"}, timeout=self.config.timeout_stream, stream=True, ) if resp.status_code == 403: raise NonceExpiredError() if resp.status_code != 200: raise StreamError(f"Stream HTTP {resp.status_code}") return resp def _execute_blocking(self, message: str, conv: Conversation) -> str: cache_key = self._cache_message(message) resp = self._open_stream(cache_key, conv) full_text = "" got_data = False for event_type, data_str in SSEParser.iter_events(resp, log_raw=self.config.log_sse_raw): got_data = True if data_str.strip() == "[DONE]": break if event_type == "error": raise StreamError(f"SSE error: {data_str}") if event_type in CONTROL_EVENTS: continue chunk = SSEParser.extract_text(data_str) if chunk: full_text += chunk if not got_data: raise EmptyResponseError() if not full_text.strip(): raise EmptyResponseError("Stream completed but empty") return full_text def _execute_stream(self, message: str, conv: Conversation) -> Generator[str, None, None]: cache_key = self._cache_message(message) resp = self._open_stream(cache_key, conv) metrics.active_streams += 1 try: for event_type, data_str in SSEParser.iter_events(resp, log_raw=self.config.log_sse_raw): if data_str.strip() == "[DONE]": break if event_type == "error": raise StreamError(f"SSE error: {data_str}") if event_type in CONTROL_EVENTS: continue chunk = SSEParser.extract_text(data_str) if chunk: yield chunk finally: metrics.active_streams = max(0, metrics.active_streams - 1) def get_status(self) -> Dict[str, Any]: return { "version": VERSION, "state": self._state.name, "session": { "bot_id": self._session_info.bot_id, "has_nonce": bool(self._session_info.nonce), "post_id": self._session_info.post_id, "age_seconds": round(self._session_info.age_seconds, 1), "request_count": self._session_info.request_count, "error_count": self._session_info.error_count, }, "circuit_breaker": self.circuit_breaker.state.value, "rate_limiter_tokens": round(self.rate_limiter.available_tokens, 2), "metrics": metrics.to_dict(), } # ══════════════════════════════════════════════════════════════ # SESSION POOL # ══════════════════════════════════════════════════════════════ class SessionPool: def __init__(self, config: Config): self.config = config self._clients: List[ChatGPTGratuitClient] = [] self._index = 0 self._lock = threading.Lock() for i in range(config.pool_size): self._clients.append(ChatGPTGratuitClient(config)) def initialize_all(self) -> int: success = 0 for i, client in enumerate(self._clients): try: if client.init_session(): success += 1 log.info(f"Pool client {i+1} initialized") except Exception as e: log.error(f"Pool client {i+1} init error: {e}") return success @contextmanager def acquire(self) -> Generator[ChatGPTGratuitClient, None, None]: with self._lock: attempts = len(self._clients) for _ in range(attempts): client = self._clients[self._index % len(self._clients)] self._index += 1 if client.is_ready or client.state == SessionState.UNINITIALIZED: break else: client = self._clients[0] yield client def get_status(self) -> Dict: return { "pool_size": len(self._clients), "clients": [ {"index": i, "state": c.state.name, "requests": c.session_info.request_count} for i, c in enumerate(self._clients) ], } # ══════════════════════════════════════════════════════════════ # FLASK APP FACTORY # ══════════════════════════════════════════════════════════════ def create_app(config: Optional[Config] = None) -> Tuple: from flask import Flask, request as freq, jsonify, Response, stream_with_context config = config or Config.from_env() app = Flask(APP_NAME) pool = SessionPool(config) ready_count = pool.initialize_all() if ready_count == 0: log.warning("No pool clients initialized! Will retry on first request.") if config.enable_cors: @app.after_request def add_cors(response): response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization" response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" return response @app.errorhandler(ChatGPTGratuitError) def handle_api_error(e: ChatGPTGratuitError): status_map = { ErrorCode.RATE_LIMITED: 429, ErrorCode.INVALID_INPUT: 400, ErrorCode.CIRCUIT_OPEN: 503, } return jsonify({"ok": False, **e.to_dict()}), status_map.get(e.code, 500) @app.errorhandler(404) def not_found(e): return jsonify({ "ok": False, "error": "Endpoint not found", "endpoints": [ "POST /chat", "POST /chat/stream", "POST /v1/chat/completions", "POST /new", "POST /refresh", "GET /health", "GET /metrics", ], }), 404 # ── Landing page ── @app.route("/", methods=["GET"]) def index(): return jsonify({ "name": APP_NAME, "version": VERSION, "status": "running", "endpoints": { "chat": "POST /chat", "stream": "POST /chat/stream", "openai_compat": "POST /v1/chat/completions", "health": "GET /health", "metrics": "GET /metrics", "new_conversation": "POST /new", "refresh_session": "POST /refresh", "conversations": "GET /conversations", }, "usage_example": { "curl": 'curl -X POST /chat -H "Content-Type: application/json" -d \'{"message": "Hello!"}\'', }, }) # ── POST /chat ── @app.route("/chat", methods=["POST"]) def chat(): data = freq.get_json(force=True, silent=True) or {} message = data.get("message", "").strip() if not message: return jsonify({"ok": False, "error": "Field 'message' is required"}), 400 with pool.acquire() as client: if data.get("new_conversation"): client.new_conversation(data.get("system_prompt")) response_text = client.send_message(message, system_prompt=data.get("system_prompt")) return jsonify({ "ok": True, "response": response_text, "conversation_id": client.active_conversation.conversation_id, "usage": { "message_count": len(client.active_conversation.messages), "estimated_tokens": client.active_conversation.total_tokens, }, }) # ── POST /chat/stream ── @app.route("/chat/stream", methods=["POST"]) def chat_stream(): data = freq.get_json(force=True, silent=True) or {} message = data.get("message", "").strip() if not message: return jsonify({"ok": False, "error": "Field 'message' is required"}), 400 with pool.acquire() as client: if data.get("new_conversation"): client.new_conversation(data.get("system_prompt")) def generate(): try: for chunk in client.send_message(message, stream=True): yield f"data: {json.dumps({'chunk': chunk}, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" except ChatGPTGratuitError as e: yield f"data: {json.dumps(e.to_dict())}\n\n" except Exception as e: yield f"data: {json.dumps({'error': str(e)})}\n\n" return Response( stream_with_context(generate()), content_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) # ── POST /v1/chat/completions (OpenAI-compatible) ── if config.enable_openai_compat: @app.route("/v1/chat/completions", methods=["POST"]) def openai_compat(): data = freq.get_json(force=True, silent=True) or {} messages = data.get("messages", []) do_stream = data.get("stream", False) if not messages: return jsonify({"error": {"message": "messages required"}}), 400 user_msg = None system_prompt = None for msg in messages: if msg.get("role") == "user": user_msg = msg.get("content", "") if msg.get("role") == "system": system_prompt = msg.get("content") if not user_msg: return jsonify({"error": {"message": "No user message found"}}), 400 response_id = f"chatcmpl-{uuid.uuid4().hex[:24]}" created = int(time.time()) with pool.acquire() as client: if do_stream: def generate(): try: for chunk in client.send_message( user_msg, stream=True, system_prompt=system_prompt ): yield f"data: {json.dumps({'id': response_id, 'object': 'chat.completion.chunk', 'created': created, 'model': 'chatgpt-gratuit', 'choices': [{'index': 0, 'delta': {'content': chunk}, 'finish_reason': None}]})}\n\n" yield f"data: {json.dumps({'id': response_id, 'object': 'chat.completion.chunk', 'created': created, 'model': 'chatgpt-gratuit', 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n" yield "data: [DONE]\n\n" except Exception as e: yield f"data: {json.dumps({'error': {'message': str(e)}})}\n\n" return Response( stream_with_context(generate()), content_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) response_text = client.send_message(user_msg, system_prompt=system_prompt) return jsonify({ "id": response_id, "object": "chat.completion", "created": created, "model": "chatgpt-gratuit", "choices": [{ "index": 0, "message": {"role": "assistant", "content": response_text}, "finish_reason": "stop", }], "usage": { "prompt_tokens": len(user_msg) // 4, "completion_tokens": len(response_text) // 4, "total_tokens": (len(user_msg) + len(response_text)) // 4, }, }) # ── POST /new ── @app.route("/new", methods=["POST"]) def new_conv(): data = freq.get_json(force=True, silent=True) or {} with pool.acquire() as client: conv = client.new_conversation(data.get("system_prompt")) return jsonify({"ok": True, "conversation_id": conv.conversation_id}) # ── POST /refresh ── @app.route("/refresh", methods=["POST"]) def refresh(): with pool.acquire() as client: result = client.refresh_nonce() return jsonify({ "ok": result, "nonce_prefix": client.session_info.nonce[:8] + "..." if client.session_info.nonce else None, }) # ── GET /health ── @app.route("/health", methods=["GET"]) def health(): with pool.acquire() as client: status = client.get_status() status["pool"] = pool.get_status() return jsonify(status), 200 if client.is_ready else 503 # ── GET /metrics ── if config.enable_metrics: @app.route("/metrics", methods=["GET"]) def metrics_endpoint(): return jsonify(metrics.to_dict()) # ── GET /conversations ── @app.route("/conversations", methods=["GET"]) def conversations(): with pool.acquire() as client: return jsonify({ "ok": True, "conversations": client.list_conversations(), "active": client._active_conversation_id, }) # ── GET /conversation//messages ── @app.route("/conversation//messages", methods=["GET"]) def conversation_messages(conv_id: str): with pool.acquire() as client: conv = client.get_conversation(conv_id) if not conv: return jsonify({"ok": False, "error": "Conversation not found"}), 404 return jsonify({ "ok": True, "conversation": conv.to_dict(), "messages": [ {"role": m.role, "content": m.content, "timestamp": m.timestamp, "message_id": m.message_id} for m in conv.messages ], }) return app, pool # ══════════════════════════════════════════════════════════════ # GUNICORN ENTRY POINT + DIRECT RUN # ══════════════════════════════════════════════════════════════ config = Config.from_env() config.validate() # Gunicorn looks for 'application' application, _pool = create_app(config) if __name__ == "__main__": print(f""" ╔══════════════════════════════════════════════════════════════╗ ║ 🚀 ChatGPT Gratuit API v{VERSION:<38s}║ ║ Server: http://{config.host}:{config.port:<43d}║ ║ ║ ║ POST /chat → Complete response ║ ║ POST /chat/stream → Streaming SSE ║ ║ POST /v1/chat/completions → OpenAI-compatible ║ ║ GET /health → Health check ║ ║ GET /metrics → Metrics ║ ╚══════════════════════════════════════════════════════════════╝ """) application.run(host=config.host, port=config.port, debug=config.debug, threaded=True)