| | """ |
| | Unified Async API Client - Replace mixed sync/async HTTP calls |
| | Implements retry logic, error handling, and logging consistently |
| | """ |
| |
|
| | import aiohttp |
| | import asyncio |
| | import logging |
| | from typing import Optional, Dict, Any, List |
| | from datetime import datetime, timedelta |
| | import traceback |
| |
|
| | import config |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class AsyncAPIClient: |
| | """ |
| | Unified async HTTP client with retry logic and error handling |
| | Replaces mixed requests/aiohttp calls throughout the codebase |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | timeout: int = config.REQUEST_TIMEOUT, |
| | max_retries: int = config.MAX_RETRIES, |
| | retry_delay: float = 2.0 |
| | ): |
| | """ |
| | Initialize async API client |
| | |
| | Args: |
| | timeout: Request timeout in seconds |
| | max_retries: Maximum number of retry attempts |
| | retry_delay: Base delay between retries (exponential backoff) |
| | """ |
| | self.timeout = aiohttp.ClientTimeout(total=timeout) |
| | self.max_retries = max_retries |
| | self.retry_delay = retry_delay |
| | self._session: Optional[aiohttp.ClientSession] = None |
| |
|
| | async def __aenter__(self): |
| | """Async context manager entry""" |
| | self._session = aiohttp.ClientSession(timeout=self.timeout) |
| | return self |
| |
|
| | async def __aexit__(self, exc_type, exc_val, exc_tb): |
| | """Async context manager exit""" |
| | if self._session: |
| | await self._session.close() |
| |
|
| | async def get( |
| | self, |
| | url: str, |
| | params: Optional[Dict[str, Any]] = None, |
| | headers: Optional[Dict[str, str]] = None |
| | ) -> Optional[Dict[str, Any]]: |
| | """ |
| | Make async GET request with retry logic |
| | |
| | Args: |
| | url: Request URL |
| | params: Query parameters |
| | headers: HTTP headers |
| | |
| | Returns: |
| | JSON response as dictionary or None on failure |
| | """ |
| | if not self._session: |
| | raise RuntimeError("Client must be used as async context manager") |
| |
|
| | for attempt in range(self.max_retries): |
| | try: |
| | logger.debug(f"GET {url} (attempt {attempt + 1}/{self.max_retries})") |
| |
|
| | async with self._session.get(url, params=params, headers=headers) as response: |
| | response.raise_for_status() |
| | data = await response.json() |
| | logger.debug(f"GET {url} successful") |
| | return data |
| |
|
| | except aiohttp.ClientResponseError as e: |
| | logger.warning(f"HTTP {e.status} error on {url}: {e.message}") |
| | if e.status in (404, 400, 401, 403): |
| | |
| | return None |
| | |
| | if attempt < self.max_retries - 1: |
| | await asyncio.sleep(self.retry_delay * (2 ** attempt)) |
| | continue |
| | return None |
| |
|
| | except aiohttp.ClientConnectionError as e: |
| | logger.warning(f"Connection error on {url}: {e}") |
| | if attempt < self.max_retries - 1: |
| | await asyncio.sleep(self.retry_delay * (2 ** attempt)) |
| | continue |
| | return None |
| |
|
| | except asyncio.TimeoutError: |
| | logger.warning(f"Timeout on {url} (attempt {attempt + 1})") |
| | if attempt < self.max_retries - 1: |
| | await asyncio.sleep(self.retry_delay * (2 ** attempt)) |
| | continue |
| | return None |
| |
|
| | except Exception as e: |
| | logger.error(f"Unexpected error on {url}: {e}\n{traceback.format_exc()}") |
| | return None |
| |
|
| | return None |
| |
|
| | async def post( |
| | self, |
| | url: str, |
| | data: Optional[Dict[str, Any]] = None, |
| | json: Optional[Dict[str, Any]] = None, |
| | headers: Optional[Dict[str, str]] = None |
| | ) -> Optional[Dict[str, Any]]: |
| | """ |
| | Make async POST request with retry logic |
| | |
| | Args: |
| | url: Request URL |
| | data: Form data |
| | json: JSON payload |
| | headers: HTTP headers |
| | |
| | Returns: |
| | JSON response as dictionary or None on failure |
| | """ |
| | if not self._session: |
| | raise RuntimeError("Client must be used as async context manager") |
| |
|
| | for attempt in range(self.max_retries): |
| | try: |
| | logger.debug(f"POST {url} (attempt {attempt + 1}/{self.max_retries})") |
| |
|
| | async with self._session.post( |
| | url, data=data, json=json, headers=headers |
| | ) as response: |
| | response.raise_for_status() |
| | response_data = await response.json() |
| | logger.debug(f"POST {url} successful") |
| | return response_data |
| |
|
| | except aiohttp.ClientResponseError as e: |
| | logger.warning(f"HTTP {e.status} error on {url}: {e.message}") |
| | if e.status in (404, 400, 401, 403): |
| | return None |
| | if attempt < self.max_retries - 1: |
| | await asyncio.sleep(self.retry_delay * (2 ** attempt)) |
| | continue |
| | return None |
| |
|
| | except Exception as e: |
| | logger.error(f"Error on POST {url}: {e}") |
| | if attempt < self.max_retries - 1: |
| | await asyncio.sleep(self.retry_delay * (2 ** attempt)) |
| | continue |
| | return None |
| |
|
| | return None |
| |
|
| | async def gather_requests( |
| | self, |
| | urls: List[str], |
| | params_list: Optional[List[Optional[Dict[str, Any]]]] = None |
| | ) -> List[Optional[Dict[str, Any]]]: |
| | """ |
| | Make multiple async GET requests in parallel |
| | |
| | Args: |
| | urls: List of URLs to fetch |
| | params_list: Optional list of params for each URL |
| | |
| | Returns: |
| | List of responses (None for failed requests) |
| | """ |
| | if params_list is None: |
| | params_list = [None] * len(urls) |
| |
|
| | tasks = [ |
| | self.get(url, params=params) |
| | for url, params in zip(urls, params_list) |
| | ] |
| |
|
| | results = await asyncio.gather(*tasks, return_exceptions=True) |
| |
|
| | |
| | return [ |
| | result if not isinstance(result, Exception) else None |
| | for result in results |
| | ] |
| |
|
| |
|
| | |
| |
|
| |
|
| | async def safe_api_call( |
| | url: str, |
| | params: Optional[Dict[str, Any]] = None, |
| | headers: Optional[Dict[str, str]] = None, |
| | timeout: int = config.REQUEST_TIMEOUT |
| | ) -> Optional[Dict[str, Any]]: |
| | """ |
| | Convenience function for single async API call |
| | |
| | Args: |
| | url: Request URL |
| | params: Query parameters |
| | headers: HTTP headers |
| | timeout: Request timeout |
| | |
| | Returns: |
| | JSON response or None on failure |
| | """ |
| | async with AsyncAPIClient(timeout=timeout) as client: |
| | return await client.get(url, params=params, headers=headers) |
| |
|
| |
|
| | async def parallel_api_calls( |
| | urls: List[str], |
| | params_list: Optional[List[Optional[Dict[str, Any]]]] = None, |
| | timeout: int = config.REQUEST_TIMEOUT |
| | ) -> List[Optional[Dict[str, Any]]]: |
| | """ |
| | Convenience function for parallel async API calls |
| | |
| | Args: |
| | urls: List of URLs |
| | params_list: Optional params for each URL |
| | timeout: Request timeout |
| | |
| | Returns: |
| | List of responses (None for failures) |
| | """ |
| | async with AsyncAPIClient(timeout=timeout) as client: |
| | return await client.gather_requests(urls, params_list) |
| |
|