"""错误处理和容错机制模块 提供统一的错误处理、重试逻辑和异常恢复功能。 """ import asyncio import functools import time from typing import Any, Callable, Dict, Optional, Type, Union from enum import Enum from ..core.config import get_config from ..utils.logger import get_task_logger class ErrorCode(Enum): """错误代码""" # 文件相关错误 FILE_NOT_FOUND = "FILE_001" FILE_TOO_LARGE = "FILE_002" FILE_FORMAT_UNSUPPORTED = "FILE_003" FILE_CORRUPTED = "FILE_004" # 网络相关错误 NETWORK_TIMEOUT = "NET_001" NETWORK_CONNECTION_ERROR = "NET_002" NETWORK_DNS_ERROR = "NET_003" # API相关错误 API_KEY_INVALID = "API_001" API_QUOTA_EXCEEDED = "API_002" API_SERVICE_UNAVAILABLE = "API_003" API_RATE_LIMITED = "API_004" # OSS相关错误 OSS_ACCESS_DENIED = "OSS_001" OSS_BUCKET_NOT_FOUND = "OSS_002" OSS_UPLOAD_FAILED = "OSS_003" # 系统相关错误 SYSTEM_OUT_OF_MEMORY = "SYS_001" SYSTEM_DISK_FULL = "SYS_002" SYSTEM_PERMISSION_DENIED = "SYS_003" # 通用错误 UNKNOWN_ERROR = "GEN_001" TIMEOUT_ERROR = "GEN_002" VALIDATION_ERROR = "GEN_003" class TranscriptServiceError(Exception): """服务自定义异常基类""" def __init__(self, message: str, error_code: ErrorCode = ErrorCode.UNKNOWN_ERROR, details: Dict = None): """初始化异常 Args: message: 错误消息 error_code: 错误代码 details: 额外详情 """ super().__init__(message) self.message = message self.error_code = error_code self.details = details or {} self.timestamp = time.time() def to_dict(self) -> Dict[str, Any]: """转换为字典格式""" return { 'error_code': self.error_code.value, 'message': self.message, 'details': self.details, 'timestamp': self.timestamp } class FileValidationError(TranscriptServiceError): """文件验证错误""" pass class NetworkError(TranscriptServiceError): """网络相关错误""" pass class APIError(TranscriptServiceError): """API调用错误""" pass class OSSError(TranscriptServiceError): """OSS操作错误""" pass class SystemError(TranscriptServiceError): """系统错误""" pass class RetryStrategy: """重试策略""" def __init__( self, max_attempts: int = 3, base_delay: float = 1.0, max_delay: float = 60.0, exponential_base: float = 2.0, jitter: bool = True ): """初始化重试策略 Args: max_attempts: 最大重试次数 base_delay: 基础延迟时间(秒) max_delay: 最大延迟时间(秒) exponential_base: 指数退避基数 jitter: 是否添加随机抖动 """ self.max_attempts = max_attempts self.base_delay = base_delay self.max_delay = max_delay self.exponential_base = exponential_base self.jitter = jitter def calculate_delay(self, attempt: int) -> float: """计算延迟时间 Args: attempt: 当前尝试次数(从1开始) Returns: 延迟时间(秒) """ delay = self.base_delay * (self.exponential_base ** (attempt - 1)) delay = min(delay, self.max_delay) if self.jitter: import random delay *= (0.5 + random.random() * 0.5) # 添加±50%的随机抖动 return delay class ErrorHandler: """错误处理器""" def __init__(self): """初始化错误处理器""" self.config = get_config() self.logger = get_task_logger(logger_name="transcript_service.error") # 错误分类映射 self.error_mapping = { # 文件错误 FileNotFoundError: (FileValidationError, ErrorCode.FILE_NOT_FOUND), PermissionError: (SystemError, ErrorCode.SYSTEM_PERMISSION_DENIED), # 网络错误 asyncio.TimeoutError: (NetworkError, ErrorCode.NETWORK_TIMEOUT), ConnectionError: (NetworkError, ErrorCode.NETWORK_CONNECTION_ERROR), # 通用错误 ValueError: (TranscriptServiceError, ErrorCode.VALIDATION_ERROR), RuntimeError: (TranscriptServiceError, ErrorCode.UNKNOWN_ERROR), } # 可重试的错误类型 self.retryable_errors = { ErrorCode.NETWORK_TIMEOUT, ErrorCode.NETWORK_CONNECTION_ERROR, ErrorCode.API_RATE_LIMITED, ErrorCode.OSS_UPLOAD_FAILED, ErrorCode.API_SERVICE_UNAVAILABLE } def classify_error(self, error: Exception) -> TranscriptServiceError: """分类和包装错误 Args: error: 原始异常 Returns: 分类后的服务异常 """ if isinstance(error, TranscriptServiceError): return error error_type = type(error) if error_type in self.error_mapping: exception_class, error_code = self.error_mapping[error_type] return exception_class(str(error), error_code) # 根据错误消息内容进行分类 error_msg = str(error).lower() if "timeout" in error_msg: return NetworkError(str(error), ErrorCode.NETWORK_TIMEOUT) elif "permission denied" in error_msg: return SystemError(str(error), ErrorCode.SYSTEM_PERMISSION_DENIED) elif "api key" in error_msg: return APIError(str(error), ErrorCode.API_KEY_INVALID) elif "quota" in error_msg or "limit" in error_msg: return APIError(str(error), ErrorCode.API_QUOTA_EXCEEDED) else: return TranscriptServiceError(str(error), ErrorCode.UNKNOWN_ERROR) def is_retryable(self, error: TranscriptServiceError) -> bool: """判断错误是否可重试 Args: error: 服务异常 Returns: 是否可重试 """ return error.error_code in self.retryable_errors def handle_error(self, error: Exception, context: str = "") -> TranscriptServiceError: """处理错误 Args: error: 原始异常 context: 错误上下文 Returns: 处理后的服务异常 """ classified_error = self.classify_error(error) # 记录错误日志 log_msg = f"错误处理 - {context}: {classified_error.message}" if classified_error.error_code in [ErrorCode.UNKNOWN_ERROR, ErrorCode.SYSTEM_OUT_OF_MEMORY]: self.logger.exception(log_msg) else: self.logger.error(log_msg) return classified_error # 全局错误处理器实例 error_handler = ErrorHandler() def retry_async( strategy: Optional[RetryStrategy] = None, exceptions: tuple = (Exception,), context: str = "" ): """异步函数重试装饰器 Args: strategy: 重试策略 exceptions: 需要重试的异常类型 context: 上下文信息 """ if strategy is None: strategy = RetryStrategy() def decorator(func: Callable): @functools.wraps(func) async def wrapper(*args, **kwargs): logger = get_task_logger(logger_name="transcript_service.retry") for attempt in range(1, strategy.max_attempts + 1): try: return await func(*args, **kwargs) except exceptions as e: classified_error = error_handler.classify_error(e) # 检查是否可重试 if attempt == strategy.max_attempts or not error_handler.is_retryable(classified_error): logger.error(f"{context} 最终失败 (尝试 {attempt}/{strategy.max_attempts}): {str(e)}") raise classified_error # 计算延迟时间 delay = strategy.calculate_delay(attempt) logger.warning(f"{context} 第 {attempt} 次尝试失败,{delay:.1f}秒后重试: {str(e)}") await asyncio.sleep(delay) # 理论上不会执行到这里 raise TranscriptServiceError("重试逻辑异常", ErrorCode.UNKNOWN_ERROR) return wrapper return decorator def retry_sync( strategy: Optional[RetryStrategy] = None, exceptions: tuple = (Exception,), context: str = "" ): """同步函数重试装饰器 Args: strategy: 重试策略 exceptions: 需要重试的异常类型 context: 上下文信息 """ if strategy is None: strategy = RetryStrategy() def decorator(func: Callable): @functools.wraps(func) def wrapper(*args, **kwargs): logger = get_task_logger(logger_name="transcript_service.retry") for attempt in range(1, strategy.max_attempts + 1): try: return func(*args, **kwargs) except exceptions as e: classified_error = error_handler.classify_error(e) # 检查是否可重试 if attempt == strategy.max_attempts or not error_handler.is_retryable(classified_error): logger.error(f"{context} 最终失败 (尝试 {attempt}/{strategy.max_attempts}): {str(e)}") raise classified_error # 计算延迟时间 delay = strategy.calculate_delay(attempt) logger.warning(f"{context} 第 {attempt} 次尝试失败,{delay:.1f}秒后重试: {str(e)}") time.sleep(delay) # 理论上不会执行到这里 raise TranscriptServiceError("重试逻辑异常", ErrorCode.UNKNOWN_ERROR) return wrapper return decorator def safe_execute(func: Callable, *args, **kwargs) -> tuple[bool, Any, Optional[TranscriptServiceError]]: """安全执行函数 Args: func: 要执行的函数 *args: 位置参数 **kwargs: 关键字参数 Returns: (是否成功, 结果或None, 错误或None) """ try: result = func(*args, **kwargs) return True, result, None except Exception as e: error = error_handler.handle_error(e, f"执行 {func.__name__}") return False, None, error async def safe_execute_async(func: Callable, *args, **kwargs) -> tuple[bool, Any, Optional[TranscriptServiceError]]: """安全执行异步函数 Args: func: 要执行的异步函数 *args: 位置参数 **kwargs: 关键字参数 Returns: (是否成功, 结果或None, 错误或None) """ try: result = await func(*args, **kwargs) return True, result, None except Exception as e: error = error_handler.handle_error(e, f"执行 {func.__name__}") return False, None, error def get_error_handler() -> ErrorHandler: """获取错误处理器实例 Returns: 错误处理器实例 """ return error_handler