Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| # ===== 你的自定义模块 ===== | |
| from config import APP_TITLE, CHATBOT_HEIGHT, MIN_WIDTH_LEFT, MIN_WIDTH_RIGHT | |
| from styles import MAHJONG_THEME_CSS | |
| # Extra CSS to force dark & counter browser auto-invert | |
| EXTRA_FIX_CSS = '''/* === Force dark site and neutralize browser-side auto-invert === */ | |
| html, body{ background-color:#0b1220 !important; color-scheme: dark !important; } | |
| /* When browser/extension applies global invert, add a counter-invert on our app root */ | |
| html.force-dark-fix #app-root{ filter: invert(1) hue-rotate(180deg) !important; }''' | |
| # Redirect to ?__theme=dark | |
| FORCE_DARK_REDIRECT = '''<script> | |
| (function(){ | |
| try{ | |
| const url=new URL(window.location.href); | |
| if(url.searchParams.get("__theme")!=="dark"){ | |
| url.searchParams.set("__theme","dark"); | |
| window.location.replace(url.toString()); | |
| } | |
| }catch(e){} | |
| })(); | |
| </script>''' | |
| # Detect browser force-dark and set html class | |
| DETECT_FORCE_DARK = '''<script> | |
| (function(){ | |
| try{ | |
| const html=document.documentElement; | |
| const cs=getComputedStyle(html); | |
| const hasGlobalFilter=(cs.filter && cs.filter!=="none"); | |
| const darkReaderOn=!!document.querySelector('style#dark-reader-style')||!!document.querySelector('meta[name="darkreader"]'); | |
| if(hasGlobalFilter||darkReaderOn){ html.classList.add('force-dark-fix'); } | |
| }catch(e){} | |
| })(); | |
| </script>''' | |
| from ai_service import design_mahjong_game | |
| from reference_retriever import match_variants_in_text, load_variant_md | |
| # 优先尝试原生流式;没有则自动回退为伪流式 | |
| try: | |
| from ai_service import design_mahjong_game_stream | |
| except Exception: | |
| design_mahjong_game_stream = None | |
| from output_validator import validate_mahjong_response, format_issues_for_llm | |
| from design_state import ( | |
| extract_design_state, | |
| extract_ready_to_generate, | |
| summarize_design_state, | |
| diff_keys, | |
| diff_mechanics, | |
| is_change_within_scope, | |
| generate_diff_summary, | |
| generate_diff_summary_compact, | |
| # 版本历史相关 | |
| create_empty_history, | |
| add_to_history, | |
| rollback_history, | |
| get_history_choices, | |
| parse_version_from_choice, | |
| # 多阶段交互相关 | |
| InteractionPhase, | |
| extract_proposals, | |
| extract_clarify_questions, | |
| detect_interaction_phase, | |
| format_proposals_for_display, | |
| generate_phase_prompt_hint, | |
| create_phase_state, | |
| update_phase_state, | |
| get_phase_display_name, | |
| # 细粒度范围控制相关 | |
| CONTROLLABLE_FIELDS, | |
| ScopeConstraint, | |
| create_scope_config, | |
| get_scope_preset_options, | |
| validate_scope_compliance, | |
| format_scope_violations, | |
| get_mechanics_from_state, | |
| generate_scope_prompt_hint, | |
| ) | |
| # ====== 🔧 Hotfix: 兼容 gradio_client 对 additionalProperties(bool) 的解析 ====== | |
| try: | |
| import gradio_client.utils as _gc_utils | |
| _orig_get_type = _gc_utils.get_type | |
| def _safe_get_type(schema): | |
| if isinstance(schema, bool): | |
| return "any" if schema else "never" | |
| return _orig_get_type(schema) | |
| _gc_utils.get_type = _safe_get_type | |
| _orig_json2py = _gc_utils._json_schema_to_python_type | |
| def _safe_json2py(schema, defs): | |
| if isinstance(schema, bool): | |
| return "any" if schema else "never" | |
| if isinstance(schema, dict) and "additionalProperties" in schema: | |
| ap = schema["additionalProperties"] | |
| if isinstance(ap, bool): | |
| inner = "any" if ap else "never" | |
| return f"dict[str, {inner}]" | |
| return _orig_json2py(schema, defs) | |
| _gc_utils._json_schema_to_python_type = _safe_json2py | |
| _orig_json_schema_to_python_type = _gc_utils.json_schema_to_python_type | |
| def _safe_json_schema_to_python_type(schema): | |
| if isinstance(schema, bool): | |
| return "any" if schema else "never" | |
| return _orig_json_schema_to_python_type(schema) | |
| _gc_utils.json_schema_to_python_type = _safe_json_schema_to_python_type | |
| except Exception: | |
| pass | |
| # ====== 🔧 Hotfix 结束 ====== | |
| # ==================== 工具函数 ==================== | |
| def _messages_to_tuples(history): | |
| """ | |
| 将 Chatbot 的 messages 格式([{role, content}, ...])转换为 [(user, bot), ...]。 | |
| 若已是 tuples,则原样返回。 | |
| """ | |
| if not history: | |
| return [] | |
| if isinstance(history, list) and history and isinstance(history[0], dict): | |
| pairs = [] | |
| last_user = None | |
| for msg in history: | |
| role = msg.get("role") | |
| content = msg.get("content", "") | |
| if role == "user": | |
| last_user = content | |
| elif role == "assistant": | |
| pairs.append((last_user or "", content)) | |
| last_user = None | |
| return pairs | |
| return history | |
| def _chunk_fake_stream(text, step=40): | |
| """把整段文本切成小块,伪流式输出。""" | |
| s = str(text or "") | |
| for i in range(0, len(s), step): | |
| yield s[i:i + step] | |
| def _is_design_intent(text: str) -> bool: | |
| """判断是否明确进入“新玩法设计/融合/生成”路径。""" | |
| s = (text or "").strip() | |
| if not s: | |
| return False | |
| keywords = [ | |
| "设计", "生成", "做一个", "做个", "新玩法", "创新", "融合", "组合", "改造", "迭代", | |
| "玩法设计", "规则设计", "番型设计", "出一套", "给我一套", | |
| ] | |
| return any(k in s for k in keywords) | |
| def _is_rules_query(text: str) -> bool: | |
| """判断是否为“询问现有玩法细节/规则”的问题。""" | |
| s = (text or "").strip() | |
| if not s: | |
| return False | |
| keywords = [ | |
| "规则", "玩法", "番型", "细节", "说明", "机制", "怎么打", "怎么胡", "如何胡", "计分", "流程", | |
| "介绍", "讲讲", "是什么", "有哪些", | |
| ] | |
| return any(k in s for k in keywords) | |
| def _normalize_question(text: str) -> str: | |
| s = str(text or "").strip().lower() | |
| if not s: | |
| return "" | |
| punct = "??!!。;;::、,,..()()[]【】<>《》\"'“”‘’ " | |
| for ch in punct: | |
| s = s.replace(ch, "") | |
| return s | |
| def _merge_asked_questions(existing, new_items): | |
| merged = [str(q).strip() for q in (existing or []) if str(q).strip()] | |
| seen = {_normalize_question(q) for q in merged if _normalize_question(q)} | |
| for q in (new_items or []): | |
| q_text = str(q).strip() | |
| if not q_text: | |
| continue | |
| key = _normalize_question(q_text) | |
| if key and key not in seen: | |
| merged.append(q_text) | |
| seen.add(key) | |
| return merged | |
| def _load_base64_image(path): | |
| """读取本地图片并返回 data URI""" | |
| import base64 | |
| import pathlib | |
| p = pathlib.Path(path) | |
| if not p.exists(): | |
| return "" | |
| try: | |
| data = base64.b64encode(p.read_bytes()).decode("ascii") | |
| suffix = p.suffix.lower() | |
| mime = "image/png" if suffix == ".png" else "image/jpeg" | |
| return f"data:{mime};base64,{data}" | |
| except Exception: | |
| return "" | |
| HERO_TILE_PATHS = ["UI00001.jpg", "UI00002.jpg", "UI00003.jpg", "UI00004.jpg"] | |
| HERO_TILE_IMAGES = [_load_base64_image(p) for p in HERO_TILE_PATHS] | |
| def _render_hero_tiles(): | |
| tiles = [ | |
| f'<span class="hero-photo" style="background-image:url(\'{src}\');" title="参考图 {idx + 1}"></span>' | |
| for idx, src in enumerate(HERO_TILE_IMAGES) | |
| if src | |
| ] | |
| return "".join(tiles) | |
| def clear_cache(): | |
| """清空缓存""" | |
| from cache_manager import file_cache, request_cache | |
| file_cache.clear() | |
| request_cache.clear() | |
| return "✅ 缓存已清空" | |
| def clear_files(): | |
| """清空文件上传""" | |
| return None | |
| def render_validation_md(text: str) -> str: | |
| issues = validate_mahjong_response(text or "") | |
| if not issues: | |
| return "✅ 输出校验:未发现静态问题(可导出)" | |
| errors = [i for i in issues if i.get("level") == "error"] | |
| warnings = [i for i in issues if i.get("level") == "warning"] | |
| parts = ["⚠️ 输出校验:发现潜在问题(建议先让模型按最小修改修复后再导出)"] | |
| if errors: | |
| parts.append("\n**错误(必须修复)**\n") | |
| parts.append(format_issues_for_llm(errors)) | |
| if warnings: | |
| parts.append("\n**警告(建议修复)**\n") | |
| parts.append(format_issues_for_llm(warnings)) | |
| return "\n".join(parts).strip() | |
| def update_file_status(files): | |
| """更新文件状态显示""" | |
| if not files: | |
| return "📁 文件状态:未上传" | |
| file_list = files if isinstance(files, (list, tuple)) else [files] | |
| names = [] | |
| for f in file_list: | |
| if isinstance(f, str): | |
| names.append(os.path.basename(f)) | |
| else: | |
| names.append(os.path.basename(getattr(f, "name", str(f)))) | |
| head = "\n".join(f" • {n}" for n in names[:3]) | |
| tail = "\n ..." if len(names) > 3 else "" | |
| return f"📁 文件状态:已上传 {len(names)} 个文件\n{head}{tail}" | |
| def export_history_to_markdown(history): | |
| """将聊天历史导出为 Markdown 文件,并返回文件路径供下载""" | |
| import time | |
| import pathlib | |
| from datetime import datetime | |
| pairs = _messages_to_tuples(history) | |
| lines = ["# 对话记录", ""] | |
| lines.append(f"- 导出时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| lines.append("") | |
| for idx, (user_msg, bot_msg) in enumerate(pairs, start=1): | |
| lines.append(f"## 轮次 {idx}") | |
| if user_msg: | |
| lines.append("**用户**:\n") | |
| lines.append(user_msg) | |
| lines.append("") | |
| if bot_msg: | |
| lines.append("**助手**:\n") | |
| lines.append(bot_msg) | |
| lines.append("") | |
| content = "\n".join(lines).strip() + "\n" | |
| export_dir = pathlib.Path("exports") | |
| export_dir.mkdir(parents=True, exist_ok=True) | |
| filename = export_dir / f"chat_history_{int(time.time())}.md" | |
| with open(filename, "w", encoding="utf-8") as f: | |
| f.write(content) | |
| return str(filename) | |
| # ==================== 新增:提取 GDL 和自然语言描述 ==================== | |
| def extract_gdl_and_narrative(content): | |
| """提取 GDL 和自然语言部分(支持多种格式变体)""" | |
| import re | |
| # 定义多种可能的标记格式(按优先级排序,支持更多变体) | |
| gdl_patterns = [ | |
| r"##\s*m?GDL\s*描述", # ## GDL描述 / ## mGDL 描述 | |
| r"m?GDL\s*描述", # GDL描述 / mGDL 描述 | |
| r"##\s*GDL\s*Description", # ## GDL Description(英文) | |
| r"##\s*m?GDL", # ## GDL / ## mGDL | |
| r"m?GDL\s*规则代码", # mGDL 规则代码 | |
| r"m?GDL\s*规则", # mGDL 规则 | |
| ] | |
| narrative_patterns = [ | |
| r"##\s*自然语言规则说明", # ## 自然语言规则说明 | |
| r"##\s*自然语言规则", # ## 自然语言规则 | |
| r"自然语言规则说明", # 自然语言规则说明 | |
| r"自然语言规则", # 自然语言规则 | |
| r"##\s*规则说明", # ## 规则说明 | |
| r"规则说明", # 规则说明 | |
| ] | |
| # 尝试查找 GDL 部分(支持大小写不敏感) | |
| gdl_start = -1 | |
| for pattern in gdl_patterns: | |
| match = re.search(pattern, content, re.IGNORECASE) | |
| if match: | |
| gdl_start = match.start() | |
| break | |
| # 尝试查找自然语言部分(支持大小写不敏感) | |
| narrative_start = -1 | |
| for pattern in narrative_patterns: | |
| match = re.search(pattern, content, re.IGNORECASE) | |
| if match: | |
| narrative_start = match.start() | |
| break | |
| if gdl_start != -1 and narrative_start != -1: | |
| # 确保顺序正确(GDL应该在自然语言之前) | |
| if gdl_start >= narrative_start: | |
| gdl_start, narrative_start = narrative_start, gdl_start | |
| # 获取 GDL 部分(从标记开始到自然语言标记之前) | |
| gdl_content = content[gdl_start:narrative_start].strip() | |
| # 获取自然语言部分(从标记开始到结尾) | |
| narrative_content = content[narrative_start:].strip() | |
| return gdl_content, narrative_content | |
| elif gdl_start != -1: | |
| # 只找到GDL,将后面全部作为GDL | |
| print(f"⚠️ 仅找到GDL标记,将其后内容作为GDL") | |
| return content[gdl_start:].strip(), "" | |
| elif narrative_start != -1: | |
| # 只找到自然语言,将后面全部作为自然语言 | |
| print(f"⚠️ 仅找到自然语言标记,将其后内容作为自然语言") | |
| return "", content[narrative_start:].strip() | |
| else: | |
| # 都没找到,返回空 | |
| print(f"⚠️ 提取警告: 未找到GDL和自然语言标记") | |
| return "", "" | |
| def save_gdl_and_narrative(gdl_content, narrative_content): | |
| """保存 GDL 和自然语言内容到文件""" | |
| # 定义文件存储路径 | |
| export_dir = os.path.join("exports") | |
| os.makedirs(export_dir, exist_ok=True) | |
| gdl_file_path = os.path.join(export_dir, "gdl_output.txt") | |
| narrative_file_path = os.path.join(export_dir, "narrative_output.txt") | |
| # 保存 GDL | |
| with open(gdl_file_path, "w", encoding="utf-8") as f: | |
| f.write(gdl_content) | |
| # 保存自然语言 | |
| with open(narrative_file_path, "w", encoding="utf-8") as f: | |
| f.write(narrative_content) | |
| return gdl_file_path, narrative_file_path | |
| def extract_design_log(content): | |
| """ | |
| 提取"设计日志(创新推演摘要)"段落,用于单独导出。 | |
| 规则:从标题行开始,截到下一个同级标题(###)或文件结尾。 | |
| """ | |
| import re | |
| if not content: | |
| return "" | |
| # 修复:原始字符串中 \s 即可匹配空白,不需要 \\s | |
| m = re.search(r"^###\s*设计日志(创新推演摘要)\s*$", content, re.MULTILINE) | |
| if not m: | |
| # 兼容变体格式:设计日志/思维日志/Design Log | |
| m = re.search(r"^###\s*(设计日志|思维日志|Design\s*Log).*$", content, re.MULTILINE | re.IGNORECASE) | |
| if not m: | |
| return "" | |
| start = m.start() | |
| m2 = re.search(r"^###\s+.+$", content[m.end():], re.MULTILINE) | |
| end = (m.end() + m2.start()) if m2 else len(content) | |
| return content[start:end].strip() | |
| def save_design_log(design_log_content): | |
| export_dir = os.path.join("exports") | |
| os.makedirs(export_dir, exist_ok=True) | |
| design_log_file_path = os.path.join(export_dir, "design_log_output.txt") | |
| with open(design_log_file_path, "w", encoding="utf-8") as f: | |
| f.write(design_log_content or "") | |
| return design_log_file_path | |
| # ==================== Gradio 界面(Mahjong Skin) ==================== | |
| with gr.Blocks( | |
| theme=gr.themes.Soft(primary_hue='green', neutral_hue='slate'), | |
| title=APP_TITLE, | |
| css=MAHJONG_THEME_CSS + EXTRA_FIX_CSS, | |
| elem_id='app-root' | |
| ) as demo: | |
| gr.HTML(FORCE_DARK_REDIRECT) | |
| gr.HTML(DETECT_FORCE_DARK) | |
| # 顶部 Hero | |
| hero_tiles_html = _render_hero_tiles() | |
| gr.HTML(f""" | |
| <div class="hero"> | |
| <div style="display:flex;align-items:center;gap:12px;"> | |
| <div style="font-size:30px;">🀄️ 麻将玩法灵感工坊 · KOI Lab</div> | |
| </div> | |
| <div style="margin-top:6px;opacity:.92;">与 AI 麻将策划智囊对话,快速生成番型体系与玩法流程</div> | |
| <div class="hero-icons" aria-hidden="true"> | |
| {hero_tiles_html or '<span class="hero-photo"></span>'} | |
| </div> | |
| </div> | |
| """) | |
| with gr.Row(equal_height=True): | |
| # 左侧:设置区 | |
| with gr.Column(scale=1, min_width=MIN_WIDTH_LEFT): | |
| with gr.Group(elem_classes="side-card"): | |
| gr.Markdown("### 输入与约束") | |
| gr.Markdown("✅ **系统已预加载 Mahjong GDL 规范与提示词**,可直接开局。", elem_classes="hint") | |
| file_uploader = gr.File( | |
| label="上传自定义 Mahjong GDL/番型示例(.txt,可多选)", | |
| file_types=[".txt"], | |
| file_count="multiple", | |
| type="filepath", | |
| interactive=True, | |
| show_label=True, | |
| container=True, | |
| scale=1 | |
| ) | |
| custom_prompt_box = gr.Textbox( | |
| label="自定义 System Prompt(可选)", | |
| placeholder="系统已经载入麻将玩法专家提示词。如需特殊规则,可在此粘贴。", | |
| lines=10, | |
| max_lines=20, | |
| show_copy_button=True | |
| ) | |
| prompt_mode = gr.Radio( | |
| choices=["覆盖默认SYSTEM_PROMPT", "合并(默认在前)"], | |
| value="覆盖默认SYSTEM_PROMPT", | |
| label="自定义 Prompt 的使用方式" | |
| ) | |
| gr.Markdown("💡 提示:系统默认包含麻将通用语法、番型模板与 Prompt;仅当你要替换地方规则时再上传或输入。", elem_classes="hint") | |
| with gr.Group(elem_classes="side-card"): | |
| gr.Markdown("### 使用说明") | |
| gr.Markdown( | |
| "- ✅ **快速开始**:直接描述你想要的麻将玩法主题或目标。\n" | |
| "- 📁 **可选上传**:导入地方番型表或自定义 GDL 模板以便引用。\n" | |
| "- ✏️ **可选自定义**:针对特殊风格重写 System Prompt。\n" | |
| "- 🔄 **清空缓存**:切换规则后建议刷新缓存,避免残留设定。" | |
| ) | |
| with gr.Group(elem_classes="side-card"): | |
| gr.Markdown("### 快捷操作") | |
| with gr.Row(): | |
| clear_cache_btn = gr.Button("清空缓存", variant="secondary") | |
| clear_files_btn = gr.Button("清空文件", variant="secondary") | |
| cache_status = gr.Markdown("💾 缓存状态:正常", elem_classes="hint") | |
| file_status = gr.Markdown("📁 文件状态:使用默认麻将配置(GDL + Prompt 已预加载)", elem_classes="hint") | |
| # 右侧:聊天区(手动事件绑定 + 流式输出) | |
| with gr.Column(scale=2, min_width=MIN_WIDTH_RIGHT): | |
| with gr.Group(elem_classes="table"): | |
| chatbot = gr.Chatbot( | |
| height=CHATBOT_HEIGHT, | |
| type="messages", | |
| elem_classes="custom-chatbot", | |
| avatar_images=("landlord.png", "bot.png"), | |
| ) | |
| # 用 State 保存 messages 历史 | |
| chat_state = gr.State([]) # list[dict]: [{"role":"user","content":...}, {"role":"assistant","content":...}] | |
| gdl_file_path = gr.State("") | |
| narrative_file_path = gr.State("") | |
| design_log_file_path = gr.State("") | |
| design_state_raw = gr.State("") # 上一次可机读 DesignState(JSON) 原文 | |
| design_state_obj = gr.State({}) # 解析后的 dict(用于显示与对比) | |
| design_state_version = gr.State(0) # 版本号(每次成功提取 +1) | |
| ready_to_generate = gr.State(False) | |
| design_state_history = gr.State(create_empty_history()) # 版本历史 | |
| interaction_phase_state = gr.State(create_phase_state()) # 多阶段交互状态 | |
| asked_questions_state = gr.State([]) # Analyse 模式:已问过的问题 | |
| with gr.Group(elem_classes="side-card"): | |
| gr.Markdown("### 可控迭代(推荐)") | |
| iterative_mode = gr.Checkbox( | |
| label="启用可控迭代:基于上轮 DesignState 做最小修改", | |
| value=True, | |
| ) | |
| analyse_mode = gr.Checkbox( | |
| label="Analyse 模式:单问题迭代完善需求(不生成mGDL)", | |
| value=False, | |
| ) | |
| ready_status = gr.Markdown("READY_TO_GENERATE:未知", elem_classes="hint") | |
| iteration_scope = gr.Dropdown( | |
| label="本轮允许修改范围(预设)", | |
| choices=get_scope_preset_options(), | |
| value="仅优化创新机制", | |
| ) | |
| # 细粒度范围控制(新增) | |
| with gr.Accordion("🔒 细粒度范围控制", open=False) as scope_accordion: | |
| scope_mode = gr.Radio( | |
| choices=["预设模式", "自定义模式"], | |
| value="预设模式", | |
| label="控制模式", | |
| ) | |
| constraint_level = gr.Radio( | |
| choices=["软约束(提示)", "硬约束(阻断)"], | |
| value="软约束(提示)", | |
| label="约束级别", | |
| ) | |
| with gr.Group(visible=False) as custom_scope_group: | |
| gr.Markdown("**锁定字段**(选中的字段不允许修改)", elem_classes="hint") | |
| locked_fields = gr.CheckboxGroup( | |
| choices=[ | |
| ("玩法名称", "new_variant_name"), | |
| ("底座玩法", "base_variants"), | |
| ("融合玩法", "fusion_variants"), | |
| ("游戏模式", "game_variant"), | |
| ("玩家人数", "players"), | |
| ("计分模式", "scoring_mode"), | |
| ("牌组配置", "tileset"), | |
| ], | |
| value=[], | |
| label="", | |
| ) | |
| gr.Markdown("**锁定机制**(选中的机制不允许修改)", elem_classes="hint") | |
| locked_mechanics = gr.CheckboxGroup( | |
| choices=[], | |
| value=[], | |
| label="", | |
| ) | |
| refresh_mechanics_btn = gr.Button("刷新机制列表", size="sm") | |
| scope_status = gr.Markdown("_范围约束: 预设模式_", elem_classes="hint") | |
| # 保存范围配置的 State | |
| scope_config_state = gr.State(create_scope_config()) | |
| generate_from_state_btn = gr.Button("基于当前 DesignState 生成完整玩法", variant="primary") | |
| design_state_status = gr.Markdown("DesignState:未建立(先生成一版玩法)", elem_classes="hint") | |
| # 差异可视化区域(新增) | |
| with gr.Accordion("📊 DesignState 变更详情", open=False) as diff_accordion: | |
| diff_summary_display = gr.Markdown("_尚无变更记录_", elem_classes="diff-summary") | |
| # 版本历史与回滚(新增) | |
| with gr.Accordion("🕐 版本历史与回滚", open=False) as history_accordion: | |
| version_dropdown = gr.Dropdown( | |
| label="选择版本", | |
| choices=[], | |
| value=None, | |
| interactive=True, | |
| ) | |
| with gr.Row(): | |
| rollback_btn = gr.Button("回滚到所选版本", variant="secondary", size="sm") | |
| refresh_history_btn = gr.Button("刷新列表", variant="secondary", size="sm") | |
| history_status = gr.Markdown("_无版本历史_", elem_classes="hint") | |
| # 多阶段交互与方案选择(新增) | |
| with gr.Accordion("🎯 方案选择与交互引导", open=True) as phase_accordion: | |
| phase_status = gr.Markdown("**当前阶段**: 🚀 初始", elem_classes="hint") | |
| proposals_display = gr.Markdown("_等待生成方案..._", visible=False) | |
| with gr.Row(visible=False) as proposal_select_row: | |
| proposal_dropdown = gr.Dropdown( | |
| label="选择方案", | |
| choices=[], | |
| value=None, | |
| interactive=True, | |
| scale=3, | |
| ) | |
| select_proposal_btn = gr.Button("确认选择", variant="primary", size="sm", scale=1) | |
| with gr.Row(visible=False) as diverge_action_row: | |
| request_more_btn = gr.Button("🔄 重新发散", variant="secondary", size="sm") | |
| request_elaborate_btn = gr.Button("📝 直接深入展开", variant="secondary", size="sm") | |
| validation_status = gr.Markdown("输出校验:未运行", elem_classes="hint") | |
| user_input = gr.Textbox( | |
| placeholder="例如:为4人竞速推倒胡设计番型与流程;或“做一个双人合作麻将 roguelike” …", | |
| show_label=False, | |
| max_lines=5, | |
| lines=3, | |
| show_copy_button=True, | |
| autofocus=True, | |
| ) | |
| with gr.Row(): | |
| send_btn = gr.Button("发送", variant="primary") | |
| stop_info = gr.Markdown("", visible=False) | |
| # ====== 核心:流式提交回调(生成器) ====== | |
| #此函数为工具核心,处理用户发送消息的完整流程(模式判断-模型调用-解析与更新) | |
| def on_submit( | |
| user_text, | |
| history_msgs, | |
| files, | |
| custom_prompt, | |
| mode, | |
| iterative_enabled, | |
| analyse_enabled, | |
| scope, | |
| ds_raw_prev, | |
| ds_obj_prev, | |
| ds_ver_prev, | |
| ready_prev, | |
| history_data, | |
| asked_questions, | |
| ): | |
| user_text = (user_text or "").strip() | |
| diff_md_prev = "_尚无变更记录_" # 保留上次的差异摘要 | |
| cur_choices = get_history_choices(history_data) | |
| asked_questions = asked_questions or [] | |
| if not user_text: | |
| # 不提交空消息:输出不变 | |
| yield history_msgs, "", history_msgs, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:未变更", ready_prev, "READY_TO_GENERATE:未知", "输出校验:未运行", diff_md_prev, history_data, gr.update(choices=cur_choices), "_无版本历史_" if not cur_choices else f"共 {len(cur_choices)} 个版本", asked_questions | |
| return | |
| # 立即显示"用户消息" | |
| history = list(history_msgs or []) | |
| history.append({"role": "user", "content": user_text}) | |
| yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:处理中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_处理中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions | |
| # 添加空的助手气泡,用于逐步填充 | |
| history.append({"role": "assistant", "content": ""}) | |
| yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:处理中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_处理中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions | |
| # ====== 现有玩法细节查询:直接返回本地玩法库内容 ====== | |
| if (not analyse_enabled) and (not iterative_enabled) and _is_rules_query(user_text) and (not _is_design_intent(user_text)): | |
| matched = match_variants_in_text(user_text) | |
| if matched: | |
| parts = [] | |
| for name in matched: | |
| md_text = (load_variant_md(name) or "").strip() | |
| if md_text: | |
| parts.append(f"### {name}\n\n{md_text}") | |
| else: | |
| parts.append(f"### {name}\n\n(未找到该玩法的本地规则文件)") | |
| history[-1]["content"] = "\n\n---\n\n".join(parts).strip() | |
| yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:未变更", ready_prev, "READY_TO_GENERATE:未知", "输出校验:未运行", "_未生成设计_", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions | |
| return | |
| # 流式生成内容 | |
| tuples_hist = _messages_to_tuples(history) | |
| # 可控迭代:基于上轮 DesignState 做“最小修改”,并尽量减少历史噪声 | |
| iterative_enabled = bool(iterative_enabled) | |
| analyse_enabled = bool(analyse_enabled) | |
| scope = (scope or "").strip() | |
| effective_user_text = user_text | |
| tuples_to_send = tuples_hist | |
| status_hint = "DesignState:未变更" | |
| ready_to_gen = ready_prev if isinstance(ready_prev, bool) else False | |
| ready_md = "READY_TO_GENERATE:未知" | |
| if analyse_enabled: | |
| tuples_to_send = [] # Analyse 模式不吃长history,避免漂移 | |
| asked_block = "" | |
| if asked_questions: | |
| asked_block = "已问过的问题(禁止重复):\n" + "\n".join( | |
| f"- {q}" for q in asked_questions[:20] | |
| ) + "\n\n" | |
| effective_user_text = ( | |
| "<ANALYSE_MODE>true</ANALYSE_MODE>\n" | |
| "你正在进行【Analyse 模式】(单问题迭代)。\n" | |
| "目标:通过多轮问答把需求场景收敛到可生成完整玩法。\n" | |
| "限制:本轮禁止输出 mGDL/完整规则/自检报告。\n\n" | |
| "{asked}" | |
| "当前已有 DesignState(如为空表示尚未建立初版):\n" | |
| "<DESIGN_STATE_JSON>\n{ds}\n</DESIGN_STATE_JSON>\n\n" | |
| "用户本轮补充信息:\n{req}\n" | |
| "要求:本轮只能问 1 个问题;不得重复已问过的问题;若信息已足够则直接 READY_TO_GENERATE: true。\n" | |
| ).format(asked=asked_block, ds=(ds_raw_prev or "{}").strip(), req=user_text) | |
| status_hint = "DesignState:Analyse模式中…" | |
| elif iterative_enabled and (ds_raw_prev or "").strip(): | |
| tuples_to_send = [] # 用“当前 DesignState”替代长历史,减少漂移 | |
| effective_user_text = ( | |
| "你正在进行【可控迭代】。请基于下方 DesignState(JSON) 对方案做【最小修改】。\n" | |
| "本轮允许修改范围:{scope}\n" | |
| "硬约束:\n" | |
| "1) 除允许范围外的字段一律保持不变;若必须改动范围外字段,先提出澄清问题并停止输出 mGDL。\n" | |
| "2) 必须输出新的 DesignState(JSON)(合法JSON、无注释)+ 变更摘要(列出改动点)。\n" | |
| "3) 仍需按 m_prompt 输出完整结果(规则 + mGDL + 自检)。\n\n" | |
| "<DESIGN_STATE_JSON>\n{ds}\n</DESIGN_STATE_JSON>\n\n" | |
| "【用户本轮优化要求】\n{req}\n" | |
| ).format(scope=scope or "自由迭代(仍保最小修改)", ds=ds_raw_prev.strip(), req=user_text) | |
| status_hint = "DesignState:可控迭代中…" | |
| if design_mahjong_game_stream is not None: | |
| try: | |
| for piece in design_mahjong_game_stream(effective_user_text, tuples_to_send, files, custom_prompt, mode): | |
| if not piece: | |
| continue | |
| history[-1]["content"] += str(piece) | |
| yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, status_hint, ready_to_gen, ready_md, "输出校验:运行中…", "_处理中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions | |
| except Exception as e: | |
| history[-1]["content"] += f"\n(流式出错){type(e).__name__}: {e}" | |
| yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:流式出错(未变更)", ready_to_gen, "READY_TO_GENERATE:未知", "输出校验:未运行", "_处理出错_", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions | |
| else: | |
| try: | |
| full = design_mahjong_game(effective_user_text, tuples_to_send, files, custom_prompt, mode) | |
| except Exception as e: | |
| full = f"(出错){type(e).__name__}: {e}" | |
| for piece in _chunk_fake_stream(str(full), step=40): | |
| history[-1]["content"] += piece | |
| yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, status_hint, ready_to_gen, ready_md, "输出校验:运行中…", "_处理中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions | |
| # 提取 GDL 和自然语言描述并保存 | |
| new_ds_obj, new_ds_raw = extract_design_state(history[-1]["content"]) | |
| ready_flag = extract_ready_to_generate(history[-1]["content"]) | |
| ds_ver = int(ds_ver_prev or 0) | |
| ds_status = "DesignState:未变更" | |
| diff_summary_md = "_无变更_" | |
| updated_history_data = history_data | |
| if new_ds_obj and new_ds_raw: | |
| ds_ver = ds_ver + 1 | |
| ds_status = "DesignState:v{0} 已更新 | {1}".format(ds_ver, summarize_design_state(new_ds_obj)) | |
| if isinstance(ready_flag, bool): | |
| ready_to_gen = ready_flag | |
| ready_md = "READY_TO_GENERATE:{0}".format("true" if ready_flag else "false") | |
| # 生成差异摘要 | |
| diff_summary_md = generate_diff_summary( | |
| ds_obj_prev if isinstance(ds_obj_prev, dict) else None, | |
| new_ds_obj, | |
| int(ds_ver_prev or 0), | |
| ds_ver | |
| ) | |
| # 添加到版本历史 | |
| compact_summary = generate_diff_summary_compact(ds_obj_prev if isinstance(ds_obj_prev, dict) else None, new_ds_obj) | |
| updated_history_data = add_to_history(history_data, ds_ver, new_ds_obj, new_ds_raw, compact_summary) | |
| # 可控迭代:做一个"范围越界"软检查(不阻断,只提示) | |
| if iterative_enabled and ds_obj_prev and isinstance(ds_obj_prev, dict) and scope: | |
| changed = diff_keys(ds_obj_prev, new_ds_obj) | |
| mech_changes = diff_mechanics(ds_obj_prev, new_ds_obj) | |
| if not is_change_within_scope(changed, scope): | |
| history[-1]["content"] += ( | |
| "\n\n---\n⚠️ 可控迭代检查:检测到可能超出允许范围的改动。\n" | |
| "允许范围:{scope}\n" | |
| "顶层变更字段:{keys}\n" | |
| "机制变更:{mech}\n" | |
| "建议:请重新执行一次迭代,明确只改允许范围内字段,或切换为「自由迭代」。\n" | |
| ).format( | |
| scope=scope, | |
| keys=",".join(changed) if changed else "(无)", | |
| mech="; ".join(mech_changes) if mech_changes else "(无)", | |
| ) | |
| try: | |
| # 更新版本下拉选项 | |
| new_choices = get_history_choices(updated_history_data) | |
| history_status_md = f"共 {len(new_choices)} 个版本" if new_choices else "_无版本历史_" | |
| # Analyse 模式不产出完整规则,不导出GDL/自然语言文件 | |
| if analyse_enabled: | |
| new_questions = extract_clarify_questions(history[-1]["content"]) | |
| asked_questions = _merge_asked_questions(asked_questions, new_questions) | |
| yield history, "", history, "", "", "", (new_ds_raw or ds_raw_prev), (new_ds_obj or ds_obj_prev), ds_ver, ds_status, ready_to_gen, ready_md, "输出校验:Analyse 模式跳过", diff_summary_md, updated_history_data, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), history_status_md, asked_questions | |
| return | |
| gdl_content, narrative_content = extract_gdl_and_narrative(history[-1]["content"]) | |
| design_log_content = extract_design_log(history[-1]["content"]) | |
| # 保存 GDL 和自然语言文件 | |
| gdl_path, narrative_path = save_gdl_and_narrative(gdl_content, narrative_content) | |
| design_log_path = save_design_log(design_log_content) | |
| # 返回文件路径,以便下载 | |
| yield history, "", history, gdl_path, narrative_path, design_log_path, (new_ds_raw or ds_raw_prev), (new_ds_obj or ds_obj_prev), ds_ver, ds_status, ready_to_gen, ready_md, render_validation_md(history[-1]["content"]), diff_summary_md, updated_history_data, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), history_status_md, asked_questions | |
| except Exception as e: | |
| print(f"保存GDL和自然语言文件时出错: {e}") | |
| yield history, "", history, "", "", "", (new_ds_raw or ds_raw_prev), (new_ds_obj or ds_obj_prev), ds_ver, ds_status, ready_to_gen, ready_md, render_validation_md(history[-1]["content"]), diff_summary_md, updated_history_data, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), history_status_md, asked_questions | |
| def on_generate_from_state( | |
| history_msgs, | |
| files, | |
| custom_prompt, | |
| mode, | |
| ds_raw_prev, | |
| ds_obj_prev, | |
| ds_ver_prev, | |
| ready_prev, | |
| history_data, | |
| ): | |
| cur_choices = get_history_choices(history_data) | |
| if not (ds_raw_prev or "").strip(): | |
| yield history_msgs, history_msgs, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:未建立(先生成一版玩法)", ready_prev, "READY_TO_GENERATE:未知", "输出校验:未运行", "_尚无变更记录_", history_data, gr.update(choices=cur_choices), "_无版本历史_" if not cur_choices else f"共 {len(cur_choices)} 个版本" | |
| return | |
| history = list(history_msgs or []) | |
| user_text = "基于当前 DesignState 生成完整玩法(自然语言规则 + mGDL + 自检报告)。" | |
| history.append({"role": "user", "content": user_text}) | |
| yield history, history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:生成中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_生成中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_" | |
| history.append({"role": "assistant", "content": ""}) | |
| yield history, history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:生成中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_生成中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_" | |
| effective_user_text = ( | |
| "请基于下方 DesignState(JSON) 生成完整的麻将新玩法交付物:\n" | |
| "1) 自然语言规则说明(含机制声明表)\n" | |
| "2) 完整 mGDL v1.3(按 m_prompt 格式)\n" | |
| "3) 自检报告\n" | |
| "要求:保持 DesignState 的核心设定不漂移;若发现 DesignState 信息不足,请先提出澄清问题并停止输出 mGDL。\n\n" | |
| "<DESIGN_STATE_JSON>\n{ds}\n</DESIGN_STATE_JSON>\n" | |
| ).format(ds=ds_raw_prev.strip()) | |
| tuples_to_send = [] | |
| buf = [] | |
| try: | |
| if design_mahjong_game_stream is not None: | |
| for piece in design_mahjong_game_stream(effective_user_text, tuples_to_send, files, custom_prompt, mode): | |
| if not piece: | |
| continue | |
| s = str(piece) | |
| buf.append(s) | |
| history[-1]["content"] += s | |
| yield history, history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:生成中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_生成中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_" | |
| else: | |
| full = design_mahjong_game(effective_user_text, tuples_to_send, files, custom_prompt, mode) | |
| buf.append(str(full)) | |
| for piece in _chunk_fake_stream(str(full), step=40): | |
| history[-1]["content"] += piece | |
| yield history, history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:生成中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_生成中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_" | |
| except Exception as e: | |
| history[-1]["content"] += f"\n(生成出错){type(e).__name__}: {e}" | |
| yield history, history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:生成出错", ready_prev, "READY_TO_GENERATE:未知", "输出校验:未运行", "_生成出错_", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_" | |
| return | |
| new_ds_obj, new_ds_raw = extract_design_state(history[-1]["content"]) | |
| ds_ver = int(ds_ver_prev or 0) | |
| ds_status = "DesignState:未变更" | |
| diff_summary_md = "_无变更_" | |
| updated_history_data = history_data | |
| if new_ds_obj and new_ds_raw: | |
| ds_ver += 1 | |
| ds_status = "DesignState:v{0} 已更新 | {1}".format(ds_ver, summarize_design_state(new_ds_obj)) | |
| # 生成差异摘要 | |
| diff_summary_md = generate_diff_summary( | |
| ds_obj_prev if isinstance(ds_obj_prev, dict) else None, | |
| new_ds_obj, | |
| int(ds_ver_prev or 0), | |
| ds_ver | |
| ) | |
| # 添加到版本历史 | |
| compact_summary = generate_diff_summary_compact(ds_obj_prev if isinstance(ds_obj_prev, dict) else None, new_ds_obj) | |
| updated_history_data = add_to_history(history_data, ds_ver, new_ds_obj, new_ds_raw, compact_summary) | |
| # 更新版本下拉选项 | |
| new_choices = get_history_choices(updated_history_data) | |
| history_status_md = f"共 {len(new_choices)} 个版本" if new_choices else "_无版本历史_" | |
| try: | |
| gdl_content, narrative_content = extract_gdl_and_narrative(history[-1]["content"]) | |
| design_log_content = extract_design_log(history[-1]["content"]) | |
| gdl_path, narrative_path = save_gdl_and_narrative(gdl_content, narrative_content) | |
| design_log_path = save_design_log(design_log_content) | |
| yield history, history, gdl_path, narrative_path, design_log_path, (new_ds_raw or ds_raw_prev), (new_ds_obj or ds_obj_prev), ds_ver, ds_status, ready_prev, "READY_TO_GENERATE:未知", render_validation_md(history[-1]["content"]), diff_summary_md, updated_history_data, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), history_status_md | |
| except Exception: | |
| yield history, history, "", "", "", (new_ds_raw or ds_raw_prev), (new_ds_obj or ds_obj_prev), ds_ver, ds_status, ready_prev, "READY_TO_GENERATE:未知", render_validation_md(history[-1]["content"]), diff_summary_md, updated_history_data, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), history_status_md | |
| # 绑定:回车提交(Enter=提交;Shift+Enter=换行由浏览器处理) | |
| user_input.submit( | |
| fn=on_submit, | |
| inputs=[ | |
| user_input, | |
| chat_state, | |
| file_uploader, | |
| custom_prompt_box, | |
| prompt_mode, | |
| iterative_mode, | |
| analyse_mode, | |
| iteration_scope, | |
| design_state_raw, | |
| design_state_obj, | |
| design_state_version, | |
| ready_to_generate, | |
| design_state_history, | |
| asked_questions_state, | |
| ], | |
| outputs=[ | |
| chatbot, | |
| user_input, | |
| chat_state, | |
| gdl_file_path, | |
| narrative_file_path, | |
| design_log_file_path, | |
| design_state_raw, | |
| design_state_obj, | |
| design_state_version, | |
| design_state_status, | |
| ready_to_generate, | |
| ready_status, | |
| validation_status, | |
| diff_summary_display, | |
| design_state_history, | |
| version_dropdown, | |
| history_status, | |
| asked_questions_state, | |
| ], | |
| preprocess=True, | |
| ) | |
| # 绑定:点击"发送" | |
| send_btn.click( | |
| fn=on_submit, | |
| inputs=[ | |
| user_input, | |
| chat_state, | |
| file_uploader, | |
| custom_prompt_box, | |
| prompt_mode, | |
| iterative_mode, | |
| analyse_mode, | |
| iteration_scope, | |
| design_state_raw, | |
| design_state_obj, | |
| design_state_version, | |
| ready_to_generate, | |
| design_state_history, | |
| asked_questions_state, | |
| ], | |
| outputs=[ | |
| chatbot, | |
| user_input, | |
| chat_state, | |
| gdl_file_path, | |
| narrative_file_path, | |
| design_log_file_path, | |
| design_state_raw, | |
| design_state_obj, | |
| design_state_version, | |
| design_state_status, | |
| ready_to_generate, | |
| ready_status, | |
| validation_status, | |
| diff_summary_display, | |
| design_state_history, | |
| version_dropdown, | |
| history_status, | |
| asked_questions_state, | |
| ], | |
| preprocess=True, | |
| ) | |
| generate_from_state_btn.click( | |
| fn=on_generate_from_state, | |
| inputs=[ | |
| chat_state, | |
| file_uploader, | |
| custom_prompt_box, | |
| prompt_mode, | |
| design_state_raw, | |
| design_state_obj, | |
| design_state_version, | |
| ready_to_generate, | |
| design_state_history, | |
| ], | |
| outputs=[ | |
| chatbot, | |
| chat_state, | |
| gdl_file_path, | |
| narrative_file_path, | |
| design_log_file_path, | |
| design_state_raw, | |
| design_state_obj, | |
| design_state_version, | |
| design_state_status, | |
| ready_to_generate, | |
| ready_status, | |
| validation_status, | |
| diff_summary_display, | |
| design_state_history, | |
| version_dropdown, | |
| history_status, | |
| ], | |
| preprocess=True, | |
| ) | |
| # 版本回滚功能 | |
| def on_rollback(selected_version, history_data, ds_obj_prev, ds_ver_prev): | |
| """回滚到选定版本""" | |
| if not selected_version: | |
| return ( | |
| ds_obj_prev, "", ds_ver_prev, | |
| "DesignState:未选择版本", "_请选择要回滚的版本_", | |
| history_data, gr.update(), f"共 {len(get_history_choices(history_data))} 个版本" if get_history_choices(history_data) else "_无版本历史_" | |
| ) | |
| version = parse_version_from_choice(selected_version) | |
| if version <= 0: | |
| return ( | |
| ds_obj_prev, "", ds_ver_prev, | |
| "DesignState:版本解析失败", "_版本号无效_", | |
| history_data, gr.update(), f"共 {len(get_history_choices(history_data))} 个版本" if get_history_choices(history_data) else "_无版本历史_" | |
| ) | |
| updated_history, state_obj, state_raw = rollback_history(history_data, version) | |
| if state_obj is None: | |
| return ( | |
| ds_obj_prev, "", ds_ver_prev, | |
| f"DesignState:回滚失败(v{version} 不存在)", "_回滚失败_", | |
| history_data, gr.update(), f"共 {len(get_history_choices(history_data))} 个版本" if get_history_choices(history_data) else "_无版本历史_" | |
| ) | |
| new_choices = get_history_choices(updated_history) | |
| ds_status = "DesignState:v{0} | {1}".format(version, summarize_design_state(state_obj)) | |
| diff_md = f"**已回滚到 v{version}**\n\n{summarize_design_state(state_obj)}" | |
| return ( | |
| state_obj, state_raw, version, | |
| ds_status, diff_md, | |
| updated_history, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), f"✅ 已回滚到 v{version},共 {len(new_choices)} 个版本" | |
| ) | |
| rollback_btn.click( | |
| fn=on_rollback, | |
| inputs=[version_dropdown, design_state_history, design_state_obj, design_state_version], | |
| outputs=[ | |
| design_state_obj, | |
| design_state_raw, | |
| design_state_version, | |
| design_state_status, | |
| diff_summary_display, | |
| design_state_history, | |
| version_dropdown, | |
| history_status, | |
| ], | |
| ) | |
| # 刷新历史列表 | |
| def on_refresh_history(history_data): | |
| choices = get_history_choices(history_data) | |
| return gr.update(choices=choices, value=choices[-1] if choices else None), f"共 {len(choices)} 个版本" if choices else "_无版本历史_" | |
| refresh_history_btn.click( | |
| fn=on_refresh_history, | |
| inputs=[design_state_history], | |
| outputs=[version_dropdown, history_status], | |
| ) | |
| # ====== 多阶段交互:阶段检测与方案选择 ====== | |
| def update_phase_from_chat(history_msgs, ds_obj, phase_state): | |
| """根据最新聊天内容更新阶段状态和方案显示""" | |
| if not history_msgs: | |
| return ( | |
| "**当前阶段**: 🚀 初始", | |
| gr.update(visible=False), | |
| gr.update(choices=[], value=None), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| phase_state, | |
| ) | |
| # 获取最后一条助手消息 | |
| last_bot_msg = "" | |
| for msg in reversed(history_msgs): | |
| if isinstance(msg, dict) and msg.get("role") == "assistant": | |
| last_bot_msg = msg.get("content", "") | |
| break | |
| if not last_bot_msg: | |
| return ( | |
| "**当前阶段**: 🚀 初始", | |
| gr.update(visible=False), | |
| gr.update(choices=[], value=None), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| phase_state, | |
| ) | |
| # 检测阶段 | |
| detected_phase = detect_interaction_phase(last_bot_msg, ds_obj) | |
| phase_display = get_phase_display_name(detected_phase) | |
| # 提取方案 | |
| proposals = extract_proposals(last_bot_msg) | |
| questions = extract_clarify_questions(last_bot_msg) | |
| # 更新阶段状态 | |
| new_phase_state = update_phase_state(phase_state, detected_phase, proposals=proposals) | |
| # 根据阶段决定 UI 显示 | |
| if detected_phase == InteractionPhase.DIVERGE and len(proposals) >= 2: | |
| # 有多个方案,显示方案选择 UI | |
| proposal_choices = [f"方案 {p['id']}: {p['title']}" for p in proposals] | |
| proposals_md = format_proposals_for_display(proposals) | |
| return ( | |
| f"**当前阶段**: {phase_display}\n\n发现 **{len(proposals)}** 个候选方案,请选择一个深入展开。", | |
| gr.update(value=proposals_md, visible=True), | |
| gr.update(choices=proposal_choices, value=None, visible=True), | |
| gr.update(visible=True), | |
| gr.update(visible=True), | |
| new_phase_state, | |
| ) | |
| elif detected_phase == InteractionPhase.UNDERSTAND and questions: | |
| # 有确认问题 | |
| questions_md = "\n".join([f"❓ {q}" for q in questions]) | |
| return ( | |
| f"**当前阶段**: {phase_display}\n\n请回答以下确认问题:", | |
| gr.update(value=questions_md, visible=True), | |
| gr.update(choices=[], value=None), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| new_phase_state, | |
| ) | |
| else: | |
| # 其他阶段 | |
| return ( | |
| f"**当前阶段**: {phase_display}", | |
| gr.update(visible=False), | |
| gr.update(choices=[], value=None), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| new_phase_state, | |
| ) | |
| # 当聊天内容变化时更新阶段 | |
| chatbot.change( | |
| fn=update_phase_from_chat, | |
| inputs=[chatbot, design_state_obj, interaction_phase_state], | |
| outputs=[ | |
| phase_status, | |
| proposals_display, | |
| proposal_dropdown, | |
| proposal_select_row, | |
| diverge_action_row, | |
| interaction_phase_state, | |
| ], | |
| ) | |
| # 确认选择方案 | |
| def on_select_proposal(selected, phase_state, history_msgs): | |
| """用户选择方案后,生成相应的用户消息""" | |
| if not selected: | |
| return "", history_msgs, phase_state | |
| # 提取方案 ID | |
| proposal_id = selected.split(":")[0].replace("方案", "").strip() | |
| # 生成用户确认消息 | |
| confirm_msg = f"我选择 **方案 {proposal_id}**,请对这个方案进行深入展开设计。" | |
| # 更新阶段状态 | |
| new_phase_state = update_phase_state(phase_state, InteractionPhase.SELECT, selected=proposal_id) | |
| return confirm_msg, history_msgs, new_phase_state | |
| select_proposal_btn.click( | |
| fn=on_select_proposal, | |
| inputs=[proposal_dropdown, interaction_phase_state, chat_state], | |
| outputs=[user_input, chat_state, interaction_phase_state], | |
| ) | |
| # 重新发散按钮 | |
| def on_request_more(): | |
| return "请给出更多不同方向的机制组合方案,我想看看其他可能性。" | |
| request_more_btn.click( | |
| fn=on_request_more, | |
| outputs=[user_input], | |
| ) | |
| # 直接深入展开按钮 | |
| def on_request_elaborate(phase_state): | |
| proposals = phase_state.get("proposals", []) | |
| if proposals: | |
| first = proposals[0] | |
| return f"请直接对方案 {first['id']}({first['title']})进行深入展开设计。" | |
| return "请直接对当前方案进行深入展开设计,生成完整的玩法规则和 mGDL。" | |
| request_elaborate_btn.click( | |
| fn=on_request_elaborate, | |
| inputs=[interaction_phase_state], | |
| outputs=[user_input], | |
| ) | |
| # ====== 细粒度范围控制:事件绑定 ====== | |
| def on_scope_mode_change(mode): | |
| """切换控制模式时显示/隐藏自定义选项""" | |
| is_custom = mode == "自定义模式" | |
| status_text = "_范围约束: 自定义模式_" if is_custom else "_范围约束: 预设模式_" | |
| return gr.update(visible=is_custom), status_text | |
| scope_mode.change( | |
| fn=on_scope_mode_change, | |
| inputs=[scope_mode], | |
| outputs=[custom_scope_group, scope_status], | |
| ) | |
| def on_refresh_mechanics(ds_obj): | |
| """刷新当前 DesignState 中的机制列表""" | |
| if not ds_obj or not isinstance(ds_obj, dict): | |
| return gr.update(choices=[], value=[]) | |
| mechanics = get_mechanics_from_state(ds_obj) | |
| choices = [(m, m) for m in mechanics] | |
| return gr.update(choices=choices, value=[]) | |
| refresh_mechanics_btn.click( | |
| fn=on_refresh_mechanics, | |
| inputs=[design_state_obj], | |
| outputs=[locked_mechanics], | |
| ) | |
| def on_scope_config_update(mode, constraint, locked_flds, locked_mechs, preset_scope): | |
| """更新范围配置状态""" | |
| is_preset = mode == "预设模式" | |
| constraint_level_val = ScopeConstraint.SOFT if "软" in constraint else ScopeConstraint.HARD | |
| config = { | |
| "mode": "preset" if is_preset else "custom", | |
| "preset": preset_scope if is_preset else "", | |
| "constraint_level": constraint_level_val, | |
| "locked_fields": locked_flds or [], | |
| "allowed_fields": [], | |
| "locked_mechanics": locked_mechs or [], | |
| "allowed_mechanics": [], | |
| } | |
| # 生成状态提示 | |
| if is_preset: | |
| status = f"_范围约束: 预设模式 ({preset_scope})_" | |
| else: | |
| locked_count = len(locked_flds or []) + len(locked_mechs or []) | |
| constraint_text = "软约束" if constraint_level_val == ScopeConstraint.SOFT else "硬约束" | |
| status = f"_范围约束: 自定义模式 | {constraint_text} | 锁定 {locked_count} 项_" | |
| return config, status | |
| # 当任意范围控制项变化时更新配置 | |
| for scope_input in [scope_mode, constraint_level, locked_fields, locked_mechanics, iteration_scope]: | |
| scope_input.change( | |
| fn=on_scope_config_update, | |
| inputs=[scope_mode, constraint_level, locked_fields, locked_mechanics, iteration_scope], | |
| outputs=[scope_config_state, scope_status], | |
| ) | |
| # 导出对话 | |
| with gr.Row(): | |
| export_btn = gr.Button("导出对话(Markdown)", variant="secondary") | |
| export_file = gr.File(label="点击下载导出文件", interactive=False) | |
| # 清空对话 | |
| with gr.Row(): | |
| clear_dialog_btn = gr.Button("清空对话", variant="secondary") | |
| def _clear_chat(): | |
| return ( | |
| [], "", [], "", "", "", "", {}, 0, | |
| "DesignState:未建立(先生成一版玩法)", False, "READY_TO_GENERATE:未知", | |
| "输出校验:未运行", "_尚无变更记录_", | |
| create_empty_history(), gr.update(choices=[], value=None), "_无版本历史_", | |
| create_phase_state(), "**当前阶段**: 🚀 初始", | |
| gr.update(visible=False), gr.update(choices=[], value=None), | |
| gr.update(visible=False), gr.update(visible=False), | |
| create_scope_config(), "_范围约束: 预设模式_", | |
| [], | |
| ) | |
| clear_dialog_btn.click( | |
| fn=_clear_chat, | |
| inputs=None, | |
| outputs=[ | |
| chatbot, | |
| user_input, | |
| chat_state, | |
| gdl_file_path, | |
| narrative_file_path, | |
| design_log_file_path, | |
| design_state_raw, | |
| design_state_obj, | |
| design_state_version, | |
| design_state_status, | |
| ready_to_generate, | |
| ready_status, | |
| validation_status, | |
| diff_summary_display, | |
| design_state_history, | |
| version_dropdown, | |
| history_status, | |
| interaction_phase_state, | |
| phase_status, | |
| proposals_display, | |
| proposal_dropdown, | |
| proposal_select_row, | |
| diverge_action_row, | |
| scope_config_state, | |
| scope_status, | |
| asked_questions_state, | |
| ], | |
| ) | |
| # ==================== 下载按钮部分 ==================== | |
| with gr.Row(): | |
| gr.Markdown("### 下载输出文件") | |
| download_gdl_btn = gr.Button("下载 GDL 文件", variant="secondary") # 下载 GDL 按钮 | |
| download_narrative_btn = gr.Button("下载自然语言文件", variant="secondary") # 下载自然语言按钮 | |
| download_design_log_btn = gr.Button("下载思维日志文件", variant="secondary") | |
| download_gdl_file = gr.File(label="GDL 文件", interactive=False) # 文件下载区域 | |
| download_narrative_file = gr.File(label="自然语言文件", interactive=False) # 文件下载区域 | |
| download_design_log_file = gr.File(label="思维日志文件", interactive=False) | |
| # 🟩【修改】绑定下载按钮与文件路径 - 修复版本 | |
| def get_gdl_file(gdl_path): | |
| if gdl_path and os.path.exists(gdl_path): | |
| return gdl_path | |
| return None | |
| def get_narrative_file(narrative_path): | |
| if narrative_path and os.path.exists(narrative_path): | |
| return narrative_path | |
| return None | |
| def get_design_log_file(design_log_path): | |
| if design_log_path and os.path.exists(design_log_path): | |
| return design_log_path | |
| return None | |
| # 绑定下载按钮与文件路径 | |
| download_gdl_btn.click( | |
| fn=get_gdl_file, | |
| inputs=[gdl_file_path], # 🟩【修改】接收State中的路径 | |
| outputs=[download_gdl_file] | |
| ) | |
| download_narrative_btn.click( | |
| fn=get_narrative_file, | |
| inputs=[narrative_file_path], # 🟩【修改】接收State中的路径 | |
| outputs=[download_narrative_file] | |
| ) | |
| download_design_log_btn.click( | |
| fn=get_design_log_file, | |
| inputs=[design_log_file_path], | |
| outputs=[download_design_log_file], | |
| ) | |
| # ==================== 事件绑定(左侧) ==================== | |
| clear_cache_btn.click(fn=clear_cache, outputs=[cache_status]) | |
| clear_files_btn.click(fn=clear_files, outputs=[file_uploader]) | |
| file_uploader.change(fn=update_file_status, inputs=[file_uploader], outputs=[file_status]) | |
| # 导出按钮 | |
| def _export_wrapper(chat_history): | |
| try: | |
| path = export_history_to_markdown(chat_history) | |
| return path | |
| except Exception as e: | |
| import tempfile | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".md") | |
| with open(tmp.name, 'w', encoding='utf-8') as f: | |
| f.write(f"导出失败:{type(e).__name__}: {e}\n") | |
| return tmp.name | |
| export_btn.click(fn=_export_wrapper, inputs=[chatbot], outputs=[export_file]) | |
| # ==================== 启动应用 ==================== | |
| if __name__ == "__main__": | |
| # 兼容不同 gradio 版本: | |
| # - 有的版本 queue() 不接受 concurrency_count/status_update_rate | |
| # - 有的版本甚至不需要手动 queue() | |
| app = demo | |
| try: | |
| app = demo.queue() # 不带参数,启用队列以支持生成器流式 | |
| except TypeError: | |
| # 某些版本 queue() 可能参数或签名不同,直接跳过即可 | |
| app = demo | |
| # HF Spaces 自带公开/私有托管,share=True 会尝试创建外部公网链接,常导致启动失败 | |
| is_space = bool(os.getenv("SPACE_ID") or os.getenv("HF_SPACE_ID")) | |
| allow_share = (os.getenv("GRADIO_SHARE") or "0").strip() == "1" | |
| app.launch(share=(allow_share and not is_space), show_api=False) | |