GPT4FREEAPI / app.py
MB-IDK's picture
Create app.py
ccf84ba verified
#!/usr/bin/env python3
"""
╔══════════════════════════════════════════════════════════════════════════════╗
β•‘ ChatGPTGratuit API β€” Hugging Face Spaces Edition β•‘
β•‘ Based on Ultimate Edition v5.0 β•‘
β•‘ Deployed as a Docker Space on Hugging Face (Free Tier) β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
"""
from __future__ import annotations
import re
import os
import sys
import json
import uuid
import time
import random
import string
import logging
import threading
from abc import ABC, abstractmethod
from collections import deque
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum, auto
from pathlib import Path
from typing import (
Any, Deque, Dict, Generator, List, Optional, Tuple, Union
)
from urllib.parse import urlencode
import requests
from bs4 import BeautifulSoup
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
# ══════════════════════════════════════════════════════════════
# CONSTANTS & ENUMS
# ══════════════════════════════════════════════════════════════
VERSION = "5.0.0-hf"
APP_NAME = "ChatGPTGratuit-API"
BASE_URL = "https://chatgptgratuit.org"
class SessionState(Enum):
UNINITIALIZED = auto()
READY = auto()
DEGRADED = auto()
EXPIRED = auto()
FAILED = auto()
class ErrorCode(Enum):
INIT_FAILED = "INIT_FAILED"
NONCE_EXPIRED = "NONCE_EXPIRED"
CACHE_FAILED = "CACHE_FAILED"
STREAM_FAILED = "STREAM_FAILED"
STREAM_EMPTY = "STREAM_EMPTY"
RATE_LIMITED = "RATE_LIMITED"
TIMEOUT = "TIMEOUT"
PARSE_ERROR = "PARSE_ERROR"
INVALID_INPUT = "INVALID_INPUT"
CIRCUIT_OPEN = "CIRCUIT_OPEN"
POOL_EXHAUSTED = "POOL_EXHAUSTED"
UNKNOWN = "UNKNOWN"
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
USER_AGENTS: List[str] = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_5) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 Edg/125.0.0.0",
]
ACCEPT_LANGUAGES: List[str] = [
"fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7",
"fr-FR,fr;q=0.9,en;q=0.8",
"en-US,en;q=0.9,fr;q=0.8",
]
CONTROL_EVENTS: frozenset = frozenset({
"done", "message_start", "openai_response_id",
"error", "ping", "heartbeat", "keep-alive",
})
# ══════════════════════════════════════════════════════════════
# LOGGING
# ══════════════════════════════════════════════════════════════
class PrettyFormatter(logging.Formatter):
COLORS = {
"DEBUG": "\033[36m", "INFO": "\033[32m",
"WARNING": "\033[33m", "ERROR": "\033[31m",
"CRITICAL": "\033[35m",
}
RESET = "\033[0m"
ICONS = {
"DEBUG": "πŸ”", "INFO": "βœ…", "WARNING": "⚠️ ",
"ERROR": "❌", "CRITICAL": "πŸ’€",
}
def format(self, record: logging.LogRecord) -> str:
color = self.COLORS.get(record.levelname, "")
icon = self.ICONS.get(record.levelname, "")
ts = datetime.now().strftime("%H:%M:%S.%f")[:-3]
msg = record.getMessage()
extra_parts = []
for key in ("action", "duration_ms", "status_code", "cache_key"):
val = getattr(record, key, None)
if val is not None:
extra_parts.append(f"{key}={val}")
extra_str = f" [{', '.join(extra_parts)}]" if extra_parts else ""
return f"{color}{ts} {icon} {msg}{extra_str}{self.RESET}"
def setup_logging(level: str = "INFO") -> logging.Logger:
logger = logging.getLogger(APP_NAME)
logger.setLevel(getattr(logging, level.upper(), logging.INFO))
logger.handlers.clear()
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(PrettyFormatter())
logger.addHandler(handler)
return logger
log = setup_logging(os.environ.get("CGPT_LOG_LEVEL", "INFO"))
# ══════════════════════════════════════════════════════════════
# CONFIGURATION
# ══════════════════════════════════════════════════════════════
@dataclass
class Config:
base_url: str = BASE_URL
timeout_init: int = 20
timeout_cache: int = 30
timeout_stream: int = 120
max_retries: int = 3
retry_backoff_base: float = 1.5
retry_jitter: float = 0.5
rate_limit_rpm: int = 15
rate_limit_burst: int = 5
pool_size: int = 2
session_ttl: int = 1800
max_history_messages: int = 50
max_message_length: int = 10000
cb_failure_threshold: int = 5
cb_recovery_timeout: int = 60
cb_half_open_max: int = 2
host: str = "0.0.0.0"
port: int = 7860
debug: bool = False
log_level: str = "INFO"
json_logs: bool = False
log_sse_raw: bool = False
enable_metrics: bool = True
enable_openai_compat: bool = True
enable_cors: bool = True
@classmethod
def from_env(cls) -> "Config":
cfg = cls()
env_map = {
"CGPT_BASE_URL": ("base_url", str),
"CGPT_TIMEOUT": ("timeout_stream", int),
"CGPT_MAX_RETRIES": ("max_retries", int),
"CGPT_RATE_LIMIT": ("rate_limit_rpm", int),
"CGPT_POOL_SIZE": ("pool_size", int),
"CGPT_MAX_HISTORY": ("max_history_messages", int),
"CGPT_HOST": ("host", str),
"CGPT_PORT": ("port", int),
"CGPT_DEBUG": ("debug", lambda x: x.lower() in ("1", "true", "yes")),
"CGPT_LOG_LEVEL": ("log_level", str),
"CGPT_JSON_LOGS": ("json_logs", lambda x: x.lower() in ("1", "true", "yes")),
}
for env_key, (attr, converter) in env_map.items():
val = os.environ.get(env_key)
if val is not None:
try:
setattr(cfg, attr, converter(val))
except (ValueError, TypeError):
log.warning(f"Invalid env var {env_key}={val}, using default")
return cfg
def validate(self) -> List[str]:
warnings = []
if self.pool_size < 1:
self.pool_size = 1
warnings.append("pool_size adjusted to minimum 1")
if self.max_retries < 0:
self.max_retries = 0
warnings.append("max_retries adjusted to 0")
if self.rate_limit_rpm < 1:
self.rate_limit_rpm = 1
warnings.append("rate_limit_rpm adjusted to minimum 1")
if self.timeout_stream < 10:
self.timeout_stream = 10
warnings.append("timeout_stream adjusted to minimum 10s")
return warnings
# ══════════════════════════════════════════════════════════════
# EXCEPTIONS
# ══════════════════════════════════════════════════════════════
class ChatGPTGratuitError(Exception):
def __init__(self, message: str, code: ErrorCode = ErrorCode.UNKNOWN,
details: Optional[Dict] = None):
super().__init__(message)
self.code = code
self.details = details or {}
def to_dict(self) -> Dict[str, Any]:
return {"error": str(self), "code": self.code.value, "details": self.details}
class InitError(ChatGPTGratuitError):
def __init__(self, msg="Session initialization failed", **kw):
super().__init__(msg, ErrorCode.INIT_FAILED, **kw)
class NonceExpiredError(ChatGPTGratuitError):
def __init__(self, msg="Nonce expired", **kw):
super().__init__(msg, ErrorCode.NONCE_EXPIRED, **kw)
class CacheError(ChatGPTGratuitError):
def __init__(self, msg="Cache operation failed", **kw):
super().__init__(msg, ErrorCode.CACHE_FAILED, **kw)
class StreamError(ChatGPTGratuitError):
def __init__(self, msg="Stream error", **kw):
super().__init__(msg, ErrorCode.STREAM_FAILED, **kw)
class EmptyResponseError(ChatGPTGratuitError):
def __init__(self, msg="Empty response from stream", **kw):
super().__init__(msg, ErrorCode.STREAM_EMPTY, **kw)
class RateLimitError(ChatGPTGratuitError):
def __init__(self, msg="Rate limit exceeded", **kw):
super().__init__(msg, ErrorCode.RATE_LIMITED, **kw)
class CircuitOpenError(ChatGPTGratuitError):
def __init__(self, msg="Circuit breaker is open", **kw):
super().__init__(msg, ErrorCode.CIRCUIT_OPEN, **kw)
class InputValidationError(ChatGPTGratuitError):
def __init__(self, msg="Invalid input", **kw):
super().__init__(msg, ErrorCode.INVALID_INPUT, **kw)
# ══════════════════════════════════════════════════════════════
# DATA MODELS
# ══════════════════════════════════════════════════════════════
@dataclass
class Message:
role: str
content: str
timestamp: float = field(default_factory=time.time)
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
tokens_estimate: int = 0
def __post_init__(self):
self.tokens_estimate = max(1, len(self.content) // 4)
@dataclass
class Conversation:
conversation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
messages: List[Message] = field(default_factory=list)
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
title: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
def add_message(self, role: str, content: str, max_messages: int = 50) -> Message:
msg = Message(role=role, content=content)
self.messages.append(msg)
self.updated_at = time.time()
if self.title is None and role == "user":
self.title = content[:80] + ("..." if len(content) > 80 else "")
if len(self.messages) > max_messages:
system_msgs = [m for m in self.messages if m.role == "system"]
other_msgs = [m for m in self.messages if m.role != "system"]
keep = max_messages - len(system_msgs)
self.messages = system_msgs + other_msgs[-keep:]
return msg
@property
def total_tokens(self) -> int:
return sum(m.tokens_estimate for m in self.messages)
def to_dict(self) -> Dict:
return {
"conversation_id": self.conversation_id,
"title": self.title,
"message_count": len(self.messages),
"total_tokens": self.total_tokens,
"created_at": self.created_at,
"updated_at": self.updated_at,
}
@dataclass
class SessionInfo:
nonce: Optional[str] = None
bot_id: Optional[str] = None
post_id: Optional[str] = None
created_at: float = field(default_factory=time.time)
last_used: float = field(default_factory=time.time)
request_count: int = 0
error_count: int = 0
@property
def is_valid(self) -> bool:
return bool(self.nonce and self.bot_id)
@property
def age_seconds(self) -> float:
return time.time() - self.created_at
def mark_used(self):
self.last_used = time.time()
self.request_count += 1
def mark_error(self):
self.error_count += 1
# ══════════════════════════════════════════════════════════════
# METRICS
# ══════════════════════════════════════════════════════════════
@dataclass
class Metrics:
_lock: threading.Lock = field(default_factory=threading.Lock, repr=False)
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
total_retries: int = 0
nonce_refreshes: int = 0
total_chars_received: int = 0
total_response_time_ms: float = 0.0
active_streams: int = 0
circuit_breaker_trips: int = 0
_latencies: Deque[float] = field(default_factory=lambda: deque(maxlen=1000), repr=False)
started_at: float = field(default_factory=time.time)
def record_request(self, success: bool, duration_ms: float, chars: int = 0):
with self._lock:
self.total_requests += 1
if success:
self.successful_requests += 1
self.total_chars_received += chars
else:
self.failed_requests += 1
self.total_response_time_ms += duration_ms
self._latencies.append(duration_ms)
def record_retry(self):
with self._lock:
self.total_retries += 1
def record_nonce_refresh(self):
with self._lock:
self.nonce_refreshes += 1
def record_circuit_trip(self):
with self._lock:
self.circuit_breaker_trips += 1
@property
def avg_latency_ms(self) -> float:
with self._lock:
return sum(self._latencies) / len(self._latencies) if self._latencies else 0.0
@property
def p95_latency_ms(self) -> float:
with self._lock:
if not self._latencies:
return 0.0
s = sorted(self._latencies)
return s[min(int(len(s) * 0.95), len(s) - 1)]
@property
def success_rate(self) -> float:
with self._lock:
return self.successful_requests / self.total_requests if self.total_requests else 1.0
def to_dict(self) -> Dict[str, Any]:
with self._lock:
return {
"total_requests": self.total_requests,
"successful_requests": self.successful_requests,
"failed_requests": self.failed_requests,
"success_rate": round(self.success_rate, 4),
"total_retries": self.total_retries,
"nonce_refreshes": self.nonce_refreshes,
"total_chars_received": self.total_chars_received,
"avg_latency_ms": round(self.avg_latency_ms, 1),
"p95_latency_ms": round(self.p95_latency_ms, 1),
"active_streams": self.active_streams,
"circuit_breaker_trips": self.circuit_breaker_trips,
"uptime_seconds": round(time.time() - self.started_at, 1),
}
metrics = Metrics()
# ══════════════════════════════════════════════════════════════
# RATE LIMITER
# ══════════════════════════════════════════════════════════════
class RateLimiter:
def __init__(self, rpm: int = 10, burst: int = 3):
self.rate = rpm / 60.0
self.max_tokens = float(burst)
self.tokens = float(burst)
self.last_refill = time.monotonic()
self._lock = threading.Lock()
def _refill(self):
now = time.monotonic()
self.tokens = min(self.max_tokens, self.tokens + (now - self.last_refill) * self.rate)
self.last_refill = now
def acquire(self, timeout: float = 30.0) -> bool:
deadline = time.monotonic() + timeout
while True:
with self._lock:
self._refill()
if self.tokens >= 1.0:
self.tokens -= 1.0
return True
if time.monotonic() >= deadline:
return False
time.sleep(min(1.0 / max(self.rate, 0.01), 0.5))
@property
def available_tokens(self) -> float:
with self._lock:
self._refill()
return self.tokens
# ══════════════════════════════════════════════════════════════
# CIRCUIT BREAKER
# ══════════════════════════════════════════════════════════════
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60,
half_open_max: int = 2):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max = half_open_max
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = 0.0
self.half_open_attempts = 0
self._lock = threading.Lock()
def can_execute(self) -> bool:
with self._lock:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_attempts = 0
log.info("Circuit breaker β†’ HALF_OPEN")
return True
return False
return self.half_open_attempts < self.half_open_max
def record_success(self):
with self._lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.half_open_max:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
log.info("Circuit breaker β†’ CLOSED (recovered)")
else:
self.failure_count = max(0, self.failure_count - 1)
def record_failure(self):
with self._lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
metrics.record_circuit_trip()
log.warning("Circuit breaker β†’ OPEN (recovery failed)")
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
metrics.record_circuit_trip()
log.warning(f"Circuit breaker β†’ OPEN (failures={self.failure_count})")
# ══════════════════════════════════════════════════════════════
# PLUGIN SYSTEM
# ══════════════════════════════════════════════════════════════
class ResponsePlugin(ABC):
@property
@abstractmethod
def name(self) -> str: ...
@abstractmethod
def process(self, response: str, conversation: Conversation) -> str: ...
class StripWhitespacePlugin(ResponsePlugin):
@property
def name(self) -> str:
return "strip_whitespace"
def process(self, response: str, conversation: Conversation) -> str:
return re.sub(r'\n{3,}', '\n\n', response).strip()
class PluginManager:
def __init__(self):
self._plugins: List[ResponsePlugin] = []
def register(self, plugin: ResponsePlugin):
self._plugins.append(plugin)
def apply_all(self, response: str, conversation: Conversation) -> str:
for plugin in self._plugins:
try:
response = plugin.process(response, conversation)
except Exception as e:
log.warning(f"Plugin {plugin.name} failed: {e}")
return response
# ══════════════════════════════════════════════════════════════
# WEBSITE PARSER
# ══════════════════════════════════════════════════════════════
class WebsiteParser:
@staticmethod
def parse(html: str) -> SessionInfo:
info = SessionInfo()
soup = BeautifulSoup(html, "html.parser")
container = soup.find(class_="aipkit_chat_container")
if container:
config_str = container.get("data-config", "")
if config_str:
try:
config = json.loads(config_str)
info.nonce = config.get("nonce", "")
info.bot_id = str(config.get("botId", ""))
info.post_id = str(config.get("postId", ""))
if info.is_valid:
return info
except json.JSONDecodeError:
pass
if not info.bot_id:
textarea = soup.find("textarea", id=re.compile(r"aipkit_chat_input_field_\d+"))
if textarea:
match = re.search(r"_(\d+)$", textarea.get("id", ""))
if match:
info.bot_id = match.group(1)
if not info.nonce:
for pattern in [
r'"nonce"\s*:\s*"([a-f0-9]{8,})"',
r"'nonce'\s*:\s*'([a-f0-9]{8,})'",
r'nonce["\s:=]+["\']([a-f0-9]{8,})',
]:
match = re.search(pattern, html)
if match:
info.nonce = match.group(1)
break
if not info.bot_id:
for pattern in [r'"botId"\s*:\s*"?(\d+)', r'"bot_id"\s*:\s*"?(\d+)']:
match = re.search(pattern, html)
if match:
info.bot_id = match.group(1)
break
if not info.post_id:
match = re.search(r'"postId"\s*:\s*"?(\d+)', html)
info.post_id = match.group(1) if match else "7"
return info
# ══════════════════════════════════════════════════════════════
# SSE PARSER
# ══════════════════════════════════════════════════════════════
class SSEParser:
@staticmethod
def iter_events(response: requests.Response,
log_raw: bool = False) -> Generator[Tuple[str, str], None, None]:
current_event = "message"
data_buffer: List[str] = []
for line in response.iter_lines(decode_unicode=True):
if log_raw:
log.debug(f"SSE RAW: {repr(line)}")
if line is None:
continue
if not line:
if data_buffer:
yield current_event, "\n".join(data_buffer)
data_buffer.clear()
current_event = "message"
continue
if line.startswith("event:"):
current_event = line[6:].strip()
elif line.startswith("data:"):
data_str = line[5:]
if data_str.startswith(" "):
data_str = data_str[1:]
data_buffer.append(data_str)
elif line.startswith("id:") or line.startswith(":"):
continue
if data_buffer:
yield current_event, "\n".join(data_buffer)
@staticmethod
def extract_text(data_str: str) -> str:
try:
data = json.loads(data_str)
if isinstance(data, dict):
if "delta" in data and isinstance(data["delta"], str):
return data["delta"]
if "choices" in data and data["choices"]:
choice = data["choices"][0]
if "delta" in choice:
return choice["delta"].get("content", "")
if "message" in choice:
return choice["message"].get("content", "")
for key in ("content", "text", "message", "data", "chunk", "response"):
if key in data and isinstance(data[key], str):
return data[key]
if isinstance(data, str):
return data
return ""
except (json.JSONDecodeError, TypeError):
return data_str
# ══════════════════════════════════════════════════════════════
# CORE CLIENT
# ══════════════════════════════════════════════════════════════
class ChatGPTGratuitClient:
def __init__(self, config: Optional[Config] = None):
self.config = config or Config.from_env()
self._session = requests.Session()
self._session_info = SessionInfo()
self._state = SessionState.UNINITIALIZED
self._guest_uuid = str(uuid.uuid4())
self._lock = threading.Lock()
self.rate_limiter = RateLimiter(
rpm=self.config.rate_limit_rpm, burst=self.config.rate_limit_burst,
)
self.circuit_breaker = CircuitBreaker(
failure_threshold=self.config.cb_failure_threshold,
recovery_timeout=self.config.cb_recovery_timeout,
half_open_max=self.config.cb_half_open_max,
)
self.plugin_manager = PluginManager()
self.plugin_manager.register(StripWhitespacePlugin())
self._conversations: Dict[str, Conversation] = {}
self._active_conversation_id: Optional[str] = None
self._rotate_identity()
@property
def state(self) -> SessionState:
return self._state
@property
def is_ready(self) -> bool:
return self._state in (SessionState.READY, SessionState.DEGRADED)
@property
def active_conversation(self) -> Conversation:
if self._active_conversation_id not in self._conversations:
conv = Conversation()
self._conversations[conv.conversation_id] = conv
self._active_conversation_id = conv.conversation_id
return self._conversations[self._active_conversation_id]
@property
def session_info(self) -> SessionInfo:
return self._session_info
def _rotate_identity(self):
ua = random.choice(USER_AGENTS)
lang = random.choice(ACCEPT_LANGUAGES)
self._session.headers.update({
"User-Agent": ua,
"Accept-Language": lang,
"Referer": self.config.base_url + "/",
"Origin": self.config.base_url,
"DNT": "1",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
})
def init_session(self, force: bool = False) -> bool:
with self._lock:
if self.is_ready and not force:
if self._session_info.age_seconds < self.config.session_ttl:
return True
log.info("Initializing session...", extra={"action": "init_session"})
self._rotate_identity()
try:
resp = self._session.get(self.config.base_url, timeout=self.config.timeout_init)
resp.raise_for_status()
except requests.RequestException as e:
self._state = SessionState.FAILED
log.error(f"Failed to load page: {e}")
return False
self._session_info = WebsiteParser.parse(resp.text)
if self._session_info.is_valid:
self._state = SessionState.READY
log.info(
f"Session ready: bot_id={self._session_info.bot_id}, "
f"nonce={self._session_info.nonce[:8]}...",
extra={"action": "init_session", "bot_id": self._session_info.bot_id},
)
else:
self._state = SessionState.DEGRADED
log.warning(
f"Partial init: nonce={'βœ“' if self._session_info.nonce else 'βœ—'}, "
f"bot_id={'βœ“' if self._session_info.bot_id else 'βœ—'}"
)
return self._session_info.is_valid
def refresh_nonce(self) -> bool:
log.info("Refreshing nonce...")
metrics.record_nonce_refresh()
return self.init_session(force=True)
def _ensure_session(self):
if not self.is_ready:
if not self.init_session():
raise InitError()
if self._session_info.age_seconds > self.config.session_ttl:
self.refresh_nonce()
def new_conversation(self, system_prompt: Optional[str] = None) -> Conversation:
conv = Conversation()
if system_prompt:
conv.add_message("system", system_prompt, self.config.max_history_messages)
self._conversations[conv.conversation_id] = conv
self._active_conversation_id = conv.conversation_id
return conv
def get_conversation(self, conversation_id: str) -> Optional[Conversation]:
return self._conversations.get(conversation_id)
def list_conversations(self) -> List[Dict]:
return [c.to_dict() for c in self._conversations.values()]
def _generate_client_msg_id(self) -> str:
ts = int(time.time() * 1000)
rand = ''.join(random.choices(string.ascii_lowercase + string.digits, k=5))
return f"aipkit-client-msg-{self._session_info.bot_id}-{ts}-{rand}"
def _calculate_backoff(self, attempt: int) -> float:
return self.config.retry_backoff_base ** attempt + random.uniform(0, self.config.retry_jitter)
def send_message(
self, message: str, *, stream: bool = False,
conversation_id: Optional[str] = None,
system_prompt: Optional[str] = None,
) -> Union[str, Generator[str, None, None]]:
message = message.strip()
if not message:
raise InputValidationError("Message cannot be empty")
if len(message) > self.config.max_message_length:
raise InputValidationError(f"Message too long ({len(message)} chars)")
if not self.circuit_breaker.can_execute():
raise CircuitOpenError()
if not self.rate_limiter.acquire(timeout=10.0):
raise RateLimitError()
self._ensure_session()
if conversation_id and conversation_id in self._conversations:
conv = self._conversations[conversation_id]
else:
conv = self.active_conversation
if system_prompt and not any(m.role == "system" for m in conv.messages):
conv.add_message("system", system_prompt, self.config.max_history_messages)
conv.add_message("user", message, self.config.max_history_messages)
last_error: Optional[Exception] = None
start_time = time.monotonic()
for attempt in range(self.config.max_retries + 1):
try:
if attempt > 0:
delay = self._calculate_backoff(attempt)
log.info(f"Retry {attempt}/{self.config.max_retries} after {delay:.1f}s")
metrics.record_retry()
time.sleep(delay)
if stream:
gen = self._execute_stream(message, conv)
return self._wrap_stream_generator(gen, conv, start_time)
else:
result = self._execute_blocking(message, conv)
duration = (time.monotonic() - start_time) * 1000
result = self.plugin_manager.apply_all(result, conv)
conv.add_message("assistant", result, self.config.max_history_messages)
metrics.record_request(True, duration, len(result))
self.circuit_breaker.record_success()
self._session_info.mark_used()
log.info(
f"Response received ({len(result)} chars)",
extra={"action": "send_message", "duration_ms": round(duration)},
)
return result
except NonceExpiredError:
log.warning("Nonce expired, refreshing...")
self.refresh_nonce()
last_error = NonceExpiredError()
except (StreamError, EmptyResponseError, CacheError) as e:
last_error = e
self.circuit_breaker.record_failure()
self._session_info.mark_error()
log.warning(f"Attempt {attempt + 1} failed: {e}")
if attempt < self.config.max_retries:
self.refresh_nonce()
except requests.Timeout:
last_error = ChatGPTGratuitError("Request timeout", ErrorCode.TIMEOUT)
self.circuit_breaker.record_failure()
except requests.RequestException as e:
last_error = ChatGPTGratuitError(str(e), ErrorCode.UNKNOWN)
self.circuit_breaker.record_failure()
duration = (time.monotonic() - start_time) * 1000
metrics.record_request(False, duration)
if isinstance(last_error, ChatGPTGratuitError):
raise last_error
raise ChatGPTGratuitError(f"All attempts failed: {last_error}", ErrorCode.UNKNOWN)
def _wrap_stream_generator(self, gen, conv, start_time):
full_text = ""
try:
for chunk in gen:
full_text += chunk
yield chunk
full_text = self.plugin_manager.apply_all(full_text, conv)
conv.add_message("assistant", full_text, self.config.max_history_messages)
duration = (time.monotonic() - start_time) * 1000
metrics.record_request(True, duration, len(full_text))
self.circuit_breaker.record_success()
self._session_info.mark_used()
except Exception:
duration = (time.monotonic() - start_time) * 1000
metrics.record_request(False, duration)
self.circuit_breaker.record_failure()
raise
def _cache_message(self, message: str) -> str:
info = self._session_info
payload = {
"action": "aipkit_cache_sse_message",
"message": message,
"_ajax_nonce": info.nonce,
"bot_id": info.bot_id,
"user_client_message_id": self._generate_client_msg_id(),
}
ajax_url = f"{self.config.base_url}/wp-admin/admin-ajax.php"
resp = self._session.post(
ajax_url, data=payload,
headers={"X-Requested-With": "XMLHttpRequest", "Accept": "*/*"},
timeout=self.config.timeout_cache,
)
if resp.status_code == 403 or (resp.status_code == 400 and "nonce" in resp.text.lower()):
raise NonceExpiredError()
try:
data = resp.json()
except json.JSONDecodeError:
raise CacheError(f"Invalid JSON (HTTP {resp.status_code}): {resp.text[:200]}")
if not data.get("success"):
err_data = data.get("data", {})
err_msg = err_data.get("message", str(err_data)) if isinstance(err_data, dict) else str(err_data)
if "nonce" in err_msg.lower():
raise NonceExpiredError()
raise CacheError(f"Cache rejected: {err_msg}")
return data["data"]["cache_key"]
def _open_stream(self, cache_key: str, conv: Conversation) -> requests.Response:
info = self._session_info
ajax_url = f"{self.config.base_url}/wp-admin/admin-ajax.php"
params = {
"action": "aipkit_frontend_chat_stream",
"cache_key": cache_key,
"bot_id": info.bot_id,
"session_id": self._guest_uuid,
"conversation_uuid": conv.conversation_id,
"post_id": info.post_id,
"_ts": str(int(time.time() * 1000)),
"_ajax_nonce": info.nonce,
}
resp = self._session.get(
f"{ajax_url}?{urlencode(params)}",
headers={"Accept": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive"},
timeout=self.config.timeout_stream,
stream=True,
)
if resp.status_code == 403:
raise NonceExpiredError()
if resp.status_code != 200:
raise StreamError(f"Stream HTTP {resp.status_code}")
return resp
def _execute_blocking(self, message: str, conv: Conversation) -> str:
cache_key = self._cache_message(message)
resp = self._open_stream(cache_key, conv)
full_text = ""
got_data = False
for event_type, data_str in SSEParser.iter_events(resp, log_raw=self.config.log_sse_raw):
got_data = True
if data_str.strip() == "[DONE]":
break
if event_type == "error":
raise StreamError(f"SSE error: {data_str}")
if event_type in CONTROL_EVENTS:
continue
chunk = SSEParser.extract_text(data_str)
if chunk:
full_text += chunk
if not got_data:
raise EmptyResponseError()
if not full_text.strip():
raise EmptyResponseError("Stream completed but empty")
return full_text
def _execute_stream(self, message: str, conv: Conversation) -> Generator[str, None, None]:
cache_key = self._cache_message(message)
resp = self._open_stream(cache_key, conv)
metrics.active_streams += 1
try:
for event_type, data_str in SSEParser.iter_events(resp, log_raw=self.config.log_sse_raw):
if data_str.strip() == "[DONE]":
break
if event_type == "error":
raise StreamError(f"SSE error: {data_str}")
if event_type in CONTROL_EVENTS:
continue
chunk = SSEParser.extract_text(data_str)
if chunk:
yield chunk
finally:
metrics.active_streams = max(0, metrics.active_streams - 1)
def get_status(self) -> Dict[str, Any]:
return {
"version": VERSION,
"state": self._state.name,
"session": {
"bot_id": self._session_info.bot_id,
"has_nonce": bool(self._session_info.nonce),
"post_id": self._session_info.post_id,
"age_seconds": round(self._session_info.age_seconds, 1),
"request_count": self._session_info.request_count,
"error_count": self._session_info.error_count,
},
"circuit_breaker": self.circuit_breaker.state.value,
"rate_limiter_tokens": round(self.rate_limiter.available_tokens, 2),
"metrics": metrics.to_dict(),
}
# ══════════════════════════════════════════════════════════════
# SESSION POOL
# ══════════════════════════════════════════════════════════════
class SessionPool:
def __init__(self, config: Config):
self.config = config
self._clients: List[ChatGPTGratuitClient] = []
self._index = 0
self._lock = threading.Lock()
for i in range(config.pool_size):
self._clients.append(ChatGPTGratuitClient(config))
def initialize_all(self) -> int:
success = 0
for i, client in enumerate(self._clients):
try:
if client.init_session():
success += 1
log.info(f"Pool client {i+1} initialized")
except Exception as e:
log.error(f"Pool client {i+1} init error: {e}")
return success
@contextmanager
def acquire(self) -> Generator[ChatGPTGratuitClient, None, None]:
with self._lock:
attempts = len(self._clients)
for _ in range(attempts):
client = self._clients[self._index % len(self._clients)]
self._index += 1
if client.is_ready or client.state == SessionState.UNINITIALIZED:
break
else:
client = self._clients[0]
yield client
def get_status(self) -> Dict:
return {
"pool_size": len(self._clients),
"clients": [
{"index": i, "state": c.state.name, "requests": c.session_info.request_count}
for i, c in enumerate(self._clients)
],
}
# ══════════════════════════════════════════════════════════════
# FLASK APP FACTORY
# ══════════════════════════════════════════════════════════════
def create_app(config: Optional[Config] = None) -> Tuple:
from flask import Flask, request as freq, jsonify, Response, stream_with_context
config = config or Config.from_env()
app = Flask(APP_NAME)
pool = SessionPool(config)
ready_count = pool.initialize_all()
if ready_count == 0:
log.warning("No pool clients initialized! Will retry on first request.")
if config.enable_cors:
@app.after_request
def add_cors(response):
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization"
response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS"
return response
@app.errorhandler(ChatGPTGratuitError)
def handle_api_error(e: ChatGPTGratuitError):
status_map = {
ErrorCode.RATE_LIMITED: 429,
ErrorCode.INVALID_INPUT: 400,
ErrorCode.CIRCUIT_OPEN: 503,
}
return jsonify({"ok": False, **e.to_dict()}), status_map.get(e.code, 500)
@app.errorhandler(404)
def not_found(e):
return jsonify({
"ok": False, "error": "Endpoint not found",
"endpoints": [
"POST /chat", "POST /chat/stream", "POST /v1/chat/completions",
"POST /new", "POST /refresh", "GET /health", "GET /metrics",
],
}), 404
# ── Landing page ──
@app.route("/", methods=["GET"])
def index():
return jsonify({
"name": APP_NAME,
"version": VERSION,
"status": "running",
"endpoints": {
"chat": "POST /chat",
"stream": "POST /chat/stream",
"openai_compat": "POST /v1/chat/completions",
"health": "GET /health",
"metrics": "GET /metrics",
"new_conversation": "POST /new",
"refresh_session": "POST /refresh",
"conversations": "GET /conversations",
},
"usage_example": {
"curl": 'curl -X POST /chat -H "Content-Type: application/json" -d \'{"message": "Hello!"}\'',
},
})
# ── POST /chat ──
@app.route("/chat", methods=["POST"])
def chat():
data = freq.get_json(force=True, silent=True) or {}
message = data.get("message", "").strip()
if not message:
return jsonify({"ok": False, "error": "Field 'message' is required"}), 400
with pool.acquire() as client:
if data.get("new_conversation"):
client.new_conversation(data.get("system_prompt"))
response_text = client.send_message(message, system_prompt=data.get("system_prompt"))
return jsonify({
"ok": True,
"response": response_text,
"conversation_id": client.active_conversation.conversation_id,
"usage": {
"message_count": len(client.active_conversation.messages),
"estimated_tokens": client.active_conversation.total_tokens,
},
})
# ── POST /chat/stream ──
@app.route("/chat/stream", methods=["POST"])
def chat_stream():
data = freq.get_json(force=True, silent=True) or {}
message = data.get("message", "").strip()
if not message:
return jsonify({"ok": False, "error": "Field 'message' is required"}), 400
with pool.acquire() as client:
if data.get("new_conversation"):
client.new_conversation(data.get("system_prompt"))
def generate():
try:
for chunk in client.send_message(message, stream=True):
yield f"data: {json.dumps({'chunk': chunk}, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
except ChatGPTGratuitError as e:
yield f"data: {json.dumps(e.to_dict())}\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return Response(
stream_with_context(generate()),
content_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
# ── POST /v1/chat/completions (OpenAI-compatible) ──
if config.enable_openai_compat:
@app.route("/v1/chat/completions", methods=["POST"])
def openai_compat():
data = freq.get_json(force=True, silent=True) or {}
messages = data.get("messages", [])
do_stream = data.get("stream", False)
if not messages:
return jsonify({"error": {"message": "messages required"}}), 400
user_msg = None
system_prompt = None
for msg in messages:
if msg.get("role") == "user":
user_msg = msg.get("content", "")
if msg.get("role") == "system":
system_prompt = msg.get("content")
if not user_msg:
return jsonify({"error": {"message": "No user message found"}}), 400
response_id = f"chatcmpl-{uuid.uuid4().hex[:24]}"
created = int(time.time())
with pool.acquire() as client:
if do_stream:
def generate():
try:
for chunk in client.send_message(
user_msg, stream=True, system_prompt=system_prompt
):
yield f"data: {json.dumps({'id': response_id, 'object': 'chat.completion.chunk', 'created': created, 'model': 'chatgpt-gratuit', 'choices': [{'index': 0, 'delta': {'content': chunk}, 'finish_reason': None}]})}\n\n"
yield f"data: {json.dumps({'id': response_id, 'object': 'chat.completion.chunk', 'created': created, 'model': 'chatgpt-gratuit', 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': {'message': str(e)}})}\n\n"
return Response(
stream_with_context(generate()),
content_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
response_text = client.send_message(user_msg, system_prompt=system_prompt)
return jsonify({
"id": response_id,
"object": "chat.completion",
"created": created,
"model": "chatgpt-gratuit",
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": response_text},
"finish_reason": "stop",
}],
"usage": {
"prompt_tokens": len(user_msg) // 4,
"completion_tokens": len(response_text) // 4,
"total_tokens": (len(user_msg) + len(response_text)) // 4,
},
})
# ── POST /new ──
@app.route("/new", methods=["POST"])
def new_conv():
data = freq.get_json(force=True, silent=True) or {}
with pool.acquire() as client:
conv = client.new_conversation(data.get("system_prompt"))
return jsonify({"ok": True, "conversation_id": conv.conversation_id})
# ── POST /refresh ──
@app.route("/refresh", methods=["POST"])
def refresh():
with pool.acquire() as client:
result = client.refresh_nonce()
return jsonify({
"ok": result,
"nonce_prefix": client.session_info.nonce[:8] + "..." if client.session_info.nonce else None,
})
# ── GET /health ──
@app.route("/health", methods=["GET"])
def health():
with pool.acquire() as client:
status = client.get_status()
status["pool"] = pool.get_status()
return jsonify(status), 200 if client.is_ready else 503
# ── GET /metrics ──
if config.enable_metrics:
@app.route("/metrics", methods=["GET"])
def metrics_endpoint():
return jsonify(metrics.to_dict())
# ── GET /conversations ──
@app.route("/conversations", methods=["GET"])
def conversations():
with pool.acquire() as client:
return jsonify({
"ok": True,
"conversations": client.list_conversations(),
"active": client._active_conversation_id,
})
# ── GET /conversation/<id>/messages ──
@app.route("/conversation/<conv_id>/messages", methods=["GET"])
def conversation_messages(conv_id: str):
with pool.acquire() as client:
conv = client.get_conversation(conv_id)
if not conv:
return jsonify({"ok": False, "error": "Conversation not found"}), 404
return jsonify({
"ok": True,
"conversation": conv.to_dict(),
"messages": [
{"role": m.role, "content": m.content, "timestamp": m.timestamp, "message_id": m.message_id}
for m in conv.messages
],
})
return app, pool
# ══════════════════════════════════════════════════════════════
# GUNICORN ENTRY POINT + DIRECT RUN
# ══════════════════════════════════════════════════════════════
config = Config.from_env()
config.validate()
# Gunicorn looks for 'application'
application, _pool = create_app(config)
if __name__ == "__main__":
print(f"""
╔══════════════════════════════════════════════════════════════╗
β•‘ πŸš€ ChatGPT Gratuit API v{VERSION:<38s}β•‘
β•‘ Server: http://{config.host}:{config.port:<43d}β•‘
β•‘ β•‘
β•‘ POST /chat β†’ Complete response β•‘
β•‘ POST /chat/stream β†’ Streaming SSE β•‘
β•‘ POST /v1/chat/completions β†’ OpenAI-compatible β•‘
β•‘ GET /health β†’ Health check β•‘
β•‘ GET /metrics β†’ Metrics β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
""")
application.run(host=config.host, port=config.port, debug=config.debug, threaded=True)