| """ |
| Global task lifecycle management module |
| 管理应用程序中所有异步任务的生命周期,确保正确清理 |
| """ |
|
|
| import asyncio |
| import weakref |
| from typing import Any, Dict, Set |
|
|
| from log import log |
|
|
|
|
| class TaskManager: |
| """全局异步任务管理器 - 单例模式""" |
|
|
| _instance = None |
| _lock = asyncio.Lock() |
|
|
| def __new__(cls): |
| if cls._instance is None: |
| cls._instance = super().__new__(cls) |
| cls._instance._initialized = False |
| return cls._instance |
|
|
| def __init__(self): |
| if self._initialized: |
| return |
|
|
| self._tasks: Set[asyncio.Task] = set() |
| self._resources: Set[Any] = set() |
| self._shutdown_event = asyncio.Event() |
| self._initialized = True |
| log.debug("TaskManager initialized") |
|
|
| def register_task(self, task: asyncio.Task, description: str = None) -> asyncio.Task: |
| """注册一个任务供生命周期管理""" |
| self._tasks.add(task) |
| task.add_done_callback(lambda t: self._tasks.discard(t)) |
|
|
| if description: |
| task.set_name(description) |
|
|
| log.debug(f"Registered task: {task.get_name() or 'unnamed'}") |
| return task |
|
|
| def create_task(self, coro, *, name: str = None) -> asyncio.Task: |
| """创建并注册一个任务""" |
| task = asyncio.create_task(coro, name=name) |
| return self.register_task(task, name) |
|
|
| def register_resource(self, resource: Any) -> Any: |
| """注册一个需要清理的资源(如HTTP客户端、文件句柄等)""" |
| |
| self._resources.add(weakref.ref(resource)) |
| log.debug(f"Registered resource: {type(resource).__name__}") |
| return resource |
|
|
| async def shutdown(self, timeout: float = 30.0): |
| """关闭所有任务和资源""" |
| log.info("TaskManager shutdown initiated") |
|
|
| |
| self._shutdown_event.set() |
|
|
| |
| cancelled_count = 0 |
| for task in list(self._tasks): |
| if not task.done(): |
| task.cancel() |
| cancelled_count += 1 |
|
|
| if cancelled_count > 0: |
| log.info(f"Cancelled {cancelled_count} pending tasks") |
|
|
| |
| if self._tasks: |
| try: |
| await asyncio.wait_for( |
| asyncio.gather(*self._tasks, return_exceptions=True), timeout=timeout |
| ) |
| except asyncio.TimeoutError: |
| log.warning(f"Some tasks did not complete within {timeout}s timeout") |
|
|
| |
| cleaned_resources = 0 |
| failed_resources = 0 |
| for resource_ref in list(self._resources): |
| resource = resource_ref() |
| if resource is not None: |
| try: |
| if hasattr(resource, "close"): |
| if asyncio.iscoroutinefunction(resource.close): |
| await resource.close() |
| else: |
| resource.close() |
| elif hasattr(resource, "aclose"): |
| await resource.aclose() |
| cleaned_resources += 1 |
| except Exception as e: |
| log.warning(f"Failed to close resource {type(resource).__name__}: {e}") |
| failed_resources += 1 |
| |
|
|
| if cleaned_resources > 0: |
| log.info(f"Cleaned up {cleaned_resources} resources") |
| if failed_resources > 0: |
| log.warning(f"Failed to clean {failed_resources} resources") |
|
|
| self._tasks.clear() |
| self._resources.clear() |
| log.info("TaskManager shutdown completed") |
|
|
| @property |
| def is_shutdown(self) -> bool: |
| """检查是否已经开始关闭""" |
| return self._shutdown_event.is_set() |
|
|
| def get_stats(self) -> Dict[str, int]: |
| """获取任务管理统计信息""" |
| return { |
| "active_tasks": len(self._tasks), |
| "registered_resources": len(self._resources), |
| "is_shutdown": self.is_shutdown, |
| } |
|
|
|
|
| |
| task_manager = TaskManager() |
|
|
|
|
| def create_managed_task(coro, *, name: str = None) -> asyncio.Task: |
| """创建一个被管理的异步任务的便捷函数""" |
| return task_manager.create_task(coro, name=name) |
|
|
|
|
| def register_resource(resource: Any) -> Any: |
| """注册资源的便捷函数""" |
| return task_manager.register_resource(resource) |
|
|
|
|
| async def shutdown_all_tasks(timeout: float = 30.0): |
| """关闭所有任务的便捷函数""" |
| await task_manager.shutdown(timeout) |
|
|