""" SPARKNET Authentication Module JWT-based authentication with OAuth2 support. """ from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import JWTError, jwt from passlib.context import CryptContext from pydantic import BaseModel from datetime import datetime, timedelta from typing import Optional, List from pathlib import Path import os import json import uuid # Configuration (use environment variables in production) SECRET_KEY = os.getenv("SPARKNET_SECRET_KEY", "sparknet-super-secret-key-change-in-production") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Password hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # OAuth2 scheme oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/auth/token", auto_error=False) # Simple file-based user store (replace with database in production) USERS_FILE = Path(__file__).parent.parent / "data" / "users.json" USERS_FILE.parent.mkdir(parents=True, exist_ok=True) class User(BaseModel): """User model.""" user_id: str username: str email: str hashed_password: str is_active: bool = True is_admin: bool = False scopes: List[str] = [] created_at: datetime = None class Config: json_encoders = { datetime: lambda v: v.isoformat() if v else None } class UserInDB(User): """User model with password hash.""" pass class TokenData(BaseModel): """JWT token payload.""" username: Optional[str] = None user_id: Optional[str] = None scopes: List[str] = [] def _load_users() -> dict: """Load users from file.""" if USERS_FILE.exists(): try: with open(USERS_FILE) as f: data = json.load(f) return {u["username"]: User(**u) for u in data} except Exception: pass return {} def _save_users(users: dict): """Save users to file.""" with open(USERS_FILE, "w") as f: json.dump([u.dict() for u in users.values()], f, default=str, indent=2) def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash.""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Hash a password.""" return pwd_context.hash(password) def get_user(username: str) -> Optional[UserInDB]: """Get a user by username.""" users = _load_users() if username in users: return UserInDB(**users[username].dict()) return None def authenticate_user(username: str, password: str) -> Optional[UserInDB]: """Authenticate a user.""" user = get_user(username) if not user: return None if not verify_password(password, user.hashed_password): return None return user def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Create a JWT access token.""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_current_user(token: str = Depends(oauth2_scheme)) -> Optional[UserInDB]: """Get the current user from JWT token.""" if not token: return None try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: return None token_data = TokenData( username=username, user_id=payload.get("user_id"), scopes=payload.get("scopes", []) ) except JWTError: return None user = get_user(token_data.username) return user async def get_current_active_user( current_user: Optional[UserInDB] = Depends(get_current_user) ) -> Optional[UserInDB]: """Get current active user (authentication optional).""" if current_user and not current_user.is_active: return None return current_user async def require_auth( current_user: Optional[UserInDB] = Depends(get_current_user) ) -> UserInDB: """Require authentication (raises exception if not authenticated).""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) if not current_user: raise credentials_exception if not current_user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return current_user async def require_admin( current_user: UserInDB = Depends(require_auth) ) -> UserInDB: """Require admin privileges.""" if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required" ) return current_user def create_user(username: str, email: str, password: str, is_admin: bool = False) -> User: """Create a new user.""" users = _load_users() if username in users: raise ValueError(f"User {username} already exists") user = User( user_id=str(uuid.uuid4()), username=username, email=email, hashed_password=get_password_hash(password), is_active=True, is_admin=is_admin, scopes=["read", "write"] if not is_admin else ["read", "write", "admin"], created_at=datetime.now() ) users[username] = user _save_users(users) return user def delete_user(username: str) -> bool: """Delete a user.""" users = _load_users() if username in users: del users[username] _save_users(users) return True return False # Initialize default admin user if none exists def init_default_admin(): """Create default admin user if no users exist.""" users = _load_users() if not users: try: create_user( username="admin", email="admin@sparknet.local", password="admin123", # Change in production! is_admin=True ) print("Default admin user created: admin / admin123") except Exception as e: print(f"Could not create default admin: {e}") # Auth routes from fastapi import APIRouter auth_router = APIRouter() @auth_router.post("/token") async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): """OAuth2 compatible token login.""" user = authenticate_user(form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={ "sub": user.username, "user_id": user.user_id, "scopes": user.scopes }, expires_delta=access_token_expires ) return { "access_token": access_token, "token_type": "bearer", "expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60 } @auth_router.post("/register") async def register_user( username: str, email: str, password: str, ): """Register a new user.""" try: user = create_user(username, email, password) return { "user_id": user.user_id, "username": user.username, "email": user.email, "message": "User created successfully" } except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @auth_router.get("/me") async def read_users_me(current_user: UserInDB = Depends(require_auth)): """Get current user information.""" return { "user_id": current_user.user_id, "username": current_user.username, "email": current_user.email, "is_active": current_user.is_active, "is_admin": current_user.is_admin, "scopes": current_user.scopes } @auth_router.get("/users") async def list_users(current_user: UserInDB = Depends(require_admin)): """List all users (admin only).""" users = _load_users() return [ { "user_id": u.user_id, "username": u.username, "email": u.email, "is_active": u.is_active, "is_admin": u.is_admin } for u in users.values() ] @auth_router.delete("/users/{username}") async def delete_user_endpoint( username: str, current_user: UserInDB = Depends(require_admin) ): """Delete a user (admin only).""" if username == current_user.username: raise HTTPException(status_code=400, detail="Cannot delete yourself") if delete_user(username): return {"status": "deleted", "username": username} raise HTTPException(status_code=404, detail=f"User not found: {username}")