| """Proxy pool manager - fetch proxy IPs dynamically from a URL""" |
|
|
| import asyncio |
| import aiohttp |
| import time |
| from typing import Optional, List |
| from app.core.logger import logger |
|
|
|
|
| class ProxyPool: |
| """Proxy pool manager""" |
| |
| def __init__(self): |
| self._pool_url: Optional[str] = None |
| self._static_proxy: Optional[str] = None |
| self._current_proxy: Optional[str] = None |
| self._last_fetch_time: float = 0 |
| self._fetch_interval: int = 300 |
| self._enabled: bool = False |
| self._lock = asyncio.Lock() |
| |
| def configure(self, proxy_url: str, proxy_pool_url: str = "", proxy_pool_interval: int = 300): |
| """Configure proxy pool |
| |
| Args: |
| proxy_url: Static proxy URL (socks5h://xxx or http://xxx) |
| proxy_pool_url: Proxy pool API URL returning a single proxy address |
| proxy_pool_interval: Proxy pool refresh interval (seconds) |
| """ |
| self._static_proxy = self._normalize_proxy(proxy_url) if proxy_url else None |
| pool_url = proxy_pool_url.strip() if proxy_pool_url else None |
| if pool_url and self._looks_like_proxy_url(pool_url): |
| normalized_proxy = self._normalize_proxy(pool_url) |
| if not self._static_proxy: |
| self._static_proxy = normalized_proxy |
| logger.warning("[ProxyPool] proxy_pool_url looks like a proxy address; using as static proxy. Use proxy_url instead.") |
| else: |
| logger.warning("[ProxyPool] proxy_pool_url looks like a proxy address; ignored (using proxy_url).") |
| pool_url = None |
| self._pool_url = pool_url |
| self._fetch_interval = proxy_pool_interval |
| self._enabled = bool(self._pool_url) |
| |
| if self._enabled: |
| logger.info(f"[ProxyPool] Proxy pool enabled: {self._pool_url}, refresh interval: {self._fetch_interval}s") |
| elif self._static_proxy: |
| logger.info(f"[ProxyPool] Using static proxy: {self._static_proxy}") |
| self._current_proxy = self._static_proxy |
| else: |
| logger.info("[ProxyPool] No proxy configured") |
| |
| async def get_proxy(self) -> Optional[str]: |
| """Get proxy address |
| |
| Returns: |
| Proxy URL or None |
| """ |
| |
| if not self._enabled: |
| return self._static_proxy |
| |
| |
| now = time.time() |
| if not self._current_proxy or (now - self._last_fetch_time) >= self._fetch_interval: |
| async with self._lock: |
| |
| if not self._current_proxy or (now - self._last_fetch_time) >= self._fetch_interval: |
| await self._fetch_proxy() |
| |
| return self._current_proxy |
| |
| async def force_refresh(self) -> Optional[str]: |
| """Force refresh proxy (for 403 retry) |
| |
| Returns: |
| New proxy URL or None |
| """ |
| if not self._enabled: |
| return self._static_proxy |
| |
| async with self._lock: |
| await self._fetch_proxy() |
| |
| return self._current_proxy |
| |
| async def _fetch_proxy(self): |
| """Fetch a new proxy from the proxy pool URL""" |
| try: |
| logger.debug(f"[ProxyPool] Fetching new proxy from pool: {self._pool_url}") |
| |
| timeout = aiohttp.ClientTimeout(total=10) |
| async with aiohttp.ClientSession(timeout=timeout) as session: |
| async with session.get(self._pool_url) as response: |
| if response.status == 200: |
| proxy_text = await response.text() |
| proxy = self._normalize_proxy(proxy_text.strip()) |
| |
| |
| if self._validate_proxy(proxy): |
| self._current_proxy = proxy |
| self._last_fetch_time = time.time() |
| logger.info(f"[ProxyPool] Successfully fetched new proxy: {proxy}") |
| else: |
| logger.error(f"[ProxyPool] Invalid proxy format: {proxy}") |
| |
| if not self._current_proxy: |
| self._current_proxy = self._static_proxy |
| else: |
| logger.error(f"[ProxyPool] Failed to fetch proxy: HTTP {response.status}") |
| |
| if not self._current_proxy: |
| self._current_proxy = self._static_proxy |
| |
| except asyncio.TimeoutError: |
| logger.error("[ProxyPool] Proxy fetch timed out") |
| if not self._current_proxy: |
| self._current_proxy = self._static_proxy |
| |
| except Exception as e: |
| logger.error(f"[ProxyPool] Proxy fetch error: {e}") |
| |
| if not self._current_proxy: |
| self._current_proxy = self._static_proxy |
| |
| def _validate_proxy(self, proxy: str) -> bool: |
| """Validate proxy format |
| |
| Args: |
| proxy: Proxy URL |
| |
| Returns: |
| True if valid |
| """ |
| if not proxy: |
| return False |
| |
| |
| valid_protocols = ['http://', 'https://', 'socks5://', 'socks5h://'] |
| |
| return any(proxy.startswith(proto) for proto in valid_protocols) |
|
|
| def _normalize_proxy(self, proxy: str) -> str: |
| """Normalize proxy URL (sock5/socks5 -> socks5h://)""" |
| if not proxy: |
| return proxy |
|
|
| proxy = proxy.strip() |
| if proxy.startswith("sock5h://"): |
| proxy = proxy.replace("sock5h://", "socks5h://", 1) |
| if proxy.startswith("sock5://"): |
| proxy = proxy.replace("sock5://", "socks5://", 1) |
| if proxy.startswith("socks5://"): |
| return proxy.replace("socks5://", "socks5h://", 1) |
| return proxy |
|
|
| def _looks_like_proxy_url(self, url: str) -> bool: |
| """Check if URL looks like a proxy address (avoid mistaking pool API for proxy)""" |
| return url.startswith(("sock5://", "sock5h://", "socks5://", "socks5h://")) |
| |
| def get_current_proxy(self) -> Optional[str]: |
| """Get current proxy (sync) |
| |
| Returns: |
| Current proxy URL or None |
| """ |
| return self._current_proxy or self._static_proxy |
|
|
|
|
| |
| proxy_pool = ProxyPool() |
|
|