transcript_service / src /services /oss_service.py
PCNUSMSE's picture
Upload folder using huggingface_hub
4e37375 verified
"""OSS云存储服务模块
提供阿里云OSS文件上传、下载和管理功能。
"""
import os
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional, Tuple
import asyncio
import aiohttp
import oss2
from oss2.exceptions import OssError
from ..core.config import get_config
from ..utils.logger import get_task_logger
class OSSService:
"""OSS云存储服务"""
def __init__(self):
"""初始化OSS服务"""
self.config = get_config()
self.oss_config = self.config.oss
# 初始化OSS客户端
auth = oss2.Auth(
self.oss_config.access_key_id,
self.oss_config.access_key_secret
)
self.bucket = oss2.Bucket(
auth,
self.oss_config.endpoint,
self.oss_config.bucket_name
)
self.logger = get_task_logger(logger_name="transcript_service.oss")
def _generate_object_key(self, filename: str, task_id: str) -> str:
"""生成OSS对象键名
Args:
filename: 原始文件名
task_id: 任务ID
Returns:
OSS对象键名
"""
now = datetime.now()
date_path = now.strftime("%Y/%m/%d")
timestamp = now.strftime("%Y%m%d_%H%M%S")
# 获取文件扩展名
file_ext = Path(filename).suffix
safe_filename = f"{timestamp}_{task_id}_{uuid.uuid4().hex[:8]}{file_ext}"
return f"{self.oss_config.temp_prefix}/{date_path}/{safe_filename}"
async def upload_file(self, file_path: Path, task_id: str) -> Tuple[bool, str, Optional[str]]:
"""上传文件到OSS
Args:
file_path: 本地文件路径
task_id: 任务ID
Returns:
(是否成功, 公网URL或错误信息, 对象键名)
"""
try:
self.logger.info(f"开始上传文件到OSS: {file_path.name}")
# 生成对象键名
object_key = self._generate_object_key(file_path.name, task_id)
# 上传文件并设置公共读取权限
try:
# 首先上传文件
self.bucket.put_object_from_file(object_key, str(file_path))
# 设置对象ACL为公共读取
self.bucket.put_object_acl(object_key, oss2.OBJECT_ACL_PUBLIC_READ)
# 生成公网访问URL
url = self._generate_public_url(object_key)
self.logger.info(f"文件上传成功: {object_key}, URL: {url}")
return True, url, object_key
except oss2.exceptions.OssError as oss_err:
# 如果设置ACL失败,尝试使用签名URL
if 'public-read' in str(oss_err).lower():
self.logger.warning(f"ACL设置失败,使用签名URL: {oss_err}")
url = self._generate_signed_url(object_key)
self.logger.info(f"文件上传成功: {object_key}, URL: {url}")
return True, url, object_key
else:
raise
except OssError as e:
error_msg = f"OSS错误: {str(e)}"
self.logger.error(error_msg)
return False, error_msg, None
except Exception as e:
error_msg = f"上传文件时发生未知错误: {str(e)}"
self.logger.exception(error_msg)
return False, error_msg, None
async def upload_multiple_files(self, file_paths: List[Path], task_id: str) -> List[Tuple[str, bool, str, Optional[str]]]:
"""批量上传文件到OSS
Args:
file_paths: 本地文件路径列表
task_id: 任务ID
Returns:
[(文件名, 是否成功, URL或错误信息, 对象键名), ...]
"""
results = []
# 创建异步任务
tasks = []
for file_path in file_paths:
task = self._upload_single_file_async(file_path, task_id)
tasks.append((file_path.name, task))
# 等待所有上传完成
for filename, task in tasks:
success, url_or_error, object_key = await task
results.append((filename, success, url_or_error, object_key))
return results
async def _upload_single_file_async(self, file_path: Path, task_id: str) -> Tuple[bool, str, Optional[str]]:
"""异步上传单个文件"""
return await asyncio.get_event_loop().run_in_executor(
None,
lambda: asyncio.run(self.upload_file(file_path, task_id))
)
def _generate_public_url(self, object_key: str) -> str:
"""生成公网访问URL
Args:
object_key: OSS对象键名
Returns:
公网访问URL
"""
# 生成简单的公网访问URL(不带签名)
# 正确的格式: https://bucket-name.endpoint/object-key
# 注意: endpoint不能包含协议前缀
endpoint = self.oss_config.endpoint
if endpoint.startswith('http://'):
endpoint = endpoint[7:]
elif endpoint.startswith('https://'):
endpoint = endpoint[8:]
# 构造公网URL - 注意这里的格式必须正确
url = f"https://{self.oss_config.bucket_name}.{endpoint}/{object_key}"
# 记录生成的URL以便调试
self.logger.debug(f"生成公网URL: {url}")
return url
def _generate_signed_url(self, object_key: str) -> str:
"""生成签名URL(备用方案)
Args:
object_key: OSS对象键名
Returns:
签名URL
"""
# 生成有时效性的签名URL
expire_time = int((datetime.now() + timedelta(hours=self.oss_config.url_expire_hours)).timestamp())
url = self.bucket.sign_url('GET', object_key, expire_time)
return url
def delete_file(self, object_key: str) -> bool:
"""删除OSS文件
Args:
object_key: OSS对象键名
Returns:
是否删除成功
"""
try:
self.bucket.delete_object(object_key)
self.logger.info(f"文件删除成功: {object_key}")
return True
except OssError as e:
self.logger.error(f"删除文件失败: {object_key}, 错误: {str(e)}")
return False
except Exception as e:
self.logger.exception(f"删除文件时发生未知错误: {object_key}, 错误: {str(e)}")
return False
def cleanup_old_files(self, days: Optional[int] = None) -> int:
"""清理过期的临时文件
Args:
days: 保留天数,默认使用配置中的值
Returns:
删除的文件数量
"""
cleanup_days = days or self.oss_config.auto_cleanup_days
cutoff_date = datetime.now() - timedelta(days=cleanup_days)
deleted_count = 0
prefix = self.oss_config.temp_prefix
try:
# 列出所有临时文件
for obj in oss2.ObjectIterator(self.bucket, prefix=prefix):
# 检查文件最后修改时间
if obj.last_modified.replace(tzinfo=None) < cutoff_date:
if self.delete_file(obj.key):
deleted_count += 1
self.logger.info(f"清理完成,删除了 {deleted_count} 个过期文件")
return deleted_count
except Exception as e:
self.logger.exception(f"清理过期文件时发生错误: {str(e)}")
return deleted_count
def get_file_info(self, object_key: str) -> Optional[dict]:
"""获取文件信息
Args:
object_key: OSS对象键名
Returns:
文件信息字典
"""
try:
info = self.bucket.head_object(object_key)
return {
'size': info.content_length,
'last_modified': info.last_modified,
'etag': info.etag,
'content_type': info.content_type
}
except OssError as e:
self.logger.error(f"获取文件信息失败: {object_key}, 错误: {str(e)}")
return None
def check_bucket_exists(self) -> bool:
"""检查存储桶是否存在
Returns:
存储桶是否存在
"""
try:
return self.bucket.bucket_exists()
except Exception as e:
self.logger.error(f"检查存储桶失败: {str(e)}")
return False
def get_bucket_info(self) -> Optional[dict]:
"""获取存储桶信息
Returns:
存储桶信息
"""
try:
info = self.bucket.get_bucket_info()
return {
'name': info.name,
'location': info.location,
'creation_date': info.creation_date,
'storage_class': info.storage_class
}
except Exception as e:
self.logger.error(f"获取存储桶信息失败: {str(e)}")
return None
# 全局OSS服务实例
oss_service = OSSService()
def get_oss_service() -> OSSService:
"""获取OSS服务实例
Returns:
OSS服务实例
"""
return oss_service