import gradio as gr import os # ===== 你的自定义模块 ===== from config import APP_TITLE, CHATBOT_HEIGHT, MIN_WIDTH_LEFT, MIN_WIDTH_RIGHT from styles import MAHJONG_THEME_CSS # Extra CSS to force dark & counter browser auto-invert EXTRA_FIX_CSS = '''/* === Force dark site and neutralize browser-side auto-invert === */ html, body{ background-color:#0b1220 !important; color-scheme: dark !important; } /* When browser/extension applies global invert, add a counter-invert on our app root */ html.force-dark-fix #app-root{ filter: invert(1) hue-rotate(180deg) !important; }''' # Redirect to ?__theme=dark FORCE_DARK_REDIRECT = '''''' # Detect browser force-dark and set html class DETECT_FORCE_DARK = '''''' from ai_service import design_mahjong_game from reference_retriever import match_variants_in_text, load_variant_md # 优先尝试原生流式;没有则自动回退为伪流式 try: from ai_service import design_mahjong_game_stream except Exception: design_mahjong_game_stream = None from output_validator import validate_mahjong_response, format_issues_for_llm from design_state import ( extract_design_state, extract_ready_to_generate, summarize_design_state, diff_keys, diff_mechanics, is_change_within_scope, generate_diff_summary, generate_diff_summary_compact, # 版本历史相关 create_empty_history, add_to_history, rollback_history, get_history_choices, parse_version_from_choice, # 多阶段交互相关 InteractionPhase, extract_proposals, extract_clarify_questions, detect_interaction_phase, format_proposals_for_display, generate_phase_prompt_hint, create_phase_state, update_phase_state, get_phase_display_name, # 细粒度范围控制相关 CONTROLLABLE_FIELDS, ScopeConstraint, create_scope_config, get_scope_preset_options, validate_scope_compliance, format_scope_violations, get_mechanics_from_state, generate_scope_prompt_hint, ) # ====== 🔧 Hotfix: 兼容 gradio_client 对 additionalProperties(bool) 的解析 ====== try: import gradio_client.utils as _gc_utils _orig_get_type = _gc_utils.get_type def _safe_get_type(schema): if isinstance(schema, bool): return "any" if schema else "never" return _orig_get_type(schema) _gc_utils.get_type = _safe_get_type _orig_json2py = _gc_utils._json_schema_to_python_type def _safe_json2py(schema, defs): if isinstance(schema, bool): return "any" if schema else "never" if isinstance(schema, dict) and "additionalProperties" in schema: ap = schema["additionalProperties"] if isinstance(ap, bool): inner = "any" if ap else "never" return f"dict[str, {inner}]" return _orig_json2py(schema, defs) _gc_utils._json_schema_to_python_type = _safe_json2py _orig_json_schema_to_python_type = _gc_utils.json_schema_to_python_type def _safe_json_schema_to_python_type(schema): if isinstance(schema, bool): return "any" if schema else "never" return _orig_json_schema_to_python_type(schema) _gc_utils.json_schema_to_python_type = _safe_json_schema_to_python_type except Exception: pass # ====== 🔧 Hotfix 结束 ====== # ==================== 工具函数 ==================== def _messages_to_tuples(history): """ 将 Chatbot 的 messages 格式([{role, content}, ...])转换为 [(user, bot), ...]。 若已是 tuples,则原样返回。 """ if not history: return [] if isinstance(history, list) and history and isinstance(history[0], dict): pairs = [] last_user = None for msg in history: role = msg.get("role") content = msg.get("content", "") if role == "user": last_user = content elif role == "assistant": pairs.append((last_user or "", content)) last_user = None return pairs return history def _chunk_fake_stream(text, step=40): """把整段文本切成小块,伪流式输出。""" s = str(text or "") for i in range(0, len(s), step): yield s[i:i + step] def _is_design_intent(text: str) -> bool: """判断是否明确进入“新玩法设计/融合/生成”路径。""" s = (text or "").strip() if not s: return False keywords = [ "设计", "生成", "做一个", "做个", "新玩法", "创新", "融合", "组合", "改造", "迭代", "玩法设计", "规则设计", "番型设计", "出一套", "给我一套", ] return any(k in s for k in keywords) def _is_rules_query(text: str) -> bool: """判断是否为“询问现有玩法细节/规则”的问题。""" s = (text or "").strip() if not s: return False keywords = [ "规则", "玩法", "番型", "细节", "说明", "机制", "怎么打", "怎么胡", "如何胡", "计分", "流程", "介绍", "讲讲", "是什么", "有哪些", ] return any(k in s for k in keywords) def _normalize_question(text: str) -> str: s = str(text or "").strip().lower() if not s: return "" punct = "??!!。;;::、,,..()()[]【】<>《》\"'“”‘’ " for ch in punct: s = s.replace(ch, "") return s def _merge_asked_questions(existing, new_items): merged = [str(q).strip() for q in (existing or []) if str(q).strip()] seen = {_normalize_question(q) for q in merged if _normalize_question(q)} for q in (new_items or []): q_text = str(q).strip() if not q_text: continue key = _normalize_question(q_text) if key and key not in seen: merged.append(q_text) seen.add(key) return merged def _load_base64_image(path): """读取本地图片并返回 data URI""" import base64 import pathlib p = pathlib.Path(path) if not p.exists(): return "" try: data = base64.b64encode(p.read_bytes()).decode("ascii") suffix = p.suffix.lower() mime = "image/png" if suffix == ".png" else "image/jpeg" return f"data:{mime};base64,{data}" except Exception: return "" HERO_TILE_PATHS = ["UI00001.jpg", "UI00002.jpg", "UI00003.jpg", "UI00004.jpg"] HERO_TILE_IMAGES = [_load_base64_image(p) for p in HERO_TILE_PATHS] def _render_hero_tiles(): tiles = [ f'' for idx, src in enumerate(HERO_TILE_IMAGES) if src ] return "".join(tiles) def clear_cache(): """清空缓存""" from cache_manager import file_cache, request_cache file_cache.clear() request_cache.clear() return "✅ 缓存已清空" def clear_files(): """清空文件上传""" return None def render_validation_md(text: str) -> str: issues = validate_mahjong_response(text or "") if not issues: return "✅ 输出校验:未发现静态问题(可导出)" errors = [i for i in issues if i.get("level") == "error"] warnings = [i for i in issues if i.get("level") == "warning"] parts = ["⚠️ 输出校验:发现潜在问题(建议先让模型按最小修改修复后再导出)"] if errors: parts.append("\n**错误(必须修复)**\n") parts.append(format_issues_for_llm(errors)) if warnings: parts.append("\n**警告(建议修复)**\n") parts.append(format_issues_for_llm(warnings)) return "\n".join(parts).strip() def update_file_status(files): """更新文件状态显示""" if not files: return "📁 文件状态:未上传" file_list = files if isinstance(files, (list, tuple)) else [files] names = [] for f in file_list: if isinstance(f, str): names.append(os.path.basename(f)) else: names.append(os.path.basename(getattr(f, "name", str(f)))) head = "\n".join(f" • {n}" for n in names[:3]) tail = "\n ..." if len(names) > 3 else "" return f"📁 文件状态:已上传 {len(names)} 个文件\n{head}{tail}" def export_history_to_markdown(history): """将聊天历史导出为 Markdown 文件,并返回文件路径供下载""" import time import pathlib from datetime import datetime pairs = _messages_to_tuples(history) lines = ["# 对话记录", ""] lines.append(f"- 导出时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") lines.append("") for idx, (user_msg, bot_msg) in enumerate(pairs, start=1): lines.append(f"## 轮次 {idx}") if user_msg: lines.append("**用户**:\n") lines.append(user_msg) lines.append("") if bot_msg: lines.append("**助手**:\n") lines.append(bot_msg) lines.append("") content = "\n".join(lines).strip() + "\n" export_dir = pathlib.Path("exports") export_dir.mkdir(parents=True, exist_ok=True) filename = export_dir / f"chat_history_{int(time.time())}.md" with open(filename, "w", encoding="utf-8") as f: f.write(content) return str(filename) # ==================== 新增:提取 GDL 和自然语言描述 ==================== def extract_gdl_and_narrative(content): """提取 GDL 和自然语言部分(支持多种格式变体)""" import re # 定义多种可能的标记格式(按优先级排序,支持更多变体) gdl_patterns = [ r"##\s*m?GDL\s*描述", # ## GDL描述 / ## mGDL 描述 r"m?GDL\s*描述", # GDL描述 / mGDL 描述 r"##\s*GDL\s*Description", # ## GDL Description(英文) r"##\s*m?GDL", # ## GDL / ## mGDL r"m?GDL\s*规则代码", # mGDL 规则代码 r"m?GDL\s*规则", # mGDL 规则 ] narrative_patterns = [ r"##\s*自然语言规则说明", # ## 自然语言规则说明 r"##\s*自然语言规则", # ## 自然语言规则 r"自然语言规则说明", # 自然语言规则说明 r"自然语言规则", # 自然语言规则 r"##\s*规则说明", # ## 规则说明 r"规则说明", # 规则说明 ] # 尝试查找 GDL 部分(支持大小写不敏感) gdl_start = -1 for pattern in gdl_patterns: match = re.search(pattern, content, re.IGNORECASE) if match: gdl_start = match.start() break # 尝试查找自然语言部分(支持大小写不敏感) narrative_start = -1 for pattern in narrative_patterns: match = re.search(pattern, content, re.IGNORECASE) if match: narrative_start = match.start() break if gdl_start != -1 and narrative_start != -1: # 确保顺序正确(GDL应该在自然语言之前) if gdl_start >= narrative_start: gdl_start, narrative_start = narrative_start, gdl_start # 获取 GDL 部分(从标记开始到自然语言标记之前) gdl_content = content[gdl_start:narrative_start].strip() # 获取自然语言部分(从标记开始到结尾) narrative_content = content[narrative_start:].strip() return gdl_content, narrative_content elif gdl_start != -1: # 只找到GDL,将后面全部作为GDL print(f"⚠️ 仅找到GDL标记,将其后内容作为GDL") return content[gdl_start:].strip(), "" elif narrative_start != -1: # 只找到自然语言,将后面全部作为自然语言 print(f"⚠️ 仅找到自然语言标记,将其后内容作为自然语言") return "", content[narrative_start:].strip() else: # 都没找到,返回空 print(f"⚠️ 提取警告: 未找到GDL和自然语言标记") return "", "" def save_gdl_and_narrative(gdl_content, narrative_content): """保存 GDL 和自然语言内容到文件""" # 定义文件存储路径 export_dir = os.path.join("exports") os.makedirs(export_dir, exist_ok=True) gdl_file_path = os.path.join(export_dir, "gdl_output.txt") narrative_file_path = os.path.join(export_dir, "narrative_output.txt") # 保存 GDL with open(gdl_file_path, "w", encoding="utf-8") as f: f.write(gdl_content) # 保存自然语言 with open(narrative_file_path, "w", encoding="utf-8") as f: f.write(narrative_content) return gdl_file_path, narrative_file_path def extract_design_log(content): """ 提取"设计日志(创新推演摘要)"段落,用于单独导出。 规则:从标题行开始,截到下一个同级标题(###)或文件结尾。 """ import re if not content: return "" # 修复:原始字符串中 \s 即可匹配空白,不需要 \\s m = re.search(r"^###\s*设计日志(创新推演摘要)\s*$", content, re.MULTILINE) if not m: # 兼容变体格式:设计日志/思维日志/Design Log m = re.search(r"^###\s*(设计日志|思维日志|Design\s*Log).*$", content, re.MULTILINE | re.IGNORECASE) if not m: return "" start = m.start() m2 = re.search(r"^###\s+.+$", content[m.end():], re.MULTILINE) end = (m.end() + m2.start()) if m2 else len(content) return content[start:end].strip() def save_design_log(design_log_content): export_dir = os.path.join("exports") os.makedirs(export_dir, exist_ok=True) design_log_file_path = os.path.join(export_dir, "design_log_output.txt") with open(design_log_file_path, "w", encoding="utf-8") as f: f.write(design_log_content or "") return design_log_file_path # ==================== Gradio 界面(Mahjong Skin) ==================== with gr.Blocks( theme=gr.themes.Soft(primary_hue='green', neutral_hue='slate'), title=APP_TITLE, css=MAHJONG_THEME_CSS + EXTRA_FIX_CSS, elem_id='app-root' ) as demo: gr.HTML(FORCE_DARK_REDIRECT) gr.HTML(DETECT_FORCE_DARK) # 顶部 Hero hero_tiles_html = _render_hero_tiles() gr.HTML(f"""
🀄️ 麻将玩法灵感工坊 · KOI Lab
与 AI 麻将策划智囊对话,快速生成番型体系与玩法流程
""") with gr.Row(equal_height=True): # 左侧:设置区 with gr.Column(scale=1, min_width=MIN_WIDTH_LEFT): with gr.Group(elem_classes="side-card"): gr.Markdown("### 输入与约束") gr.Markdown("✅ **系统已预加载 Mahjong GDL 规范与提示词**,可直接开局。", elem_classes="hint") file_uploader = gr.File( label="上传自定义 Mahjong GDL/番型示例(.txt,可多选)", file_types=[".txt"], file_count="multiple", type="filepath", interactive=True, show_label=True, container=True, scale=1 ) custom_prompt_box = gr.Textbox( label="自定义 System Prompt(可选)", placeholder="系统已经载入麻将玩法专家提示词。如需特殊规则,可在此粘贴。", lines=10, max_lines=20, show_copy_button=True ) prompt_mode = gr.Radio( choices=["覆盖默认SYSTEM_PROMPT", "合并(默认在前)"], value="覆盖默认SYSTEM_PROMPT", label="自定义 Prompt 的使用方式" ) gr.Markdown("💡 提示:系统默认包含麻将通用语法、番型模板与 Prompt;仅当你要替换地方规则时再上传或输入。", elem_classes="hint") with gr.Group(elem_classes="side-card"): gr.Markdown("### 使用说明") gr.Markdown( "- ✅ **快速开始**:直接描述你想要的麻将玩法主题或目标。\n" "- 📁 **可选上传**:导入地方番型表或自定义 GDL 模板以便引用。\n" "- ✏️ **可选自定义**:针对特殊风格重写 System Prompt。\n" "- 🔄 **清空缓存**:切换规则后建议刷新缓存,避免残留设定。" ) with gr.Group(elem_classes="side-card"): gr.Markdown("### 快捷操作") with gr.Row(): clear_cache_btn = gr.Button("清空缓存", variant="secondary") clear_files_btn = gr.Button("清空文件", variant="secondary") cache_status = gr.Markdown("💾 缓存状态:正常", elem_classes="hint") file_status = gr.Markdown("📁 文件状态:使用默认麻将配置(GDL + Prompt 已预加载)", elem_classes="hint") # 右侧:聊天区(手动事件绑定 + 流式输出) with gr.Column(scale=2, min_width=MIN_WIDTH_RIGHT): with gr.Group(elem_classes="table"): chatbot = gr.Chatbot( height=CHATBOT_HEIGHT, type="messages", elem_classes="custom-chatbot", avatar_images=("landlord.png", "bot.png"), ) # 用 State 保存 messages 历史 chat_state = gr.State([]) # list[dict]: [{"role":"user","content":...}, {"role":"assistant","content":...}] gdl_file_path = gr.State("") narrative_file_path = gr.State("") design_log_file_path = gr.State("") design_state_raw = gr.State("") # 上一次可机读 DesignState(JSON) 原文 design_state_obj = gr.State({}) # 解析后的 dict(用于显示与对比) design_state_version = gr.State(0) # 版本号(每次成功提取 +1) ready_to_generate = gr.State(False) design_state_history = gr.State(create_empty_history()) # 版本历史 interaction_phase_state = gr.State(create_phase_state()) # 多阶段交互状态 asked_questions_state = gr.State([]) # Analyse 模式:已问过的问题 with gr.Group(elem_classes="side-card"): gr.Markdown("### 可控迭代(推荐)") iterative_mode = gr.Checkbox( label="启用可控迭代:基于上轮 DesignState 做最小修改", value=True, ) analyse_mode = gr.Checkbox( label="Analyse 模式:单问题迭代完善需求(不生成mGDL)", value=False, ) ready_status = gr.Markdown("READY_TO_GENERATE:未知", elem_classes="hint") iteration_scope = gr.Dropdown( label="本轮允许修改范围(预设)", choices=get_scope_preset_options(), value="仅优化创新机制", ) # 细粒度范围控制(新增) with gr.Accordion("🔒 细粒度范围控制", open=False) as scope_accordion: scope_mode = gr.Radio( choices=["预设模式", "自定义模式"], value="预设模式", label="控制模式", ) constraint_level = gr.Radio( choices=["软约束(提示)", "硬约束(阻断)"], value="软约束(提示)", label="约束级别", ) with gr.Group(visible=False) as custom_scope_group: gr.Markdown("**锁定字段**(选中的字段不允许修改)", elem_classes="hint") locked_fields = gr.CheckboxGroup( choices=[ ("玩法名称", "new_variant_name"), ("底座玩法", "base_variants"), ("融合玩法", "fusion_variants"), ("游戏模式", "game_variant"), ("玩家人数", "players"), ("计分模式", "scoring_mode"), ("牌组配置", "tileset"), ], value=[], label="", ) gr.Markdown("**锁定机制**(选中的机制不允许修改)", elem_classes="hint") locked_mechanics = gr.CheckboxGroup( choices=[], value=[], label="", ) refresh_mechanics_btn = gr.Button("刷新机制列表", size="sm") scope_status = gr.Markdown("_范围约束: 预设模式_", elem_classes="hint") # 保存范围配置的 State scope_config_state = gr.State(create_scope_config()) generate_from_state_btn = gr.Button("基于当前 DesignState 生成完整玩法", variant="primary") design_state_status = gr.Markdown("DesignState:未建立(先生成一版玩法)", elem_classes="hint") # 差异可视化区域(新增) with gr.Accordion("📊 DesignState 变更详情", open=False) as diff_accordion: diff_summary_display = gr.Markdown("_尚无变更记录_", elem_classes="diff-summary") # 版本历史与回滚(新增) with gr.Accordion("🕐 版本历史与回滚", open=False) as history_accordion: version_dropdown = gr.Dropdown( label="选择版本", choices=[], value=None, interactive=True, ) with gr.Row(): rollback_btn = gr.Button("回滚到所选版本", variant="secondary", size="sm") refresh_history_btn = gr.Button("刷新列表", variant="secondary", size="sm") history_status = gr.Markdown("_无版本历史_", elem_classes="hint") # 多阶段交互与方案选择(新增) with gr.Accordion("🎯 方案选择与交互引导", open=True) as phase_accordion: phase_status = gr.Markdown("**当前阶段**: 🚀 初始", elem_classes="hint") proposals_display = gr.Markdown("_等待生成方案..._", visible=False) with gr.Row(visible=False) as proposal_select_row: proposal_dropdown = gr.Dropdown( label="选择方案", choices=[], value=None, interactive=True, scale=3, ) select_proposal_btn = gr.Button("确认选择", variant="primary", size="sm", scale=1) with gr.Row(visible=False) as diverge_action_row: request_more_btn = gr.Button("🔄 重新发散", variant="secondary", size="sm") request_elaborate_btn = gr.Button("📝 直接深入展开", variant="secondary", size="sm") validation_status = gr.Markdown("输出校验:未运行", elem_classes="hint") user_input = gr.Textbox( placeholder="例如:为4人竞速推倒胡设计番型与流程;或“做一个双人合作麻将 roguelike” …", show_label=False, max_lines=5, lines=3, show_copy_button=True, autofocus=True, ) with gr.Row(): send_btn = gr.Button("发送", variant="primary") stop_info = gr.Markdown("", visible=False) # ====== 核心:流式提交回调(生成器) ====== #此函数为工具核心,处理用户发送消息的完整流程(模式判断-模型调用-解析与更新) def on_submit( user_text, history_msgs, files, custom_prompt, mode, iterative_enabled, analyse_enabled, scope, ds_raw_prev, ds_obj_prev, ds_ver_prev, ready_prev, history_data, asked_questions, ): user_text = (user_text or "").strip() diff_md_prev = "_尚无变更记录_" # 保留上次的差异摘要 cur_choices = get_history_choices(history_data) asked_questions = asked_questions or [] if not user_text: # 不提交空消息:输出不变 yield history_msgs, "", history_msgs, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:未变更", ready_prev, "READY_TO_GENERATE:未知", "输出校验:未运行", diff_md_prev, history_data, gr.update(choices=cur_choices), "_无版本历史_" if not cur_choices else f"共 {len(cur_choices)} 个版本", asked_questions return # 立即显示"用户消息" history = list(history_msgs or []) history.append({"role": "user", "content": user_text}) yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:处理中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_处理中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions # 添加空的助手气泡,用于逐步填充 history.append({"role": "assistant", "content": ""}) yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:处理中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_处理中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions # ====== 现有玩法细节查询:直接返回本地玩法库内容 ====== if (not analyse_enabled) and (not iterative_enabled) and _is_rules_query(user_text) and (not _is_design_intent(user_text)): matched = match_variants_in_text(user_text) if matched: parts = [] for name in matched: md_text = (load_variant_md(name) or "").strip() if md_text: parts.append(f"### {name}\n\n{md_text}") else: parts.append(f"### {name}\n\n(未找到该玩法的本地规则文件)") history[-1]["content"] = "\n\n---\n\n".join(parts).strip() yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:未变更", ready_prev, "READY_TO_GENERATE:未知", "输出校验:未运行", "_未生成设计_", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions return # 流式生成内容 tuples_hist = _messages_to_tuples(history) # 可控迭代:基于上轮 DesignState 做“最小修改”,并尽量减少历史噪声 iterative_enabled = bool(iterative_enabled) analyse_enabled = bool(analyse_enabled) scope = (scope or "").strip() effective_user_text = user_text tuples_to_send = tuples_hist status_hint = "DesignState:未变更" ready_to_gen = ready_prev if isinstance(ready_prev, bool) else False ready_md = "READY_TO_GENERATE:未知" if analyse_enabled: tuples_to_send = [] # Analyse 模式不吃长history,避免漂移 asked_block = "" if asked_questions: asked_block = "已问过的问题(禁止重复):\n" + "\n".join( f"- {q}" for q in asked_questions[:20] ) + "\n\n" effective_user_text = ( "true\n" "你正在进行【Analyse 模式】(单问题迭代)。\n" "目标:通过多轮问答把需求场景收敛到可生成完整玩法。\n" "限制:本轮禁止输出 mGDL/完整规则/自检报告。\n\n" "{asked}" "当前已有 DesignState(如为空表示尚未建立初版):\n" "\n{ds}\n\n\n" "用户本轮补充信息:\n{req}\n" "要求:本轮只能问 1 个问题;不得重复已问过的问题;若信息已足够则直接 READY_TO_GENERATE: true。\n" ).format(asked=asked_block, ds=(ds_raw_prev or "{}").strip(), req=user_text) status_hint = "DesignState:Analyse模式中…" elif iterative_enabled and (ds_raw_prev or "").strip(): tuples_to_send = [] # 用“当前 DesignState”替代长历史,减少漂移 effective_user_text = ( "你正在进行【可控迭代】。请基于下方 DesignState(JSON) 对方案做【最小修改】。\n" "本轮允许修改范围:{scope}\n" "硬约束:\n" "1) 除允许范围外的字段一律保持不变;若必须改动范围外字段,先提出澄清问题并停止输出 mGDL。\n" "2) 必须输出新的 DesignState(JSON)(合法JSON、无注释)+ 变更摘要(列出改动点)。\n" "3) 仍需按 m_prompt 输出完整结果(规则 + mGDL + 自检)。\n\n" "\n{ds}\n\n\n" "【用户本轮优化要求】\n{req}\n" ).format(scope=scope or "自由迭代(仍保最小修改)", ds=ds_raw_prev.strip(), req=user_text) status_hint = "DesignState:可控迭代中…" if design_mahjong_game_stream is not None: try: for piece in design_mahjong_game_stream(effective_user_text, tuples_to_send, files, custom_prompt, mode): if not piece: continue history[-1]["content"] += str(piece) yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, status_hint, ready_to_gen, ready_md, "输出校验:运行中…", "_处理中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions except Exception as e: history[-1]["content"] += f"\n(流式出错){type(e).__name__}: {e}" yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:流式出错(未变更)", ready_to_gen, "READY_TO_GENERATE:未知", "输出校验:未运行", "_处理出错_", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions else: try: full = design_mahjong_game(effective_user_text, tuples_to_send, files, custom_prompt, mode) except Exception as e: full = f"(出错){type(e).__name__}: {e}" for piece in _chunk_fake_stream(str(full), step=40): history[-1]["content"] += piece yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, status_hint, ready_to_gen, ready_md, "输出校验:运行中…", "_处理中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions # 提取 GDL 和自然语言描述并保存 new_ds_obj, new_ds_raw = extract_design_state(history[-1]["content"]) ready_flag = extract_ready_to_generate(history[-1]["content"]) ds_ver = int(ds_ver_prev or 0) ds_status = "DesignState:未变更" diff_summary_md = "_无变更_" updated_history_data = history_data if new_ds_obj and new_ds_raw: ds_ver = ds_ver + 1 ds_status = "DesignState:v{0} 已更新 | {1}".format(ds_ver, summarize_design_state(new_ds_obj)) if isinstance(ready_flag, bool): ready_to_gen = ready_flag ready_md = "READY_TO_GENERATE:{0}".format("true" if ready_flag else "false") # 生成差异摘要 diff_summary_md = generate_diff_summary( ds_obj_prev if isinstance(ds_obj_prev, dict) else None, new_ds_obj, int(ds_ver_prev or 0), ds_ver ) # 添加到版本历史 compact_summary = generate_diff_summary_compact(ds_obj_prev if isinstance(ds_obj_prev, dict) else None, new_ds_obj) updated_history_data = add_to_history(history_data, ds_ver, new_ds_obj, new_ds_raw, compact_summary) # 可控迭代:做一个"范围越界"软检查(不阻断,只提示) if iterative_enabled and ds_obj_prev and isinstance(ds_obj_prev, dict) and scope: changed = diff_keys(ds_obj_prev, new_ds_obj) mech_changes = diff_mechanics(ds_obj_prev, new_ds_obj) if not is_change_within_scope(changed, scope): history[-1]["content"] += ( "\n\n---\n⚠️ 可控迭代检查:检测到可能超出允许范围的改动。\n" "允许范围:{scope}\n" "顶层变更字段:{keys}\n" "机制变更:{mech}\n" "建议:请重新执行一次迭代,明确只改允许范围内字段,或切换为「自由迭代」。\n" ).format( scope=scope, keys=",".join(changed) if changed else "(无)", mech="; ".join(mech_changes) if mech_changes else "(无)", ) try: # 更新版本下拉选项 new_choices = get_history_choices(updated_history_data) history_status_md = f"共 {len(new_choices)} 个版本" if new_choices else "_无版本历史_" # Analyse 模式不产出完整规则,不导出GDL/自然语言文件 if analyse_enabled: new_questions = extract_clarify_questions(history[-1]["content"]) asked_questions = _merge_asked_questions(asked_questions, new_questions) yield history, "", history, "", "", "", (new_ds_raw or ds_raw_prev), (new_ds_obj or ds_obj_prev), ds_ver, ds_status, ready_to_gen, ready_md, "输出校验:Analyse 模式跳过", diff_summary_md, updated_history_data, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), history_status_md, asked_questions return gdl_content, narrative_content = extract_gdl_and_narrative(history[-1]["content"]) design_log_content = extract_design_log(history[-1]["content"]) # 保存 GDL 和自然语言文件 gdl_path, narrative_path = save_gdl_and_narrative(gdl_content, narrative_content) design_log_path = save_design_log(design_log_content) # 返回文件路径,以便下载 yield history, "", history, gdl_path, narrative_path, design_log_path, (new_ds_raw or ds_raw_prev), (new_ds_obj or ds_obj_prev), ds_ver, ds_status, ready_to_gen, ready_md, render_validation_md(history[-1]["content"]), diff_summary_md, updated_history_data, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), history_status_md, asked_questions except Exception as e: print(f"保存GDL和自然语言文件时出错: {e}") yield history, "", history, "", "", "", (new_ds_raw or ds_raw_prev), (new_ds_obj or ds_obj_prev), ds_ver, ds_status, ready_to_gen, ready_md, render_validation_md(history[-1]["content"]), diff_summary_md, updated_history_data, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), history_status_md, asked_questions def on_generate_from_state( history_msgs, files, custom_prompt, mode, ds_raw_prev, ds_obj_prev, ds_ver_prev, ready_prev, history_data, ): cur_choices = get_history_choices(history_data) if not (ds_raw_prev or "").strip(): yield history_msgs, history_msgs, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:未建立(先生成一版玩法)", ready_prev, "READY_TO_GENERATE:未知", "输出校验:未运行", "_尚无变更记录_", history_data, gr.update(choices=cur_choices), "_无版本历史_" if not cur_choices else f"共 {len(cur_choices)} 个版本" return history = list(history_msgs or []) user_text = "基于当前 DesignState 生成完整玩法(自然语言规则 + mGDL + 自检报告)。" history.append({"role": "user", "content": user_text}) yield history, history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:生成中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_生成中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_" history.append({"role": "assistant", "content": ""}) yield history, history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:生成中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_生成中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_" effective_user_text = ( "请基于下方 DesignState(JSON) 生成完整的麻将新玩法交付物:\n" "1) 自然语言规则说明(含机制声明表)\n" "2) 完整 mGDL v1.3(按 m_prompt 格式)\n" "3) 自检报告\n" "要求:保持 DesignState 的核心设定不漂移;若发现 DesignState 信息不足,请先提出澄清问题并停止输出 mGDL。\n\n" "\n{ds}\n\n" ).format(ds=ds_raw_prev.strip()) tuples_to_send = [] buf = [] try: if design_mahjong_game_stream is not None: for piece in design_mahjong_game_stream(effective_user_text, tuples_to_send, files, custom_prompt, mode): if not piece: continue s = str(piece) buf.append(s) history[-1]["content"] += s yield history, history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:生成中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_生成中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_" else: full = design_mahjong_game(effective_user_text, tuples_to_send, files, custom_prompt, mode) buf.append(str(full)) for piece in _chunk_fake_stream(str(full), step=40): history[-1]["content"] += piece yield history, history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:生成中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_生成中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_" except Exception as e: history[-1]["content"] += f"\n(生成出错){type(e).__name__}: {e}" yield history, history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:生成出错", ready_prev, "READY_TO_GENERATE:未知", "输出校验:未运行", "_生成出错_", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_" return new_ds_obj, new_ds_raw = extract_design_state(history[-1]["content"]) ds_ver = int(ds_ver_prev or 0) ds_status = "DesignState:未变更" diff_summary_md = "_无变更_" updated_history_data = history_data if new_ds_obj and new_ds_raw: ds_ver += 1 ds_status = "DesignState:v{0} 已更新 | {1}".format(ds_ver, summarize_design_state(new_ds_obj)) # 生成差异摘要 diff_summary_md = generate_diff_summary( ds_obj_prev if isinstance(ds_obj_prev, dict) else None, new_ds_obj, int(ds_ver_prev or 0), ds_ver ) # 添加到版本历史 compact_summary = generate_diff_summary_compact(ds_obj_prev if isinstance(ds_obj_prev, dict) else None, new_ds_obj) updated_history_data = add_to_history(history_data, ds_ver, new_ds_obj, new_ds_raw, compact_summary) # 更新版本下拉选项 new_choices = get_history_choices(updated_history_data) history_status_md = f"共 {len(new_choices)} 个版本" if new_choices else "_无版本历史_" try: gdl_content, narrative_content = extract_gdl_and_narrative(history[-1]["content"]) design_log_content = extract_design_log(history[-1]["content"]) gdl_path, narrative_path = save_gdl_and_narrative(gdl_content, narrative_content) design_log_path = save_design_log(design_log_content) yield history, history, gdl_path, narrative_path, design_log_path, (new_ds_raw or ds_raw_prev), (new_ds_obj or ds_obj_prev), ds_ver, ds_status, ready_prev, "READY_TO_GENERATE:未知", render_validation_md(history[-1]["content"]), diff_summary_md, updated_history_data, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), history_status_md except Exception: yield history, history, "", "", "", (new_ds_raw or ds_raw_prev), (new_ds_obj or ds_obj_prev), ds_ver, ds_status, ready_prev, "READY_TO_GENERATE:未知", render_validation_md(history[-1]["content"]), diff_summary_md, updated_history_data, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), history_status_md # 绑定:回车提交(Enter=提交;Shift+Enter=换行由浏览器处理) user_input.submit( fn=on_submit, inputs=[ user_input, chat_state, file_uploader, custom_prompt_box, prompt_mode, iterative_mode, analyse_mode, iteration_scope, design_state_raw, design_state_obj, design_state_version, ready_to_generate, design_state_history, asked_questions_state, ], outputs=[ chatbot, user_input, chat_state, gdl_file_path, narrative_file_path, design_log_file_path, design_state_raw, design_state_obj, design_state_version, design_state_status, ready_to_generate, ready_status, validation_status, diff_summary_display, design_state_history, version_dropdown, history_status, asked_questions_state, ], preprocess=True, ) # 绑定:点击"发送" send_btn.click( fn=on_submit, inputs=[ user_input, chat_state, file_uploader, custom_prompt_box, prompt_mode, iterative_mode, analyse_mode, iteration_scope, design_state_raw, design_state_obj, design_state_version, ready_to_generate, design_state_history, asked_questions_state, ], outputs=[ chatbot, user_input, chat_state, gdl_file_path, narrative_file_path, design_log_file_path, design_state_raw, design_state_obj, design_state_version, design_state_status, ready_to_generate, ready_status, validation_status, diff_summary_display, design_state_history, version_dropdown, history_status, asked_questions_state, ], preprocess=True, ) generate_from_state_btn.click( fn=on_generate_from_state, inputs=[ chat_state, file_uploader, custom_prompt_box, prompt_mode, design_state_raw, design_state_obj, design_state_version, ready_to_generate, design_state_history, ], outputs=[ chatbot, chat_state, gdl_file_path, narrative_file_path, design_log_file_path, design_state_raw, design_state_obj, design_state_version, design_state_status, ready_to_generate, ready_status, validation_status, diff_summary_display, design_state_history, version_dropdown, history_status, ], preprocess=True, ) # 版本回滚功能 def on_rollback(selected_version, history_data, ds_obj_prev, ds_ver_prev): """回滚到选定版本""" if not selected_version: return ( ds_obj_prev, "", ds_ver_prev, "DesignState:未选择版本", "_请选择要回滚的版本_", history_data, gr.update(), f"共 {len(get_history_choices(history_data))} 个版本" if get_history_choices(history_data) else "_无版本历史_" ) version = parse_version_from_choice(selected_version) if version <= 0: return ( ds_obj_prev, "", ds_ver_prev, "DesignState:版本解析失败", "_版本号无效_", history_data, gr.update(), f"共 {len(get_history_choices(history_data))} 个版本" if get_history_choices(history_data) else "_无版本历史_" ) updated_history, state_obj, state_raw = rollback_history(history_data, version) if state_obj is None: return ( ds_obj_prev, "", ds_ver_prev, f"DesignState:回滚失败(v{version} 不存在)", "_回滚失败_", history_data, gr.update(), f"共 {len(get_history_choices(history_data))} 个版本" if get_history_choices(history_data) else "_无版本历史_" ) new_choices = get_history_choices(updated_history) ds_status = "DesignState:v{0} | {1}".format(version, summarize_design_state(state_obj)) diff_md = f"**已回滚到 v{version}**\n\n{summarize_design_state(state_obj)}" return ( state_obj, state_raw, version, ds_status, diff_md, updated_history, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), f"✅ 已回滚到 v{version},共 {len(new_choices)} 个版本" ) rollback_btn.click( fn=on_rollback, inputs=[version_dropdown, design_state_history, design_state_obj, design_state_version], outputs=[ design_state_obj, design_state_raw, design_state_version, design_state_status, diff_summary_display, design_state_history, version_dropdown, history_status, ], ) # 刷新历史列表 def on_refresh_history(history_data): choices = get_history_choices(history_data) return gr.update(choices=choices, value=choices[-1] if choices else None), f"共 {len(choices)} 个版本" if choices else "_无版本历史_" refresh_history_btn.click( fn=on_refresh_history, inputs=[design_state_history], outputs=[version_dropdown, history_status], ) # ====== 多阶段交互:阶段检测与方案选择 ====== def update_phase_from_chat(history_msgs, ds_obj, phase_state): """根据最新聊天内容更新阶段状态和方案显示""" if not history_msgs: return ( "**当前阶段**: 🚀 初始", gr.update(visible=False), gr.update(choices=[], value=None), gr.update(visible=False), gr.update(visible=False), phase_state, ) # 获取最后一条助手消息 last_bot_msg = "" for msg in reversed(history_msgs): if isinstance(msg, dict) and msg.get("role") == "assistant": last_bot_msg = msg.get("content", "") break if not last_bot_msg: return ( "**当前阶段**: 🚀 初始", gr.update(visible=False), gr.update(choices=[], value=None), gr.update(visible=False), gr.update(visible=False), phase_state, ) # 检测阶段 detected_phase = detect_interaction_phase(last_bot_msg, ds_obj) phase_display = get_phase_display_name(detected_phase) # 提取方案 proposals = extract_proposals(last_bot_msg) questions = extract_clarify_questions(last_bot_msg) # 更新阶段状态 new_phase_state = update_phase_state(phase_state, detected_phase, proposals=proposals) # 根据阶段决定 UI 显示 if detected_phase == InteractionPhase.DIVERGE and len(proposals) >= 2: # 有多个方案,显示方案选择 UI proposal_choices = [f"方案 {p['id']}: {p['title']}" for p in proposals] proposals_md = format_proposals_for_display(proposals) return ( f"**当前阶段**: {phase_display}\n\n发现 **{len(proposals)}** 个候选方案,请选择一个深入展开。", gr.update(value=proposals_md, visible=True), gr.update(choices=proposal_choices, value=None, visible=True), gr.update(visible=True), gr.update(visible=True), new_phase_state, ) elif detected_phase == InteractionPhase.UNDERSTAND and questions: # 有确认问题 questions_md = "\n".join([f"❓ {q}" for q in questions]) return ( f"**当前阶段**: {phase_display}\n\n请回答以下确认问题:", gr.update(value=questions_md, visible=True), gr.update(choices=[], value=None), gr.update(visible=False), gr.update(visible=False), new_phase_state, ) else: # 其他阶段 return ( f"**当前阶段**: {phase_display}", gr.update(visible=False), gr.update(choices=[], value=None), gr.update(visible=False), gr.update(visible=False), new_phase_state, ) # 当聊天内容变化时更新阶段 chatbot.change( fn=update_phase_from_chat, inputs=[chatbot, design_state_obj, interaction_phase_state], outputs=[ phase_status, proposals_display, proposal_dropdown, proposal_select_row, diverge_action_row, interaction_phase_state, ], ) # 确认选择方案 def on_select_proposal(selected, phase_state, history_msgs): """用户选择方案后,生成相应的用户消息""" if not selected: return "", history_msgs, phase_state # 提取方案 ID proposal_id = selected.split(":")[0].replace("方案", "").strip() # 生成用户确认消息 confirm_msg = f"我选择 **方案 {proposal_id}**,请对这个方案进行深入展开设计。" # 更新阶段状态 new_phase_state = update_phase_state(phase_state, InteractionPhase.SELECT, selected=proposal_id) return confirm_msg, history_msgs, new_phase_state select_proposal_btn.click( fn=on_select_proposal, inputs=[proposal_dropdown, interaction_phase_state, chat_state], outputs=[user_input, chat_state, interaction_phase_state], ) # 重新发散按钮 def on_request_more(): return "请给出更多不同方向的机制组合方案,我想看看其他可能性。" request_more_btn.click( fn=on_request_more, outputs=[user_input], ) # 直接深入展开按钮 def on_request_elaborate(phase_state): proposals = phase_state.get("proposals", []) if proposals: first = proposals[0] return f"请直接对方案 {first['id']}({first['title']})进行深入展开设计。" return "请直接对当前方案进行深入展开设计,生成完整的玩法规则和 mGDL。" request_elaborate_btn.click( fn=on_request_elaborate, inputs=[interaction_phase_state], outputs=[user_input], ) # ====== 细粒度范围控制:事件绑定 ====== def on_scope_mode_change(mode): """切换控制模式时显示/隐藏自定义选项""" is_custom = mode == "自定义模式" status_text = "_范围约束: 自定义模式_" if is_custom else "_范围约束: 预设模式_" return gr.update(visible=is_custom), status_text scope_mode.change( fn=on_scope_mode_change, inputs=[scope_mode], outputs=[custom_scope_group, scope_status], ) def on_refresh_mechanics(ds_obj): """刷新当前 DesignState 中的机制列表""" if not ds_obj or not isinstance(ds_obj, dict): return gr.update(choices=[], value=[]) mechanics = get_mechanics_from_state(ds_obj) choices = [(m, m) for m in mechanics] return gr.update(choices=choices, value=[]) refresh_mechanics_btn.click( fn=on_refresh_mechanics, inputs=[design_state_obj], outputs=[locked_mechanics], ) def on_scope_config_update(mode, constraint, locked_flds, locked_mechs, preset_scope): """更新范围配置状态""" is_preset = mode == "预设模式" constraint_level_val = ScopeConstraint.SOFT if "软" in constraint else ScopeConstraint.HARD config = { "mode": "preset" if is_preset else "custom", "preset": preset_scope if is_preset else "", "constraint_level": constraint_level_val, "locked_fields": locked_flds or [], "allowed_fields": [], "locked_mechanics": locked_mechs or [], "allowed_mechanics": [], } # 生成状态提示 if is_preset: status = f"_范围约束: 预设模式 ({preset_scope})_" else: locked_count = len(locked_flds or []) + len(locked_mechs or []) constraint_text = "软约束" if constraint_level_val == ScopeConstraint.SOFT else "硬约束" status = f"_范围约束: 自定义模式 | {constraint_text} | 锁定 {locked_count} 项_" return config, status # 当任意范围控制项变化时更新配置 for scope_input in [scope_mode, constraint_level, locked_fields, locked_mechanics, iteration_scope]: scope_input.change( fn=on_scope_config_update, inputs=[scope_mode, constraint_level, locked_fields, locked_mechanics, iteration_scope], outputs=[scope_config_state, scope_status], ) # 导出对话 with gr.Row(): export_btn = gr.Button("导出对话(Markdown)", variant="secondary") export_file = gr.File(label="点击下载导出文件", interactive=False) # 清空对话 with gr.Row(): clear_dialog_btn = gr.Button("清空对话", variant="secondary") def _clear_chat(): return ( [], "", [], "", "", "", "", {}, 0, "DesignState:未建立(先生成一版玩法)", False, "READY_TO_GENERATE:未知", "输出校验:未运行", "_尚无变更记录_", create_empty_history(), gr.update(choices=[], value=None), "_无版本历史_", create_phase_state(), "**当前阶段**: 🚀 初始", gr.update(visible=False), gr.update(choices=[], value=None), gr.update(visible=False), gr.update(visible=False), create_scope_config(), "_范围约束: 预设模式_", [], ) clear_dialog_btn.click( fn=_clear_chat, inputs=None, outputs=[ chatbot, user_input, chat_state, gdl_file_path, narrative_file_path, design_log_file_path, design_state_raw, design_state_obj, design_state_version, design_state_status, ready_to_generate, ready_status, validation_status, diff_summary_display, design_state_history, version_dropdown, history_status, interaction_phase_state, phase_status, proposals_display, proposal_dropdown, proposal_select_row, diverge_action_row, scope_config_state, scope_status, asked_questions_state, ], ) # ==================== 下载按钮部分 ==================== with gr.Row(): gr.Markdown("### 下载输出文件") download_gdl_btn = gr.Button("下载 GDL 文件", variant="secondary") # 下载 GDL 按钮 download_narrative_btn = gr.Button("下载自然语言文件", variant="secondary") # 下载自然语言按钮 download_design_log_btn = gr.Button("下载思维日志文件", variant="secondary") download_gdl_file = gr.File(label="GDL 文件", interactive=False) # 文件下载区域 download_narrative_file = gr.File(label="自然语言文件", interactive=False) # 文件下载区域 download_design_log_file = gr.File(label="思维日志文件", interactive=False) # 🟩【修改】绑定下载按钮与文件路径 - 修复版本 def get_gdl_file(gdl_path): if gdl_path and os.path.exists(gdl_path): return gdl_path return None def get_narrative_file(narrative_path): if narrative_path and os.path.exists(narrative_path): return narrative_path return None def get_design_log_file(design_log_path): if design_log_path and os.path.exists(design_log_path): return design_log_path return None # 绑定下载按钮与文件路径 download_gdl_btn.click( fn=get_gdl_file, inputs=[gdl_file_path], # 🟩【修改】接收State中的路径 outputs=[download_gdl_file] ) download_narrative_btn.click( fn=get_narrative_file, inputs=[narrative_file_path], # 🟩【修改】接收State中的路径 outputs=[download_narrative_file] ) download_design_log_btn.click( fn=get_design_log_file, inputs=[design_log_file_path], outputs=[download_design_log_file], ) # ==================== 事件绑定(左侧) ==================== clear_cache_btn.click(fn=clear_cache, outputs=[cache_status]) clear_files_btn.click(fn=clear_files, outputs=[file_uploader]) file_uploader.change(fn=update_file_status, inputs=[file_uploader], outputs=[file_status]) # 导出按钮 def _export_wrapper(chat_history): try: path = export_history_to_markdown(chat_history) return path except Exception as e: import tempfile tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".md") with open(tmp.name, 'w', encoding='utf-8') as f: f.write(f"导出失败:{type(e).__name__}: {e}\n") return tmp.name export_btn.click(fn=_export_wrapper, inputs=[chatbot], outputs=[export_file]) # ==================== 启动应用 ==================== if __name__ == "__main__": # 兼容不同 gradio 版本: # - 有的版本 queue() 不接受 concurrency_count/status_update_rate # - 有的版本甚至不需要手动 queue() app = demo try: app = demo.queue() # 不带参数,启用队列以支持生成器流式 except TypeError: # 某些版本 queue() 可能参数或签名不同,直接跳过即可 app = demo # HF Spaces 自带公开/私有托管,share=True 会尝试创建外部公网链接,常导致启动失败 is_space = bool(os.getenv("SPACE_ID") or os.getenv("HF_SPACE_ID")) allow_share = (os.getenv("GRADIO_SHARE") or "0").strip() == "1" app.launch(share=(allow_share and not is_space), show_api=False)