| | import asyncio |
| | import time |
| | from datetime import datetime, timedelta |
| | from typing import Optional, Callable, Dict, Any |
| | from services.logging_service import get_logger |
| |
|
| |
|
| | class LongRunningTaskMonitor: |
| | """ |
| | 长时间运行任务监控器,用于在MCP工具执行期间定期发送心跳 |
| | """ |
| | |
| | def __init__(self, heartbeat_interval: int = 300): |
| | self.heartbeat_interval = heartbeat_interval |
| | self.logger = get_logger() |
| | self.active_tasks: Dict[str, Dict[str, Any]] = {} |
| | |
| | def start_monitoring(self, task_id: str, task_name: str, chat_id: Optional[str] = None, |
| | heartbeat_callback: Optional[Callable] = None): |
| | """ |
| | 开始监控一个长时间运行的任务 |
| | |
| | Args: |
| | task_id: 任务唯一标识 |
| | task_name: 任务名称 |
| | chat_id: 聊天ID |
| | heartbeat_callback: 心跳回调函数 |
| | """ |
| | self.active_tasks[task_id] = { |
| | 'task_name': task_name, |
| | 'chat_id': chat_id, |
| | 'start_time': time.time(), |
| | 'heartbeat_callback': heartbeat_callback, |
| | 'last_heartbeat': time.time(), |
| | 'heartbeat_count': 0 |
| | } |
| | |
| | self.logger.log_system_status( |
| | f"Started monitoring long-running task: {task_name}", |
| | {'task_id': task_id, 'chat_id': chat_id} |
| | ) |
| | |
| | def stop_monitoring(self, task_id: str): |
| | """ |
| | 停止监控一个任务 |
| | |
| | Args: |
| | task_id: 任务唯一标识 |
| | """ |
| | if task_id in self.active_tasks: |
| | task_info = self.active_tasks[task_id] |
| | duration = time.time() - task_info['start_time'] |
| | |
| | self.logger.log_long_running_task( |
| | task_info['task_name'], |
| | duration, |
| | task_info['chat_id'] |
| | ) |
| | |
| | del self.active_tasks[task_id] |
| | |
| | async def send_heartbeat(self, task_id: str): |
| | """ |
| | 发送心跳信号 |
| | |
| | Args: |
| | task_id: 任务唯一标识 |
| | """ |
| | if task_id not in self.active_tasks: |
| | return |
| | |
| | task_info = self.active_tasks[task_id] |
| | current_time = time.time() |
| | |
| | |
| | if current_time - task_info['last_heartbeat'] >= self.heartbeat_interval: |
| | task_info['last_heartbeat'] = current_time |
| | task_info['heartbeat_count'] += 1 |
| | |
| | duration = current_time - task_info['start_time'] |
| | |
| | |
| | self.logger.log_system_status( |
| | f"Heartbeat for long-running task: {task_info['task_name']}", |
| | { |
| | 'task_id': task_id, |
| | 'chat_id': task_info['chat_id'], |
| | 'duration_seconds': duration, |
| | 'heartbeat_count': task_info['heartbeat_count'] |
| | } |
| | ) |
| | |
| | |
| | if task_info['heartbeat_callback']: |
| | try: |
| | await task_info['heartbeat_callback'](task_id, task_info) |
| | except Exception as e: |
| | self.logger.log_error( |
| | "HeartbeatCallbackError", |
| | str(e), |
| | {'task_id': task_id, 'task_name': task_info['task_name']} |
| | ) |
| | |
| | async def monitor_all_tasks(self): |
| | """ |
| | 监控所有活跃任务并发送心跳 |
| | """ |
| | while True: |
| | try: |
| | |
| | for task_id in list(self.active_tasks.keys()): |
| | await self.send_heartbeat(task_id) |
| | |
| | |
| | await asyncio.sleep(60) |
| | |
| | except Exception as e: |
| | self.logger.log_error( |
| | "TaskMonitorError", |
| | str(e), |
| | {'active_tasks_count': len(self.active_tasks)} |
| | ) |
| | await asyncio.sleep(60) |
| | |
| | def get_active_tasks_info(self) -> Dict[str, Dict[str, Any]]: |
| | """ |
| | 获取所有活跃任务的信息 |
| | |
| | Returns: |
| | 活跃任务信息字典 |
| | """ |
| | result = {} |
| | current_time = time.time() |
| | |
| | for task_id, task_info in self.active_tasks.items(): |
| | duration = current_time - task_info['start_time'] |
| | result[task_id] = { |
| | 'task_name': task_info['task_name'], |
| | 'chat_id': task_info['chat_id'], |
| | 'duration_seconds': duration, |
| | 'heartbeat_count': task_info['heartbeat_count'], |
| | 'last_heartbeat_seconds_ago': current_time - task_info['last_heartbeat'] |
| | } |
| | |
| | return result |
| |
|
| |
|
| | |
| | task_monitor = LongRunningTaskMonitor() |
| |
|
| |
|
| | def get_task_monitor(): |
| | """获取全局任务监控器""" |
| | return task_monitor |
| |
|
| |
|
| | async def start_task_monitoring(): |
| | """启动任务监控""" |
| | monitor = get_task_monitor() |
| | await monitor.monitor_all_tasks() |