"""Proxy pool manager - fetch proxy IPs dynamically from a URL""" import asyncio import aiohttp import time from typing import Optional, List from app.core.logger import logger class ProxyPool: """Proxy pool manager""" def __init__(self): self._pool_url: Optional[str] = None self._static_proxy: Optional[str] = None self._current_proxy: Optional[str] = None self._last_fetch_time: float = 0 self._fetch_interval: int = 300 # Refresh every 5 minutes self._enabled: bool = False self._lock = asyncio.Lock() def configure(self, proxy_url: str, proxy_pool_url: str = "", proxy_pool_interval: int = 300): """Configure proxy pool Args: proxy_url: Static proxy URL (socks5h://xxx or http://xxx) proxy_pool_url: Proxy pool API URL returning a single proxy address proxy_pool_interval: Proxy pool refresh interval (seconds) """ self._static_proxy = self._normalize_proxy(proxy_url) if proxy_url else None pool_url = proxy_pool_url.strip() if proxy_pool_url else None if pool_url and self._looks_like_proxy_url(pool_url): normalized_proxy = self._normalize_proxy(pool_url) if not self._static_proxy: self._static_proxy = normalized_proxy logger.warning("[ProxyPool] proxy_pool_url looks like a proxy address; using as static proxy. Use proxy_url instead.") else: logger.warning("[ProxyPool] proxy_pool_url looks like a proxy address; ignored (using proxy_url).") pool_url = None self._pool_url = pool_url self._fetch_interval = proxy_pool_interval self._enabled = bool(self._pool_url) if self._enabled: logger.info(f"[ProxyPool] Proxy pool enabled: {self._pool_url}, refresh interval: {self._fetch_interval}s") elif self._static_proxy: logger.info(f"[ProxyPool] Using static proxy: {self._static_proxy}") self._current_proxy = self._static_proxy else: logger.info("[ProxyPool] No proxy configured") async def get_proxy(self) -> Optional[str]: """Get proxy address Returns: Proxy URL or None """ # If proxy pool is disabled, return static proxy if not self._enabled: return self._static_proxy # Check if refresh needed now = time.time() if not self._current_proxy or (now - self._last_fetch_time) >= self._fetch_interval: async with self._lock: # Double-check if not self._current_proxy or (now - self._last_fetch_time) >= self._fetch_interval: await self._fetch_proxy() return self._current_proxy async def force_refresh(self) -> Optional[str]: """Force refresh proxy (for 403 retry) Returns: New proxy URL or None """ if not self._enabled: return self._static_proxy async with self._lock: await self._fetch_proxy() return self._current_proxy async def _fetch_proxy(self): """Fetch a new proxy from the proxy pool URL""" try: logger.debug(f"[ProxyPool] Fetching new proxy from pool: {self._pool_url}") timeout = aiohttp.ClientTimeout(total=10) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(self._pool_url) as response: if response.status == 200: proxy_text = await response.text() proxy = self._normalize_proxy(proxy_text.strip()) # Validate proxy format if self._validate_proxy(proxy): self._current_proxy = proxy self._last_fetch_time = time.time() logger.info(f"[ProxyPool] Successfully fetched new proxy: {proxy}") else: logger.error(f"[ProxyPool] Invalid proxy format: {proxy}") # Fallback to static proxy if not self._current_proxy: self._current_proxy = self._static_proxy else: logger.error(f"[ProxyPool] Failed to fetch proxy: HTTP {response.status}") # Fallback to static proxy if not self._current_proxy: self._current_proxy = self._static_proxy except asyncio.TimeoutError: logger.error("[ProxyPool] Proxy fetch timed out") if not self._current_proxy: self._current_proxy = self._static_proxy except Exception as e: logger.error(f"[ProxyPool] Proxy fetch error: {e}") # Fallback to static proxy if not self._current_proxy: self._current_proxy = self._static_proxy def _validate_proxy(self, proxy: str) -> bool: """Validate proxy format Args: proxy: Proxy URL Returns: True if valid """ if not proxy: return False # Supported protocols valid_protocols = ['http://', 'https://', 'socks5://', 'socks5h://'] return any(proxy.startswith(proto) for proto in valid_protocols) def _normalize_proxy(self, proxy: str) -> str: """Normalize proxy URL (sock5/socks5 -> socks5h://)""" if not proxy: return proxy proxy = proxy.strip() if proxy.startswith("sock5h://"): proxy = proxy.replace("sock5h://", "socks5h://", 1) if proxy.startswith("sock5://"): proxy = proxy.replace("sock5://", "socks5://", 1) if proxy.startswith("socks5://"): return proxy.replace("socks5://", "socks5h://", 1) return proxy def _looks_like_proxy_url(self, url: str) -> bool: """Check if URL looks like a proxy address (avoid mistaking pool API for proxy)""" return url.startswith(("sock5://", "sock5h://", "socks5://", "socks5h://")) def get_current_proxy(self) -> Optional[str]: """Get current proxy (sync) Returns: Current proxy URL or None """ return self._current_proxy or self._static_proxy # Global proxy pool instance proxy_pool = ProxyPool()