| import asyncio |
| import contextvars |
| import inspect |
| import warnings |
|
|
| from .case import TestCase |
|
|
|
|
| class IsolatedAsyncioTestCase(TestCase): |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| def __init__(self, methodName='runTest'): |
| super().__init__(methodName) |
| self._asyncioRunner = None |
| self._asyncioTestContext = contextvars.copy_context() |
|
|
| async def asyncSetUp(self): |
| pass |
|
|
| async def asyncTearDown(self): |
| pass |
|
|
| def addAsyncCleanup(self, func, /, *args, **kwargs): |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| self.addCleanup(*(func, *args), **kwargs) |
|
|
| async def enterAsyncContext(self, cm): |
| """Enters the supplied asynchronous context manager. |
| |
| If successful, also adds its __aexit__ method as a cleanup |
| function and returns the result of the __aenter__ method. |
| """ |
| |
| |
| cls = type(cm) |
| try: |
| enter = cls.__aenter__ |
| exit = cls.__aexit__ |
| except AttributeError: |
| raise TypeError(f"'{cls.__module__}.{cls.__qualname__}' object does " |
| f"not support the asynchronous context manager protocol" |
| ) from None |
| result = await enter(cm) |
| self.addAsyncCleanup(exit, cm, None, None, None) |
| return result |
|
|
| def _callSetUp(self): |
| |
| |
| |
| self._asyncioRunner.get_loop() |
| self._asyncioTestContext.run(self.setUp) |
| self._callAsync(self.asyncSetUp) |
|
|
| def _callTestMethod(self, method): |
| if self._callMaybeAsync(method) is not None: |
| warnings.warn(f'It is deprecated to return a value that is not None from a ' |
| f'test case ({method})', DeprecationWarning, stacklevel=4) |
|
|
| def _callTearDown(self): |
| self._callAsync(self.asyncTearDown) |
| self._asyncioTestContext.run(self.tearDown) |
|
|
| def _callCleanup(self, function, *args, **kwargs): |
| self._callMaybeAsync(function, *args, **kwargs) |
|
|
| def _callAsync(self, func, /, *args, **kwargs): |
| assert self._asyncioRunner is not None, 'asyncio runner is not initialized' |
| assert inspect.iscoroutinefunction(func), f'{func!r} is not an async function' |
| return self._asyncioRunner.run( |
| func(*args, **kwargs), |
| context=self._asyncioTestContext |
| ) |
|
|
| def _callMaybeAsync(self, func, /, *args, **kwargs): |
| assert self._asyncioRunner is not None, 'asyncio runner is not initialized' |
| if inspect.iscoroutinefunction(func): |
| return self._asyncioRunner.run( |
| func(*args, **kwargs), |
| context=self._asyncioTestContext, |
| ) |
| else: |
| return self._asyncioTestContext.run(func, *args, **kwargs) |
|
|
| def _setupAsyncioRunner(self): |
| assert self._asyncioRunner is None, 'asyncio runner is already initialized' |
| runner = asyncio.Runner(debug=True) |
| self._asyncioRunner = runner |
|
|
| def _tearDownAsyncioRunner(self): |
| runner = self._asyncioRunner |
| runner.close() |
|
|
| def run(self, result=None): |
| self._setupAsyncioRunner() |
| try: |
| return super().run(result) |
| finally: |
| self._tearDownAsyncioRunner() |
|
|
| def debug(self): |
| self._setupAsyncioRunner() |
| super().debug() |
| self._tearDownAsyncioRunner() |
|
|
| def __del__(self): |
| if self._asyncioRunner is not None: |
| self._tearDownAsyncioRunner() |
|
|