Spaces:
Sleeping
Sleeping
| """ | |
| 文件处理模块 - 处理文件上传和读取 | |
| """ | |
| import os | |
| from config import MAX_FILE_CHARS, SUPPORTED_FILE_TYPES | |
| from cache_manager import file_cache | |
| def load_gdl_text(uploaded_files, max_chars=MAX_FILE_CHARS): | |
| """ | |
| 读取上传的 GDL 文本文件 | |
| Args: | |
| uploaded_files: 上传的文件列表 | |
| max_chars: 最大字符数限制 | |
| Returns: | |
| str: 合并后的文本内容 | |
| """ | |
| if not uploaded_files: | |
| return "" | |
| files = uploaded_files if isinstance(uploaded_files, (list, tuple)) else [uploaded_files] | |
| parts = [] | |
| total_size = 0 | |
| for f in files: | |
| try: | |
| path = getattr(f, "name", None) or str(f) | |
| # 检查文件是否存在 | |
| if not os.path.exists(path): | |
| parts.append(f"\n# FILE_NOT_FOUND: {os.path.basename(path)}\n") | |
| continue | |
| # 检查文件扩展名 | |
| file_ext = os.path.splitext(path)[1].lower() | |
| if file_ext not in SUPPORTED_FILE_TYPES: | |
| parts.append(f"\n# UNSUPPORTED_FILE_TYPE: {os.path.basename(path)} (expected: {', '.join(SUPPORTED_FILE_TYPES)})\n") | |
| continue | |
| # 检查文件大小 | |
| file_size = os.path.getsize(path) | |
| if file_size > max_chars: | |
| parts.append(f"\n# FILE_TOO_LARGE: {os.path.basename(path)} ({file_size} bytes, skipped)\n") | |
| continue | |
| # 尝试从缓存获取 | |
| txt = file_cache.get(path) | |
| if txt is None: | |
| # 缓存未命中,读取文件 | |
| with open(path, "r", encoding="utf-8", errors="ignore") as fh: | |
| txt = fh.read() | |
| # 存入缓存 | |
| file_cache.set(path, txt) | |
| # 检查内容长度 | |
| if len(txt) + total_size > max_chars: | |
| remaining = max_chars - total_size | |
| if remaining > 100: # 至少保留100字符 | |
| txt = txt[:remaining] + "\n[...TRUNCATED...]" | |
| else: | |
| parts.append(f"\n# FILE_SKIPPED: {os.path.basename(path)} (would exceed limit)\n") | |
| continue | |
| parts.append(f"\n# FILE: {os.path.basename(path)}\n{txt}\n") | |
| total_size += len(txt) | |
| except PermissionError: | |
| parts.append(f"\n# FILE_PERMISSION_ERROR: {os.path.basename(path)} (access denied)\n") | |
| except UnicodeDecodeError as e: | |
| parts.append(f"\n# FILE_ENCODING_ERROR: {os.path.basename(path)} ({str(e)})\n") | |
| except Exception as e: | |
| parts.append(f"\n# FILE_READ_ERROR: {os.path.basename(path)} ({type(e).__name__}: {str(e)})\n") | |
| text = "\n".join(parts).strip() | |
| return text | |
| def validate_file_upload(files): | |
| """ | |
| 验证文件上传的有效性 | |
| Args: | |
| files: 上传的文件列表 | |
| Returns: | |
| tuple: (is_valid, error_message) | |
| """ | |
| if not files: | |
| return True, "" | |
| file_list = files if isinstance(files, (list, tuple)) else [files] | |
| for f in file_list: | |
| path = getattr(f, "name", None) or str(f) | |
| if not os.path.exists(path): | |
| return False, f"文件不存在: {os.path.basename(path)}" | |
| file_ext = os.path.splitext(path)[1].lower() | |
| if file_ext not in SUPPORTED_FILE_TYPES: | |
| return False, f"不支持的文件类型: {os.path.basename(path)} (支持: {', '.join(SUPPORTED_FILE_TYPES)})" | |
| file_size = os.path.getsize(path) | |
| if file_size > MAX_FILE_CHARS: | |
| return False, f"文件过大: {os.path.basename(path)} ({file_size} bytes, 最大: {MAX_FILE_CHARS})" | |
| return True, "" | |