| import asyncio |
| from utils.logger import Logger |
|
|
| logger = Logger.get_logger(__name__) |
|
|
|
|
| class RequestCancellation: |
| """ |
| RequestCancellation middleware handles request canceling |
| * In case of API routes where very frequent/expensive requests are made. |
| """ |
|
|
| def __init__(self, app): |
| self.app = app |
|
|
| async def __call__(self, scope, receive, send): |
| if scope["type"] != "http": |
| await self.app(scope, receive, send) |
| return |
|
|
| queue = asyncio.Queue() |
|
|
| async def message_poller(sentinel, handler_task): |
| nonlocal queue |
| while True: |
| message = await receive() |
| if message["type"] == "http.disconnect": |
| handler_task.cancel() |
| return sentinel |
| await queue.put(message) |
|
|
| sentinel = object() |
| handler_task = asyncio.create_task(self.app(scope, queue.get, send)) |
| asyncio.create_task(message_poller(sentinel, handler_task)) |
|
|
| try: |
| return await handler_task |
| except asyncio.CancelledError: |
| logger.info('Task Cancellatation Requested.') |
|
|