| | """ |
| | HTTP API Client with Retry Logic and Timeout Handling |
| | Provides robust HTTP client for API requests |
| | """ |
| |
|
| | import aiohttp |
| | import asyncio |
| | from typing import Dict, Optional, Tuple, Any |
| | from datetime import datetime |
| | import time |
| | from utils.logger import setup_logger |
| |
|
| | logger = setup_logger("api_client") |
| |
|
| |
|
| | class APIClientError(Exception): |
| | """Base exception for API client errors""" |
| | pass |
| |
|
| |
|
| | class TimeoutError(APIClientError): |
| | """Timeout exception""" |
| | pass |
| |
|
| |
|
| | class RateLimitError(APIClientError): |
| | """Rate limit exception""" |
| | def __init__(self, message: str, retry_after: Optional[int] = None): |
| | super().__init__(message) |
| | self.retry_after = retry_after |
| |
|
| |
|
| | class AuthenticationError(APIClientError): |
| | """Authentication exception""" |
| | pass |
| |
|
| |
|
| | class ServerError(APIClientError): |
| | """Server error exception""" |
| | pass |
| |
|
| |
|
| | class APIClient: |
| | """ |
| | HTTP client with retry logic, timeout handling, and connection pooling |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | default_timeout: int = 10, |
| | max_connections: int = 100, |
| | retry_attempts: int = 3, |
| | retry_delay: float = 1.0 |
| | ): |
| | """ |
| | Initialize API client |
| | |
| | Args: |
| | default_timeout: Default timeout in seconds |
| | max_connections: Maximum concurrent connections |
| | retry_attempts: Maximum number of retry attempts |
| | retry_delay: Initial retry delay in seconds (exponential backoff) |
| | """ |
| | self.default_timeout = default_timeout |
| | self.max_connections = max_connections |
| | self.retry_attempts = retry_attempts |
| | self.retry_delay = retry_delay |
| |
|
| | |
| | self._connector = None |
| |
|
| | |
| | self.default_headers = { |
| | "User-Agent": "CryptoAPIMonitor/1.0", |
| | "Accept": "application/json" |
| | } |
| |
|
| | @property |
| | def connector(self): |
| | """Lazy initialize connector when first accessed""" |
| | if self._connector is None: |
| | self._connector = aiohttp.TCPConnector( |
| | limit=self.max_connections, |
| | limit_per_host=10, |
| | ttl_dns_cache=300, |
| | enable_cleanup_closed=True |
| | ) |
| | return self._connector |
| |
|
| | async def _make_request( |
| | self, |
| | method: str, |
| | url: str, |
| | headers: Optional[Dict] = None, |
| | params: Optional[Dict] = None, |
| | timeout: Optional[int] = None, |
| | **kwargs |
| | ) -> Tuple[int, Any, float, Optional[str]]: |
| | """ |
| | Make HTTP request with error handling |
| | |
| | Returns: |
| | Tuple of (status_code, response_data, response_time_ms, error_message) |
| | """ |
| | merged_headers = {**self.default_headers} |
| | if headers: |
| | merged_headers.update(headers) |
| |
|
| | timeout_seconds = timeout or self.default_timeout |
| | timeout_config = aiohttp.ClientTimeout(total=timeout_seconds) |
| |
|
| | start_time = time.time() |
| | error_message = None |
| |
|
| | try: |
| | async with aiohttp.ClientSession( |
| | connector=self.connector, |
| | timeout=timeout_config |
| | ) as session: |
| | async with session.request( |
| | method, |
| | url, |
| | headers=merged_headers, |
| | params=params, |
| | ssl=True, |
| | **kwargs |
| | ) as response: |
| | response_time_ms = (time.time() - start_time) * 1000 |
| | status_code = response.status |
| |
|
| | |
| | try: |
| | data = await response.json() |
| | except: |
| | |
| | data = await response.text() |
| |
|
| | return status_code, data, response_time_ms, error_message |
| |
|
| | except asyncio.TimeoutError: |
| | response_time_ms = (time.time() - start_time) * 1000 |
| | error_message = f"Request timeout after {timeout_seconds}s" |
| | return 0, None, response_time_ms, error_message |
| |
|
| | except aiohttp.ClientError as e: |
| | response_time_ms = (time.time() - start_time) * 1000 |
| | error_message = f"Client error: {str(e)}" |
| | return 0, None, response_time_ms, error_message |
| |
|
| | except Exception as e: |
| | response_time_ms = (time.time() - start_time) * 1000 |
| | error_message = f"Unexpected error: {str(e)}" |
| | return 0, None, response_time_ms, error_message |
| |
|
| | async def request( |
| | self, |
| | method: str, |
| | url: str, |
| | headers: Optional[Dict] = None, |
| | params: Optional[Dict] = None, |
| | timeout: Optional[int] = None, |
| | retry: bool = True, |
| | **kwargs |
| | ) -> Dict[str, Any]: |
| | """ |
| | Make HTTP request with retry logic |
| | |
| | Args: |
| | method: HTTP method (GET, POST, etc.) |
| | url: Request URL |
| | headers: Optional headers |
| | params: Optional query parameters |
| | timeout: Optional timeout override |
| | retry: Enable retry logic |
| | |
| | Returns: |
| | Dict with keys: success, status_code, data, response_time_ms, error_type, error_message |
| | """ |
| | attempt = 0 |
| | last_error = None |
| | current_timeout = timeout or self.default_timeout |
| |
|
| | while attempt < (self.retry_attempts if retry else 1): |
| | attempt += 1 |
| |
|
| | status_code, data, response_time_ms, error_message = await self._make_request( |
| | method, url, headers, params, current_timeout, **kwargs |
| | ) |
| |
|
| | |
| | if status_code == 200: |
| | return { |
| | "success": True, |
| | "status_code": status_code, |
| | "data": data, |
| | "response_time_ms": response_time_ms, |
| | "error_type": None, |
| | "error_message": None, |
| | "retry_count": attempt - 1 |
| | } |
| |
|
| | |
| | elif status_code == 429: |
| | last_error = "rate_limit" |
| | |
| | retry_after = 60 |
| |
|
| | if not retry or attempt >= self.retry_attempts: |
| | return { |
| | "success": False, |
| | "status_code": status_code, |
| | "data": None, |
| | "response_time_ms": response_time_ms, |
| | "error_type": "rate_limit", |
| | "error_message": f"Rate limit exceeded. Retry after {retry_after}s", |
| | "retry_count": attempt - 1, |
| | "retry_after": retry_after |
| | } |
| |
|
| | |
| | await asyncio.sleep(retry_after + 10) |
| | continue |
| |
|
| | |
| | elif status_code in [401, 403]: |
| | return { |
| | "success": False, |
| | "status_code": status_code, |
| | "data": None, |
| | "response_time_ms": response_time_ms, |
| | "error_type": "authentication", |
| | "error_message": f"Authentication failed: HTTP {status_code}", |
| | "retry_count": attempt - 1 |
| | } |
| |
|
| | |
| | elif status_code >= 500: |
| | last_error = "server_error" |
| |
|
| | if not retry or attempt >= self.retry_attempts: |
| | return { |
| | "success": False, |
| | "status_code": status_code, |
| | "data": None, |
| | "response_time_ms": response_time_ms, |
| | "error_type": "server_error", |
| | "error_message": f"Server error: HTTP {status_code}", |
| | "retry_count": attempt - 1 |
| | } |
| |
|
| | |
| | delay = self.retry_delay * 60 * (2 ** (attempt - 1)) |
| | await asyncio.sleep(min(delay, 240)) |
| | continue |
| |
|
| | |
| | elif error_message and "timeout" in error_message.lower(): |
| | last_error = "timeout" |
| |
|
| | if not retry or attempt >= self.retry_attempts: |
| | return { |
| | "success": False, |
| | "status_code": 0, |
| | "data": None, |
| | "response_time_ms": response_time_ms, |
| | "error_type": "timeout", |
| | "error_message": error_message, |
| | "retry_count": attempt - 1 |
| | } |
| |
|
| | |
| | current_timeout = int(current_timeout * 1.5) |
| | await asyncio.sleep(self.retry_delay) |
| | continue |
| |
|
| | |
| | else: |
| | return { |
| | "success": False, |
| | "status_code": status_code or 0, |
| | "data": data, |
| | "response_time_ms": response_time_ms, |
| | "error_type": "network_error" if status_code == 0 else "http_error", |
| | "error_message": error_message or f"HTTP {status_code}", |
| | "retry_count": attempt - 1 |
| | } |
| |
|
| | |
| | return { |
| | "success": False, |
| | "status_code": 0, |
| | "data": None, |
| | "response_time_ms": 0, |
| | "error_type": last_error or "unknown", |
| | "error_message": "All retry attempts exhausted", |
| | "retry_count": self.retry_attempts |
| | } |
| |
|
| | async def get(self, url: str, **kwargs) -> Dict[str, Any]: |
| | """GET request""" |
| | return await self.request("GET", url, **kwargs) |
| |
|
| | async def post(self, url: str, **kwargs) -> Dict[str, Any]: |
| | """POST request""" |
| | return await self.request("POST", url, **kwargs) |
| |
|
| | async def close(self): |
| | """Close connector""" |
| | if self.connector: |
| | await self.connector.close() |
| |
|
| |
|
| | |
| | _client = None |
| |
|
| |
|
| | def get_client() -> APIClient: |
| | """Get global API client instance""" |
| | global _client |
| | if _client is None: |
| | _client = APIClient() |
| | return _client |
| |
|