| """Concurrency manager for token-based rate limiting""" |
| import asyncio |
| from typing import Dict, Optional |
| from ..core.logger import debug_logger |
|
|
|
|
| class ConcurrencyManager: |
| """Manages concurrent request limits for each token""" |
|
|
| def __init__(self): |
| """Initialize concurrency manager""" |
| self._image_concurrency: Dict[int, int] = {} |
| self._video_concurrency: Dict[int, int] = {} |
| self._lock = asyncio.Lock() |
|
|
| async def initialize(self, tokens: list): |
| """ |
| Initialize concurrency counters from token list |
| |
| Args: |
| tokens: List of Token objects with image_concurrency and video_concurrency fields |
| """ |
| async with self._lock: |
| for token in tokens: |
| if token.image_concurrency and token.image_concurrency > 0: |
| self._image_concurrency[token.id] = token.image_concurrency |
| if token.video_concurrency and token.video_concurrency > 0: |
| self._video_concurrency[token.id] = token.video_concurrency |
|
|
| debug_logger.log_info(f"Concurrency manager initialized with {len(tokens)} tokens") |
|
|
| async def can_use_image(self, token_id: int) -> bool: |
| """ |
| Check if token can be used for image generation |
| |
| Args: |
| token_id: Token ID |
| |
| Returns: |
| True if token has available image concurrency, False if concurrency is 0 |
| """ |
| async with self._lock: |
| |
| if token_id not in self._image_concurrency: |
| return True |
|
|
| remaining = self._image_concurrency[token_id] |
| if remaining <= 0: |
| debug_logger.log_info(f"Token {token_id} image concurrency exhausted (remaining: {remaining})") |
| return False |
|
|
| return True |
|
|
| async def can_use_video(self, token_id: int) -> bool: |
| """ |
| Check if token can be used for video generation |
| |
| Args: |
| token_id: Token ID |
| |
| Returns: |
| True if token has available video concurrency, False if concurrency is 0 |
| """ |
| async with self._lock: |
| |
| if token_id not in self._video_concurrency: |
| return True |
|
|
| remaining = self._video_concurrency[token_id] |
| if remaining <= 0: |
| debug_logger.log_info(f"Token {token_id} video concurrency exhausted (remaining: {remaining})") |
| return False |
|
|
| return True |
|
|
| async def acquire_image(self, token_id: int) -> bool: |
| """ |
| Acquire image concurrency slot |
| |
| Args: |
| token_id: Token ID |
| |
| Returns: |
| True if acquired, False if not available |
| """ |
| async with self._lock: |
| if token_id not in self._image_concurrency: |
| |
| return True |
|
|
| if self._image_concurrency[token_id] <= 0: |
| return False |
|
|
| self._image_concurrency[token_id] -= 1 |
| debug_logger.log_info(f"Token {token_id} acquired image slot (remaining: {self._image_concurrency[token_id]})") |
| return True |
|
|
| async def acquire_video(self, token_id: int) -> bool: |
| """ |
| Acquire video concurrency slot |
| |
| Args: |
| token_id: Token ID |
| |
| Returns: |
| True if acquired, False if not available |
| """ |
| async with self._lock: |
| if token_id not in self._video_concurrency: |
| |
| return True |
|
|
| if self._video_concurrency[token_id] <= 0: |
| return False |
|
|
| self._video_concurrency[token_id] -= 1 |
| debug_logger.log_info(f"Token {token_id} acquired video slot (remaining: {self._video_concurrency[token_id]})") |
| return True |
|
|
| async def release_image(self, token_id: int): |
| """ |
| Release image concurrency slot |
| |
| Args: |
| token_id: Token ID |
| """ |
| async with self._lock: |
| if token_id in self._image_concurrency: |
| self._image_concurrency[token_id] += 1 |
| debug_logger.log_info(f"Token {token_id} released image slot (remaining: {self._image_concurrency[token_id]})") |
|
|
| async def release_video(self, token_id: int): |
| """ |
| Release video concurrency slot |
| |
| Args: |
| token_id: Token ID |
| """ |
| async with self._lock: |
| if token_id in self._video_concurrency: |
| self._video_concurrency[token_id] += 1 |
| debug_logger.log_info(f"Token {token_id} released video slot (remaining: {self._video_concurrency[token_id]})") |
|
|
| async def get_image_remaining(self, token_id: int) -> Optional[int]: |
| """ |
| Get remaining image concurrency for token |
| |
| Args: |
| token_id: Token ID |
| |
| Returns: |
| Remaining count or None if no limit |
| """ |
| async with self._lock: |
| return self._image_concurrency.get(token_id) |
|
|
| async def get_video_remaining(self, token_id: int) -> Optional[int]: |
| """ |
| Get remaining video concurrency for token |
| |
| Args: |
| token_id: Token ID |
| |
| Returns: |
| Remaining count or None if no limit |
| """ |
| async with self._lock: |
| return self._video_concurrency.get(token_id) |
|
|
| async def reset_token(self, token_id: int, image_concurrency: int = -1, video_concurrency: int = -1): |
| """ |
| Reset concurrency counters for a token |
| |
| Args: |
| token_id: Token ID |
| image_concurrency: New image concurrency limit (-1 for no limit) |
| video_concurrency: New video concurrency limit (-1 for no limit) |
| """ |
| async with self._lock: |
| if image_concurrency > 0: |
| self._image_concurrency[token_id] = image_concurrency |
| elif token_id in self._image_concurrency: |
| del self._image_concurrency[token_id] |
|
|
| if video_concurrency > 0: |
| self._video_concurrency[token_id] = video_concurrency |
| elif token_id in self._video_concurrency: |
| del self._video_concurrency[token_id] |
|
|
| debug_logger.log_info(f"Token {token_id} concurrency reset (image: {image_concurrency}, video: {video_concurrency})") |
|
|