Spaces:
Running
Running
File size: 1,604 Bytes
cef045d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | """API key + JWT verification."""
import secrets
import time
from typing import Optional
import jwt
from fastapi import HTTPException, Request
from ..services.api_key_manager import ApiKeyManager
from ..core import config
_key_manager: Optional[ApiKeyManager] = None
_JWT_SECRET = secrets.token_hex(32)
_JWT_EXPIRE = 86400 * 7 # 7 days
def init_auth(key_manager: ApiKeyManager):
global _key_manager
_key_manager = key_manager
def create_login_token(username: str) -> str:
payload = {"sub": username, "exp": int(time.time()) + _JWT_EXPIRE}
return jwt.encode(payload, _JWT_SECRET, algorithm="HS256")
def verify_login(username: str, password: str) -> bool:
return username == config.ADMIN_USERNAME and password == config.ADMIN_PASSWORD
def _extract_token(request: Request) -> Optional[str]:
auth_header = request.headers.get("authorization", "")
if auth_header.lower().startswith("bearer "):
return auth_header[7:].strip()
return request.headers.get("x-api-key")
async def verify_api_key(request: Request) -> str:
token = _extract_token(request)
if not token:
raise HTTPException(status_code=401, detail="Missing token")
# Try JWT first
try:
payload = jwt.decode(token, _JWT_SECRET, algorithms=["HS256"])
if payload.get("exp", 0) > time.time():
return token
except (jwt.InvalidTokenError, jwt.ExpiredSignatureError):
pass
# Fallback to API key
if _key_manager and _key_manager.validate(token):
return token
raise HTTPException(status_code=401, detail="Invalid token")
|