Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β ChatGPTGratuit API β Hugging Face Spaces Edition β | |
| β Based on Ultimate Edition v5.0 β | |
| β Deployed as a Docker Space on Hugging Face (Free Tier) β | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| """ | |
| from __future__ import annotations | |
| import re | |
| import os | |
| import sys | |
| import json | |
| import uuid | |
| import time | |
| import random | |
| import string | |
| import logging | |
| import threading | |
| from abc import ABC, abstractmethod | |
| from collections import deque | |
| from contextlib import contextmanager | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timezone | |
| from enum import Enum, auto | |
| from pathlib import Path | |
| from typing import ( | |
| Any, Deque, Dict, Generator, List, Optional, Tuple, Union | |
| ) | |
| from urllib.parse import urlencode | |
| import requests | |
| from bs4 import BeautifulSoup | |
| try: | |
| import yaml | |
| HAS_YAML = True | |
| except ImportError: | |
| HAS_YAML = False | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CONSTANTS & ENUMS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| VERSION = "5.0.0-hf" | |
| APP_NAME = "ChatGPTGratuit-API" | |
| BASE_URL = "https://chatgptgratuit.org" | |
| class SessionState(Enum): | |
| UNINITIALIZED = auto() | |
| READY = auto() | |
| DEGRADED = auto() | |
| EXPIRED = auto() | |
| FAILED = auto() | |
| class ErrorCode(Enum): | |
| INIT_FAILED = "INIT_FAILED" | |
| NONCE_EXPIRED = "NONCE_EXPIRED" | |
| CACHE_FAILED = "CACHE_FAILED" | |
| STREAM_FAILED = "STREAM_FAILED" | |
| STREAM_EMPTY = "STREAM_EMPTY" | |
| RATE_LIMITED = "RATE_LIMITED" | |
| TIMEOUT = "TIMEOUT" | |
| PARSE_ERROR = "PARSE_ERROR" | |
| INVALID_INPUT = "INVALID_INPUT" | |
| CIRCUIT_OPEN = "CIRCUIT_OPEN" | |
| POOL_EXHAUSTED = "POOL_EXHAUSTED" | |
| UNKNOWN = "UNKNOWN" | |
| class CircuitState(Enum): | |
| CLOSED = "closed" | |
| OPEN = "open" | |
| HALF_OPEN = "half_open" | |
| USER_AGENTS: List[str] = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_5) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0", | |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 Edg/125.0.0.0", | |
| ] | |
| ACCEPT_LANGUAGES: List[str] = [ | |
| "fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7", | |
| "fr-FR,fr;q=0.9,en;q=0.8", | |
| "en-US,en;q=0.9,fr;q=0.8", | |
| ] | |
| CONTROL_EVENTS: frozenset = frozenset({ | |
| "done", "message_start", "openai_response_id", | |
| "error", "ping", "heartbeat", "keep-alive", | |
| }) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LOGGING | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class PrettyFormatter(logging.Formatter): | |
| COLORS = { | |
| "DEBUG": "\033[36m", "INFO": "\033[32m", | |
| "WARNING": "\033[33m", "ERROR": "\033[31m", | |
| "CRITICAL": "\033[35m", | |
| } | |
| RESET = "\033[0m" | |
| ICONS = { | |
| "DEBUG": "π", "INFO": "β ", "WARNING": "β οΈ ", | |
| "ERROR": "β", "CRITICAL": "π", | |
| } | |
| def format(self, record: logging.LogRecord) -> str: | |
| color = self.COLORS.get(record.levelname, "") | |
| icon = self.ICONS.get(record.levelname, "") | |
| ts = datetime.now().strftime("%H:%M:%S.%f")[:-3] | |
| msg = record.getMessage() | |
| extra_parts = [] | |
| for key in ("action", "duration_ms", "status_code", "cache_key"): | |
| val = getattr(record, key, None) | |
| if val is not None: | |
| extra_parts.append(f"{key}={val}") | |
| extra_str = f" [{', '.join(extra_parts)}]" if extra_parts else "" | |
| return f"{color}{ts} {icon} {msg}{extra_str}{self.RESET}" | |
| def setup_logging(level: str = "INFO") -> logging.Logger: | |
| logger = logging.getLogger(APP_NAME) | |
| logger.setLevel(getattr(logging, level.upper(), logging.INFO)) | |
| logger.handlers.clear() | |
| handler = logging.StreamHandler(sys.stderr) | |
| handler.setFormatter(PrettyFormatter()) | |
| logger.addHandler(handler) | |
| return logger | |
| log = setup_logging(os.environ.get("CGPT_LOG_LEVEL", "INFO")) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CONFIGURATION | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Config: | |
| base_url: str = BASE_URL | |
| timeout_init: int = 20 | |
| timeout_cache: int = 30 | |
| timeout_stream: int = 120 | |
| max_retries: int = 3 | |
| retry_backoff_base: float = 1.5 | |
| retry_jitter: float = 0.5 | |
| rate_limit_rpm: int = 15 | |
| rate_limit_burst: int = 5 | |
| pool_size: int = 2 | |
| session_ttl: int = 1800 | |
| max_history_messages: int = 50 | |
| max_message_length: int = 10000 | |
| cb_failure_threshold: int = 5 | |
| cb_recovery_timeout: int = 60 | |
| cb_half_open_max: int = 2 | |
| host: str = "0.0.0.0" | |
| port: int = 7860 | |
| debug: bool = False | |
| log_level: str = "INFO" | |
| json_logs: bool = False | |
| log_sse_raw: bool = False | |
| enable_metrics: bool = True | |
| enable_openai_compat: bool = True | |
| enable_cors: bool = True | |
| def from_env(cls) -> "Config": | |
| cfg = cls() | |
| env_map = { | |
| "CGPT_BASE_URL": ("base_url", str), | |
| "CGPT_TIMEOUT": ("timeout_stream", int), | |
| "CGPT_MAX_RETRIES": ("max_retries", int), | |
| "CGPT_RATE_LIMIT": ("rate_limit_rpm", int), | |
| "CGPT_POOL_SIZE": ("pool_size", int), | |
| "CGPT_MAX_HISTORY": ("max_history_messages", int), | |
| "CGPT_HOST": ("host", str), | |
| "CGPT_PORT": ("port", int), | |
| "CGPT_DEBUG": ("debug", lambda x: x.lower() in ("1", "true", "yes")), | |
| "CGPT_LOG_LEVEL": ("log_level", str), | |
| "CGPT_JSON_LOGS": ("json_logs", lambda x: x.lower() in ("1", "true", "yes")), | |
| } | |
| for env_key, (attr, converter) in env_map.items(): | |
| val = os.environ.get(env_key) | |
| if val is not None: | |
| try: | |
| setattr(cfg, attr, converter(val)) | |
| except (ValueError, TypeError): | |
| log.warning(f"Invalid env var {env_key}={val}, using default") | |
| return cfg | |
| def validate(self) -> List[str]: | |
| warnings = [] | |
| if self.pool_size < 1: | |
| self.pool_size = 1 | |
| warnings.append("pool_size adjusted to minimum 1") | |
| if self.max_retries < 0: | |
| self.max_retries = 0 | |
| warnings.append("max_retries adjusted to 0") | |
| if self.rate_limit_rpm < 1: | |
| self.rate_limit_rpm = 1 | |
| warnings.append("rate_limit_rpm adjusted to minimum 1") | |
| if self.timeout_stream < 10: | |
| self.timeout_stream = 10 | |
| warnings.append("timeout_stream adjusted to minimum 10s") | |
| return warnings | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # EXCEPTIONS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ChatGPTGratuitError(Exception): | |
| def __init__(self, message: str, code: ErrorCode = ErrorCode.UNKNOWN, | |
| details: Optional[Dict] = None): | |
| super().__init__(message) | |
| self.code = code | |
| self.details = details or {} | |
| def to_dict(self) -> Dict[str, Any]: | |
| return {"error": str(self), "code": self.code.value, "details": self.details} | |
| class InitError(ChatGPTGratuitError): | |
| def __init__(self, msg="Session initialization failed", **kw): | |
| super().__init__(msg, ErrorCode.INIT_FAILED, **kw) | |
| class NonceExpiredError(ChatGPTGratuitError): | |
| def __init__(self, msg="Nonce expired", **kw): | |
| super().__init__(msg, ErrorCode.NONCE_EXPIRED, **kw) | |
| class CacheError(ChatGPTGratuitError): | |
| def __init__(self, msg="Cache operation failed", **kw): | |
| super().__init__(msg, ErrorCode.CACHE_FAILED, **kw) | |
| class StreamError(ChatGPTGratuitError): | |
| def __init__(self, msg="Stream error", **kw): | |
| super().__init__(msg, ErrorCode.STREAM_FAILED, **kw) | |
| class EmptyResponseError(ChatGPTGratuitError): | |
| def __init__(self, msg="Empty response from stream", **kw): | |
| super().__init__(msg, ErrorCode.STREAM_EMPTY, **kw) | |
| class RateLimitError(ChatGPTGratuitError): | |
| def __init__(self, msg="Rate limit exceeded", **kw): | |
| super().__init__(msg, ErrorCode.RATE_LIMITED, **kw) | |
| class CircuitOpenError(ChatGPTGratuitError): | |
| def __init__(self, msg="Circuit breaker is open", **kw): | |
| super().__init__(msg, ErrorCode.CIRCUIT_OPEN, **kw) | |
| class InputValidationError(ChatGPTGratuitError): | |
| def __init__(self, msg="Invalid input", **kw): | |
| super().__init__(msg, ErrorCode.INVALID_INPUT, **kw) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DATA MODELS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Message: | |
| role: str | |
| content: str | |
| timestamp: float = field(default_factory=time.time) | |
| message_id: str = field(default_factory=lambda: str(uuid.uuid4())) | |
| tokens_estimate: int = 0 | |
| def __post_init__(self): | |
| self.tokens_estimate = max(1, len(self.content) // 4) | |
| class Conversation: | |
| conversation_id: str = field(default_factory=lambda: str(uuid.uuid4())) | |
| messages: List[Message] = field(default_factory=list) | |
| created_at: float = field(default_factory=time.time) | |
| updated_at: float = field(default_factory=time.time) | |
| title: Optional[str] = None | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| def add_message(self, role: str, content: str, max_messages: int = 50) -> Message: | |
| msg = Message(role=role, content=content) | |
| self.messages.append(msg) | |
| self.updated_at = time.time() | |
| if self.title is None and role == "user": | |
| self.title = content[:80] + ("..." if len(content) > 80 else "") | |
| if len(self.messages) > max_messages: | |
| system_msgs = [m for m in self.messages if m.role == "system"] | |
| other_msgs = [m for m in self.messages if m.role != "system"] | |
| keep = max_messages - len(system_msgs) | |
| self.messages = system_msgs + other_msgs[-keep:] | |
| return msg | |
| def total_tokens(self) -> int: | |
| return sum(m.tokens_estimate for m in self.messages) | |
| def to_dict(self) -> Dict: | |
| return { | |
| "conversation_id": self.conversation_id, | |
| "title": self.title, | |
| "message_count": len(self.messages), | |
| "total_tokens": self.total_tokens, | |
| "created_at": self.created_at, | |
| "updated_at": self.updated_at, | |
| } | |
| class SessionInfo: | |
| nonce: Optional[str] = None | |
| bot_id: Optional[str] = None | |
| post_id: Optional[str] = None | |
| created_at: float = field(default_factory=time.time) | |
| last_used: float = field(default_factory=time.time) | |
| request_count: int = 0 | |
| error_count: int = 0 | |
| def is_valid(self) -> bool: | |
| return bool(self.nonce and self.bot_id) | |
| def age_seconds(self) -> float: | |
| return time.time() - self.created_at | |
| def mark_used(self): | |
| self.last_used = time.time() | |
| self.request_count += 1 | |
| def mark_error(self): | |
| self.error_count += 1 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # METRICS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Metrics: | |
| _lock: threading.Lock = field(default_factory=threading.Lock, repr=False) | |
| total_requests: int = 0 | |
| successful_requests: int = 0 | |
| failed_requests: int = 0 | |
| total_retries: int = 0 | |
| nonce_refreshes: int = 0 | |
| total_chars_received: int = 0 | |
| total_response_time_ms: float = 0.0 | |
| active_streams: int = 0 | |
| circuit_breaker_trips: int = 0 | |
| _latencies: Deque[float] = field(default_factory=lambda: deque(maxlen=1000), repr=False) | |
| started_at: float = field(default_factory=time.time) | |
| def record_request(self, success: bool, duration_ms: float, chars: int = 0): | |
| with self._lock: | |
| self.total_requests += 1 | |
| if success: | |
| self.successful_requests += 1 | |
| self.total_chars_received += chars | |
| else: | |
| self.failed_requests += 1 | |
| self.total_response_time_ms += duration_ms | |
| self._latencies.append(duration_ms) | |
| def record_retry(self): | |
| with self._lock: | |
| self.total_retries += 1 | |
| def record_nonce_refresh(self): | |
| with self._lock: | |
| self.nonce_refreshes += 1 | |
| def record_circuit_trip(self): | |
| with self._lock: | |
| self.circuit_breaker_trips += 1 | |
| def avg_latency_ms(self) -> float: | |
| with self._lock: | |
| return sum(self._latencies) / len(self._latencies) if self._latencies else 0.0 | |
| def p95_latency_ms(self) -> float: | |
| with self._lock: | |
| if not self._latencies: | |
| return 0.0 | |
| s = sorted(self._latencies) | |
| return s[min(int(len(s) * 0.95), len(s) - 1)] | |
| def success_rate(self) -> float: | |
| with self._lock: | |
| return self.successful_requests / self.total_requests if self.total_requests else 1.0 | |
| def to_dict(self) -> Dict[str, Any]: | |
| with self._lock: | |
| return { | |
| "total_requests": self.total_requests, | |
| "successful_requests": self.successful_requests, | |
| "failed_requests": self.failed_requests, | |
| "success_rate": round(self.success_rate, 4), | |
| "total_retries": self.total_retries, | |
| "nonce_refreshes": self.nonce_refreshes, | |
| "total_chars_received": self.total_chars_received, | |
| "avg_latency_ms": round(self.avg_latency_ms, 1), | |
| "p95_latency_ms": round(self.p95_latency_ms, 1), | |
| "active_streams": self.active_streams, | |
| "circuit_breaker_trips": self.circuit_breaker_trips, | |
| "uptime_seconds": round(time.time() - self.started_at, 1), | |
| } | |
| metrics = Metrics() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # RATE LIMITER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class RateLimiter: | |
| def __init__(self, rpm: int = 10, burst: int = 3): | |
| self.rate = rpm / 60.0 | |
| self.max_tokens = float(burst) | |
| self.tokens = float(burst) | |
| self.last_refill = time.monotonic() | |
| self._lock = threading.Lock() | |
| def _refill(self): | |
| now = time.monotonic() | |
| self.tokens = min(self.max_tokens, self.tokens + (now - self.last_refill) * self.rate) | |
| self.last_refill = now | |
| def acquire(self, timeout: float = 30.0) -> bool: | |
| deadline = time.monotonic() + timeout | |
| while True: | |
| with self._lock: | |
| self._refill() | |
| if self.tokens >= 1.0: | |
| self.tokens -= 1.0 | |
| return True | |
| if time.monotonic() >= deadline: | |
| return False | |
| time.sleep(min(1.0 / max(self.rate, 0.01), 0.5)) | |
| def available_tokens(self) -> float: | |
| with self._lock: | |
| self._refill() | |
| return self.tokens | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CIRCUIT BREAKER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class CircuitBreaker: | |
| def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60, | |
| half_open_max: int = 2): | |
| self.failure_threshold = failure_threshold | |
| self.recovery_timeout = recovery_timeout | |
| self.half_open_max = half_open_max | |
| self.state = CircuitState.CLOSED | |
| self.failure_count = 0 | |
| self.success_count = 0 | |
| self.last_failure_time = 0.0 | |
| self.half_open_attempts = 0 | |
| self._lock = threading.Lock() | |
| def can_execute(self) -> bool: | |
| with self._lock: | |
| if self.state == CircuitState.CLOSED: | |
| return True | |
| if self.state == CircuitState.OPEN: | |
| if time.time() - self.last_failure_time >= self.recovery_timeout: | |
| self.state = CircuitState.HALF_OPEN | |
| self.half_open_attempts = 0 | |
| log.info("Circuit breaker β HALF_OPEN") | |
| return True | |
| return False | |
| return self.half_open_attempts < self.half_open_max | |
| def record_success(self): | |
| with self._lock: | |
| if self.state == CircuitState.HALF_OPEN: | |
| self.success_count += 1 | |
| if self.success_count >= self.half_open_max: | |
| self.state = CircuitState.CLOSED | |
| self.failure_count = 0 | |
| self.success_count = 0 | |
| log.info("Circuit breaker β CLOSED (recovered)") | |
| else: | |
| self.failure_count = max(0, self.failure_count - 1) | |
| def record_failure(self): | |
| with self._lock: | |
| self.failure_count += 1 | |
| self.last_failure_time = time.time() | |
| if self.state == CircuitState.HALF_OPEN: | |
| self.state = CircuitState.OPEN | |
| metrics.record_circuit_trip() | |
| log.warning("Circuit breaker β OPEN (recovery failed)") | |
| elif self.failure_count >= self.failure_threshold: | |
| self.state = CircuitState.OPEN | |
| metrics.record_circuit_trip() | |
| log.warning(f"Circuit breaker β OPEN (failures={self.failure_count})") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PLUGIN SYSTEM | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ResponsePlugin(ABC): | |
| def name(self) -> str: ... | |
| def process(self, response: str, conversation: Conversation) -> str: ... | |
| class StripWhitespacePlugin(ResponsePlugin): | |
| def name(self) -> str: | |
| return "strip_whitespace" | |
| def process(self, response: str, conversation: Conversation) -> str: | |
| return re.sub(r'\n{3,}', '\n\n', response).strip() | |
| class PluginManager: | |
| def __init__(self): | |
| self._plugins: List[ResponsePlugin] = [] | |
| def register(self, plugin: ResponsePlugin): | |
| self._plugins.append(plugin) | |
| def apply_all(self, response: str, conversation: Conversation) -> str: | |
| for plugin in self._plugins: | |
| try: | |
| response = plugin.process(response, conversation) | |
| except Exception as e: | |
| log.warning(f"Plugin {plugin.name} failed: {e}") | |
| return response | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # WEBSITE PARSER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class WebsiteParser: | |
| def parse(html: str) -> SessionInfo: | |
| info = SessionInfo() | |
| soup = BeautifulSoup(html, "html.parser") | |
| container = soup.find(class_="aipkit_chat_container") | |
| if container: | |
| config_str = container.get("data-config", "") | |
| if config_str: | |
| try: | |
| config = json.loads(config_str) | |
| info.nonce = config.get("nonce", "") | |
| info.bot_id = str(config.get("botId", "")) | |
| info.post_id = str(config.get("postId", "")) | |
| if info.is_valid: | |
| return info | |
| except json.JSONDecodeError: | |
| pass | |
| if not info.bot_id: | |
| textarea = soup.find("textarea", id=re.compile(r"aipkit_chat_input_field_\d+")) | |
| if textarea: | |
| match = re.search(r"_(\d+)$", textarea.get("id", "")) | |
| if match: | |
| info.bot_id = match.group(1) | |
| if not info.nonce: | |
| for pattern in [ | |
| r'"nonce"\s*:\s*"([a-f0-9]{8,})"', | |
| r"'nonce'\s*:\s*'([a-f0-9]{8,})'", | |
| r'nonce["\s:=]+["\']([a-f0-9]{8,})', | |
| ]: | |
| match = re.search(pattern, html) | |
| if match: | |
| info.nonce = match.group(1) | |
| break | |
| if not info.bot_id: | |
| for pattern in [r'"botId"\s*:\s*"?(\d+)', r'"bot_id"\s*:\s*"?(\d+)']: | |
| match = re.search(pattern, html) | |
| if match: | |
| info.bot_id = match.group(1) | |
| break | |
| if not info.post_id: | |
| match = re.search(r'"postId"\s*:\s*"?(\d+)', html) | |
| info.post_id = match.group(1) if match else "7" | |
| return info | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SSE PARSER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class SSEParser: | |
| def iter_events(response: requests.Response, | |
| log_raw: bool = False) -> Generator[Tuple[str, str], None, None]: | |
| current_event = "message" | |
| data_buffer: List[str] = [] | |
| for line in response.iter_lines(decode_unicode=True): | |
| if log_raw: | |
| log.debug(f"SSE RAW: {repr(line)}") | |
| if line is None: | |
| continue | |
| if not line: | |
| if data_buffer: | |
| yield current_event, "\n".join(data_buffer) | |
| data_buffer.clear() | |
| current_event = "message" | |
| continue | |
| if line.startswith("event:"): | |
| current_event = line[6:].strip() | |
| elif line.startswith("data:"): | |
| data_str = line[5:] | |
| if data_str.startswith(" "): | |
| data_str = data_str[1:] | |
| data_buffer.append(data_str) | |
| elif line.startswith("id:") or line.startswith(":"): | |
| continue | |
| if data_buffer: | |
| yield current_event, "\n".join(data_buffer) | |
| def extract_text(data_str: str) -> str: | |
| try: | |
| data = json.loads(data_str) | |
| if isinstance(data, dict): | |
| if "delta" in data and isinstance(data["delta"], str): | |
| return data["delta"] | |
| if "choices" in data and data["choices"]: | |
| choice = data["choices"][0] | |
| if "delta" in choice: | |
| return choice["delta"].get("content", "") | |
| if "message" in choice: | |
| return choice["message"].get("content", "") | |
| for key in ("content", "text", "message", "data", "chunk", "response"): | |
| if key in data and isinstance(data[key], str): | |
| return data[key] | |
| if isinstance(data, str): | |
| return data | |
| return "" | |
| except (json.JSONDecodeError, TypeError): | |
| return data_str | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CORE CLIENT | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ChatGPTGratuitClient: | |
| def __init__(self, config: Optional[Config] = None): | |
| self.config = config or Config.from_env() | |
| self._session = requests.Session() | |
| self._session_info = SessionInfo() | |
| self._state = SessionState.UNINITIALIZED | |
| self._guest_uuid = str(uuid.uuid4()) | |
| self._lock = threading.Lock() | |
| self.rate_limiter = RateLimiter( | |
| rpm=self.config.rate_limit_rpm, burst=self.config.rate_limit_burst, | |
| ) | |
| self.circuit_breaker = CircuitBreaker( | |
| failure_threshold=self.config.cb_failure_threshold, | |
| recovery_timeout=self.config.cb_recovery_timeout, | |
| half_open_max=self.config.cb_half_open_max, | |
| ) | |
| self.plugin_manager = PluginManager() | |
| self.plugin_manager.register(StripWhitespacePlugin()) | |
| self._conversations: Dict[str, Conversation] = {} | |
| self._active_conversation_id: Optional[str] = None | |
| self._rotate_identity() | |
| def state(self) -> SessionState: | |
| return self._state | |
| def is_ready(self) -> bool: | |
| return self._state in (SessionState.READY, SessionState.DEGRADED) | |
| def active_conversation(self) -> Conversation: | |
| if self._active_conversation_id not in self._conversations: | |
| conv = Conversation() | |
| self._conversations[conv.conversation_id] = conv | |
| self._active_conversation_id = conv.conversation_id | |
| return self._conversations[self._active_conversation_id] | |
| def session_info(self) -> SessionInfo: | |
| return self._session_info | |
| def _rotate_identity(self): | |
| ua = random.choice(USER_AGENTS) | |
| lang = random.choice(ACCEPT_LANGUAGES) | |
| self._session.headers.update({ | |
| "User-Agent": ua, | |
| "Accept-Language": lang, | |
| "Referer": self.config.base_url + "/", | |
| "Origin": self.config.base_url, | |
| "DNT": "1", | |
| "Sec-Fetch-Dest": "empty", | |
| "Sec-Fetch-Mode": "cors", | |
| "Sec-Fetch-Site": "same-origin", | |
| }) | |
| def init_session(self, force: bool = False) -> bool: | |
| with self._lock: | |
| if self.is_ready and not force: | |
| if self._session_info.age_seconds < self.config.session_ttl: | |
| return True | |
| log.info("Initializing session...", extra={"action": "init_session"}) | |
| self._rotate_identity() | |
| try: | |
| resp = self._session.get(self.config.base_url, timeout=self.config.timeout_init) | |
| resp.raise_for_status() | |
| except requests.RequestException as e: | |
| self._state = SessionState.FAILED | |
| log.error(f"Failed to load page: {e}") | |
| return False | |
| self._session_info = WebsiteParser.parse(resp.text) | |
| if self._session_info.is_valid: | |
| self._state = SessionState.READY | |
| log.info( | |
| f"Session ready: bot_id={self._session_info.bot_id}, " | |
| f"nonce={self._session_info.nonce[:8]}...", | |
| extra={"action": "init_session", "bot_id": self._session_info.bot_id}, | |
| ) | |
| else: | |
| self._state = SessionState.DEGRADED | |
| log.warning( | |
| f"Partial init: nonce={'β' if self._session_info.nonce else 'β'}, " | |
| f"bot_id={'β' if self._session_info.bot_id else 'β'}" | |
| ) | |
| return self._session_info.is_valid | |
| def refresh_nonce(self) -> bool: | |
| log.info("Refreshing nonce...") | |
| metrics.record_nonce_refresh() | |
| return self.init_session(force=True) | |
| def _ensure_session(self): | |
| if not self.is_ready: | |
| if not self.init_session(): | |
| raise InitError() | |
| if self._session_info.age_seconds > self.config.session_ttl: | |
| self.refresh_nonce() | |
| def new_conversation(self, system_prompt: Optional[str] = None) -> Conversation: | |
| conv = Conversation() | |
| if system_prompt: | |
| conv.add_message("system", system_prompt, self.config.max_history_messages) | |
| self._conversations[conv.conversation_id] = conv | |
| self._active_conversation_id = conv.conversation_id | |
| return conv | |
| def get_conversation(self, conversation_id: str) -> Optional[Conversation]: | |
| return self._conversations.get(conversation_id) | |
| def list_conversations(self) -> List[Dict]: | |
| return [c.to_dict() for c in self._conversations.values()] | |
| def _generate_client_msg_id(self) -> str: | |
| ts = int(time.time() * 1000) | |
| rand = ''.join(random.choices(string.ascii_lowercase + string.digits, k=5)) | |
| return f"aipkit-client-msg-{self._session_info.bot_id}-{ts}-{rand}" | |
| def _calculate_backoff(self, attempt: int) -> float: | |
| return self.config.retry_backoff_base ** attempt + random.uniform(0, self.config.retry_jitter) | |
| def send_message( | |
| self, message: str, *, stream: bool = False, | |
| conversation_id: Optional[str] = None, | |
| system_prompt: Optional[str] = None, | |
| ) -> Union[str, Generator[str, None, None]]: | |
| message = message.strip() | |
| if not message: | |
| raise InputValidationError("Message cannot be empty") | |
| if len(message) > self.config.max_message_length: | |
| raise InputValidationError(f"Message too long ({len(message)} chars)") | |
| if not self.circuit_breaker.can_execute(): | |
| raise CircuitOpenError() | |
| if not self.rate_limiter.acquire(timeout=10.0): | |
| raise RateLimitError() | |
| self._ensure_session() | |
| if conversation_id and conversation_id in self._conversations: | |
| conv = self._conversations[conversation_id] | |
| else: | |
| conv = self.active_conversation | |
| if system_prompt and not any(m.role == "system" for m in conv.messages): | |
| conv.add_message("system", system_prompt, self.config.max_history_messages) | |
| conv.add_message("user", message, self.config.max_history_messages) | |
| last_error: Optional[Exception] = None | |
| start_time = time.monotonic() | |
| for attempt in range(self.config.max_retries + 1): | |
| try: | |
| if attempt > 0: | |
| delay = self._calculate_backoff(attempt) | |
| log.info(f"Retry {attempt}/{self.config.max_retries} after {delay:.1f}s") | |
| metrics.record_retry() | |
| time.sleep(delay) | |
| if stream: | |
| gen = self._execute_stream(message, conv) | |
| return self._wrap_stream_generator(gen, conv, start_time) | |
| else: | |
| result = self._execute_blocking(message, conv) | |
| duration = (time.monotonic() - start_time) * 1000 | |
| result = self.plugin_manager.apply_all(result, conv) | |
| conv.add_message("assistant", result, self.config.max_history_messages) | |
| metrics.record_request(True, duration, len(result)) | |
| self.circuit_breaker.record_success() | |
| self._session_info.mark_used() | |
| log.info( | |
| f"Response received ({len(result)} chars)", | |
| extra={"action": "send_message", "duration_ms": round(duration)}, | |
| ) | |
| return result | |
| except NonceExpiredError: | |
| log.warning("Nonce expired, refreshing...") | |
| self.refresh_nonce() | |
| last_error = NonceExpiredError() | |
| except (StreamError, EmptyResponseError, CacheError) as e: | |
| last_error = e | |
| self.circuit_breaker.record_failure() | |
| self._session_info.mark_error() | |
| log.warning(f"Attempt {attempt + 1} failed: {e}") | |
| if attempt < self.config.max_retries: | |
| self.refresh_nonce() | |
| except requests.Timeout: | |
| last_error = ChatGPTGratuitError("Request timeout", ErrorCode.TIMEOUT) | |
| self.circuit_breaker.record_failure() | |
| except requests.RequestException as e: | |
| last_error = ChatGPTGratuitError(str(e), ErrorCode.UNKNOWN) | |
| self.circuit_breaker.record_failure() | |
| duration = (time.monotonic() - start_time) * 1000 | |
| metrics.record_request(False, duration) | |
| if isinstance(last_error, ChatGPTGratuitError): | |
| raise last_error | |
| raise ChatGPTGratuitError(f"All attempts failed: {last_error}", ErrorCode.UNKNOWN) | |
| def _wrap_stream_generator(self, gen, conv, start_time): | |
| full_text = "" | |
| try: | |
| for chunk in gen: | |
| full_text += chunk | |
| yield chunk | |
| full_text = self.plugin_manager.apply_all(full_text, conv) | |
| conv.add_message("assistant", full_text, self.config.max_history_messages) | |
| duration = (time.monotonic() - start_time) * 1000 | |
| metrics.record_request(True, duration, len(full_text)) | |
| self.circuit_breaker.record_success() | |
| self._session_info.mark_used() | |
| except Exception: | |
| duration = (time.monotonic() - start_time) * 1000 | |
| metrics.record_request(False, duration) | |
| self.circuit_breaker.record_failure() | |
| raise | |
| def _cache_message(self, message: str) -> str: | |
| info = self._session_info | |
| payload = { | |
| "action": "aipkit_cache_sse_message", | |
| "message": message, | |
| "_ajax_nonce": info.nonce, | |
| "bot_id": info.bot_id, | |
| "user_client_message_id": self._generate_client_msg_id(), | |
| } | |
| ajax_url = f"{self.config.base_url}/wp-admin/admin-ajax.php" | |
| resp = self._session.post( | |
| ajax_url, data=payload, | |
| headers={"X-Requested-With": "XMLHttpRequest", "Accept": "*/*"}, | |
| timeout=self.config.timeout_cache, | |
| ) | |
| if resp.status_code == 403 or (resp.status_code == 400 and "nonce" in resp.text.lower()): | |
| raise NonceExpiredError() | |
| try: | |
| data = resp.json() | |
| except json.JSONDecodeError: | |
| raise CacheError(f"Invalid JSON (HTTP {resp.status_code}): {resp.text[:200]}") | |
| if not data.get("success"): | |
| err_data = data.get("data", {}) | |
| err_msg = err_data.get("message", str(err_data)) if isinstance(err_data, dict) else str(err_data) | |
| if "nonce" in err_msg.lower(): | |
| raise NonceExpiredError() | |
| raise CacheError(f"Cache rejected: {err_msg}") | |
| return data["data"]["cache_key"] | |
| def _open_stream(self, cache_key: str, conv: Conversation) -> requests.Response: | |
| info = self._session_info | |
| ajax_url = f"{self.config.base_url}/wp-admin/admin-ajax.php" | |
| params = { | |
| "action": "aipkit_frontend_chat_stream", | |
| "cache_key": cache_key, | |
| "bot_id": info.bot_id, | |
| "session_id": self._guest_uuid, | |
| "conversation_uuid": conv.conversation_id, | |
| "post_id": info.post_id, | |
| "_ts": str(int(time.time() * 1000)), | |
| "_ajax_nonce": info.nonce, | |
| } | |
| resp = self._session.get( | |
| f"{ajax_url}?{urlencode(params)}", | |
| headers={"Accept": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive"}, | |
| timeout=self.config.timeout_stream, | |
| stream=True, | |
| ) | |
| if resp.status_code == 403: | |
| raise NonceExpiredError() | |
| if resp.status_code != 200: | |
| raise StreamError(f"Stream HTTP {resp.status_code}") | |
| return resp | |
| def _execute_blocking(self, message: str, conv: Conversation) -> str: | |
| cache_key = self._cache_message(message) | |
| resp = self._open_stream(cache_key, conv) | |
| full_text = "" | |
| got_data = False | |
| for event_type, data_str in SSEParser.iter_events(resp, log_raw=self.config.log_sse_raw): | |
| got_data = True | |
| if data_str.strip() == "[DONE]": | |
| break | |
| if event_type == "error": | |
| raise StreamError(f"SSE error: {data_str}") | |
| if event_type in CONTROL_EVENTS: | |
| continue | |
| chunk = SSEParser.extract_text(data_str) | |
| if chunk: | |
| full_text += chunk | |
| if not got_data: | |
| raise EmptyResponseError() | |
| if not full_text.strip(): | |
| raise EmptyResponseError("Stream completed but empty") | |
| return full_text | |
| def _execute_stream(self, message: str, conv: Conversation) -> Generator[str, None, None]: | |
| cache_key = self._cache_message(message) | |
| resp = self._open_stream(cache_key, conv) | |
| metrics.active_streams += 1 | |
| try: | |
| for event_type, data_str in SSEParser.iter_events(resp, log_raw=self.config.log_sse_raw): | |
| if data_str.strip() == "[DONE]": | |
| break | |
| if event_type == "error": | |
| raise StreamError(f"SSE error: {data_str}") | |
| if event_type in CONTROL_EVENTS: | |
| continue | |
| chunk = SSEParser.extract_text(data_str) | |
| if chunk: | |
| yield chunk | |
| finally: | |
| metrics.active_streams = max(0, metrics.active_streams - 1) | |
| def get_status(self) -> Dict[str, Any]: | |
| return { | |
| "version": VERSION, | |
| "state": self._state.name, | |
| "session": { | |
| "bot_id": self._session_info.bot_id, | |
| "has_nonce": bool(self._session_info.nonce), | |
| "post_id": self._session_info.post_id, | |
| "age_seconds": round(self._session_info.age_seconds, 1), | |
| "request_count": self._session_info.request_count, | |
| "error_count": self._session_info.error_count, | |
| }, | |
| "circuit_breaker": self.circuit_breaker.state.value, | |
| "rate_limiter_tokens": round(self.rate_limiter.available_tokens, 2), | |
| "metrics": metrics.to_dict(), | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SESSION POOL | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class SessionPool: | |
| def __init__(self, config: Config): | |
| self.config = config | |
| self._clients: List[ChatGPTGratuitClient] = [] | |
| self._index = 0 | |
| self._lock = threading.Lock() | |
| for i in range(config.pool_size): | |
| self._clients.append(ChatGPTGratuitClient(config)) | |
| def initialize_all(self) -> int: | |
| success = 0 | |
| for i, client in enumerate(self._clients): | |
| try: | |
| if client.init_session(): | |
| success += 1 | |
| log.info(f"Pool client {i+1} initialized") | |
| except Exception as e: | |
| log.error(f"Pool client {i+1} init error: {e}") | |
| return success | |
| def acquire(self) -> Generator[ChatGPTGratuitClient, None, None]: | |
| with self._lock: | |
| attempts = len(self._clients) | |
| for _ in range(attempts): | |
| client = self._clients[self._index % len(self._clients)] | |
| self._index += 1 | |
| if client.is_ready or client.state == SessionState.UNINITIALIZED: | |
| break | |
| else: | |
| client = self._clients[0] | |
| yield client | |
| def get_status(self) -> Dict: | |
| return { | |
| "pool_size": len(self._clients), | |
| "clients": [ | |
| {"index": i, "state": c.state.name, "requests": c.session_info.request_count} | |
| for i, c in enumerate(self._clients) | |
| ], | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FLASK APP FACTORY | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_app(config: Optional[Config] = None) -> Tuple: | |
| from flask import Flask, request as freq, jsonify, Response, stream_with_context | |
| config = config or Config.from_env() | |
| app = Flask(APP_NAME) | |
| pool = SessionPool(config) | |
| ready_count = pool.initialize_all() | |
| if ready_count == 0: | |
| log.warning("No pool clients initialized! Will retry on first request.") | |
| if config.enable_cors: | |
| def add_cors(response): | |
| response.headers["Access-Control-Allow-Origin"] = "*" | |
| response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization" | |
| response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" | |
| return response | |
| def handle_api_error(e: ChatGPTGratuitError): | |
| status_map = { | |
| ErrorCode.RATE_LIMITED: 429, | |
| ErrorCode.INVALID_INPUT: 400, | |
| ErrorCode.CIRCUIT_OPEN: 503, | |
| } | |
| return jsonify({"ok": False, **e.to_dict()}), status_map.get(e.code, 500) | |
| def not_found(e): | |
| return jsonify({ | |
| "ok": False, "error": "Endpoint not found", | |
| "endpoints": [ | |
| "POST /chat", "POST /chat/stream", "POST /v1/chat/completions", | |
| "POST /new", "POST /refresh", "GET /health", "GET /metrics", | |
| ], | |
| }), 404 | |
| # ββ Landing page ββ | |
| def index(): | |
| return jsonify({ | |
| "name": APP_NAME, | |
| "version": VERSION, | |
| "status": "running", | |
| "endpoints": { | |
| "chat": "POST /chat", | |
| "stream": "POST /chat/stream", | |
| "openai_compat": "POST /v1/chat/completions", | |
| "health": "GET /health", | |
| "metrics": "GET /metrics", | |
| "new_conversation": "POST /new", | |
| "refresh_session": "POST /refresh", | |
| "conversations": "GET /conversations", | |
| }, | |
| "usage_example": { | |
| "curl": 'curl -X POST /chat -H "Content-Type: application/json" -d \'{"message": "Hello!"}\'', | |
| }, | |
| }) | |
| # ββ POST /chat ββ | |
| def chat(): | |
| data = freq.get_json(force=True, silent=True) or {} | |
| message = data.get("message", "").strip() | |
| if not message: | |
| return jsonify({"ok": False, "error": "Field 'message' is required"}), 400 | |
| with pool.acquire() as client: | |
| if data.get("new_conversation"): | |
| client.new_conversation(data.get("system_prompt")) | |
| response_text = client.send_message(message, system_prompt=data.get("system_prompt")) | |
| return jsonify({ | |
| "ok": True, | |
| "response": response_text, | |
| "conversation_id": client.active_conversation.conversation_id, | |
| "usage": { | |
| "message_count": len(client.active_conversation.messages), | |
| "estimated_tokens": client.active_conversation.total_tokens, | |
| }, | |
| }) | |
| # ββ POST /chat/stream ββ | |
| def chat_stream(): | |
| data = freq.get_json(force=True, silent=True) or {} | |
| message = data.get("message", "").strip() | |
| if not message: | |
| return jsonify({"ok": False, "error": "Field 'message' is required"}), 400 | |
| with pool.acquire() as client: | |
| if data.get("new_conversation"): | |
| client.new_conversation(data.get("system_prompt")) | |
| def generate(): | |
| try: | |
| for chunk in client.send_message(message, stream=True): | |
| yield f"data: {json.dumps({'chunk': chunk}, ensure_ascii=False)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| except ChatGPTGratuitError as e: | |
| yield f"data: {json.dumps(e.to_dict())}\n\n" | |
| except Exception as e: | |
| yield f"data: {json.dumps({'error': str(e)})}\n\n" | |
| return Response( | |
| stream_with_context(generate()), | |
| content_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, | |
| ) | |
| # ββ POST /v1/chat/completions (OpenAI-compatible) ββ | |
| if config.enable_openai_compat: | |
| def openai_compat(): | |
| data = freq.get_json(force=True, silent=True) or {} | |
| messages = data.get("messages", []) | |
| do_stream = data.get("stream", False) | |
| if not messages: | |
| return jsonify({"error": {"message": "messages required"}}), 400 | |
| user_msg = None | |
| system_prompt = None | |
| for msg in messages: | |
| if msg.get("role") == "user": | |
| user_msg = msg.get("content", "") | |
| if msg.get("role") == "system": | |
| system_prompt = msg.get("content") | |
| if not user_msg: | |
| return jsonify({"error": {"message": "No user message found"}}), 400 | |
| response_id = f"chatcmpl-{uuid.uuid4().hex[:24]}" | |
| created = int(time.time()) | |
| with pool.acquire() as client: | |
| if do_stream: | |
| def generate(): | |
| try: | |
| for chunk in client.send_message( | |
| user_msg, stream=True, system_prompt=system_prompt | |
| ): | |
| yield f"data: {json.dumps({'id': response_id, 'object': 'chat.completion.chunk', 'created': created, 'model': 'chatgpt-gratuit', 'choices': [{'index': 0, 'delta': {'content': chunk}, 'finish_reason': None}]})}\n\n" | |
| yield f"data: {json.dumps({'id': response_id, 'object': 'chat.completion.chunk', 'created': created, 'model': 'chatgpt-gratuit', 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n" | |
| yield "data: [DONE]\n\n" | |
| except Exception as e: | |
| yield f"data: {json.dumps({'error': {'message': str(e)}})}\n\n" | |
| return Response( | |
| stream_with_context(generate()), | |
| content_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, | |
| ) | |
| response_text = client.send_message(user_msg, system_prompt=system_prompt) | |
| return jsonify({ | |
| "id": response_id, | |
| "object": "chat.completion", | |
| "created": created, | |
| "model": "chatgpt-gratuit", | |
| "choices": [{ | |
| "index": 0, | |
| "message": {"role": "assistant", "content": response_text}, | |
| "finish_reason": "stop", | |
| }], | |
| "usage": { | |
| "prompt_tokens": len(user_msg) // 4, | |
| "completion_tokens": len(response_text) // 4, | |
| "total_tokens": (len(user_msg) + len(response_text)) // 4, | |
| }, | |
| }) | |
| # ββ POST /new ββ | |
| def new_conv(): | |
| data = freq.get_json(force=True, silent=True) or {} | |
| with pool.acquire() as client: | |
| conv = client.new_conversation(data.get("system_prompt")) | |
| return jsonify({"ok": True, "conversation_id": conv.conversation_id}) | |
| # ββ POST /refresh ββ | |
| def refresh(): | |
| with pool.acquire() as client: | |
| result = client.refresh_nonce() | |
| return jsonify({ | |
| "ok": result, | |
| "nonce_prefix": client.session_info.nonce[:8] + "..." if client.session_info.nonce else None, | |
| }) | |
| # ββ GET /health ββ | |
| def health(): | |
| with pool.acquire() as client: | |
| status = client.get_status() | |
| status["pool"] = pool.get_status() | |
| return jsonify(status), 200 if client.is_ready else 503 | |
| # ββ GET /metrics ββ | |
| if config.enable_metrics: | |
| def metrics_endpoint(): | |
| return jsonify(metrics.to_dict()) | |
| # ββ GET /conversations ββ | |
| def conversations(): | |
| with pool.acquire() as client: | |
| return jsonify({ | |
| "ok": True, | |
| "conversations": client.list_conversations(), | |
| "active": client._active_conversation_id, | |
| }) | |
| # ββ GET /conversation/<id>/messages ββ | |
| def conversation_messages(conv_id: str): | |
| with pool.acquire() as client: | |
| conv = client.get_conversation(conv_id) | |
| if not conv: | |
| return jsonify({"ok": False, "error": "Conversation not found"}), 404 | |
| return jsonify({ | |
| "ok": True, | |
| "conversation": conv.to_dict(), | |
| "messages": [ | |
| {"role": m.role, "content": m.content, "timestamp": m.timestamp, "message_id": m.message_id} | |
| for m in conv.messages | |
| ], | |
| }) | |
| return app, pool | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # GUNICORN ENTRY POINT + DIRECT RUN | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| config = Config.from_env() | |
| config.validate() | |
| # Gunicorn looks for 'application' | |
| application, _pool = create_app(config) | |
| if __name__ == "__main__": | |
| print(f""" | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β π ChatGPT Gratuit API v{VERSION:<38s}β | |
| β Server: http://{config.host}:{config.port:<43d}β | |
| β β | |
| β POST /chat β Complete response β | |
| β POST /chat/stream β Streaming SSE β | |
| β POST /v1/chat/completions β OpenAI-compatible β | |
| β GET /health β Health check β | |
| β GET /metrics β Metrics β | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| """) | |
| application.run(host=config.host, port=config.port, debug=config.debug, threaded=True) |