Spaces:
Sleeping
Sleeping
File size: 11,739 Bytes
4e37375 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 | """错误处理和容错机制模块
提供统一的错误处理、重试逻辑和异常恢复功能。
"""
import asyncio
import functools
import time
from typing import Any, Callable, Dict, Optional, Type, Union
from enum import Enum
from ..core.config import get_config
from ..utils.logger import get_task_logger
class ErrorCode(Enum):
"""错误代码"""
# 文件相关错误
FILE_NOT_FOUND = "FILE_001"
FILE_TOO_LARGE = "FILE_002"
FILE_FORMAT_UNSUPPORTED = "FILE_003"
FILE_CORRUPTED = "FILE_004"
# 网络相关错误
NETWORK_TIMEOUT = "NET_001"
NETWORK_CONNECTION_ERROR = "NET_002"
NETWORK_DNS_ERROR = "NET_003"
# API相关错误
API_KEY_INVALID = "API_001"
API_QUOTA_EXCEEDED = "API_002"
API_SERVICE_UNAVAILABLE = "API_003"
API_RATE_LIMITED = "API_004"
# OSS相关错误
OSS_ACCESS_DENIED = "OSS_001"
OSS_BUCKET_NOT_FOUND = "OSS_002"
OSS_UPLOAD_FAILED = "OSS_003"
# 系统相关错误
SYSTEM_OUT_OF_MEMORY = "SYS_001"
SYSTEM_DISK_FULL = "SYS_002"
SYSTEM_PERMISSION_DENIED = "SYS_003"
# 通用错误
UNKNOWN_ERROR = "GEN_001"
TIMEOUT_ERROR = "GEN_002"
VALIDATION_ERROR = "GEN_003"
class TranscriptServiceError(Exception):
"""服务自定义异常基类"""
def __init__(self, message: str, error_code: ErrorCode = ErrorCode.UNKNOWN_ERROR, details: Dict = None):
"""初始化异常
Args:
message: 错误消息
error_code: 错误代码
details: 额外详情
"""
super().__init__(message)
self.message = message
self.error_code = error_code
self.details = details or {}
self.timestamp = time.time()
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
'error_code': self.error_code.value,
'message': self.message,
'details': self.details,
'timestamp': self.timestamp
}
class FileValidationError(TranscriptServiceError):
"""文件验证错误"""
pass
class NetworkError(TranscriptServiceError):
"""网络相关错误"""
pass
class APIError(TranscriptServiceError):
"""API调用错误"""
pass
class OSSError(TranscriptServiceError):
"""OSS操作错误"""
pass
class SystemError(TranscriptServiceError):
"""系统错误"""
pass
class RetryStrategy:
"""重试策略"""
def __init__(
self,
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
"""初始化重试策略
Args:
max_attempts: 最大重试次数
base_delay: 基础延迟时间(秒)
max_delay: 最大延迟时间(秒)
exponential_base: 指数退避基数
jitter: 是否添加随机抖动
"""
self.max_attempts = max_attempts
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
def calculate_delay(self, attempt: int) -> float:
"""计算延迟时间
Args:
attempt: 当前尝试次数(从1开始)
Returns:
延迟时间(秒)
"""
delay = self.base_delay * (self.exponential_base ** (attempt - 1))
delay = min(delay, self.max_delay)
if self.jitter:
import random
delay *= (0.5 + random.random() * 0.5) # 添加±50%的随机抖动
return delay
class ErrorHandler:
"""错误处理器"""
def __init__(self):
"""初始化错误处理器"""
self.config = get_config()
self.logger = get_task_logger(logger_name="transcript_service.error")
# 错误分类映射
self.error_mapping = {
# 文件错误
FileNotFoundError: (FileValidationError, ErrorCode.FILE_NOT_FOUND),
PermissionError: (SystemError, ErrorCode.SYSTEM_PERMISSION_DENIED),
# 网络错误
asyncio.TimeoutError: (NetworkError, ErrorCode.NETWORK_TIMEOUT),
ConnectionError: (NetworkError, ErrorCode.NETWORK_CONNECTION_ERROR),
# 通用错误
ValueError: (TranscriptServiceError, ErrorCode.VALIDATION_ERROR),
RuntimeError: (TranscriptServiceError, ErrorCode.UNKNOWN_ERROR),
}
# 可重试的错误类型
self.retryable_errors = {
ErrorCode.NETWORK_TIMEOUT,
ErrorCode.NETWORK_CONNECTION_ERROR,
ErrorCode.API_RATE_LIMITED,
ErrorCode.OSS_UPLOAD_FAILED,
ErrorCode.API_SERVICE_UNAVAILABLE
}
def classify_error(self, error: Exception) -> TranscriptServiceError:
"""分类和包装错误
Args:
error: 原始异常
Returns:
分类后的服务异常
"""
if isinstance(error, TranscriptServiceError):
return error
error_type = type(error)
if error_type in self.error_mapping:
exception_class, error_code = self.error_mapping[error_type]
return exception_class(str(error), error_code)
# 根据错误消息内容进行分类
error_msg = str(error).lower()
if "timeout" in error_msg:
return NetworkError(str(error), ErrorCode.NETWORK_TIMEOUT)
elif "permission denied" in error_msg:
return SystemError(str(error), ErrorCode.SYSTEM_PERMISSION_DENIED)
elif "api key" in error_msg:
return APIError(str(error), ErrorCode.API_KEY_INVALID)
elif "quota" in error_msg or "limit" in error_msg:
return APIError(str(error), ErrorCode.API_QUOTA_EXCEEDED)
else:
return TranscriptServiceError(str(error), ErrorCode.UNKNOWN_ERROR)
def is_retryable(self, error: TranscriptServiceError) -> bool:
"""判断错误是否可重试
Args:
error: 服务异常
Returns:
是否可重试
"""
return error.error_code in self.retryable_errors
def handle_error(self, error: Exception, context: str = "") -> TranscriptServiceError:
"""处理错误
Args:
error: 原始异常
context: 错误上下文
Returns:
处理后的服务异常
"""
classified_error = self.classify_error(error)
# 记录错误日志
log_msg = f"错误处理 - {context}: {classified_error.message}"
if classified_error.error_code in [ErrorCode.UNKNOWN_ERROR, ErrorCode.SYSTEM_OUT_OF_MEMORY]:
self.logger.exception(log_msg)
else:
self.logger.error(log_msg)
return classified_error
# 全局错误处理器实例
error_handler = ErrorHandler()
def retry_async(
strategy: Optional[RetryStrategy] = None,
exceptions: tuple = (Exception,),
context: str = ""
):
"""异步函数重试装饰器
Args:
strategy: 重试策略
exceptions: 需要重试的异常类型
context: 上下文信息
"""
if strategy is None:
strategy = RetryStrategy()
def decorator(func: Callable):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
logger = get_task_logger(logger_name="transcript_service.retry")
for attempt in range(1, strategy.max_attempts + 1):
try:
return await func(*args, **kwargs)
except exceptions as e:
classified_error = error_handler.classify_error(e)
# 检查是否可重试
if attempt == strategy.max_attempts or not error_handler.is_retryable(classified_error):
logger.error(f"{context} 最终失败 (尝试 {attempt}/{strategy.max_attempts}): {str(e)}")
raise classified_error
# 计算延迟时间
delay = strategy.calculate_delay(attempt)
logger.warning(f"{context} 第 {attempt} 次尝试失败,{delay:.1f}秒后重试: {str(e)}")
await asyncio.sleep(delay)
# 理论上不会执行到这里
raise TranscriptServiceError("重试逻辑异常", ErrorCode.UNKNOWN_ERROR)
return wrapper
return decorator
def retry_sync(
strategy: Optional[RetryStrategy] = None,
exceptions: tuple = (Exception,),
context: str = ""
):
"""同步函数重试装饰器
Args:
strategy: 重试策略
exceptions: 需要重试的异常类型
context: 上下文信息
"""
if strategy is None:
strategy = RetryStrategy()
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
logger = get_task_logger(logger_name="transcript_service.retry")
for attempt in range(1, strategy.max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
classified_error = error_handler.classify_error(e)
# 检查是否可重试
if attempt == strategy.max_attempts or not error_handler.is_retryable(classified_error):
logger.error(f"{context} 最终失败 (尝试 {attempt}/{strategy.max_attempts}): {str(e)}")
raise classified_error
# 计算延迟时间
delay = strategy.calculate_delay(attempt)
logger.warning(f"{context} 第 {attempt} 次尝试失败,{delay:.1f}秒后重试: {str(e)}")
time.sleep(delay)
# 理论上不会执行到这里
raise TranscriptServiceError("重试逻辑异常", ErrorCode.UNKNOWN_ERROR)
return wrapper
return decorator
def safe_execute(func: Callable, *args, **kwargs) -> tuple[bool, Any, Optional[TranscriptServiceError]]:
"""安全执行函数
Args:
func: 要执行的函数
*args: 位置参数
**kwargs: 关键字参数
Returns:
(是否成功, 结果或None, 错误或None)
"""
try:
result = func(*args, **kwargs)
return True, result, None
except Exception as e:
error = error_handler.handle_error(e, f"执行 {func.__name__}")
return False, None, error
async def safe_execute_async(func: Callable, *args, **kwargs) -> tuple[bool, Any, Optional[TranscriptServiceError]]:
"""安全执行异步函数
Args:
func: 要执行的异步函数
*args: 位置参数
**kwargs: 关键字参数
Returns:
(是否成功, 结果或None, 错误或None)
"""
try:
result = await func(*args, **kwargs)
return True, result, None
except Exception as e:
error = error_handler.handle_error(e, f"执行 {func.__name__}")
return False, None, error
def get_error_handler() -> ErrorHandler:
"""获取错误处理器实例
Returns:
错误处理器实例
"""
return error_handler |