| from datetime import datetime, timezone |
| from typing import List, Optional |
|
|
| from config import get_api_password, get_panel_password |
| from fastapi import Depends, HTTPException, Header, Query, Request, status |
| from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer |
| from log import log |
|
|
| |
| security = HTTPBearer() |
|
|
| |
|
|
| GEMINICLI_USER_AGENT = "GeminiCLI/0.1.5 (Windows; AMD64)" |
|
|
| ANTIGRAVITY_USER_AGENT = "antigravity/1.18.3 windows/amd64" |
|
|
| |
| CLIENT_ID = "681255809395-oo8ft2oprdrnp9e3aqf6av3hmdib135j.apps.googleusercontent.com" |
| CLIENT_SECRET = "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" |
| SCOPES = [ |
| "https://www.googleapis.com/auth/cloud-platform", |
| "https://www.googleapis.com/auth/userinfo.email", |
| "https://www.googleapis.com/auth/userinfo.profile", |
| ] |
|
|
| |
| ANTIGRAVITY_CLIENT_ID = "1071006060591-tmhssin2h21lcre235vtolojh4g403ep.apps.googleusercontent.com" |
| ANTIGRAVITY_CLIENT_SECRET = "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" |
| ANTIGRAVITY_SCOPES = [ |
| 'https://www.googleapis.com/auth/cloud-platform', |
| 'https://www.googleapis.com/auth/userinfo.email', |
| 'https://www.googleapis.com/auth/userinfo.profile', |
| 'https://www.googleapis.com/auth/cclog', |
| 'https://www.googleapis.com/auth/experimentsandconfigs' |
| ] |
|
|
| |
| TOKEN_URL = "https://oauth2.googleapis.com/token" |
|
|
| |
| CALLBACK_HOST = "localhost" |
|
|
| |
|
|
| |
| DEFAULT_SAFETY_SETTINGS = [ |
| {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, |
| {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, |
| {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, |
| {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, |
| {"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"}, |
| {"category": "HARM_CATEGORY_IMAGE_HATE", "threshold": "BLOCK_NONE"}, |
| {"category": "HARM_CATEGORY_IMAGE_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, |
| {"category": "HARM_CATEGORY_IMAGE_HARASSMENT", "threshold": "BLOCK_NONE"}, |
| {"category": "HARM_CATEGORY_IMAGE_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, |
| {"category": "HARM_CATEGORY_JAILBREAK", "threshold": "BLOCK_NONE"}, |
| ] |
|
|
| |
| BASE_MODELS = [ |
| "gemini-2.5-pro", |
| "gemini-2.5-flash", |
| "gemini-3-pro-preview", |
| "gemini-3-flash-preview" |
| ] |
|
|
|
|
| |
|
|
| def is_fake_streaming_model(model_name: str) -> bool: |
| """Check if model name indicates fake streaming should be used.""" |
| return model_name.startswith("假流式/") |
|
|
|
|
| def is_anti_truncation_model(model_name: str) -> bool: |
| """Check if model name indicates anti-truncation should be used.""" |
| return model_name.startswith("流式抗截断/") |
|
|
|
|
| def get_base_model_from_feature_model(model_name: str) -> str: |
| """Get base model name from feature model name.""" |
| |
| for prefix in ["假流式/", "流式抗截断/"]: |
| if model_name.startswith(prefix): |
| return model_name[len(prefix) :] |
| return model_name |
|
|
|
|
| def get_available_models(router_type: str = "openai") -> List[str]: |
| """ |
| Get available models with feature prefixes. |
| |
| Args: |
| router_type: "openai" or "gemini" |
| |
| Returns: |
| List of model names with feature prefixes |
| """ |
| models = [] |
|
|
| for base_model in BASE_MODELS: |
| |
| models.append(base_model) |
|
|
| |
| models.append(f"假流式/{base_model}") |
|
|
| |
| models.append(f"流式抗截断/{base_model}") |
|
|
| |
| |
| thinking_suffixes = ["-maxthinking", "-nothinking"] |
| search_suffix = "-search" |
|
|
| |
| for thinking_suffix in thinking_suffixes: |
| models.append(f"{base_model}{thinking_suffix}") |
| models.append(f"假流式/{base_model}{thinking_suffix}") |
| models.append(f"流式抗截断/{base_model}{thinking_suffix}") |
|
|
| |
| models.append(f"{base_model}{search_suffix}") |
| models.append(f"假流式/{base_model}{search_suffix}") |
| models.append(f"流式抗截断/{base_model}{search_suffix}") |
|
|
| |
| for thinking_suffix in thinking_suffixes: |
| combined_suffix = f"{thinking_suffix}{search_suffix}" |
| models.append(f"{base_model}{combined_suffix}") |
| models.append(f"假流式/{base_model}{combined_suffix}") |
| models.append(f"流式抗截断/{base_model}{combined_suffix}") |
|
|
| return models |
|
|
|
|
| |
|
|
| async def authenticate_flexible( |
| request: Request, |
| authorization: Optional[str] = Header(None), |
| x_api_key: Optional[str] = Header(None, alias="x-api-key"), |
| access_token: Optional[str] = Header(None, alias="access_token"), |
| x_goog_api_key: Optional[str] = Header(None, alias="x-goog-api-key"), |
| key: Optional[str] = Query(None) |
| ) -> str: |
| """ |
| 统一的灵活认证函数,支持多种认证方式 |
| |
| 此函数可以直接用作 FastAPI 的 Depends 依赖 |
| |
| 支持的认证方式: |
| - URL 参数: key |
| - HTTP 头部: Authorization (Bearer token) |
| - HTTP 头部: x-api-key |
| - HTTP 头部: access_token |
| - HTTP 头部: x-goog-api-key |
| |
| Args: |
| request: FastAPI Request 对象 |
| authorization: Authorization 头部值(自动注入) |
| x_api_key: x-api-key 头部值(自动注入) |
| access_token: access_token 头部值(自动注入) |
| x_goog_api_key: x-goog-api-key 头部值(自动注入) |
| key: URL 参数 key(自动注入) |
| |
| Returns: |
| 验证通过的token |
| |
| Raises: |
| HTTPException: 认证失败时抛出异常 |
| |
| 使用示例: |
| @router.post("/endpoint") |
| async def endpoint(token: str = Depends(authenticate_flexible)): |
| # token 已验证通过 |
| pass |
| """ |
| password = await get_api_password() |
| token = None |
| auth_method = None |
| |
| |
| if key: |
| token = key |
| auth_method = "URL parameter 'key'" |
| |
| |
| elif x_goog_api_key: |
| token = x_goog_api_key |
| auth_method = "x-goog-api-key header" |
| |
| |
| elif x_api_key: |
| token = x_api_key |
| auth_method = "x-api-key header" |
| |
| |
| elif access_token: |
| token = access_token |
| auth_method = "access_token header" |
| |
| |
| elif authorization: |
| if not authorization.startswith("Bearer "): |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Invalid authentication scheme. Use 'Bearer <token>'", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| token = authorization[7:] |
| auth_method = "Authorization Bearer header" |
| |
| |
| if not token: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Missing authentication credentials. Use 'key' URL parameter, 'x-goog-api-key', 'x-api-key', 'access_token' header, or 'Authorization: Bearer <token>'", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| |
| |
| if token != password: |
| log.error(f"Authentication failed using {auth_method}") |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, |
| detail="密码错误" |
| ) |
| |
| log.debug(f"Authentication successful using {auth_method}") |
| return token |
|
|
|
|
| |
| authenticate_bearer = authenticate_flexible |
| authenticate_gemini_flexible = authenticate_flexible |
|
|
|
|
| |
|
|
| async def verify_panel_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str: |
| """ |
| 简化的控制面板密码验证函数 |
| |
| 直接验证Bearer token是否等于控制面板密码 |
| |
| Args: |
| credentials: HTTPAuthorizationCredentials 自动注入 |
| |
| Returns: |
| 验证通过的token |
| |
| Raises: |
| HTTPException: 密码错误时抛出401异常 |
| """ |
|
|
| password = await get_panel_password() |
| if credentials.credentials != password: |
| raise HTTPException(status_code=401, detail="密码错误") |
| return credentials.credentials |
|
|