grok2api / app /core /proxy_pool.py
tejmar's picture
Initial commit
2c97e18
"""Proxy pool manager - fetch proxy IPs dynamically from a URL"""
import asyncio
import aiohttp
import time
from typing import Optional, List
from app.core.logger import logger
class ProxyPool:
"""Proxy pool manager"""
def __init__(self):
self._pool_url: Optional[str] = None
self._static_proxy: Optional[str] = None
self._current_proxy: Optional[str] = None
self._last_fetch_time: float = 0
self._fetch_interval: int = 300 # Refresh every 5 minutes
self._enabled: bool = False
self._lock = asyncio.Lock()
def configure(self, proxy_url: str, proxy_pool_url: str = "", proxy_pool_interval: int = 300):
"""Configure proxy pool
Args:
proxy_url: Static proxy URL (socks5h://xxx or http://xxx)
proxy_pool_url: Proxy pool API URL returning a single proxy address
proxy_pool_interval: Proxy pool refresh interval (seconds)
"""
self._static_proxy = self._normalize_proxy(proxy_url) if proxy_url else None
pool_url = proxy_pool_url.strip() if proxy_pool_url else None
if pool_url and self._looks_like_proxy_url(pool_url):
normalized_proxy = self._normalize_proxy(pool_url)
if not self._static_proxy:
self._static_proxy = normalized_proxy
logger.warning("[ProxyPool] proxy_pool_url looks like a proxy address; using as static proxy. Use proxy_url instead.")
else:
logger.warning("[ProxyPool] proxy_pool_url looks like a proxy address; ignored (using proxy_url).")
pool_url = None
self._pool_url = pool_url
self._fetch_interval = proxy_pool_interval
self._enabled = bool(self._pool_url)
if self._enabled:
logger.info(f"[ProxyPool] Proxy pool enabled: {self._pool_url}, refresh interval: {self._fetch_interval}s")
elif self._static_proxy:
logger.info(f"[ProxyPool] Using static proxy: {self._static_proxy}")
self._current_proxy = self._static_proxy
else:
logger.info("[ProxyPool] No proxy configured")
async def get_proxy(self) -> Optional[str]:
"""Get proxy address
Returns:
Proxy URL or None
"""
# If proxy pool is disabled, return static proxy
if not self._enabled:
return self._static_proxy
# Check if refresh needed
now = time.time()
if not self._current_proxy or (now - self._last_fetch_time) >= self._fetch_interval:
async with self._lock:
# Double-check
if not self._current_proxy or (now - self._last_fetch_time) >= self._fetch_interval:
await self._fetch_proxy()
return self._current_proxy
async def force_refresh(self) -> Optional[str]:
"""Force refresh proxy (for 403 retry)
Returns:
New proxy URL or None
"""
if not self._enabled:
return self._static_proxy
async with self._lock:
await self._fetch_proxy()
return self._current_proxy
async def _fetch_proxy(self):
"""Fetch a new proxy from the proxy pool URL"""
try:
logger.debug(f"[ProxyPool] Fetching new proxy from pool: {self._pool_url}")
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(self._pool_url) as response:
if response.status == 200:
proxy_text = await response.text()
proxy = self._normalize_proxy(proxy_text.strip())
# Validate proxy format
if self._validate_proxy(proxy):
self._current_proxy = proxy
self._last_fetch_time = time.time()
logger.info(f"[ProxyPool] Successfully fetched new proxy: {proxy}")
else:
logger.error(f"[ProxyPool] Invalid proxy format: {proxy}")
# Fallback to static proxy
if not self._current_proxy:
self._current_proxy = self._static_proxy
else:
logger.error(f"[ProxyPool] Failed to fetch proxy: HTTP {response.status}")
# Fallback to static proxy
if not self._current_proxy:
self._current_proxy = self._static_proxy
except asyncio.TimeoutError:
logger.error("[ProxyPool] Proxy fetch timed out")
if not self._current_proxy:
self._current_proxy = self._static_proxy
except Exception as e:
logger.error(f"[ProxyPool] Proxy fetch error: {e}")
# Fallback to static proxy
if not self._current_proxy:
self._current_proxy = self._static_proxy
def _validate_proxy(self, proxy: str) -> bool:
"""Validate proxy format
Args:
proxy: Proxy URL
Returns:
True if valid
"""
if not proxy:
return False
# Supported protocols
valid_protocols = ['http://', 'https://', 'socks5://', 'socks5h://']
return any(proxy.startswith(proto) for proto in valid_protocols)
def _normalize_proxy(self, proxy: str) -> str:
"""Normalize proxy URL (sock5/socks5 -> socks5h://)"""
if not proxy:
return proxy
proxy = proxy.strip()
if proxy.startswith("sock5h://"):
proxy = proxy.replace("sock5h://", "socks5h://", 1)
if proxy.startswith("sock5://"):
proxy = proxy.replace("sock5://", "socks5://", 1)
if proxy.startswith("socks5://"):
return proxy.replace("socks5://", "socks5h://", 1)
return proxy
def _looks_like_proxy_url(self, url: str) -> bool:
"""Check if URL looks like a proxy address (avoid mistaking pool API for proxy)"""
return url.startswith(("sock5://", "sock5h://", "socks5://", "socks5h://"))
def get_current_proxy(self) -> Optional[str]:
"""Get current proxy (sync)
Returns:
Current proxy URL or None
"""
return self._current_proxy or self._static_proxy
# Global proxy pool instance
proxy_pool = ProxyPool()