PCNUSMSE's picture
Upload folder using huggingface_hub
4e37375 verified
"""日志管理模块
提供结构化日志记录功能,支持任务跟踪和状态记录。
"""
import logging
import logging.config
import logging.handlers
import uuid
from pathlib import Path
from typing import Any, Dict, Optional
import yaml
try:
from rich.console import Console
from rich.logging import RichHandler
RICH_AVAILABLE = True
except ImportError:
RICH_AVAILABLE = False
from ..core.config import get_config
class TaskContextFilter(logging.Filter):
"""任务上下文过滤器
为日志记录添加任务ID上下文信息。
"""
def __init__(self):
super().__init__()
self.task_id = 'system'
def filter(self, record):
"""添加任务ID到日志记录"""
# 确保所有记录都有task_id字段
if not hasattr(record, 'task_id'):
record.task_id = getattr(self, 'task_id', 'system')
elif getattr(record, 'task_id', None) is None:
record.task_id = getattr(self, 'task_id', 'system')
return True
class Logger:
"""日志管理器"""
def __init__(self, name: str = "transcript_service"):
"""初始化日志管理器
Args:
name: 日志器名称
"""
self.name = name
self.config = get_config()
self._setup_logging()
self.logger = logging.getLogger(name)
self.task_filter = TaskContextFilter()
# 为所有处理器添加任务过滤器
for handler in self.logger.handlers:
handler.addFilter(self.task_filter)
# 同时为根日志器的处理器添加过滤器
root_logger = logging.getLogger()
for handler in root_logger.handlers:
if not any(isinstance(f, TaskContextFilter) for f in handler.filters):
handler.addFilter(self.task_filter)
def _setup_logging(self):
"""设置日志配置"""
# 确保日志目录存在
logs_dir = self.config.get_logs_dir()
# 加载日志配置文件
config_file = self.config.get_project_root() / "config" / "logging.yaml"
if config_file.exists():
with open(config_file, 'r', encoding='utf-8') as file:
logging_config = yaml.safe_load(file)
# 更新文件路径为绝对路径
for handler_name, handler_config in logging_config.get('handlers', {}).items():
if 'filename' in handler_config:
handler_config['filename'] = str(logs_dir / Path(handler_config['filename']).name)
logging.config.dictConfig(logging_config)
else:
# 使用默认配置
self._setup_default_logging()
def _setup_default_logging(self):
"""设置默认日志配置"""
# 控制台处理器
if RICH_AVAILABLE:
console = Console()
console_handler = RichHandler(
console=console,
show_time=True,
show_path=True,
markup=True
)
else:
console_handler = logging.StreamHandler()
console_formatter = logging.Formatter(
'[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler.setFormatter(console_formatter)
console_handler.setLevel(logging.DEBUG if self.config.app.debug else logging.INFO)
# 文件处理器
log_file = self.config.get_logs_dir() / "app.log"
file_handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(logging.INFO)
# 格式化器(简化版本)
formatter = logging.Formatter(
'[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)
# 配置根日志器
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG if self.config.app.debug else logging.INFO)
root_logger.addHandler(console_handler)
root_logger.addHandler(file_handler)
def set_task_id(self, task_id: str):
"""设置当前任务ID
Args:
task_id: 任务ID
"""
self.task_filter.task_id = task_id
def clear_task_id(self):
"""清除当前任务ID"""
self.task_filter.task_id = 'system'
def debug(self, message: str, **kwargs):
"""记录调试信息"""
self.logger.debug(message, extra=kwargs)
def info(self, message: str, **kwargs):
"""记录一般信息"""
self.logger.info(message, extra=kwargs)
def warning(self, message: str, **kwargs):
"""记录警告信息"""
self.logger.warning(message, extra=kwargs)
def error(self, message: str, **kwargs):
"""记录错误信息"""
self.logger.error(message, extra=kwargs)
def critical(self, message: str, **kwargs):
"""记录严重错误"""
self.logger.critical(message, extra=kwargs)
def exception(self, message: str, **kwargs):
"""记录异常信息(包含堆栈跟踪)"""
self.logger.exception(message, extra=kwargs)
class TaskLogger:
"""任务日志记录器
为特定任务提供上下文日志记录。
"""
def __init__(self, task_id: Optional[str] = None, logger_name: str = "transcript_service"):
"""初始化任务日志记录器
Args:
task_id: 任务ID,如果为None则自动生成
logger_name: 基础日志器名称
"""
self.task_id = task_id or str(uuid.uuid4())[:8]
self.logger = Logger(logger_name)
self.logger.set_task_id(self.task_id)
def __enter__(self):
"""进入上下文管理器"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文管理器"""
self.logger.clear_task_id()
def debug(self, message: str, **kwargs):
"""记录调试信息"""
self.logger.debug(message, **kwargs)
def info(self, message: str, **kwargs):
"""记录一般信息"""
self.logger.info(message, **kwargs)
def warning(self, message: str, **kwargs):
"""记录警告信息"""
self.logger.warning(message, **kwargs)
def error(self, message: str, **kwargs):
"""记录错误信息"""
self.logger.error(message, **kwargs)
def critical(self, message: str, **kwargs):
"""记录严重错误"""
self.logger.critical(message, **kwargs)
def exception(self, message: str, **kwargs):
"""记录异常信息"""
self.logger.exception(message, **kwargs)
def set_task_id(self, task_id: str):
"""设置当前任务ID
Args:
task_id: 任务ID
"""
self.logger.set_task_id(task_id)
def clear_task_id(self):
"""清除当前任务ID"""
self.logger.clear_task_id()
# 全局日志实例
logger = Logger()
def get_logger(name: str = "transcript_service") -> Logger:
"""获取日志实例
Args:
name: 日志器名称
Returns:
日志实例
"""
return Logger(name)
def get_task_logger(task_id: Optional[str] = None, logger_name: str = "transcript_service") -> TaskLogger:
"""获取任务日志实例
Args:
task_id: 任务ID
logger_name: 日志器名称
Returns:
任务日志实例
"""
return TaskLogger(task_id, logger_name)