Spaces:
Sleeping
Sleeping
| """错误处理和容错机制模块 | |
| 提供统一的错误处理、重试逻辑和异常恢复功能。 | |
| """ | |
| import asyncio | |
| import functools | |
| import time | |
| from typing import Any, Callable, Dict, Optional, Type, Union | |
| from enum import Enum | |
| from ..core.config import get_config | |
| from ..utils.logger import get_task_logger | |
| class ErrorCode(Enum): | |
| """错误代码""" | |
| # 文件相关错误 | |
| FILE_NOT_FOUND = "FILE_001" | |
| FILE_TOO_LARGE = "FILE_002" | |
| FILE_FORMAT_UNSUPPORTED = "FILE_003" | |
| FILE_CORRUPTED = "FILE_004" | |
| # 网络相关错误 | |
| NETWORK_TIMEOUT = "NET_001" | |
| NETWORK_CONNECTION_ERROR = "NET_002" | |
| NETWORK_DNS_ERROR = "NET_003" | |
| # API相关错误 | |
| API_KEY_INVALID = "API_001" | |
| API_QUOTA_EXCEEDED = "API_002" | |
| API_SERVICE_UNAVAILABLE = "API_003" | |
| API_RATE_LIMITED = "API_004" | |
| # OSS相关错误 | |
| OSS_ACCESS_DENIED = "OSS_001" | |
| OSS_BUCKET_NOT_FOUND = "OSS_002" | |
| OSS_UPLOAD_FAILED = "OSS_003" | |
| # 系统相关错误 | |
| SYSTEM_OUT_OF_MEMORY = "SYS_001" | |
| SYSTEM_DISK_FULL = "SYS_002" | |
| SYSTEM_PERMISSION_DENIED = "SYS_003" | |
| # 通用错误 | |
| UNKNOWN_ERROR = "GEN_001" | |
| TIMEOUT_ERROR = "GEN_002" | |
| VALIDATION_ERROR = "GEN_003" | |
| class TranscriptServiceError(Exception): | |
| """服务自定义异常基类""" | |
| def __init__(self, message: str, error_code: ErrorCode = ErrorCode.UNKNOWN_ERROR, details: Dict = None): | |
| """初始化异常 | |
| Args: | |
| message: 错误消息 | |
| error_code: 错误代码 | |
| details: 额外详情 | |
| """ | |
| super().__init__(message) | |
| self.message = message | |
| self.error_code = error_code | |
| self.details = details or {} | |
| self.timestamp = time.time() | |
| def to_dict(self) -> Dict[str, Any]: | |
| """转换为字典格式""" | |
| return { | |
| 'error_code': self.error_code.value, | |
| 'message': self.message, | |
| 'details': self.details, | |
| 'timestamp': self.timestamp | |
| } | |
| class FileValidationError(TranscriptServiceError): | |
| """文件验证错误""" | |
| pass | |
| class NetworkError(TranscriptServiceError): | |
| """网络相关错误""" | |
| pass | |
| class APIError(TranscriptServiceError): | |
| """API调用错误""" | |
| pass | |
| class OSSError(TranscriptServiceError): | |
| """OSS操作错误""" | |
| pass | |
| class SystemError(TranscriptServiceError): | |
| """系统错误""" | |
| pass | |
| class RetryStrategy: | |
| """重试策略""" | |
| def __init__( | |
| self, | |
| max_attempts: int = 3, | |
| base_delay: float = 1.0, | |
| max_delay: float = 60.0, | |
| exponential_base: float = 2.0, | |
| jitter: bool = True | |
| ): | |
| """初始化重试策略 | |
| Args: | |
| max_attempts: 最大重试次数 | |
| base_delay: 基础延迟时间(秒) | |
| max_delay: 最大延迟时间(秒) | |
| exponential_base: 指数退避基数 | |
| jitter: 是否添加随机抖动 | |
| """ | |
| self.max_attempts = max_attempts | |
| self.base_delay = base_delay | |
| self.max_delay = max_delay | |
| self.exponential_base = exponential_base | |
| self.jitter = jitter | |
| def calculate_delay(self, attempt: int) -> float: | |
| """计算延迟时间 | |
| Args: | |
| attempt: 当前尝试次数(从1开始) | |
| Returns: | |
| 延迟时间(秒) | |
| """ | |
| delay = self.base_delay * (self.exponential_base ** (attempt - 1)) | |
| delay = min(delay, self.max_delay) | |
| if self.jitter: | |
| import random | |
| delay *= (0.5 + random.random() * 0.5) # 添加±50%的随机抖动 | |
| return delay | |
| class ErrorHandler: | |
| """错误处理器""" | |
| def __init__(self): | |
| """初始化错误处理器""" | |
| self.config = get_config() | |
| self.logger = get_task_logger(logger_name="transcript_service.error") | |
| # 错误分类映射 | |
| self.error_mapping = { | |
| # 文件错误 | |
| FileNotFoundError: (FileValidationError, ErrorCode.FILE_NOT_FOUND), | |
| PermissionError: (SystemError, ErrorCode.SYSTEM_PERMISSION_DENIED), | |
| # 网络错误 | |
| asyncio.TimeoutError: (NetworkError, ErrorCode.NETWORK_TIMEOUT), | |
| ConnectionError: (NetworkError, ErrorCode.NETWORK_CONNECTION_ERROR), | |
| # 通用错误 | |
| ValueError: (TranscriptServiceError, ErrorCode.VALIDATION_ERROR), | |
| RuntimeError: (TranscriptServiceError, ErrorCode.UNKNOWN_ERROR), | |
| } | |
| # 可重试的错误类型 | |
| self.retryable_errors = { | |
| ErrorCode.NETWORK_TIMEOUT, | |
| ErrorCode.NETWORK_CONNECTION_ERROR, | |
| ErrorCode.API_RATE_LIMITED, | |
| ErrorCode.OSS_UPLOAD_FAILED, | |
| ErrorCode.API_SERVICE_UNAVAILABLE | |
| } | |
| def classify_error(self, error: Exception) -> TranscriptServiceError: | |
| """分类和包装错误 | |
| Args: | |
| error: 原始异常 | |
| Returns: | |
| 分类后的服务异常 | |
| """ | |
| if isinstance(error, TranscriptServiceError): | |
| return error | |
| error_type = type(error) | |
| if error_type in self.error_mapping: | |
| exception_class, error_code = self.error_mapping[error_type] | |
| return exception_class(str(error), error_code) | |
| # 根据错误消息内容进行分类 | |
| error_msg = str(error).lower() | |
| if "timeout" in error_msg: | |
| return NetworkError(str(error), ErrorCode.NETWORK_TIMEOUT) | |
| elif "permission denied" in error_msg: | |
| return SystemError(str(error), ErrorCode.SYSTEM_PERMISSION_DENIED) | |
| elif "api key" in error_msg: | |
| return APIError(str(error), ErrorCode.API_KEY_INVALID) | |
| elif "quota" in error_msg or "limit" in error_msg: | |
| return APIError(str(error), ErrorCode.API_QUOTA_EXCEEDED) | |
| else: | |
| return TranscriptServiceError(str(error), ErrorCode.UNKNOWN_ERROR) | |
| def is_retryable(self, error: TranscriptServiceError) -> bool: | |
| """判断错误是否可重试 | |
| Args: | |
| error: 服务异常 | |
| Returns: | |
| 是否可重试 | |
| """ | |
| return error.error_code in self.retryable_errors | |
| def handle_error(self, error: Exception, context: str = "") -> TranscriptServiceError: | |
| """处理错误 | |
| Args: | |
| error: 原始异常 | |
| context: 错误上下文 | |
| Returns: | |
| 处理后的服务异常 | |
| """ | |
| classified_error = self.classify_error(error) | |
| # 记录错误日志 | |
| log_msg = f"错误处理 - {context}: {classified_error.message}" | |
| if classified_error.error_code in [ErrorCode.UNKNOWN_ERROR, ErrorCode.SYSTEM_OUT_OF_MEMORY]: | |
| self.logger.exception(log_msg) | |
| else: | |
| self.logger.error(log_msg) | |
| return classified_error | |
| # 全局错误处理器实例 | |
| error_handler = ErrorHandler() | |
| def retry_async( | |
| strategy: Optional[RetryStrategy] = None, | |
| exceptions: tuple = (Exception,), | |
| context: str = "" | |
| ): | |
| """异步函数重试装饰器 | |
| Args: | |
| strategy: 重试策略 | |
| exceptions: 需要重试的异常类型 | |
| context: 上下文信息 | |
| """ | |
| if strategy is None: | |
| strategy = RetryStrategy() | |
| def decorator(func: Callable): | |
| async def wrapper(*args, **kwargs): | |
| logger = get_task_logger(logger_name="transcript_service.retry") | |
| for attempt in range(1, strategy.max_attempts + 1): | |
| try: | |
| return await func(*args, **kwargs) | |
| except exceptions as e: | |
| classified_error = error_handler.classify_error(e) | |
| # 检查是否可重试 | |
| if attempt == strategy.max_attempts or not error_handler.is_retryable(classified_error): | |
| logger.error(f"{context} 最终失败 (尝试 {attempt}/{strategy.max_attempts}): {str(e)}") | |
| raise classified_error | |
| # 计算延迟时间 | |
| delay = strategy.calculate_delay(attempt) | |
| logger.warning(f"{context} 第 {attempt} 次尝试失败,{delay:.1f}秒后重试: {str(e)}") | |
| await asyncio.sleep(delay) | |
| # 理论上不会执行到这里 | |
| raise TranscriptServiceError("重试逻辑异常", ErrorCode.UNKNOWN_ERROR) | |
| return wrapper | |
| return decorator | |
| def retry_sync( | |
| strategy: Optional[RetryStrategy] = None, | |
| exceptions: tuple = (Exception,), | |
| context: str = "" | |
| ): | |
| """同步函数重试装饰器 | |
| Args: | |
| strategy: 重试策略 | |
| exceptions: 需要重试的异常类型 | |
| context: 上下文信息 | |
| """ | |
| if strategy is None: | |
| strategy = RetryStrategy() | |
| def decorator(func: Callable): | |
| def wrapper(*args, **kwargs): | |
| logger = get_task_logger(logger_name="transcript_service.retry") | |
| for attempt in range(1, strategy.max_attempts + 1): | |
| try: | |
| return func(*args, **kwargs) | |
| except exceptions as e: | |
| classified_error = error_handler.classify_error(e) | |
| # 检查是否可重试 | |
| if attempt == strategy.max_attempts or not error_handler.is_retryable(classified_error): | |
| logger.error(f"{context} 最终失败 (尝试 {attempt}/{strategy.max_attempts}): {str(e)}") | |
| raise classified_error | |
| # 计算延迟时间 | |
| delay = strategy.calculate_delay(attempt) | |
| logger.warning(f"{context} 第 {attempt} 次尝试失败,{delay:.1f}秒后重试: {str(e)}") | |
| time.sleep(delay) | |
| # 理论上不会执行到这里 | |
| raise TranscriptServiceError("重试逻辑异常", ErrorCode.UNKNOWN_ERROR) | |
| return wrapper | |
| return decorator | |
| def safe_execute(func: Callable, *args, **kwargs) -> tuple[bool, Any, Optional[TranscriptServiceError]]: | |
| """安全执行函数 | |
| Args: | |
| func: 要执行的函数 | |
| *args: 位置参数 | |
| **kwargs: 关键字参数 | |
| Returns: | |
| (是否成功, 结果或None, 错误或None) | |
| """ | |
| try: | |
| result = func(*args, **kwargs) | |
| return True, result, None | |
| except Exception as e: | |
| error = error_handler.handle_error(e, f"执行 {func.__name__}") | |
| return False, None, error | |
| async def safe_execute_async(func: Callable, *args, **kwargs) -> tuple[bool, Any, Optional[TranscriptServiceError]]: | |
| """安全执行异步函数 | |
| Args: | |
| func: 要执行的异步函数 | |
| *args: 位置参数 | |
| **kwargs: 关键字参数 | |
| Returns: | |
| (是否成功, 结果或None, 错误或None) | |
| """ | |
| try: | |
| result = await func(*args, **kwargs) | |
| return True, result, None | |
| except Exception as e: | |
| error = error_handler.handle_error(e, f"执行 {func.__name__}") | |
| return False, None, error | |
| def get_error_handler() -> ErrorHandler: | |
| """获取错误处理器实例 | |
| Returns: | |
| 错误处理器实例 | |
| """ | |
| return error_handler |