| | """ |
| | Rate Limit Tracking Module |
| | Manages rate limits per provider with in-memory tracking |
| | """ |
| |
|
| | import time |
| | from datetime import datetime, timedelta |
| | from typing import Dict, Optional, Tuple |
| | from threading import Lock |
| | from utils.logger import setup_logger |
| |
|
| | logger = setup_logger("rate_limiter") |
| |
|
| |
|
| | class RateLimiter: |
| | """ |
| | Rate limiter with per-provider tracking |
| | """ |
| |
|
| | def __init__(self): |
| | """Initialize rate limiter""" |
| | self.limits: Dict[str, Dict] = {} |
| | self.lock = Lock() |
| |
|
| | def configure_limit( |
| | self, |
| | provider: str, |
| | limit_type: str, |
| | limit_value: int |
| | ): |
| | """ |
| | Configure rate limit for a provider |
| | |
| | Args: |
| | provider: Provider name |
| | limit_type: Type of limit (per_minute, per_hour, per_day, per_second) |
| | limit_value: Maximum requests allowed |
| | """ |
| | with self.lock: |
| | |
| | now = datetime.now() |
| | if limit_type == "per_second": |
| | reset_time = now + timedelta(seconds=1) |
| | elif limit_type == "per_minute": |
| | reset_time = now + timedelta(minutes=1) |
| | elif limit_type == "per_hour": |
| | reset_time = now + timedelta(hours=1) |
| | elif limit_type == "per_day": |
| | reset_time = now + timedelta(days=1) |
| | else: |
| | logger.warning(f"Unknown limit type {limit_type} for {provider}") |
| | reset_time = now + timedelta(minutes=1) |
| |
|
| | self.limits[provider] = { |
| | "limit_type": limit_type, |
| | "limit_value": limit_value, |
| | "current_usage": 0, |
| | "reset_time": reset_time, |
| | "last_request_time": None |
| | } |
| |
|
| | logger.info(f"Configured rate limit for {provider}: {limit_value} {limit_type}") |
| |
|
| | def can_make_request(self, provider: str) -> Tuple[bool, Optional[str]]: |
| | """ |
| | Check if request can be made without exceeding rate limit |
| | |
| | Args: |
| | provider: Provider name |
| | |
| | Returns: |
| | Tuple of (can_proceed, reason_if_blocked) |
| | """ |
| | with self.lock: |
| | if provider not in self.limits: |
| | |
| | return True, None |
| |
|
| | limit_info = self.limits[provider] |
| | now = datetime.now() |
| |
|
| | |
| | if now >= limit_info["reset_time"]: |
| | self._reset_limit(provider) |
| | limit_info = self.limits[provider] |
| |
|
| | |
| | if limit_info["current_usage"] < limit_info["limit_value"]: |
| | return True, None |
| | else: |
| | seconds_until_reset = (limit_info["reset_time"] - now).total_seconds() |
| | return False, f"Rate limit reached. Reset in {int(seconds_until_reset)}s" |
| |
|
| | def record_request(self, provider: str): |
| | """ |
| | Record a request against the rate limit |
| | |
| | Args: |
| | provider: Provider name |
| | """ |
| | with self.lock: |
| | if provider not in self.limits: |
| | logger.warning(f"Recording request for unconfigured provider: {provider}") |
| | return |
| |
|
| | limit_info = self.limits[provider] |
| | now = datetime.now() |
| |
|
| | |
| | if now >= limit_info["reset_time"]: |
| | self._reset_limit(provider) |
| | limit_info = self.limits[provider] |
| |
|
| | |
| | limit_info["current_usage"] += 1 |
| | limit_info["last_request_time"] = now |
| |
|
| | |
| | percentage = (limit_info["current_usage"] / limit_info["limit_value"]) * 100 |
| | if percentage >= 80: |
| | logger.warning( |
| | f"Rate limit warning for {provider}: {percentage:.1f}% used " |
| | f"({limit_info['current_usage']}/{limit_info['limit_value']})" |
| | ) |
| |
|
| | def _reset_limit(self, provider: str): |
| | """ |
| | Reset rate limit counter |
| | |
| | Args: |
| | provider: Provider name |
| | """ |
| | if provider not in self.limits: |
| | return |
| |
|
| | limit_info = self.limits[provider] |
| | limit_type = limit_info["limit_type"] |
| | now = datetime.now() |
| |
|
| | |
| | if limit_type == "per_second": |
| | reset_time = now + timedelta(seconds=1) |
| | elif limit_type == "per_minute": |
| | reset_time = now + timedelta(minutes=1) |
| | elif limit_type == "per_hour": |
| | reset_time = now + timedelta(hours=1) |
| | elif limit_type == "per_day": |
| | reset_time = now + timedelta(days=1) |
| | else: |
| | reset_time = now + timedelta(minutes=1) |
| |
|
| | limit_info["current_usage"] = 0 |
| | limit_info["reset_time"] = reset_time |
| |
|
| | logger.debug(f"Reset rate limit for {provider}. Next reset: {reset_time}") |
| |
|
| | def get_status(self, provider: str) -> Optional[Dict]: |
| | """ |
| | Get current rate limit status for provider |
| | |
| | Args: |
| | provider: Provider name |
| | |
| | Returns: |
| | Dict with limit info or None if not configured |
| | """ |
| | with self.lock: |
| | if provider not in self.limits: |
| | return None |
| |
|
| | limit_info = self.limits[provider] |
| | now = datetime.now() |
| |
|
| | |
| | if now >= limit_info["reset_time"]: |
| | self._reset_limit(provider) |
| | limit_info = self.limits[provider] |
| |
|
| | percentage = (limit_info["current_usage"] / limit_info["limit_value"]) * 100 if limit_info["limit_value"] > 0 else 0 |
| | seconds_until_reset = max(0, (limit_info["reset_time"] - now).total_seconds()) |
| |
|
| | status = "ok" |
| | if percentage >= 100: |
| | status = "blocked" |
| | elif percentage >= 80: |
| | status = "warning" |
| |
|
| | return { |
| | "provider": provider, |
| | "limit_type": limit_info["limit_type"], |
| | "limit_value": limit_info["limit_value"], |
| | "current_usage": limit_info["current_usage"], |
| | "percentage": round(percentage, 1), |
| | "reset_time": limit_info["reset_time"].isoformat(), |
| | "reset_in_seconds": int(seconds_until_reset), |
| | "status": status, |
| | "last_request_time": limit_info["last_request_time"].isoformat() if limit_info["last_request_time"] else None |
| | } |
| |
|
| | def get_all_statuses(self) -> Dict[str, Dict]: |
| | """ |
| | Get rate limit status for all providers |
| | |
| | Returns: |
| | Dict mapping provider names to their rate limit status |
| | """ |
| | with self.lock: |
| | return { |
| | provider: self.get_status(provider) |
| | for provider in self.limits.keys() |
| | } |
| |
|
| | def remove_limit(self, provider: str): |
| | """ |
| | Remove rate limit configuration for provider |
| | |
| | Args: |
| | provider: Provider name |
| | """ |
| | with self.lock: |
| | if provider in self.limits: |
| | del self.limits[provider] |
| | logger.info(f"Removed rate limit for {provider}") |
| |
|
| |
|
| | |
| | rate_limiter = RateLimiter() |
| |
|