Spaces:
Sleeping
Sleeping
| """日志管理模块 | |
| 提供结构化日志记录功能,支持任务跟踪和状态记录。 | |
| """ | |
| import logging | |
| import logging.config | |
| import logging.handlers | |
| import uuid | |
| from pathlib import Path | |
| from typing import Any, Dict, Optional | |
| import yaml | |
| try: | |
| from rich.console import Console | |
| from rich.logging import RichHandler | |
| RICH_AVAILABLE = True | |
| except ImportError: | |
| RICH_AVAILABLE = False | |
| from ..core.config import get_config | |
| class TaskContextFilter(logging.Filter): | |
| """任务上下文过滤器 | |
| 为日志记录添加任务ID上下文信息。 | |
| """ | |
| def __init__(self): | |
| super().__init__() | |
| self.task_id = 'system' | |
| def filter(self, record): | |
| """添加任务ID到日志记录""" | |
| # 确保所有记录都有task_id字段 | |
| if not hasattr(record, 'task_id'): | |
| record.task_id = getattr(self, 'task_id', 'system') | |
| elif getattr(record, 'task_id', None) is None: | |
| record.task_id = getattr(self, 'task_id', 'system') | |
| return True | |
| class Logger: | |
| """日志管理器""" | |
| def __init__(self, name: str = "transcript_service"): | |
| """初始化日志管理器 | |
| Args: | |
| name: 日志器名称 | |
| """ | |
| self.name = name | |
| self.config = get_config() | |
| self._setup_logging() | |
| self.logger = logging.getLogger(name) | |
| self.task_filter = TaskContextFilter() | |
| # 为所有处理器添加任务过滤器 | |
| for handler in self.logger.handlers: | |
| handler.addFilter(self.task_filter) | |
| # 同时为根日志器的处理器添加过滤器 | |
| root_logger = logging.getLogger() | |
| for handler in root_logger.handlers: | |
| if not any(isinstance(f, TaskContextFilter) for f in handler.filters): | |
| handler.addFilter(self.task_filter) | |
| def _setup_logging(self): | |
| """设置日志配置""" | |
| # 确保日志目录存在 | |
| logs_dir = self.config.get_logs_dir() | |
| # 加载日志配置文件 | |
| config_file = self.config.get_project_root() / "config" / "logging.yaml" | |
| if config_file.exists(): | |
| with open(config_file, 'r', encoding='utf-8') as file: | |
| logging_config = yaml.safe_load(file) | |
| # 更新文件路径为绝对路径 | |
| for handler_name, handler_config in logging_config.get('handlers', {}).items(): | |
| if 'filename' in handler_config: | |
| handler_config['filename'] = str(logs_dir / Path(handler_config['filename']).name) | |
| logging.config.dictConfig(logging_config) | |
| else: | |
| # 使用默认配置 | |
| self._setup_default_logging() | |
| def _setup_default_logging(self): | |
| """设置默认日志配置""" | |
| # 控制台处理器 | |
| if RICH_AVAILABLE: | |
| console = Console() | |
| console_handler = RichHandler( | |
| console=console, | |
| show_time=True, | |
| show_path=True, | |
| markup=True | |
| ) | |
| else: | |
| console_handler = logging.StreamHandler() | |
| console_formatter = logging.Formatter( | |
| '[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| console_handler.setFormatter(console_formatter) | |
| console_handler.setLevel(logging.DEBUG if self.config.app.debug else logging.INFO) | |
| # 文件处理器 | |
| log_file = self.config.get_logs_dir() / "app.log" | |
| file_handler = logging.handlers.RotatingFileHandler( | |
| log_file, | |
| maxBytes=10*1024*1024, # 10MB | |
| backupCount=5, | |
| encoding='utf-8' | |
| ) | |
| file_handler.setLevel(logging.INFO) | |
| # 格式化器(简化版本) | |
| formatter = logging.Formatter( | |
| '[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| file_handler.setFormatter(formatter) | |
| # 配置根日志器 | |
| root_logger = logging.getLogger() | |
| root_logger.setLevel(logging.DEBUG if self.config.app.debug else logging.INFO) | |
| root_logger.addHandler(console_handler) | |
| root_logger.addHandler(file_handler) | |
| def set_task_id(self, task_id: str): | |
| """设置当前任务ID | |
| Args: | |
| task_id: 任务ID | |
| """ | |
| self.task_filter.task_id = task_id | |
| def clear_task_id(self): | |
| """清除当前任务ID""" | |
| self.task_filter.task_id = 'system' | |
| def debug(self, message: str, **kwargs): | |
| """记录调试信息""" | |
| self.logger.debug(message, extra=kwargs) | |
| def info(self, message: str, **kwargs): | |
| """记录一般信息""" | |
| self.logger.info(message, extra=kwargs) | |
| def warning(self, message: str, **kwargs): | |
| """记录警告信息""" | |
| self.logger.warning(message, extra=kwargs) | |
| def error(self, message: str, **kwargs): | |
| """记录错误信息""" | |
| self.logger.error(message, extra=kwargs) | |
| def critical(self, message: str, **kwargs): | |
| """记录严重错误""" | |
| self.logger.critical(message, extra=kwargs) | |
| def exception(self, message: str, **kwargs): | |
| """记录异常信息(包含堆栈跟踪)""" | |
| self.logger.exception(message, extra=kwargs) | |
| class TaskLogger: | |
| """任务日志记录器 | |
| 为特定任务提供上下文日志记录。 | |
| """ | |
| def __init__(self, task_id: Optional[str] = None, logger_name: str = "transcript_service"): | |
| """初始化任务日志记录器 | |
| Args: | |
| task_id: 任务ID,如果为None则自动生成 | |
| logger_name: 基础日志器名称 | |
| """ | |
| self.task_id = task_id or str(uuid.uuid4())[:8] | |
| self.logger = Logger(logger_name) | |
| self.logger.set_task_id(self.task_id) | |
| def __enter__(self): | |
| """进入上下文管理器""" | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """退出上下文管理器""" | |
| self.logger.clear_task_id() | |
| def debug(self, message: str, **kwargs): | |
| """记录调试信息""" | |
| self.logger.debug(message, **kwargs) | |
| def info(self, message: str, **kwargs): | |
| """记录一般信息""" | |
| self.logger.info(message, **kwargs) | |
| def warning(self, message: str, **kwargs): | |
| """记录警告信息""" | |
| self.logger.warning(message, **kwargs) | |
| def error(self, message: str, **kwargs): | |
| """记录错误信息""" | |
| self.logger.error(message, **kwargs) | |
| def critical(self, message: str, **kwargs): | |
| """记录严重错误""" | |
| self.logger.critical(message, **kwargs) | |
| def exception(self, message: str, **kwargs): | |
| """记录异常信息""" | |
| self.logger.exception(message, **kwargs) | |
| def set_task_id(self, task_id: str): | |
| """设置当前任务ID | |
| Args: | |
| task_id: 任务ID | |
| """ | |
| self.logger.set_task_id(task_id) | |
| def clear_task_id(self): | |
| """清除当前任务ID""" | |
| self.logger.clear_task_id() | |
| # 全局日志实例 | |
| logger = Logger() | |
| def get_logger(name: str = "transcript_service") -> Logger: | |
| """获取日志实例 | |
| Args: | |
| name: 日志器名称 | |
| Returns: | |
| 日志实例 | |
| """ | |
| return Logger(name) | |
| def get_task_logger(task_id: Optional[str] = None, logger_name: str = "transcript_service") -> TaskLogger: | |
| """获取任务日志实例 | |
| Args: | |
| task_id: 任务ID | |
| logger_name: 日志器名称 | |
| Returns: | |
| 任务日志实例 | |
| """ | |
| return TaskLogger(task_id, logger_name) |