| import binascii |
| from base64 import b64decode |
| from typing import Optional |
|
|
| from fastapi.exceptions import HTTPException |
| from fastapi.openapi.models import HTTPBase as HTTPBaseModel |
| from fastapi.openapi.models import HTTPBearer as HTTPBearerModel |
| from fastapi.security.base import SecurityBase |
| from fastapi.security.utils import get_authorization_scheme_param |
| from pydantic import BaseModel |
| from starlette.requests import Request |
| from starlette.status import HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN |
| from typing_extensions import Annotated, Doc |
|
|
|
|
| class HTTPBasicCredentials(BaseModel): |
| """ |
| The HTTP Basic credentials given as the result of using `HTTPBasic` in a |
| dependency. |
| |
| Read more about it in the |
| [FastAPI docs for HTTP Basic Auth](https://fastapi.tiangolo.com/advanced/security/http-basic-auth/). |
| """ |
|
|
| username: Annotated[str, Doc("The HTTP Basic username.")] |
| password: Annotated[str, Doc("The HTTP Basic password.")] |
|
|
|
|
| class HTTPAuthorizationCredentials(BaseModel): |
| """ |
| The HTTP authorization credentials in the result of using `HTTPBearer` or |
| `HTTPDigest` in a dependency. |
| |
| The HTTP authorization header value is split by the first space. |
| |
| The first part is the `scheme`, the second part is the `credentials`. |
| |
| For example, in an HTTP Bearer token scheme, the client will send a header |
| like: |
| |
| ``` |
| Authorization: Bearer deadbeef12346 |
| ``` |
| |
| In this case: |
| |
| * `scheme` will have the value `"Bearer"` |
| * `credentials` will have the value `"deadbeef12346"` |
| """ |
|
|
| scheme: Annotated[ |
| str, |
| Doc( |
| """ |
| The HTTP authorization scheme extracted from the header value. |
| """ |
| ), |
| ] |
| credentials: Annotated[ |
| str, |
| Doc( |
| """ |
| The HTTP authorization credentials extracted from the header value. |
| """ |
| ), |
| ] |
|
|
|
|
| class HTTPBase(SecurityBase): |
| def __init__( |
| self, |
| *, |
| scheme: str, |
| scheme_name: Optional[str] = None, |
| description: Optional[str] = None, |
| auto_error: bool = True, |
| ): |
| self.model = HTTPBaseModel(scheme=scheme, description=description) |
| self.scheme_name = scheme_name or self.__class__.__name__ |
| self.auto_error = auto_error |
|
|
| async def __call__( |
| self, request: Request |
| ) -> Optional[HTTPAuthorizationCredentials]: |
| authorization = request.headers.get("Authorization") |
| scheme, credentials = get_authorization_scheme_param(authorization) |
| if not (authorization and scheme and credentials): |
| if self.auto_error: |
| raise HTTPException( |
| status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" |
| ) |
| else: |
| return None |
| return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials) |
|
|
|
|
| class HTTPBasic(HTTPBase): |
| """ |
| HTTP Basic authentication. |
| |
| ## Usage |
| |
| Create an instance object and use that object as the dependency in `Depends()`. |
| |
| The dependency result will be an `HTTPBasicCredentials` object containing the |
| `username` and the `password`. |
| |
| Read more about it in the |
| [FastAPI docs for HTTP Basic Auth](https://fastapi.tiangolo.com/advanced/security/http-basic-auth/). |
| |
| ## Example |
| |
| ```python |
| from typing import Annotated |
| |
| from fastapi import Depends, FastAPI |
| from fastapi.security import HTTPBasic, HTTPBasicCredentials |
| |
| app = FastAPI() |
| |
| security = HTTPBasic() |
| |
| |
| @app.get("/users/me") |
| def read_current_user(credentials: Annotated[HTTPBasicCredentials, Depends(security)]): |
| return {"username": credentials.username, "password": credentials.password} |
| ``` |
| """ |
|
|
| def __init__( |
| self, |
| *, |
| scheme_name: Annotated[ |
| Optional[str], |
| Doc( |
| """ |
| Security scheme name. |
| |
| It will be included in the generated OpenAPI (e.g. visible at `/docs`). |
| """ |
| ), |
| ] = None, |
| realm: Annotated[ |
| Optional[str], |
| Doc( |
| """ |
| HTTP Basic authentication realm. |
| """ |
| ), |
| ] = None, |
| description: Annotated[ |
| Optional[str], |
| Doc( |
| """ |
| Security scheme description. |
| |
| It will be included in the generated OpenAPI (e.g. visible at `/docs`). |
| """ |
| ), |
| ] = None, |
| auto_error: Annotated[ |
| bool, |
| Doc( |
| """ |
| By default, if the HTTP Basic authentication is not provided (a |
| header), `HTTPBasic` will automatically cancel the request and send the |
| client an error. |
| |
| If `auto_error` is set to `False`, when the HTTP Basic authentication |
| is not available, instead of erroring out, the dependency result will |
| be `None`. |
| |
| This is useful when you want to have optional authentication. |
| |
| It is also useful when you want to have authentication that can be |
| provided in one of multiple optional ways (for example, in HTTP Basic |
| authentication or in an HTTP Bearer token). |
| """ |
| ), |
| ] = True, |
| ): |
| self.model = HTTPBaseModel(scheme="basic", description=description) |
| self.scheme_name = scheme_name or self.__class__.__name__ |
| self.realm = realm |
| self.auto_error = auto_error |
|
|
| async def __call__( |
| self, request: Request |
| ) -> Optional[HTTPBasicCredentials]: |
| authorization = request.headers.get("Authorization") |
| scheme, param = get_authorization_scheme_param(authorization) |
| if self.realm: |
| unauthorized_headers = {"WWW-Authenticate": f'Basic realm="{self.realm}"'} |
| else: |
| unauthorized_headers = {"WWW-Authenticate": "Basic"} |
| if not authorization or scheme.lower() != "basic": |
| if self.auto_error: |
| raise HTTPException( |
| status_code=HTTP_401_UNAUTHORIZED, |
| detail="Not authenticated", |
| headers=unauthorized_headers, |
| ) |
| else: |
| return None |
| invalid_user_credentials_exc = HTTPException( |
| status_code=HTTP_401_UNAUTHORIZED, |
| detail="Invalid authentication credentials", |
| headers=unauthorized_headers, |
| ) |
| try: |
| data = b64decode(param).decode("ascii") |
| except (ValueError, UnicodeDecodeError, binascii.Error): |
| raise invalid_user_credentials_exc |
| username, separator, password = data.partition(":") |
| if not separator: |
| raise invalid_user_credentials_exc |
| return HTTPBasicCredentials(username=username, password=password) |
|
|
|
|
| class HTTPBearer(HTTPBase): |
| """ |
| HTTP Bearer token authentication. |
| |
| ## Usage |
| |
| Create an instance object and use that object as the dependency in `Depends()`. |
| |
| The dependency result will be an `HTTPAuthorizationCredentials` object containing |
| the `scheme` and the `credentials`. |
| |
| ## Example |
| |
| ```python |
| from typing import Annotated |
| |
| from fastapi import Depends, FastAPI |
| from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer |
| |
| app = FastAPI() |
| |
| security = HTTPBearer() |
| |
| |
| @app.get("/users/me") |
| def read_current_user( |
| credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)] |
| ): |
| return {"scheme": credentials.scheme, "credentials": credentials.credentials} |
| ``` |
| """ |
|
|
| def __init__( |
| self, |
| *, |
| bearerFormat: Annotated[Optional[str], Doc("Bearer token format.")] = None, |
| scheme_name: Annotated[ |
| Optional[str], |
| Doc( |
| """ |
| Security scheme name. |
| |
| It will be included in the generated OpenAPI (e.g. visible at `/docs`). |
| """ |
| ), |
| ] = None, |
| description: Annotated[ |
| Optional[str], |
| Doc( |
| """ |
| Security scheme description. |
| |
| It will be included in the generated OpenAPI (e.g. visible at `/docs`). |
| """ |
| ), |
| ] = None, |
| auto_error: Annotated[ |
| bool, |
| Doc( |
| """ |
| By default, if the HTTP Bearer token not provided (in an |
| `Authorization` header), `HTTPBearer` will automatically cancel the |
| request and send the client an error. |
| |
| If `auto_error` is set to `False`, when the HTTP Bearer token |
| is not available, instead of erroring out, the dependency result will |
| be `None`. |
| |
| This is useful when you want to have optional authentication. |
| |
| It is also useful when you want to have authentication that can be |
| provided in one of multiple optional ways (for example, in an HTTP |
| Bearer token or in a cookie). |
| """ |
| ), |
| ] = True, |
| ): |
| self.model = HTTPBearerModel(bearerFormat=bearerFormat, description=description) |
| self.scheme_name = scheme_name or self.__class__.__name__ |
| self.auto_error = auto_error |
|
|
| async def __call__( |
| self, request: Request |
| ) -> Optional[HTTPAuthorizationCredentials]: |
| authorization = request.headers.get("Authorization") |
| scheme, credentials = get_authorization_scheme_param(authorization) |
| if not (authorization and scheme and credentials): |
| if self.auto_error: |
| raise HTTPException( |
| status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" |
| ) |
| else: |
| return None |
| if scheme.lower() != "bearer": |
| if self.auto_error: |
| raise HTTPException( |
| status_code=HTTP_403_FORBIDDEN, |
| detail="Invalid authentication credentials", |
| ) |
| else: |
| return None |
| return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials) |
|
|
|
|
| class HTTPDigest(HTTPBase): |
| """ |
| HTTP Digest authentication. |
| |
| ## Usage |
| |
| Create an instance object and use that object as the dependency in `Depends()`. |
| |
| The dependency result will be an `HTTPAuthorizationCredentials` object containing |
| the `scheme` and the `credentials`. |
| |
| ## Example |
| |
| ```python |
| from typing import Annotated |
| |
| from fastapi import Depends, FastAPI |
| from fastapi.security import HTTPAuthorizationCredentials, HTTPDigest |
| |
| app = FastAPI() |
| |
| security = HTTPDigest() |
| |
| |
| @app.get("/users/me") |
| def read_current_user( |
| credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)] |
| ): |
| return {"scheme": credentials.scheme, "credentials": credentials.credentials} |
| ``` |
| """ |
|
|
| def __init__( |
| self, |
| *, |
| scheme_name: Annotated[ |
| Optional[str], |
| Doc( |
| """ |
| Security scheme name. |
| |
| It will be included in the generated OpenAPI (e.g. visible at `/docs`). |
| """ |
| ), |
| ] = None, |
| description: Annotated[ |
| Optional[str], |
| Doc( |
| """ |
| Security scheme description. |
| |
| It will be included in the generated OpenAPI (e.g. visible at `/docs`). |
| """ |
| ), |
| ] = None, |
| auto_error: Annotated[ |
| bool, |
| Doc( |
| """ |
| By default, if the HTTP Digest not provided, `HTTPDigest` will |
| automatically cancel the request and send the client an error. |
| |
| If `auto_error` is set to `False`, when the HTTP Digest is not |
| available, instead of erroring out, the dependency result will |
| be `None`. |
| |
| This is useful when you want to have optional authentication. |
| |
| It is also useful when you want to have authentication that can be |
| provided in one of multiple optional ways (for example, in HTTP |
| Digest or in a cookie). |
| """ |
| ), |
| ] = True, |
| ): |
| self.model = HTTPBaseModel(scheme="digest", description=description) |
| self.scheme_name = scheme_name or self.__class__.__name__ |
| self.auto_error = auto_error |
|
|
| async def __call__( |
| self, request: Request |
| ) -> Optional[HTTPAuthorizationCredentials]: |
| authorization = request.headers.get("Authorization") |
| scheme, credentials = get_authorization_scheme_param(authorization) |
| if not (authorization and scheme and credentials): |
| if self.auto_error: |
| raise HTTPException( |
| status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" |
| ) |
| else: |
| return None |
| if scheme.lower() != "digest": |
| raise HTTPException( |
| status_code=HTTP_403_FORBIDDEN, |
| detail="Invalid authentication credentials", |
| ) |
| return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials) |
|
|