"""音频转文字服务主应用程序 基于Gradio的音频转文字Web服务应用程序入口。 """ import asyncio import sys import signal import time from pathlib import Path from typing import Optional # 添加项目根目录到Python路径 project_root = Path(__file__).parent sys.path.insert(0, str(project_root)) # 加载环境变量 from dotenv import load_dotenv load_dotenv(project_root / ".env") from src.core.config import get_config, reload_config from src.utils.logger import get_logger from src.api.gradio_interface import get_gradio_interface from src.core.task_manager import get_task_manager, TaskStatus class TranscriptServiceApp: """音频转文字服务应用程序""" def __init__(self, environment: Optional[str] = None): """初始化应用程序 Args: environment: 运行环境 (development/production) """ # 加载配置 if environment: self.config = reload_config(environment) else: self.config = get_config() # 初始化日志 self.logger = get_logger("transcript_service.app") # 初始化界面 self.gradio_interface = get_gradio_interface() # 添加健康检查端点 self._setup_health_endpoint() # 运行状态 self.is_running = False self.logger.info(f"应用程序初始化完成 - 环境: {self.config.environment}") def _setup_health_endpoint(self): """设置健康检查端点""" try: import gradio as gr def health_check(): """健康检查函数""" import json import time health_data = { "status": "healthy", "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "environment": self.config.environment, "version": self.config.app.version, "uptime": time.time() - getattr(self, '_start_time', time.time()), "services": { "oss": self._check_oss_connection(), "dashscope": self._check_dashscope_connection() } } return json.dumps(health_data, indent=2, ensure_ascii=False) # 在Gradio应用中添加健康检查端点 if hasattr(self.gradio_interface, 'app'): from fastapi.responses import JSONResponse @self.gradio_interface.app.get("/health") async def health_endpoint(): health_data = { "status": "healthy", "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "environment": self.config.environment, "version": self.config.app.version, "uptime": time.time() - getattr(self, '_start_time', time.time()), "services": { "oss": self._check_oss_connection(), "dashscope": self._check_dashscope_connection() } } return JSONResponse(content=health_data) except Exception as e: self.logger.warning(f"设置健康检查端点失败: {e}") def _check_oss_connection(self) -> bool: """检查OSS连接""" try: if not (self.config.oss.access_key_id and self.config.oss.access_key_secret): return False import oss2 auth = oss2.Auth(self.config.oss.access_key_id, self.config.oss.access_key_secret) service = oss2.Service(auth, "https://oss-cn-beijing.aliyuncs.com") # 简单的连接测试 list(service.list_buckets(max_keys=1)) return True except Exception: return False def _check_dashscope_connection(self) -> bool: """检查DashScope连接""" try: if not self.config.dashscope.api_key: return False # 简单的API key格式检查 return self.config.dashscope.api_key.startswith("sk-") except Exception: return False def setup_signal_handlers(self): """设置信号处理器""" # 移除优雅关闭功能,允许应用直接终止 pass def validate_environment(self) -> bool: """验证运行环境 Returns: 环境是否有效 """ try: # 检查必要的环境变量 missing_vars = [] if not self.config.oss.access_key_id: missing_vars.append("OSS_ACCESS_KEY_ID") if not self.config.oss.access_key_secret: missing_vars.append("OSS_ACCESS_KEY_SECRET") if not self.config.dashscope.api_key: missing_vars.append("DASHSCOPE_API_KEY") if missing_vars: self.logger.error(f"缺少必要的环境变量: {missing_vars}") return False # 检查目录权限 logs_dir = self.config.get_logs_dir() temp_dir = self.config.get_temp_dir() for directory in [logs_dir, temp_dir]: if not directory.exists(): directory.mkdir(parents=True, exist_ok=True) # 测试写权限 test_file = directory / ".write_test" try: test_file.write_text("test") test_file.unlink() except Exception as e: self.logger.error(f"目录权限检查失败 {directory}: {str(e)}") return False self.logger.info("环境验证通过") return True except Exception as e: self.logger.exception(f"环境验证失败: {str(e)}") return False def run(self, **launch_kwargs): """启动应用程序 Args: **launch_kwargs: Gradio启动参数 """ try: # 设置信号处理器 self.setup_signal_handlers() # 验证环境 if not self.validate_environment(): self.logger.error("环境验证失败,应用程序无法启动") sys.exit(1) # 启动应用 self.is_running = True self._start_time = time.time() # 记录启动时间 self.logger.info("正在启动音频转文字服务...") # 启动Gradio界面 self.gradio_interface.launch(**launch_kwargs) except OSError as e: if "address already in use" in str(e).lower(): port = launch_kwargs.get('server_port', self.config.app.port) self.logger.warning(f"端口 {port} 已被占用。正在尝试使用一个可用的随机端口...") # 显式设置 server_port=None 来让 Gradio 自动查找可用端口 launch_kwargs['server_port'] = None try: # 再次尝试启动 self.gradio_interface.launch(**launch_kwargs) except Exception as final_e: self.logger.exception(f"尝试使用随机端口后,应用程序启动仍然失败: {str(final_e)}") sys.exit(1) else: self.logger.exception(f"启动时发生未处理的网络错误: {str(e)}") sys.exit(1) except KeyboardInterrupt: self.logger.info("接收到键盘中断信号") self.shutdown() except Exception as e: self.logger.exception(f"应用程序启动失败: {str(e)}") sys.exit(1) def shutdown(self): """关闭应用程序""" if not self.is_running: return self.logger.info("开始关闭应用程序...") self.is_running = False try: # 清理任务管理器 task_manager = get_task_manager() # 取消所有待处理的任务 pending_tasks = task_manager.get_tasks_by_status(TaskStatus.PENDING) for task in pending_tasks: try: loop = asyncio.get_running_loop() asyncio.create_task(task_manager.cancel_task(task.id)) except RuntimeError: # No running loop asyncio.run(task_manager.cancel_task(task.id)) # 等待正在处理的任务完成(最多等待30秒) active_tasks = ( task_manager.get_tasks_by_status(TaskStatus.VALIDATING) + task_manager.get_tasks_by_status(TaskStatus.UPLOADING) + task_manager.get_tasks_by_status(TaskStatus.TRANSCRIBING) ) if active_tasks: self.logger.info(f"等待 {len(active_tasks)} 个活跃任务完成...") # 这里可以添加更复杂的等待逻辑, 但为简单起见, 我们直接继续 # 清理临时文件 self.cleanup_temp_files() self.logger.info("应用程序已安全关闭") except Exception as e: self.logger.exception(f"关闭应用程序时发生错误: {str(e)}") def cleanup_temp_files(self): """清理临时文件""" try: temp_dir = self.config.get_temp_dir() if temp_dir.exists(): for file_path in temp_dir.glob("*"): if file_path.is_file(): file_path.unlink() self.logger.info("临时文件清理完成") except Exception as e: self.logger.warning(f"清理临时文件失败: {str(e)}") def get_app_info(self) -> dict: """获取应用程序信息 Returns: 应用程序信息字典 """ return { "name": self.config.app.name, "version": self.config.app.version, "environment": self.config.environment, "debug": self.config.app.debug, "host": self.config.app.host, "port": self.config.app.port, "is_running": self.is_running } def create_app(environment: Optional[str] = None) -> TranscriptServiceApp: """创建应用程序实例 Args: environment: 运行环境 Returns: 应用程序实例 """ return TranscriptServiceApp(environment) def main(): """主函数入口""" import argparse import os parser = argparse.ArgumentParser(description="音频转文字服务") parser.add_argument( "--env", choices=["development", "production"], default=None, # 改为None,从环境变量读取 help="运行环境" ) parser.add_argument( "--host", default=None, help="服务主机地址" ) parser.add_argument( "--port", type=int, default=None, help="服务端口" ) parser.add_argument( "--share", action="store_true", help="启用Gradio分享链接" ) parser.add_argument( "--debug", action="store_true", help="启用调试模式" ) args = parser.parse_args() # 从环境变量或命令行参数确定运行环境 environment = args.env or os.getenv('ENVIRONMENT', 'production') # 创建应用 app = create_app(environment) # 准备启动参数 launch_kwargs = { 'share': False, # 生产环境禁用share 'server_name': '0.0.0.0', # Hugging Face Spaces 需要监听所有接口 'server_port': 7860 # Hugging Face Spaces 默认端口 } # 命令行参数可以覆盖默认值 if args.host: launch_kwargs['server_name'] = args.host if args.port: launch_kwargs['server_port'] = args.port if args.share: launch_kwargs['share'] = True # 如果用户明确要求share if args.debug: launch_kwargs['debug'] = True # 启动应用 app.run(**launch_kwargs) if __name__ == "__main__": main()