| """Token management module""" |
| import jwt |
| import asyncio |
| import random |
| from datetime import datetime, timedelta |
| from typing import Optional, List, Dict, Any |
| from curl_cffi.requests import AsyncSession |
| from faker import Faker |
| from ..core.database import Database |
| from ..core.models import Token, TokenStats |
| from ..core.config import config |
| from .proxy_manager import ProxyManager |
|
|
| class TokenManager: |
| """Token lifecycle manager""" |
|
|
| def __init__(self, db: Database): |
| self.db = db |
| self._lock = asyncio.Lock() |
| self.proxy_manager = ProxyManager(db) |
| self.fake = Faker() |
| |
| async def decode_jwt(self, token: str) -> dict: |
| """Decode JWT token without verification""" |
| try: |
| decoded = jwt.decode(token, options={"verify_signature": False}) |
| return decoded |
| except Exception as e: |
| raise ValueError(f"Invalid JWT token: {str(e)}") |
|
|
| def _generate_random_username(self) -> str: |
| """Generate a random username using faker |
| |
| Returns: |
| A random username string |
| """ |
| |
| first_name = self.fake.first_name() |
| last_name = self.fake.last_name() |
|
|
| |
| first_name_clean = ''.join(c for c in first_name if c.isalpha()) |
| last_name_clean = ''.join(c for c in last_name if c.isalpha()) |
|
|
| |
| random_digits = str(random.randint(1, 9999)) |
|
|
| |
| format_choice = random.choice([ |
| f"{first_name_clean}{last_name_clean}{random_digits}", |
| f"{first_name_clean}.{last_name_clean}{random_digits}", |
| f"{first_name_clean}{random_digits}", |
| f"{last_name_clean}{random_digits}", |
| f"{first_name_clean[0]}{last_name_clean}{random_digits}", |
| f"{first_name_clean}{last_name_clean[0]}{random_digits}" |
| ]) |
|
|
| |
| return format_choice.lower() |
|
|
| async def get_user_info(self, access_token: str) -> dict: |
| """Get user info from Sora API""" |
| proxy_url = await self.proxy_manager.get_proxy_url() |
|
|
| async with AsyncSession() as session: |
| headers = { |
| "Authorization": f"Bearer {access_token}", |
| "Accept": "application/json", |
| "Origin": "https://sora.chatgpt.com", |
| "Referer": "https://sora.chatgpt.com/" |
| } |
|
|
| kwargs = { |
| "headers": headers, |
| "timeout": 30, |
| "impersonate": "chrome" |
| } |
|
|
| if proxy_url: |
| kwargs["proxy"] = proxy_url |
|
|
| response = await session.get( |
| f"{config.sora_base_url}/me", |
| **kwargs |
| ) |
|
|
| if response.status_code != 200: |
| raise ValueError(f"Failed to get user info: {response.status_code}") |
|
|
| return response.json() |
|
|
| async def get_subscription_info(self, token: str) -> Dict[str, Any]: |
| """Get subscription information from Sora API |
| |
| Returns: |
| { |
| "plan_type": "chatgpt_team", |
| "plan_title": "ChatGPT Business", |
| "subscription_end": "2025-11-13T16:58:21Z" |
| } |
| """ |
| print(f"🔍 开始获取订阅信息...") |
| proxy_url = await self.proxy_manager.get_proxy_url() |
|
|
| headers = { |
| "Authorization": f"Bearer {token}" |
| } |
|
|
| async with AsyncSession() as session: |
| url = "https://sora.chatgpt.com/backend/billing/subscriptions" |
| print(f"📡 请求 URL: {url}") |
| print(f"🔑 使用 Token: {token[:30]}...") |
|
|
| kwargs = { |
| "headers": headers, |
| "timeout": 30, |
| "impersonate": "chrome" |
| } |
|
|
| if proxy_url: |
| kwargs["proxy"] = proxy_url |
| print(f"🌐 使用代理: {proxy_url}") |
|
|
| response = await session.get(url, **kwargs) |
| print(f"📥 响应状态码: {response.status_code}") |
|
|
| if response.status_code == 200: |
| data = response.json() |
| print(f"📦 响应数据: {data}") |
|
|
| |
| if data.get("data") and len(data["data"]) > 0: |
| subscription = data["data"][0] |
| plan = subscription.get("plan", {}) |
|
|
| result = { |
| "plan_type": plan.get("id", ""), |
| "plan_title": plan.get("title", ""), |
| "subscription_end": subscription.get("end_ts", "") |
| } |
| print(f"✅ 订阅信息提取成功: {result}") |
| return result |
|
|
| print(f"⚠️ 响应数据中没有订阅信息") |
| return { |
| "plan_type": "", |
| "plan_title": "", |
| "subscription_end": "" |
| } |
| else: |
| error_msg = f"Failed to get subscription info: {response.status_code}" |
| print(f"❌ {error_msg}") |
| print(f"📄 响应内容: {response.text[:500]}") |
| raise Exception(error_msg) |
|
|
| async def get_sora2_invite_code(self, access_token: str) -> dict: |
| """Get Sora2 invite code""" |
| proxy_url = await self.proxy_manager.get_proxy_url() |
|
|
| print(f"🔍 开始获取Sora2邀请码...") |
|
|
| async with AsyncSession() as session: |
| headers = { |
| "Authorization": f"Bearer {access_token}", |
| "Accept": "application/json" |
| } |
|
|
| kwargs = { |
| "headers": headers, |
| "timeout": 30, |
| "impersonate": "chrome" |
| } |
|
|
| if proxy_url: |
| kwargs["proxy"] = proxy_url |
| print(f"🌐 使用代理: {proxy_url}") |
|
|
| response = await session.get( |
| "https://sora.chatgpt.com/backend/project_y/invite/mine", |
| **kwargs |
| ) |
|
|
| print(f"📥 响应状态码: {response.status_code}") |
|
|
| if response.status_code == 200: |
| data = response.json() |
| print(f"✅ Sora2邀请码获取成功: {data}") |
| return { |
| "supported": True, |
| "invite_code": data.get("invite_code"), |
| "redeemed_count": data.get("redeemed_count", 0), |
| "total_count": data.get("total_count", 0) |
| } |
| else: |
| |
| try: |
| error_data = response.json() |
| if error_data.get("error", {}).get("message", "").startswith("401"): |
| print(f"⚠️ Token不支持Sora2") |
| return { |
| "supported": False, |
| "invite_code": None |
| } |
| except: |
| pass |
|
|
| print(f"❌ 获取Sora2邀请码失败: {response.status_code}") |
| print(f"📄 响应内容: {response.text[:500]}") |
| return { |
| "supported": False, |
| "invite_code": None |
| } |
|
|
| async def get_sora2_remaining_count(self, access_token: str) -> dict: |
| """Get Sora2 remaining video count |
| |
| Returns: |
| { |
| "remaining_count": 27, |
| "rate_limit_reached": false, |
| "access_resets_in_seconds": 46833 |
| } |
| """ |
| proxy_url = await self.proxy_manager.get_proxy_url() |
|
|
| print(f"🔍 开始获取Sora2剩余次数...") |
|
|
| async with AsyncSession() as session: |
| headers = { |
| "Authorization": f"Bearer {access_token}", |
| "Accept": "application/json" |
| } |
|
|
| kwargs = { |
| "headers": headers, |
| "timeout": 30, |
| "impersonate": "chrome" |
| } |
|
|
| if proxy_url: |
| kwargs["proxy"] = proxy_url |
| print(f"🌐 使用代理: {proxy_url}") |
|
|
| response = await session.get( |
| "https://sora.chatgpt.com/backend/nf/check", |
| **kwargs |
| ) |
|
|
| print(f"📥 响应状态码: {response.status_code}") |
|
|
| if response.status_code == 200: |
| data = response.json() |
| print(f"✅ Sora2剩余次数获取成功: {data}") |
|
|
| rate_limit_info = data.get("rate_limit_and_credit_balance", {}) |
| return { |
| "success": True, |
| "remaining_count": rate_limit_info.get("estimated_num_videos_remaining", 0), |
| "rate_limit_reached": rate_limit_info.get("rate_limit_reached", False), |
| "access_resets_in_seconds": rate_limit_info.get("access_resets_in_seconds", 0) |
| } |
| else: |
| print(f"❌ 获取Sora2剩余次数失败: {response.status_code}") |
| print(f"📄 响应内容: {response.text[:500]}") |
| return { |
| "success": False, |
| "remaining_count": 0, |
| "error": f"Failed to get remaining count: {response.status_code}" |
| } |
|
|
| async def check_username_available(self, access_token: str, username: str) -> bool: |
| """Check if username is available |
| |
| Args: |
| access_token: Access token for authentication |
| username: Username to check |
| |
| Returns: |
| True if username is available, False otherwise |
| """ |
| proxy_url = await self.proxy_manager.get_proxy_url() |
|
|
| print(f"🔍 检查用户名是否可用: {username}") |
|
|
| async with AsyncSession() as session: |
| headers = { |
| "Authorization": f"Bearer {access_token}", |
| "Content-Type": "application/json" |
| } |
|
|
| kwargs = { |
| "headers": headers, |
| "json": {"username": username}, |
| "timeout": 30, |
| "impersonate": "chrome" |
| } |
|
|
| if proxy_url: |
| kwargs["proxy"] = proxy_url |
| print(f"🌐 使用代理: {proxy_url}") |
|
|
| response = await session.post( |
| "https://sora.chatgpt.com/backend/project_y/profile/username/check", |
| **kwargs |
| ) |
|
|
| print(f"📥 响应状态码: {response.status_code}") |
|
|
| if response.status_code == 200: |
| data = response.json() |
| available = data.get("available", False) |
| print(f"✅ 用户名检查结果: available={available}") |
| return available |
| else: |
| print(f"❌ 用户名检查失败: {response.status_code}") |
| print(f"📄 响应内容: {response.text[:500]}") |
| return False |
|
|
| async def set_username(self, access_token: str, username: str) -> dict: |
| """Set username for the account |
| |
| Args: |
| access_token: Access token for authentication |
| username: Username to set |
| |
| Returns: |
| User profile information after setting username |
| """ |
| proxy_url = await self.proxy_manager.get_proxy_url() |
|
|
| print(f"🔍 开始设置用户名: {username}") |
|
|
| async with AsyncSession() as session: |
| headers = { |
| "Authorization": f"Bearer {access_token}", |
| "Content-Type": "application/json" |
| } |
|
|
| kwargs = { |
| "headers": headers, |
| "json": {"username": username}, |
| "timeout": 30, |
| "impersonate": "chrome" |
| } |
|
|
| if proxy_url: |
| kwargs["proxy"] = proxy_url |
| print(f"🌐 使用代理: {proxy_url}") |
|
|
| response = await session.post( |
| "https://sora.chatgpt.com/backend/project_y/profile/username/set", |
| **kwargs |
| ) |
|
|
| print(f"📥 响应状态码: {response.status_code}") |
|
|
| if response.status_code == 200: |
| data = response.json() |
| print(f"✅ 用户名设置成功: {data.get('username')}") |
| return data |
| else: |
| print(f"❌ 用户名设置失败: {response.status_code}") |
| print(f"📄 响应内容: {response.text[:500]}") |
| raise Exception(f"Failed to set username: {response.status_code}") |
|
|
| async def activate_sora2_invite(self, access_token: str, invite_code: str) -> dict: |
| """Activate Sora2 with invite code""" |
| import uuid |
| proxy_url = await self.proxy_manager.get_proxy_url() |
|
|
| print(f"🔍 开始激活Sora2邀请码: {invite_code}") |
| print(f"🔑 Access Token 前缀: {access_token[:50]}...") |
|
|
| async with AsyncSession() as session: |
| |
| device_id = str(uuid.uuid4()) |
|
|
| |
| headers = { |
| "authorization": f"Bearer {access_token}", |
| "cookie": f"oai-did={device_id}" |
| } |
|
|
| print(f"🆔 设备ID: {device_id}") |
| print(f"📦 请求体: {{'invite_code': '{invite_code}'}}") |
|
|
| kwargs = { |
| "headers": headers, |
| "json": {"invite_code": invite_code}, |
| "timeout": 30, |
| "impersonate": "chrome120" |
| } |
|
|
| if proxy_url: |
| kwargs["proxy"] = proxy_url |
| print(f"🌐 使用代理: {proxy_url}") |
|
|
| response = await session.post( |
| "https://sora.chatgpt.com/backend/project_y/invite/accept", |
| **kwargs |
| ) |
|
|
| print(f"📥 响应状态码: {response.status_code}") |
|
|
| if response.status_code == 200: |
| data = response.json() |
| print(f"✅ Sora2激活成功: {data}") |
| return { |
| "success": data.get("success", False), |
| "already_accepted": data.get("already_accepted", False) |
| } |
| else: |
| print(f"❌ Sora2激活失败: {response.status_code}") |
| print(f"📄 响应内容: {response.text[:500]}") |
| raise Exception(f"Failed to activate Sora2: {response.status_code}") |
|
|
| async def st_to_at(self, session_token: str) -> dict: |
| """Convert Session Token to Access Token""" |
| proxy_url = await self.proxy_manager.get_proxy_url() |
|
|
| async with AsyncSession() as session: |
| headers = { |
| "Cookie": f"__Secure-next-auth.session-token={session_token}", |
| "Accept": "application/json", |
| "Origin": "https://sora.chatgpt.com", |
| "Referer": "https://sora.chatgpt.com/" |
| } |
|
|
| kwargs = { |
| "headers": headers, |
| "timeout": 30, |
| "impersonate": "chrome" |
| } |
|
|
| if proxy_url: |
| kwargs["proxy"] = proxy_url |
|
|
| response = await session.get( |
| "https://sora.chatgpt.com/api/auth/session", |
| **kwargs |
| ) |
|
|
| if response.status_code != 200: |
| raise ValueError(f"Failed to convert ST to AT: {response.status_code}") |
|
|
| data = response.json() |
| return { |
| "access_token": data.get("accessToken"), |
| "email": data.get("user", {}).get("email"), |
| "expires": data.get("expires") |
| } |
| |
| async def rt_to_at(self, refresh_token: str) -> dict: |
| """Convert Refresh Token to Access Token""" |
| proxy_url = await self.proxy_manager.get_proxy_url() |
|
|
| async with AsyncSession() as session: |
| headers = { |
| "Accept": "application/json", |
| "Content-Type": "application/json" |
| } |
|
|
| kwargs = { |
| "headers": headers, |
| "json": { |
| "client_id": "app_LlGpXReQgckcGGUo2JrYvtJK", |
| "grant_type": "refresh_token", |
| "redirect_uri": "com.openai.chat://auth0.openai.com/ios/com.openai.chat/callback", |
| "refresh_token": refresh_token |
| }, |
| "timeout": 30, |
| "impersonate": "chrome" |
| } |
|
|
| if proxy_url: |
| kwargs["proxy"] = proxy_url |
| |
| response = await session.post( |
| "https://auth.openai.com/oauth/token", |
| **kwargs |
| ) |
|
|
| if response.status_code != 200: |
| raise ValueError(f"Failed to convert RT to AT: {response.status_code} - {response.text}") |
|
|
| data = response.json() |
| return { |
| "access_token": data.get("access_token"), |
| "refresh_token": data.get("refresh_token"), |
| "expires_in": data.get("expires_in") |
| } |
| |
| async def add_token(self, token_value: str, |
| st: Optional[str] = None, |
| rt: Optional[str] = None, |
| remark: Optional[str] = None, |
| update_if_exists: bool = False, |
| image_enabled: bool = True, |
| video_enabled: bool = True) -> Token: |
| """Add a new Access Token to database |
| |
| Args: |
| token_value: Access Token |
| st: Session Token (optional) |
| rt: Refresh Token (optional) |
| remark: Remark (optional) |
| update_if_exists: If True, update existing token instead of raising error |
| image_enabled: Enable image generation (default: True) |
| video_enabled: Enable video generation (default: True) |
| |
| Returns: |
| Token object |
| |
| Raises: |
| ValueError: If token already exists and update_if_exists is False |
| """ |
| |
| existing_token = await self.db.get_token_by_value(token_value) |
| if existing_token: |
| if not update_if_exists: |
| raise ValueError(f"Token 已存在(邮箱: {existing_token.email})。如需更新,请先删除旧 Token 或使用更新功能。") |
| |
| return await self.update_existing_token(existing_token.id, token_value, st, rt, remark) |
|
|
| |
| decoded = await self.decode_jwt(token_value) |
|
|
| |
| expiry_time = datetime.fromtimestamp(decoded.get("exp", 0)) if "exp" in decoded else None |
|
|
| |
| jwt_email = None |
| if "https://api.openai.com/profile" in decoded: |
| jwt_email = decoded["https://api.openai.com/profile"].get("email") |
|
|
| |
| try: |
| user_info = await self.get_user_info(token_value) |
| email = user_info.get("email", jwt_email or "") |
| name = user_info.get("name") or "" |
| except Exception as e: |
| |
| email = jwt_email or "" |
| name = email.split("@")[0] if email else "" |
|
|
| |
| plan_type = None |
| plan_title = None |
| subscription_end = None |
| try: |
| sub_info = await self.get_subscription_info(token_value) |
| plan_type = sub_info.get("plan_type") |
| plan_title = sub_info.get("plan_title") |
| |
| if sub_info.get("subscription_end"): |
| from dateutil import parser |
| subscription_end = parser.parse(sub_info["subscription_end"]) |
| except Exception as e: |
| |
| print(f"Failed to get subscription info: {e}") |
|
|
| |
| sora2_supported = None |
| sora2_invite_code = None |
| sora2_redeemed_count = 0 |
| sora2_total_count = 0 |
| sora2_remaining_count = 0 |
| try: |
| sora2_info = await self.get_sora2_invite_code(token_value) |
| sora2_supported = sora2_info.get("supported", False) |
| sora2_invite_code = sora2_info.get("invite_code") |
| sora2_redeemed_count = sora2_info.get("redeemed_count", 0) |
| sora2_total_count = sora2_info.get("total_count", 0) |
|
|
| |
| if sora2_supported: |
| try: |
| remaining_info = await self.get_sora2_remaining_count(token_value) |
| if remaining_info.get("success"): |
| sora2_remaining_count = remaining_info.get("remaining_count", 0) |
| print(f"✅ Sora2剩余次数: {sora2_remaining_count}") |
| except Exception as e: |
| print(f"Failed to get Sora2 remaining count: {e}") |
| except Exception as e: |
| |
| print(f"Failed to get Sora2 info: {e}") |
|
|
| |
| try: |
| |
| user_info = await self.get_user_info(token_value) |
| username = user_info.get("username") |
|
|
| |
| if username is None: |
| print(f"⚠️ 检测到用户名为null,需要设置用户名") |
|
|
| |
| max_attempts = 5 |
| for attempt in range(max_attempts): |
| generated_username = self._generate_random_username() |
| print(f"🔄 尝试用户名 ({attempt + 1}/{max_attempts}): {generated_username}") |
|
|
| |
| if await self.check_username_available(token_value, generated_username): |
| |
| try: |
| await self.set_username(token_value, generated_username) |
| print(f"✅ 用户名设置成功: {generated_username}") |
| break |
| except Exception as e: |
| print(f"❌ 用户名设置失败: {e}") |
| if attempt == max_attempts - 1: |
| print(f"⚠️ 达到最大尝试次数,跳过用户名设置") |
| else: |
| print(f"⚠️ 用户名 {generated_username} 已被占用,尝试下一个") |
| if attempt == max_attempts - 1: |
| print(f"⚠️ 达到最大尝试次数,跳过用户名设置") |
| else: |
| print(f"✅ 用户名已设置: {username}") |
| except Exception as e: |
| print(f"⚠️ 用户名检查/设置过程中出错: {e}") |
|
|
| |
| token = Token( |
| token=token_value, |
| email=email, |
| name=name, |
| st=st, |
| rt=rt, |
| remark=remark, |
| expiry_time=expiry_time, |
| is_active=True, |
| plan_type=plan_type, |
| plan_title=plan_title, |
| subscription_end=subscription_end, |
| sora2_supported=sora2_supported, |
| sora2_invite_code=sora2_invite_code, |
| sora2_redeemed_count=sora2_redeemed_count, |
| sora2_total_count=sora2_total_count, |
| sora2_remaining_count=sora2_remaining_count, |
| image_enabled=image_enabled, |
| video_enabled=video_enabled |
| ) |
|
|
| |
| token_id = await self.db.add_token(token) |
| token.id = token_id |
|
|
| return token |
|
|
| async def update_existing_token(self, token_id: int, token_value: str, |
| st: Optional[str] = None, |
| rt: Optional[str] = None, |
| remark: Optional[str] = None) -> Token: |
| """Update an existing token with new information""" |
| |
| decoded = await self.decode_jwt(token_value) |
| expiry_time = datetime.fromtimestamp(decoded.get("exp", 0)) if "exp" in decoded else None |
|
|
| |
| jwt_email = None |
| if "https://api.openai.com/profile" in decoded: |
| jwt_email = decoded["https://api.openai.com/profile"].get("email") |
|
|
| try: |
| user_info = await self.get_user_info(token_value) |
| email = user_info.get("email", jwt_email or "") |
| name = user_info.get("name", "") |
| except Exception as e: |
| email = jwt_email or "" |
| name = email.split("@")[0] if email else "" |
|
|
| |
| plan_type = None |
| plan_title = None |
| subscription_end = None |
| try: |
| sub_info = await self.get_subscription_info(token_value) |
| plan_type = sub_info.get("plan_type") |
| plan_title = sub_info.get("plan_title") |
| if sub_info.get("subscription_end"): |
| from dateutil import parser |
| subscription_end = parser.parse(sub_info["subscription_end"]) |
| except Exception as e: |
| print(f"Failed to get subscription info: {e}") |
|
|
| |
| await self.db.update_token( |
| token_id=token_id, |
| token=token_value, |
| st=st, |
| rt=rt, |
| remark=remark, |
| expiry_time=expiry_time, |
| plan_type=plan_type, |
| plan_title=plan_title, |
| subscription_end=subscription_end |
| ) |
|
|
| |
| updated_token = await self.db.get_token(token_id) |
| return updated_token |
|
|
| async def delete_token(self, token_id: int): |
| """Delete a token""" |
| await self.db.delete_token(token_id) |
|
|
| async def update_token(self, token_id: int, |
| token: Optional[str] = None, |
| st: Optional[str] = None, |
| rt: Optional[str] = None, |
| remark: Optional[str] = None, |
| image_enabled: Optional[bool] = None, |
| video_enabled: Optional[bool] = None): |
| """Update token (AT, ST, RT, remark, image_enabled, video_enabled)""" |
| |
| expiry_time = None |
| if token: |
| try: |
| decoded = await self.decode_jwt(token) |
| expiry_time = datetime.fromtimestamp(decoded.get("exp", 0)) if "exp" in decoded else None |
| except Exception: |
| pass |
|
|
| await self.db.update_token(token_id, token=token, st=st, rt=rt, remark=remark, expiry_time=expiry_time, |
| image_enabled=image_enabled, video_enabled=video_enabled) |
|
|
| async def get_active_tokens(self) -> List[Token]: |
| """Get all active tokens (not cooled down)""" |
| return await self.db.get_active_tokens() |
| |
| async def get_all_tokens(self) -> List[Token]: |
| """Get all tokens""" |
| return await self.db.get_all_tokens() |
| |
| async def update_token_status(self, token_id: int, is_active: bool): |
| """Update token active status""" |
| await self.db.update_token_status(token_id, is_active) |
|
|
| async def enable_token(self, token_id: int): |
| """Enable a token and reset error count""" |
| await self.db.update_token_status(token_id, True) |
| |
| await self.db.reset_error_count(token_id) |
|
|
| async def disable_token(self, token_id: int): |
| """Disable a token""" |
| await self.db.update_token_status(token_id, False) |
|
|
| async def test_token(self, token_id: int) -> dict: |
| """Test if a token is valid by calling Sora API and refresh Sora2 info""" |
| |
| token_data = await self.db.get_token(token_id) |
| if not token_data: |
| return {"valid": False, "message": "Token not found"} |
|
|
| try: |
| |
| user_info = await self.get_user_info(token_data.token) |
|
|
| |
| sora2_info = await self.get_sora2_invite_code(token_data.token) |
| sora2_supported = sora2_info.get("supported", False) |
| sora2_invite_code = sora2_info.get("invite_code") |
| sora2_redeemed_count = sora2_info.get("redeemed_count", 0) |
| sora2_total_count = sora2_info.get("total_count", 0) |
| sora2_remaining_count = 0 |
|
|
| |
| if sora2_supported: |
| try: |
| remaining_info = await self.get_sora2_remaining_count(token_data.token) |
| if remaining_info.get("success"): |
| sora2_remaining_count = remaining_info.get("remaining_count", 0) |
| except Exception as e: |
| print(f"Failed to get Sora2 remaining count: {e}") |
|
|
| |
| await self.db.update_token_sora2( |
| token_id, |
| supported=sora2_supported, |
| invite_code=sora2_invite_code, |
| redeemed_count=sora2_redeemed_count, |
| total_count=sora2_total_count, |
| remaining_count=sora2_remaining_count |
| ) |
|
|
| return { |
| "valid": True, |
| "message": "Token is valid", |
| "email": user_info.get("email"), |
| "username": user_info.get("username"), |
| "sora2_supported": sora2_supported, |
| "sora2_invite_code": sora2_invite_code, |
| "sora2_redeemed_count": sora2_redeemed_count, |
| "sora2_total_count": sora2_total_count, |
| "sora2_remaining_count": sora2_remaining_count |
| } |
| except Exception as e: |
| return { |
| "valid": False, |
| "message": f"Token is invalid: {str(e)}" |
| } |
|
|
| async def record_usage(self, token_id: int, is_video: bool = False): |
| """Record token usage""" |
| await self.db.update_token_usage(token_id) |
| |
| if is_video: |
| await self.db.increment_video_count(token_id) |
| else: |
| await self.db.increment_image_count(token_id) |
| |
| async def record_error(self, token_id: int): |
| """Record token error""" |
| await self.db.increment_error_count(token_id) |
| |
| |
| stats = await self.db.get_token_stats(token_id) |
| admin_config = await self.db.get_admin_config() |
| |
| if stats and stats.error_count >= admin_config.error_ban_threshold: |
| await self.db.update_token_status(token_id, False) |
| |
| async def record_success(self, token_id: int, is_video: bool = False): |
| """Record successful request (reset error count)""" |
| await self.db.reset_error_count(token_id) |
|
|
| |
| if is_video: |
| try: |
| token_data = await self.db.get_token(token_id) |
| if token_data and token_data.sora2_supported: |
| remaining_info = await self.get_sora2_remaining_count(token_data.token) |
| if remaining_info.get("success"): |
| remaining_count = remaining_info.get("remaining_count", 0) |
| await self.db.update_token_sora2_remaining(token_id, remaining_count) |
| print(f"✅ 更新Token {token_id} 的Sora2剩余次数: {remaining_count}") |
|
|
| |
| if remaining_count == 0: |
| reset_seconds = remaining_info.get("access_resets_in_seconds", 0) |
| if reset_seconds > 0: |
| cooldown_until = datetime.now() + timedelta(seconds=reset_seconds) |
| await self.db.update_token_sora2_cooldown(token_id, cooldown_until) |
| print(f"⏱️ Token {token_id} 剩余次数为0,设置冷却时间至: {cooldown_until}") |
| except Exception as e: |
| print(f"Failed to update Sora2 remaining count: {e}") |
| |
| async def refresh_sora2_remaining_if_cooldown_expired(self, token_id: int): |
| """Refresh Sora2 remaining count if cooldown has expired""" |
| try: |
| token_data = await self.db.get_token(token_id) |
| if not token_data or not token_data.sora2_supported: |
| return |
|
|
| |
| if token_data.sora2_cooldown_until and token_data.sora2_cooldown_until <= datetime.now(): |
| print(f"🔄 Token {token_id} Sora2冷却已过期,正在刷新剩余次数...") |
|
|
| try: |
| remaining_info = await self.get_sora2_remaining_count(token_data.token) |
| if remaining_info.get("success"): |
| remaining_count = remaining_info.get("remaining_count", 0) |
| await self.db.update_token_sora2_remaining(token_id, remaining_count) |
| |
| await self.db.update_token_sora2_cooldown(token_id, None) |
| print(f"✅ Token {token_id} Sora2剩余次数已刷新: {remaining_count}") |
| except Exception as e: |
| print(f"Failed to refresh Sora2 remaining count: {e}") |
| except Exception as e: |
| print(f"Error in refresh_sora2_remaining_if_cooldown_expired: {e}") |
|
|
| async def auto_refresh_expiring_token(self, token_id: int) -> bool: |
| """ |
| Auto refresh token when expiry time is within 24 hours using ST or RT |
| |
| Returns: |
| True if refresh successful, False otherwise |
| """ |
| try: |
| token_data = await self.db.get_token(token_id) |
| if not token_data: |
| return False |
|
|
| |
| if not token_data.expiry_time: |
| return False |
|
|
| time_until_expiry = token_data.expiry_time - datetime.now() |
| hours_until_expiry = time_until_expiry.total_seconds() / 3600 |
|
|
| |
| if hours_until_expiry > 24: |
| return False |
|
|
| if hours_until_expiry < 0: |
| |
| print(f"🔄 Token {token_id} 已过期,尝试自动刷新...") |
| else: |
| print(f"🔄 Token {token_id} 将在 {hours_until_expiry:.1f} 小时后过期,尝试自动刷新...") |
|
|
| |
| new_at = None |
| new_st = None |
| new_rt = None |
|
|
| if token_data.st: |
| |
| try: |
| print(f"📝 使用 ST 刷新 Token {token_id}...") |
| result = await self.st_to_at(token_data.st) |
| new_at = result.get("access_token") |
| |
| new_st = token_data.st |
| print(f"✅ 使用 ST 刷新成功") |
| except Exception as e: |
| print(f"❌ 使用 ST 刷新失败: {e}") |
| new_at = None |
|
|
| if not new_at and token_data.rt: |
| |
| try: |
| print(f"📝 使用 RT 刷新 Token {token_id}...") |
| result = await self.rt_to_at(token_data.rt) |
| new_at = result.get("access_token") |
| new_rt = result.get("refresh_token", token_data.rt) |
| print(f"✅ 使用 RT 刷新成功") |
| except Exception as e: |
| print(f"❌ 使用 RT 刷新失败: {e}") |
| new_at = None |
|
|
| if new_at: |
| |
| await self.update_token(token_id, token=new_at, st=new_st, rt=new_rt) |
| print(f"✅ Token {token_id} 已自动刷新") |
| return True |
| else: |
| |
| print(f"⚠️ Token {token_id} 无法刷新(无 ST 或 RT),已禁用") |
| await self.disable_token(token_id) |
| return False |
|
|
| except Exception as e: |
| print(f"❌ 自动刷新 Token {token_id} 失败: {e}") |
| return False |
|
|