| import flask_restful |
| from flask_login import current_user |
| from flask_restful import Resource, fields, marshal_with |
| from werkzeug.exceptions import Forbidden |
|
|
| from extensions.ext_database import db |
| from libs.helper import TimestampField |
| from libs.login import login_required |
| from models.dataset import Dataset |
| from models.model import ApiToken, App |
|
|
| from . import api |
| from .wraps import account_initialization_required, setup_required |
|
|
| api_key_fields = { |
| "id": fields.String, |
| "type": fields.String, |
| "token": fields.String, |
| "last_used_at": TimestampField, |
| "created_at": TimestampField, |
| } |
|
|
| api_key_list = {"data": fields.List(fields.Nested(api_key_fields), attribute="items")} |
|
|
|
|
| def _get_resource(resource_id, tenant_id, resource_model): |
| resource = resource_model.query.filter_by(id=resource_id, tenant_id=tenant_id).first() |
|
|
| if resource is None: |
| flask_restful.abort(404, message=f"{resource_model.__name__} not found.") |
|
|
| return resource |
|
|
|
|
| class BaseApiKeyListResource(Resource): |
| method_decorators = [account_initialization_required, login_required, setup_required] |
|
|
| resource_type = None |
| resource_model = None |
| resource_id_field = None |
| token_prefix = None |
| max_keys = 10 |
|
|
| @marshal_with(api_key_list) |
| def get(self, resource_id): |
| resource_id = str(resource_id) |
| _get_resource(resource_id, current_user.current_tenant_id, self.resource_model) |
| keys = ( |
| db.session.query(ApiToken) |
| .filter(ApiToken.type == self.resource_type, getattr(ApiToken, self.resource_id_field) == resource_id) |
| .all() |
| ) |
| return {"items": keys} |
|
|
| @marshal_with(api_key_fields) |
| def post(self, resource_id): |
| resource_id = str(resource_id) |
| _get_resource(resource_id, current_user.current_tenant_id, self.resource_model) |
| if not current_user.is_editor: |
| raise Forbidden() |
|
|
| current_key_count = ( |
| db.session.query(ApiToken) |
| .filter(ApiToken.type == self.resource_type, getattr(ApiToken, self.resource_id_field) == resource_id) |
| .count() |
| ) |
|
|
| if current_key_count >= self.max_keys: |
| flask_restful.abort( |
| 400, |
| message=f"Cannot create more than {self.max_keys} API keys for this resource type.", |
| code="max_keys_exceeded", |
| ) |
|
|
| key = ApiToken.generate_api_key(self.token_prefix, 24) |
| api_token = ApiToken() |
| setattr(api_token, self.resource_id_field, resource_id) |
| api_token.tenant_id = current_user.current_tenant_id |
| api_token.token = key |
| api_token.type = self.resource_type |
| db.session.add(api_token) |
| db.session.commit() |
| return api_token, 201 |
|
|
|
|
| class BaseApiKeyResource(Resource): |
| method_decorators = [account_initialization_required, login_required, setup_required] |
|
|
| resource_type = None |
| resource_model = None |
| resource_id_field = None |
|
|
| def delete(self, resource_id, api_key_id): |
| resource_id = str(resource_id) |
| api_key_id = str(api_key_id) |
| _get_resource(resource_id, current_user.current_tenant_id, self.resource_model) |
|
|
| |
| if not current_user.is_admin_or_owner: |
| raise Forbidden() |
|
|
| key = ( |
| db.session.query(ApiToken) |
| .filter( |
| getattr(ApiToken, self.resource_id_field) == resource_id, |
| ApiToken.type == self.resource_type, |
| ApiToken.id == api_key_id, |
| ) |
| .first() |
| ) |
|
|
| if key is None: |
| flask_restful.abort(404, message="API key not found") |
|
|
| db.session.query(ApiToken).filter(ApiToken.id == api_key_id).delete() |
| db.session.commit() |
|
|
| return {"result": "success"}, 204 |
|
|
|
|
| class AppApiKeyListResource(BaseApiKeyListResource): |
| def after_request(self, resp): |
| resp.headers["Access-Control-Allow-Origin"] = "*" |
| resp.headers["Access-Control-Allow-Credentials"] = "true" |
| return resp |
|
|
| resource_type = "app" |
| resource_model = App |
| resource_id_field = "app_id" |
| token_prefix = "app-" |
|
|
|
|
| class AppApiKeyResource(BaseApiKeyResource): |
| def after_request(self, resp): |
| resp.headers["Access-Control-Allow-Origin"] = "*" |
| resp.headers["Access-Control-Allow-Credentials"] = "true" |
| return resp |
|
|
| resource_type = "app" |
| resource_model = App |
| resource_id_field = "app_id" |
|
|
|
|
| class DatasetApiKeyListResource(BaseApiKeyListResource): |
| def after_request(self, resp): |
| resp.headers["Access-Control-Allow-Origin"] = "*" |
| resp.headers["Access-Control-Allow-Credentials"] = "true" |
| return resp |
|
|
| resource_type = "dataset" |
| resource_model = Dataset |
| resource_id_field = "dataset_id" |
| token_prefix = "ds-" |
|
|
|
|
| class DatasetApiKeyResource(BaseApiKeyResource): |
| def after_request(self, resp): |
| resp.headers["Access-Control-Allow-Origin"] = "*" |
| resp.headers["Access-Control-Allow-Credentials"] = "true" |
| return resp |
|
|
| resource_type = "dataset" |
| resource_model = Dataset |
| resource_id_field = "dataset_id" |
|
|
|
|
| api.add_resource(AppApiKeyListResource, "/apps/<uuid:resource_id>/api-keys") |
| api.add_resource(AppApiKeyResource, "/apps/<uuid:resource_id>/api-keys/<uuid:api_key_id>") |
| api.add_resource(DatasetApiKeyListResource, "/datasets/<uuid:resource_id>/api-keys") |
| api.add_resource(DatasetApiKeyResource, "/datasets/<uuid:resource_id>/api-keys/<uuid:api_key_id>") |
|
|