""" 文件处理模块 - 处理文件上传和读取 """ import os from config import MAX_FILE_CHARS, SUPPORTED_FILE_TYPES from cache_manager import file_cache def load_gdl_text(uploaded_files, max_chars=MAX_FILE_CHARS): """ 读取上传的 GDL 文本文件 Args: uploaded_files: 上传的文件列表 max_chars: 最大字符数限制 Returns: str: 合并后的文本内容 """ if not uploaded_files: return "" files = uploaded_files if isinstance(uploaded_files, (list, tuple)) else [uploaded_files] parts = [] total_size = 0 for f in files: try: path = getattr(f, "name", None) or str(f) # 检查文件是否存在 if not os.path.exists(path): parts.append(f"\n# FILE_NOT_FOUND: {os.path.basename(path)}\n") continue # 检查文件扩展名 file_ext = os.path.splitext(path)[1].lower() if file_ext not in SUPPORTED_FILE_TYPES: parts.append(f"\n# UNSUPPORTED_FILE_TYPE: {os.path.basename(path)} (expected: {', '.join(SUPPORTED_FILE_TYPES)})\n") continue # 检查文件大小 file_size = os.path.getsize(path) if file_size > max_chars: parts.append(f"\n# FILE_TOO_LARGE: {os.path.basename(path)} ({file_size} bytes, skipped)\n") continue # 尝试从缓存获取 txt = file_cache.get(path) if txt is None: # 缓存未命中,读取文件 with open(path, "r", encoding="utf-8", errors="ignore") as fh: txt = fh.read() # 存入缓存 file_cache.set(path, txt) # 检查内容长度 if len(txt) + total_size > max_chars: remaining = max_chars - total_size if remaining > 100: # 至少保留100字符 txt = txt[:remaining] + "\n[...TRUNCATED...]" else: parts.append(f"\n# FILE_SKIPPED: {os.path.basename(path)} (would exceed limit)\n") continue parts.append(f"\n# FILE: {os.path.basename(path)}\n{txt}\n") total_size += len(txt) except PermissionError: parts.append(f"\n# FILE_PERMISSION_ERROR: {os.path.basename(path)} (access denied)\n") except UnicodeDecodeError as e: parts.append(f"\n# FILE_ENCODING_ERROR: {os.path.basename(path)} ({str(e)})\n") except Exception as e: parts.append(f"\n# FILE_READ_ERROR: {os.path.basename(path)} ({type(e).__name__}: {str(e)})\n") text = "\n".join(parts).strip() return text def validate_file_upload(files): """ 验证文件上传的有效性 Args: files: 上传的文件列表 Returns: tuple: (is_valid, error_message) """ if not files: return True, "" file_list = files if isinstance(files, (list, tuple)) else [files] for f in file_list: path = getattr(f, "name", None) or str(f) if not os.path.exists(path): return False, f"文件不存在: {os.path.basename(path)}" file_ext = os.path.splitext(path)[1].lower() if file_ext not in SUPPORTED_FILE_TYPES: return False, f"不支持的文件类型: {os.path.basename(path)} (支持: {', '.join(SUPPORTED_FILE_TYPES)})" file_size = os.path.getsize(path) if file_size > MAX_FILE_CHARS: return False, f"文件过大: {os.path.basename(path)} ({file_size} bytes, 最大: {MAX_FILE_CHARS})" return True, ""