Spaces:
Sleeping
Sleeping
| """OSS云存储服务模块 | |
| 提供阿里云OSS文件上传、下载和管理功能。 | |
| """ | |
| import os | |
| import uuid | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import List, Optional, Tuple | |
| import asyncio | |
| import aiohttp | |
| import oss2 | |
| from oss2.exceptions import OssError | |
| from ..core.config import get_config | |
| from ..utils.logger import get_task_logger | |
| class OSSService: | |
| """OSS云存储服务""" | |
| def __init__(self): | |
| """初始化OSS服务""" | |
| self.config = get_config() | |
| self.oss_config = self.config.oss | |
| # 初始化OSS客户端 | |
| auth = oss2.Auth( | |
| self.oss_config.access_key_id, | |
| self.oss_config.access_key_secret | |
| ) | |
| self.bucket = oss2.Bucket( | |
| auth, | |
| self.oss_config.endpoint, | |
| self.oss_config.bucket_name | |
| ) | |
| self.logger = get_task_logger(logger_name="transcript_service.oss") | |
| def _generate_object_key(self, filename: str, task_id: str) -> str: | |
| """生成OSS对象键名 | |
| Args: | |
| filename: 原始文件名 | |
| task_id: 任务ID | |
| Returns: | |
| OSS对象键名 | |
| """ | |
| now = datetime.now() | |
| date_path = now.strftime("%Y/%m/%d") | |
| timestamp = now.strftime("%Y%m%d_%H%M%S") | |
| # 获取文件扩展名 | |
| file_ext = Path(filename).suffix | |
| safe_filename = f"{timestamp}_{task_id}_{uuid.uuid4().hex[:8]}{file_ext}" | |
| return f"{self.oss_config.temp_prefix}/{date_path}/{safe_filename}" | |
| async def upload_file(self, file_path: Path, task_id: str) -> Tuple[bool, str, Optional[str]]: | |
| """上传文件到OSS | |
| Args: | |
| file_path: 本地文件路径 | |
| task_id: 任务ID | |
| Returns: | |
| (是否成功, 公网URL或错误信息, 对象键名) | |
| """ | |
| try: | |
| self.logger.info(f"开始上传文件到OSS: {file_path.name}") | |
| # 生成对象键名 | |
| object_key = self._generate_object_key(file_path.name, task_id) | |
| # 上传文件并设置公共读取权限 | |
| try: | |
| # 首先上传文件 | |
| self.bucket.put_object_from_file(object_key, str(file_path)) | |
| # 设置对象ACL为公共读取 | |
| self.bucket.put_object_acl(object_key, oss2.OBJECT_ACL_PUBLIC_READ) | |
| # 生成公网访问URL | |
| url = self._generate_public_url(object_key) | |
| self.logger.info(f"文件上传成功: {object_key}, URL: {url}") | |
| return True, url, object_key | |
| except oss2.exceptions.OssError as oss_err: | |
| # 如果设置ACL失败,尝试使用签名URL | |
| if 'public-read' in str(oss_err).lower(): | |
| self.logger.warning(f"ACL设置失败,使用签名URL: {oss_err}") | |
| url = self._generate_signed_url(object_key) | |
| self.logger.info(f"文件上传成功: {object_key}, URL: {url}") | |
| return True, url, object_key | |
| else: | |
| raise | |
| except OssError as e: | |
| error_msg = f"OSS错误: {str(e)}" | |
| self.logger.error(error_msg) | |
| return False, error_msg, None | |
| except Exception as e: | |
| error_msg = f"上传文件时发生未知错误: {str(e)}" | |
| self.logger.exception(error_msg) | |
| return False, error_msg, None | |
| async def upload_multiple_files(self, file_paths: List[Path], task_id: str) -> List[Tuple[str, bool, str, Optional[str]]]: | |
| """批量上传文件到OSS | |
| Args: | |
| file_paths: 本地文件路径列表 | |
| task_id: 任务ID | |
| Returns: | |
| [(文件名, 是否成功, URL或错误信息, 对象键名), ...] | |
| """ | |
| results = [] | |
| # 创建异步任务 | |
| tasks = [] | |
| for file_path in file_paths: | |
| task = self._upload_single_file_async(file_path, task_id) | |
| tasks.append((file_path.name, task)) | |
| # 等待所有上传完成 | |
| for filename, task in tasks: | |
| success, url_or_error, object_key = await task | |
| results.append((filename, success, url_or_error, object_key)) | |
| return results | |
| async def _upload_single_file_async(self, file_path: Path, task_id: str) -> Tuple[bool, str, Optional[str]]: | |
| """异步上传单个文件""" | |
| return await asyncio.get_event_loop().run_in_executor( | |
| None, | |
| lambda: asyncio.run(self.upload_file(file_path, task_id)) | |
| ) | |
| def _generate_public_url(self, object_key: str) -> str: | |
| """生成公网访问URL | |
| Args: | |
| object_key: OSS对象键名 | |
| Returns: | |
| 公网访问URL | |
| """ | |
| # 生成简单的公网访问URL(不带签名) | |
| # 正确的格式: https://bucket-name.endpoint/object-key | |
| # 注意: endpoint不能包含协议前缀 | |
| endpoint = self.oss_config.endpoint | |
| if endpoint.startswith('http://'): | |
| endpoint = endpoint[7:] | |
| elif endpoint.startswith('https://'): | |
| endpoint = endpoint[8:] | |
| # 构造公网URL - 注意这里的格式必须正确 | |
| url = f"https://{self.oss_config.bucket_name}.{endpoint}/{object_key}" | |
| # 记录生成的URL以便调试 | |
| self.logger.debug(f"生成公网URL: {url}") | |
| return url | |
| def _generate_signed_url(self, object_key: str) -> str: | |
| """生成签名URL(备用方案) | |
| Args: | |
| object_key: OSS对象键名 | |
| Returns: | |
| 签名URL | |
| """ | |
| # 生成有时效性的签名URL | |
| expire_time = int((datetime.now() + timedelta(hours=self.oss_config.url_expire_hours)).timestamp()) | |
| url = self.bucket.sign_url('GET', object_key, expire_time) | |
| return url | |
| def delete_file(self, object_key: str) -> bool: | |
| """删除OSS文件 | |
| Args: | |
| object_key: OSS对象键名 | |
| Returns: | |
| 是否删除成功 | |
| """ | |
| try: | |
| self.bucket.delete_object(object_key) | |
| self.logger.info(f"文件删除成功: {object_key}") | |
| return True | |
| except OssError as e: | |
| self.logger.error(f"删除文件失败: {object_key}, 错误: {str(e)}") | |
| return False | |
| except Exception as e: | |
| self.logger.exception(f"删除文件时发生未知错误: {object_key}, 错误: {str(e)}") | |
| return False | |
| def cleanup_old_files(self, days: Optional[int] = None) -> int: | |
| """清理过期的临时文件 | |
| Args: | |
| days: 保留天数,默认使用配置中的值 | |
| Returns: | |
| 删除的文件数量 | |
| """ | |
| cleanup_days = days or self.oss_config.auto_cleanup_days | |
| cutoff_date = datetime.now() - timedelta(days=cleanup_days) | |
| deleted_count = 0 | |
| prefix = self.oss_config.temp_prefix | |
| try: | |
| # 列出所有临时文件 | |
| for obj in oss2.ObjectIterator(self.bucket, prefix=prefix): | |
| # 检查文件最后修改时间 | |
| if obj.last_modified.replace(tzinfo=None) < cutoff_date: | |
| if self.delete_file(obj.key): | |
| deleted_count += 1 | |
| self.logger.info(f"清理完成,删除了 {deleted_count} 个过期文件") | |
| return deleted_count | |
| except Exception as e: | |
| self.logger.exception(f"清理过期文件时发生错误: {str(e)}") | |
| return deleted_count | |
| def get_file_info(self, object_key: str) -> Optional[dict]: | |
| """获取文件信息 | |
| Args: | |
| object_key: OSS对象键名 | |
| Returns: | |
| 文件信息字典 | |
| """ | |
| try: | |
| info = self.bucket.head_object(object_key) | |
| return { | |
| 'size': info.content_length, | |
| 'last_modified': info.last_modified, | |
| 'etag': info.etag, | |
| 'content_type': info.content_type | |
| } | |
| except OssError as e: | |
| self.logger.error(f"获取文件信息失败: {object_key}, 错误: {str(e)}") | |
| return None | |
| def check_bucket_exists(self) -> bool: | |
| """检查存储桶是否存在 | |
| Returns: | |
| 存储桶是否存在 | |
| """ | |
| try: | |
| return self.bucket.bucket_exists() | |
| except Exception as e: | |
| self.logger.error(f"检查存储桶失败: {str(e)}") | |
| return False | |
| def get_bucket_info(self) -> Optional[dict]: | |
| """获取存储桶信息 | |
| Returns: | |
| 存储桶信息 | |
| """ | |
| try: | |
| info = self.bucket.get_bucket_info() | |
| return { | |
| 'name': info.name, | |
| 'location': info.location, | |
| 'creation_date': info.creation_date, | |
| 'storage_class': info.storage_class | |
| } | |
| except Exception as e: | |
| self.logger.error(f"获取存储桶信息失败: {str(e)}") | |
| return None | |
| # 全局OSS服务实例 | |
| oss_service = OSSService() | |
| def get_oss_service() -> OSSService: | |
| """获取OSS服务实例 | |
| Returns: | |
| OSS服务实例 | |
| """ | |
| return oss_service |