transcript_service / src /utils /error_handler.py
PCNUSMSE's picture
Upload folder using huggingface_hub
4e37375 verified
"""错误处理和容错机制模块
提供统一的错误处理、重试逻辑和异常恢复功能。
"""
import asyncio
import functools
import time
from typing import Any, Callable, Dict, Optional, Type, Union
from enum import Enum
from ..core.config import get_config
from ..utils.logger import get_task_logger
class ErrorCode(Enum):
"""错误代码"""
# 文件相关错误
FILE_NOT_FOUND = "FILE_001"
FILE_TOO_LARGE = "FILE_002"
FILE_FORMAT_UNSUPPORTED = "FILE_003"
FILE_CORRUPTED = "FILE_004"
# 网络相关错误
NETWORK_TIMEOUT = "NET_001"
NETWORK_CONNECTION_ERROR = "NET_002"
NETWORK_DNS_ERROR = "NET_003"
# API相关错误
API_KEY_INVALID = "API_001"
API_QUOTA_EXCEEDED = "API_002"
API_SERVICE_UNAVAILABLE = "API_003"
API_RATE_LIMITED = "API_004"
# OSS相关错误
OSS_ACCESS_DENIED = "OSS_001"
OSS_BUCKET_NOT_FOUND = "OSS_002"
OSS_UPLOAD_FAILED = "OSS_003"
# 系统相关错误
SYSTEM_OUT_OF_MEMORY = "SYS_001"
SYSTEM_DISK_FULL = "SYS_002"
SYSTEM_PERMISSION_DENIED = "SYS_003"
# 通用错误
UNKNOWN_ERROR = "GEN_001"
TIMEOUT_ERROR = "GEN_002"
VALIDATION_ERROR = "GEN_003"
class TranscriptServiceError(Exception):
"""服务自定义异常基类"""
def __init__(self, message: str, error_code: ErrorCode = ErrorCode.UNKNOWN_ERROR, details: Dict = None):
"""初始化异常
Args:
message: 错误消息
error_code: 错误代码
details: 额外详情
"""
super().__init__(message)
self.message = message
self.error_code = error_code
self.details = details or {}
self.timestamp = time.time()
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
'error_code': self.error_code.value,
'message': self.message,
'details': self.details,
'timestamp': self.timestamp
}
class FileValidationError(TranscriptServiceError):
"""文件验证错误"""
pass
class NetworkError(TranscriptServiceError):
"""网络相关错误"""
pass
class APIError(TranscriptServiceError):
"""API调用错误"""
pass
class OSSError(TranscriptServiceError):
"""OSS操作错误"""
pass
class SystemError(TranscriptServiceError):
"""系统错误"""
pass
class RetryStrategy:
"""重试策略"""
def __init__(
self,
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
"""初始化重试策略
Args:
max_attempts: 最大重试次数
base_delay: 基础延迟时间(秒)
max_delay: 最大延迟时间(秒)
exponential_base: 指数退避基数
jitter: 是否添加随机抖动
"""
self.max_attempts = max_attempts
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
def calculate_delay(self, attempt: int) -> float:
"""计算延迟时间
Args:
attempt: 当前尝试次数(从1开始)
Returns:
延迟时间(秒)
"""
delay = self.base_delay * (self.exponential_base ** (attempt - 1))
delay = min(delay, self.max_delay)
if self.jitter:
import random
delay *= (0.5 + random.random() * 0.5) # 添加±50%的随机抖动
return delay
class ErrorHandler:
"""错误处理器"""
def __init__(self):
"""初始化错误处理器"""
self.config = get_config()
self.logger = get_task_logger(logger_name="transcript_service.error")
# 错误分类映射
self.error_mapping = {
# 文件错误
FileNotFoundError: (FileValidationError, ErrorCode.FILE_NOT_FOUND),
PermissionError: (SystemError, ErrorCode.SYSTEM_PERMISSION_DENIED),
# 网络错误
asyncio.TimeoutError: (NetworkError, ErrorCode.NETWORK_TIMEOUT),
ConnectionError: (NetworkError, ErrorCode.NETWORK_CONNECTION_ERROR),
# 通用错误
ValueError: (TranscriptServiceError, ErrorCode.VALIDATION_ERROR),
RuntimeError: (TranscriptServiceError, ErrorCode.UNKNOWN_ERROR),
}
# 可重试的错误类型
self.retryable_errors = {
ErrorCode.NETWORK_TIMEOUT,
ErrorCode.NETWORK_CONNECTION_ERROR,
ErrorCode.API_RATE_LIMITED,
ErrorCode.OSS_UPLOAD_FAILED,
ErrorCode.API_SERVICE_UNAVAILABLE
}
def classify_error(self, error: Exception) -> TranscriptServiceError:
"""分类和包装错误
Args:
error: 原始异常
Returns:
分类后的服务异常
"""
if isinstance(error, TranscriptServiceError):
return error
error_type = type(error)
if error_type in self.error_mapping:
exception_class, error_code = self.error_mapping[error_type]
return exception_class(str(error), error_code)
# 根据错误消息内容进行分类
error_msg = str(error).lower()
if "timeout" in error_msg:
return NetworkError(str(error), ErrorCode.NETWORK_TIMEOUT)
elif "permission denied" in error_msg:
return SystemError(str(error), ErrorCode.SYSTEM_PERMISSION_DENIED)
elif "api key" in error_msg:
return APIError(str(error), ErrorCode.API_KEY_INVALID)
elif "quota" in error_msg or "limit" in error_msg:
return APIError(str(error), ErrorCode.API_QUOTA_EXCEEDED)
else:
return TranscriptServiceError(str(error), ErrorCode.UNKNOWN_ERROR)
def is_retryable(self, error: TranscriptServiceError) -> bool:
"""判断错误是否可重试
Args:
error: 服务异常
Returns:
是否可重试
"""
return error.error_code in self.retryable_errors
def handle_error(self, error: Exception, context: str = "") -> TranscriptServiceError:
"""处理错误
Args:
error: 原始异常
context: 错误上下文
Returns:
处理后的服务异常
"""
classified_error = self.classify_error(error)
# 记录错误日志
log_msg = f"错误处理 - {context}: {classified_error.message}"
if classified_error.error_code in [ErrorCode.UNKNOWN_ERROR, ErrorCode.SYSTEM_OUT_OF_MEMORY]:
self.logger.exception(log_msg)
else:
self.logger.error(log_msg)
return classified_error
# 全局错误处理器实例
error_handler = ErrorHandler()
def retry_async(
strategy: Optional[RetryStrategy] = None,
exceptions: tuple = (Exception,),
context: str = ""
):
"""异步函数重试装饰器
Args:
strategy: 重试策略
exceptions: 需要重试的异常类型
context: 上下文信息
"""
if strategy is None:
strategy = RetryStrategy()
def decorator(func: Callable):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
logger = get_task_logger(logger_name="transcript_service.retry")
for attempt in range(1, strategy.max_attempts + 1):
try:
return await func(*args, **kwargs)
except exceptions as e:
classified_error = error_handler.classify_error(e)
# 检查是否可重试
if attempt == strategy.max_attempts or not error_handler.is_retryable(classified_error):
logger.error(f"{context} 最终失败 (尝试 {attempt}/{strategy.max_attempts}): {str(e)}")
raise classified_error
# 计算延迟时间
delay = strategy.calculate_delay(attempt)
logger.warning(f"{context}{attempt} 次尝试失败,{delay:.1f}秒后重试: {str(e)}")
await asyncio.sleep(delay)
# 理论上不会执行到这里
raise TranscriptServiceError("重试逻辑异常", ErrorCode.UNKNOWN_ERROR)
return wrapper
return decorator
def retry_sync(
strategy: Optional[RetryStrategy] = None,
exceptions: tuple = (Exception,),
context: str = ""
):
"""同步函数重试装饰器
Args:
strategy: 重试策略
exceptions: 需要重试的异常类型
context: 上下文信息
"""
if strategy is None:
strategy = RetryStrategy()
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
logger = get_task_logger(logger_name="transcript_service.retry")
for attempt in range(1, strategy.max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
classified_error = error_handler.classify_error(e)
# 检查是否可重试
if attempt == strategy.max_attempts or not error_handler.is_retryable(classified_error):
logger.error(f"{context} 最终失败 (尝试 {attempt}/{strategy.max_attempts}): {str(e)}")
raise classified_error
# 计算延迟时间
delay = strategy.calculate_delay(attempt)
logger.warning(f"{context}{attempt} 次尝试失败,{delay:.1f}秒后重试: {str(e)}")
time.sleep(delay)
# 理论上不会执行到这里
raise TranscriptServiceError("重试逻辑异常", ErrorCode.UNKNOWN_ERROR)
return wrapper
return decorator
def safe_execute(func: Callable, *args, **kwargs) -> tuple[bool, Any, Optional[TranscriptServiceError]]:
"""安全执行函数
Args:
func: 要执行的函数
*args: 位置参数
**kwargs: 关键字参数
Returns:
(是否成功, 结果或None, 错误或None)
"""
try:
result = func(*args, **kwargs)
return True, result, None
except Exception as e:
error = error_handler.handle_error(e, f"执行 {func.__name__}")
return False, None, error
async def safe_execute_async(func: Callable, *args, **kwargs) -> tuple[bool, Any, Optional[TranscriptServiceError]]:
"""安全执行异步函数
Args:
func: 要执行的异步函数
*args: 位置参数
**kwargs: 关键字参数
Returns:
(是否成功, 结果或None, 错误或None)
"""
try:
result = await func(*args, **kwargs)
return True, result, None
except Exception as e:
error = error_handler.handle_error(e, f"执行 {func.__name__}")
return False, None, error
def get_error_handler() -> ErrorHandler:
"""获取错误处理器实例
Returns:
错误处理器实例
"""
return error_handler