"""日志管理模块 提供结构化日志记录功能,支持任务跟踪和状态记录。 """ import logging import logging.config import logging.handlers import uuid from pathlib import Path from typing import Any, Dict, Optional import yaml try: from rich.console import Console from rich.logging import RichHandler RICH_AVAILABLE = True except ImportError: RICH_AVAILABLE = False from ..core.config import get_config class TaskContextFilter(logging.Filter): """任务上下文过滤器 为日志记录添加任务ID上下文信息。 """ def __init__(self): super().__init__() self.task_id = 'system' def filter(self, record): """添加任务ID到日志记录""" # 确保所有记录都有task_id字段 if not hasattr(record, 'task_id'): record.task_id = getattr(self, 'task_id', 'system') elif getattr(record, 'task_id', None) is None: record.task_id = getattr(self, 'task_id', 'system') return True class Logger: """日志管理器""" def __init__(self, name: str = "transcript_service"): """初始化日志管理器 Args: name: 日志器名称 """ self.name = name self.config = get_config() self._setup_logging() self.logger = logging.getLogger(name) self.task_filter = TaskContextFilter() # 为所有处理器添加任务过滤器 for handler in self.logger.handlers: handler.addFilter(self.task_filter) # 同时为根日志器的处理器添加过滤器 root_logger = logging.getLogger() for handler in root_logger.handlers: if not any(isinstance(f, TaskContextFilter) for f in handler.filters): handler.addFilter(self.task_filter) def _setup_logging(self): """设置日志配置""" # 确保日志目录存在 logs_dir = self.config.get_logs_dir() # 加载日志配置文件 config_file = self.config.get_project_root() / "config" / "logging.yaml" if config_file.exists(): with open(config_file, 'r', encoding='utf-8') as file: logging_config = yaml.safe_load(file) # 更新文件路径为绝对路径 for handler_name, handler_config in logging_config.get('handlers', {}).items(): if 'filename' in handler_config: handler_config['filename'] = str(logs_dir / Path(handler_config['filename']).name) logging.config.dictConfig(logging_config) else: # 使用默认配置 self._setup_default_logging() def _setup_default_logging(self): """设置默认日志配置""" # 控制台处理器 if RICH_AVAILABLE: console = Console() console_handler = RichHandler( console=console, show_time=True, show_path=True, markup=True ) else: console_handler = logging.StreamHandler() console_formatter = logging.Formatter( '[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) console_handler.setFormatter(console_formatter) console_handler.setLevel(logging.DEBUG if self.config.app.debug else logging.INFO) # 文件处理器 log_file = self.config.get_logs_dir() / "app.log" file_handler = logging.handlers.RotatingFileHandler( log_file, maxBytes=10*1024*1024, # 10MB backupCount=5, encoding='utf-8' ) file_handler.setLevel(logging.INFO) # 格式化器(简化版本) formatter = logging.Formatter( '[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) file_handler.setFormatter(formatter) # 配置根日志器 root_logger = logging.getLogger() root_logger.setLevel(logging.DEBUG if self.config.app.debug else logging.INFO) root_logger.addHandler(console_handler) root_logger.addHandler(file_handler) def set_task_id(self, task_id: str): """设置当前任务ID Args: task_id: 任务ID """ self.task_filter.task_id = task_id def clear_task_id(self): """清除当前任务ID""" self.task_filter.task_id = 'system' def debug(self, message: str, **kwargs): """记录调试信息""" self.logger.debug(message, extra=kwargs) def info(self, message: str, **kwargs): """记录一般信息""" self.logger.info(message, extra=kwargs) def warning(self, message: str, **kwargs): """记录警告信息""" self.logger.warning(message, extra=kwargs) def error(self, message: str, **kwargs): """记录错误信息""" self.logger.error(message, extra=kwargs) def critical(self, message: str, **kwargs): """记录严重错误""" self.logger.critical(message, extra=kwargs) def exception(self, message: str, **kwargs): """记录异常信息(包含堆栈跟踪)""" self.logger.exception(message, extra=kwargs) class TaskLogger: """任务日志记录器 为特定任务提供上下文日志记录。 """ def __init__(self, task_id: Optional[str] = None, logger_name: str = "transcript_service"): """初始化任务日志记录器 Args: task_id: 任务ID,如果为None则自动生成 logger_name: 基础日志器名称 """ self.task_id = task_id or str(uuid.uuid4())[:8] self.logger = Logger(logger_name) self.logger.set_task_id(self.task_id) def __enter__(self): """进入上下文管理器""" return self def __exit__(self, exc_type, exc_val, exc_tb): """退出上下文管理器""" self.logger.clear_task_id() def debug(self, message: str, **kwargs): """记录调试信息""" self.logger.debug(message, **kwargs) def info(self, message: str, **kwargs): """记录一般信息""" self.logger.info(message, **kwargs) def warning(self, message: str, **kwargs): """记录警告信息""" self.logger.warning(message, **kwargs) def error(self, message: str, **kwargs): """记录错误信息""" self.logger.error(message, **kwargs) def critical(self, message: str, **kwargs): """记录严重错误""" self.logger.critical(message, **kwargs) def exception(self, message: str, **kwargs): """记录异常信息""" self.logger.exception(message, **kwargs) def set_task_id(self, task_id: str): """设置当前任务ID Args: task_id: 任务ID """ self.logger.set_task_id(task_id) def clear_task_id(self): """清除当前任务ID""" self.logger.clear_task_id() # 全局日志实例 logger = Logger() def get_logger(name: str = "transcript_service") -> Logger: """获取日志实例 Args: name: 日志器名称 Returns: 日志实例 """ return Logger(name) def get_task_logger(task_id: Optional[str] = None, logger_name: str = "transcript_service") -> TaskLogger: """获取任务日志实例 Args: task_id: 任务ID logger_name: 日志器名称 Returns: 任务日志实例 """ return TaskLogger(task_id, logger_name)