PaperX / app.py
snym04's picture
Update app.py
50455ba verified
import gradio as gr
import os
import yaml
import shutil
import subprocess
import sys
from datetime import datetime
# 初始化环境路径
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
PAPERS_DIR = os.path.join(BASE_DIR, "papers")
CONFIG_PATH = os.path.join(BASE_DIR, "config.yaml")
OUTPUT_DIR = os.path.join(BASE_DIR, "mineru_outputs")
os.makedirs(PAPERS_DIR, exist_ok=True)
def get_debug_info():
"""读取服务器文件系统状态"""
now = datetime.now().strftime("%H:%M:%S")
files = os.listdir(PAPERS_DIR) if os.path.exists(PAPERS_DIR) else "Directory missing"
# 递归检查输出目录下的内容,看看到底生成了什么
output_detail = "Not generated"
if os.path.exists(OUTPUT_DIR):
all_output_items = []
for root, dirs, files_in_out in os.walk(OUTPUT_DIR):
for name in files_in_out:
all_output_items.append(os.path.join(os.path.relpath(root, OUTPUT_DIR), name))
output_detail = f"Found {len(all_output_items)} files: {all_output_items[:5]}..." if all_output_items else "Directory exists but is EMPTY"
return f"[{now}] 📁 papers/ 内容: {files}\n\n[{now}] 📂 mineru_outputs 状态: {output_detail}"
def save_pdf(file):
if file is None: return "❌ 请先选择 PDF", get_debug_info()
try:
file_path = os.path.join(PAPERS_DIR, os.path.basename(file.name))
shutil.copy(file.name, file_path)
return f"✅ 已保存: {os.path.basename(file.name)}", get_debug_info()
except Exception as e:
return f"❌ 出错: {str(e)}", get_debug_info()
def save_api_key(api_key):
if not api_key: return "❌ Key 不能为空", get_debug_info()
try:
config = {}
if os.path.exists(CONFIG_PATH):
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
config = yaml.safe_load(f) or {}
config.setdefault("api_keys", {})["gemini_api_key"] = api_key
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
yaml.dump(config, f, allow_unicode=True)
return "✅ Key 已保存", get_debug_info()
except Exception as e:
return f"❌ 出错: {str(e)}", get_debug_info()
def run_mineru_parsing():
"""执行 PDF 解析并捕获完整日志"""
if not os.path.exists(PAPERS_DIR) or not any(f.endswith('.pdf') for f in os.listdir(PAPERS_DIR)):
return "❌ 未发现 PDF 文件", get_debug_info(), "No execution logs."
try:
env = os.environ.copy()
env["MINERU_FORMULA_ENABLE"] = "false"
env["MINERU_TABLE_ENABLE"] = "false"
env["MINERU_DEVICE_MODE"] = "cpu"
env["MINERU_VIRTUAL_VRAM_SIZE"] = "8"
# 尝试使用完整的 mineru 命令
command = ["mineru", "-p", PAPERS_DIR, "-o", OUTPUT_DIR]
# 使用 subprocess.run 捕获所有输出
result = subprocess.run(
command,
env=env,
capture_output=True,
text=True,
timeout=300 # 设置5分钟超时防止卡死
)
# 组合 stdout 和 stderr 作为完整的运行日志
full_log = f"--- STDOUT ---\n{result.stdout}\n\n--- STDERR ---\n{result.stderr}"
if result.returncode == 0:
status = "✅ PDF解析完成"
else:
status = f"❌ 解析失败 (Exit Code: {result.returncode})"
return status, get_debug_info(), full_log
except Exception as e:
error_log = f"Exception occurred during execution:\n{str(e)}"
return "❌ 运行异常", get_debug_info(), error_log
# --- UI ---
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# 📑 Mineru PDF 解析调试器")
with gr.Row():
with gr.Column(scale=1):
with gr.Group():
gr.Markdown("### 1. 配置 & 上传")
key_input = gr.Textbox(label="API Key", type="password")
key_btn = gr.Button("保存 Key")
gr.Markdown("---")
pdf_input = gr.File(label="选择 PDF", file_types=[".pdf"])
pdf_btn = gr.Button("保存 PDF")
with gr.Group():
gr.Markdown("### 2. 执行解析")
parse_btn = gr.Button("🚀 Run Mineru (CPU Mode)", variant="primary")
parse_status = gr.Textbox(label="运行状态")
with gr.Column(scale=1):
gr.Markdown("### 🔍 实时系统监控")
debug_view = gr.Textbox(label="文件系统快照", value=get_debug_info(), lines=8, interactive=False)
gr.Markdown("### 📜 Mineru 终端输出日志")
cmd_logs = gr.Textbox(
label="Command Output (Stdout/Stderr)",
placeholder="等待解析任务开始...",
lines=15,
interactive=False
)
refresh_btn = gr.Button("🔄 刷新状态")
# 逻辑绑定
key_btn.click(save_api_key, inputs=key_input, outputs=[parse_status, debug_view])
pdf_btn.click(save_pdf, inputs=pdf_input, outputs=[parse_status, debug_view])
# 解析按钮会更新三个地方:状态、文件监控、详细日志
parse_btn.click(
fn=run_mineru_parsing,
outputs=[parse_status, debug_view, cmd_logs]
)
refresh_btn.click(get_debug_info, outputs=debug_view)
if __name__ == "__main__":
demo.launch()