zhongchuyi
feature:优化 2026_01_13
e43bcf1
import gradio as gr
import os
# ===== 你的自定义模块 =====
from config import APP_TITLE, CHATBOT_HEIGHT, MIN_WIDTH_LEFT, MIN_WIDTH_RIGHT
from styles import MAHJONG_THEME_CSS
# Extra CSS to force dark & counter browser auto-invert
EXTRA_FIX_CSS = '''/* === Force dark site and neutralize browser-side auto-invert === */
html, body{ background-color:#0b1220 !important; color-scheme: dark !important; }
/* When browser/extension applies global invert, add a counter-invert on our app root */
html.force-dark-fix #app-root{ filter: invert(1) hue-rotate(180deg) !important; }'''
# Redirect to ?__theme=dark
FORCE_DARK_REDIRECT = '''<script>
(function(){
try{
const url=new URL(window.location.href);
if(url.searchParams.get("__theme")!=="dark"){
url.searchParams.set("__theme","dark");
window.location.replace(url.toString());
}
}catch(e){}
})();
</script>'''
# Detect browser force-dark and set html class
DETECT_FORCE_DARK = '''<script>
(function(){
try{
const html=document.documentElement;
const cs=getComputedStyle(html);
const hasGlobalFilter=(cs.filter && cs.filter!=="none");
const darkReaderOn=!!document.querySelector('style#dark-reader-style')||!!document.querySelector('meta[name="darkreader"]');
if(hasGlobalFilter||darkReaderOn){ html.classList.add('force-dark-fix'); }
}catch(e){}
})();
</script>'''
from ai_service import design_mahjong_game
from reference_retriever import match_variants_in_text, load_variant_md
# 优先尝试原生流式;没有则自动回退为伪流式
try:
from ai_service import design_mahjong_game_stream
except Exception:
design_mahjong_game_stream = None
from output_validator import validate_mahjong_response, format_issues_for_llm
from design_state import (
extract_design_state,
extract_ready_to_generate,
summarize_design_state,
diff_keys,
diff_mechanics,
is_change_within_scope,
generate_diff_summary,
generate_diff_summary_compact,
# 版本历史相关
create_empty_history,
add_to_history,
rollback_history,
get_history_choices,
parse_version_from_choice,
# 多阶段交互相关
InteractionPhase,
extract_proposals,
extract_clarify_questions,
detect_interaction_phase,
format_proposals_for_display,
generate_phase_prompt_hint,
create_phase_state,
update_phase_state,
get_phase_display_name,
# 细粒度范围控制相关
CONTROLLABLE_FIELDS,
ScopeConstraint,
create_scope_config,
get_scope_preset_options,
validate_scope_compliance,
format_scope_violations,
get_mechanics_from_state,
generate_scope_prompt_hint,
)
# ====== 🔧 Hotfix: 兼容 gradio_client 对 additionalProperties(bool) 的解析 ======
try:
import gradio_client.utils as _gc_utils
_orig_get_type = _gc_utils.get_type
def _safe_get_type(schema):
if isinstance(schema, bool):
return "any" if schema else "never"
return _orig_get_type(schema)
_gc_utils.get_type = _safe_get_type
_orig_json2py = _gc_utils._json_schema_to_python_type
def _safe_json2py(schema, defs):
if isinstance(schema, bool):
return "any" if schema else "never"
if isinstance(schema, dict) and "additionalProperties" in schema:
ap = schema["additionalProperties"]
if isinstance(ap, bool):
inner = "any" if ap else "never"
return f"dict[str, {inner}]"
return _orig_json2py(schema, defs)
_gc_utils._json_schema_to_python_type = _safe_json2py
_orig_json_schema_to_python_type = _gc_utils.json_schema_to_python_type
def _safe_json_schema_to_python_type(schema):
if isinstance(schema, bool):
return "any" if schema else "never"
return _orig_json_schema_to_python_type(schema)
_gc_utils.json_schema_to_python_type = _safe_json_schema_to_python_type
except Exception:
pass
# ====== 🔧 Hotfix 结束 ======
# ==================== 工具函数 ====================
def _messages_to_tuples(history):
"""
将 Chatbot 的 messages 格式([{role, content}, ...])转换为 [(user, bot), ...]。
若已是 tuples,则原样返回。
"""
if not history:
return []
if isinstance(history, list) and history and isinstance(history[0], dict):
pairs = []
last_user = None
for msg in history:
role = msg.get("role")
content = msg.get("content", "")
if role == "user":
last_user = content
elif role == "assistant":
pairs.append((last_user or "", content))
last_user = None
return pairs
return history
def _chunk_fake_stream(text, step=40):
"""把整段文本切成小块,伪流式输出。"""
s = str(text or "")
for i in range(0, len(s), step):
yield s[i:i + step]
def _is_design_intent(text: str) -> bool:
"""判断是否明确进入“新玩法设计/融合/生成”路径。"""
s = (text or "").strip()
if not s:
return False
keywords = [
"设计", "生成", "做一个", "做个", "新玩法", "创新", "融合", "组合", "改造", "迭代",
"玩法设计", "规则设计", "番型设计", "出一套", "给我一套",
]
return any(k in s for k in keywords)
def _is_rules_query(text: str) -> bool:
"""判断是否为“询问现有玩法细节/规则”的问题。"""
s = (text or "").strip()
if not s:
return False
keywords = [
"规则", "玩法", "番型", "细节", "说明", "机制", "怎么打", "怎么胡", "如何胡", "计分", "流程",
"介绍", "讲讲", "是什么", "有哪些",
]
return any(k in s for k in keywords)
def _normalize_question(text: str) -> str:
s = str(text or "").strip().lower()
if not s:
return ""
punct = "??!!。;;::、,,..()()[]【】<>《》\"'“”‘’ "
for ch in punct:
s = s.replace(ch, "")
return s
def _merge_asked_questions(existing, new_items):
merged = [str(q).strip() for q in (existing or []) if str(q).strip()]
seen = {_normalize_question(q) for q in merged if _normalize_question(q)}
for q in (new_items or []):
q_text = str(q).strip()
if not q_text:
continue
key = _normalize_question(q_text)
if key and key not in seen:
merged.append(q_text)
seen.add(key)
return merged
def _load_base64_image(path):
"""读取本地图片并返回 data URI"""
import base64
import pathlib
p = pathlib.Path(path)
if not p.exists():
return ""
try:
data = base64.b64encode(p.read_bytes()).decode("ascii")
suffix = p.suffix.lower()
mime = "image/png" if suffix == ".png" else "image/jpeg"
return f"data:{mime};base64,{data}"
except Exception:
return ""
HERO_TILE_PATHS = ["UI00001.jpg", "UI00002.jpg", "UI00003.jpg", "UI00004.jpg"]
HERO_TILE_IMAGES = [_load_base64_image(p) for p in HERO_TILE_PATHS]
def _render_hero_tiles():
tiles = [
f'<span class="hero-photo" style="background-image:url(\'{src}\');" title="参考图 {idx + 1}"></span>'
for idx, src in enumerate(HERO_TILE_IMAGES)
if src
]
return "".join(tiles)
def clear_cache():
"""清空缓存"""
from cache_manager import file_cache, request_cache
file_cache.clear()
request_cache.clear()
return "✅ 缓存已清空"
def clear_files():
"""清空文件上传"""
return None
def render_validation_md(text: str) -> str:
issues = validate_mahjong_response(text or "")
if not issues:
return "✅ 输出校验:未发现静态问题(可导出)"
errors = [i for i in issues if i.get("level") == "error"]
warnings = [i for i in issues if i.get("level") == "warning"]
parts = ["⚠️ 输出校验:发现潜在问题(建议先让模型按最小修改修复后再导出)"]
if errors:
parts.append("\n**错误(必须修复)**\n")
parts.append(format_issues_for_llm(errors))
if warnings:
parts.append("\n**警告(建议修复)**\n")
parts.append(format_issues_for_llm(warnings))
return "\n".join(parts).strip()
def update_file_status(files):
"""更新文件状态显示"""
if not files:
return "📁 文件状态:未上传"
file_list = files if isinstance(files, (list, tuple)) else [files]
names = []
for f in file_list:
if isinstance(f, str):
names.append(os.path.basename(f))
else:
names.append(os.path.basename(getattr(f, "name", str(f))))
head = "\n".join(f" • {n}" for n in names[:3])
tail = "\n ..." if len(names) > 3 else ""
return f"📁 文件状态:已上传 {len(names)} 个文件\n{head}{tail}"
def export_history_to_markdown(history):
"""将聊天历史导出为 Markdown 文件,并返回文件路径供下载"""
import time
import pathlib
from datetime import datetime
pairs = _messages_to_tuples(history)
lines = ["# 对话记录", ""]
lines.append(f"- 导出时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
lines.append("")
for idx, (user_msg, bot_msg) in enumerate(pairs, start=1):
lines.append(f"## 轮次 {idx}")
if user_msg:
lines.append("**用户**:\n")
lines.append(user_msg)
lines.append("")
if bot_msg:
lines.append("**助手**:\n")
lines.append(bot_msg)
lines.append("")
content = "\n".join(lines).strip() + "\n"
export_dir = pathlib.Path("exports")
export_dir.mkdir(parents=True, exist_ok=True)
filename = export_dir / f"chat_history_{int(time.time())}.md"
with open(filename, "w", encoding="utf-8") as f:
f.write(content)
return str(filename)
# ==================== 新增:提取 GDL 和自然语言描述 ====================
def extract_gdl_and_narrative(content):
"""提取 GDL 和自然语言部分(支持多种格式变体)"""
import re
# 定义多种可能的标记格式(按优先级排序,支持更多变体)
gdl_patterns = [
r"##\s*m?GDL\s*描述", # ## GDL描述 / ## mGDL 描述
r"m?GDL\s*描述", # GDL描述 / mGDL 描述
r"##\s*GDL\s*Description", # ## GDL Description(英文)
r"##\s*m?GDL", # ## GDL / ## mGDL
r"m?GDL\s*规则代码", # mGDL 规则代码
r"m?GDL\s*规则", # mGDL 规则
]
narrative_patterns = [
r"##\s*自然语言规则说明", # ## 自然语言规则说明
r"##\s*自然语言规则", # ## 自然语言规则
r"自然语言规则说明", # 自然语言规则说明
r"自然语言规则", # 自然语言规则
r"##\s*规则说明", # ## 规则说明
r"规则说明", # 规则说明
]
# 尝试查找 GDL 部分(支持大小写不敏感)
gdl_start = -1
for pattern in gdl_patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
gdl_start = match.start()
break
# 尝试查找自然语言部分(支持大小写不敏感)
narrative_start = -1
for pattern in narrative_patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
narrative_start = match.start()
break
if gdl_start != -1 and narrative_start != -1:
# 确保顺序正确(GDL应该在自然语言之前)
if gdl_start >= narrative_start:
gdl_start, narrative_start = narrative_start, gdl_start
# 获取 GDL 部分(从标记开始到自然语言标记之前)
gdl_content = content[gdl_start:narrative_start].strip()
# 获取自然语言部分(从标记开始到结尾)
narrative_content = content[narrative_start:].strip()
return gdl_content, narrative_content
elif gdl_start != -1:
# 只找到GDL,将后面全部作为GDL
print(f"⚠️ 仅找到GDL标记,将其后内容作为GDL")
return content[gdl_start:].strip(), ""
elif narrative_start != -1:
# 只找到自然语言,将后面全部作为自然语言
print(f"⚠️ 仅找到自然语言标记,将其后内容作为自然语言")
return "", content[narrative_start:].strip()
else:
# 都没找到,返回空
print(f"⚠️ 提取警告: 未找到GDL和自然语言标记")
return "", ""
def save_gdl_and_narrative(gdl_content, narrative_content):
"""保存 GDL 和自然语言内容到文件"""
# 定义文件存储路径
export_dir = os.path.join("exports")
os.makedirs(export_dir, exist_ok=True)
gdl_file_path = os.path.join(export_dir, "gdl_output.txt")
narrative_file_path = os.path.join(export_dir, "narrative_output.txt")
# 保存 GDL
with open(gdl_file_path, "w", encoding="utf-8") as f:
f.write(gdl_content)
# 保存自然语言
with open(narrative_file_path, "w", encoding="utf-8") as f:
f.write(narrative_content)
return gdl_file_path, narrative_file_path
def extract_design_log(content):
"""
提取"设计日志(创新推演摘要)"段落,用于单独导出。
规则:从标题行开始,截到下一个同级标题(###)或文件结尾。
"""
import re
if not content:
return ""
# 修复:原始字符串中 \s 即可匹配空白,不需要 \\s
m = re.search(r"^###\s*设计日志(创新推演摘要)\s*$", content, re.MULTILINE)
if not m:
# 兼容变体格式:设计日志/思维日志/Design Log
m = re.search(r"^###\s*(设计日志|思维日志|Design\s*Log).*$", content, re.MULTILINE | re.IGNORECASE)
if not m:
return ""
start = m.start()
m2 = re.search(r"^###\s+.+$", content[m.end():], re.MULTILINE)
end = (m.end() + m2.start()) if m2 else len(content)
return content[start:end].strip()
def save_design_log(design_log_content):
export_dir = os.path.join("exports")
os.makedirs(export_dir, exist_ok=True)
design_log_file_path = os.path.join(export_dir, "design_log_output.txt")
with open(design_log_file_path, "w", encoding="utf-8") as f:
f.write(design_log_content or "")
return design_log_file_path
# ==================== Gradio 界面(Mahjong Skin) ====================
with gr.Blocks(
theme=gr.themes.Soft(primary_hue='green', neutral_hue='slate'),
title=APP_TITLE,
css=MAHJONG_THEME_CSS + EXTRA_FIX_CSS,
elem_id='app-root'
) as demo:
gr.HTML(FORCE_DARK_REDIRECT)
gr.HTML(DETECT_FORCE_DARK)
# 顶部 Hero
hero_tiles_html = _render_hero_tiles()
gr.HTML(f"""
<div class="hero">
<div style="display:flex;align-items:center;gap:12px;">
<div style="font-size:30px;">🀄️ 麻将玩法灵感工坊 · KOI Lab</div>
</div>
<div style="margin-top:6px;opacity:.92;">与 AI 麻将策划智囊对话,快速生成番型体系与玩法流程</div>
<div class="hero-icons" aria-hidden="true">
{hero_tiles_html or '<span class="hero-photo"></span>'}
</div>
</div>
""")
with gr.Row(equal_height=True):
# 左侧:设置区
with gr.Column(scale=1, min_width=MIN_WIDTH_LEFT):
with gr.Group(elem_classes="side-card"):
gr.Markdown("### 输入与约束")
gr.Markdown("✅ **系统已预加载 Mahjong GDL 规范与提示词**,可直接开局。", elem_classes="hint")
file_uploader = gr.File(
label="上传自定义 Mahjong GDL/番型示例(.txt,可多选)",
file_types=[".txt"],
file_count="multiple",
type="filepath",
interactive=True,
show_label=True,
container=True,
scale=1
)
custom_prompt_box = gr.Textbox(
label="自定义 System Prompt(可选)",
placeholder="系统已经载入麻将玩法专家提示词。如需特殊规则,可在此粘贴。",
lines=10,
max_lines=20,
show_copy_button=True
)
prompt_mode = gr.Radio(
choices=["覆盖默认SYSTEM_PROMPT", "合并(默认在前)"],
value="覆盖默认SYSTEM_PROMPT",
label="自定义 Prompt 的使用方式"
)
gr.Markdown("💡 提示:系统默认包含麻将通用语法、番型模板与 Prompt;仅当你要替换地方规则时再上传或输入。", elem_classes="hint")
with gr.Group(elem_classes="side-card"):
gr.Markdown("### 使用说明")
gr.Markdown(
"- ✅ **快速开始**:直接描述你想要的麻将玩法主题或目标。\n"
"- 📁 **可选上传**:导入地方番型表或自定义 GDL 模板以便引用。\n"
"- ✏️ **可选自定义**:针对特殊风格重写 System Prompt。\n"
"- 🔄 **清空缓存**:切换规则后建议刷新缓存,避免残留设定。"
)
with gr.Group(elem_classes="side-card"):
gr.Markdown("### 快捷操作")
with gr.Row():
clear_cache_btn = gr.Button("清空缓存", variant="secondary")
clear_files_btn = gr.Button("清空文件", variant="secondary")
cache_status = gr.Markdown("💾 缓存状态:正常", elem_classes="hint")
file_status = gr.Markdown("📁 文件状态:使用默认麻将配置(GDL + Prompt 已预加载)", elem_classes="hint")
# 右侧:聊天区(手动事件绑定 + 流式输出)
with gr.Column(scale=2, min_width=MIN_WIDTH_RIGHT):
with gr.Group(elem_classes="table"):
chatbot = gr.Chatbot(
height=CHATBOT_HEIGHT,
type="messages",
elem_classes="custom-chatbot",
avatar_images=("landlord.png", "bot.png"),
)
# 用 State 保存 messages 历史
chat_state = gr.State([]) # list[dict]: [{"role":"user","content":...}, {"role":"assistant","content":...}]
gdl_file_path = gr.State("")
narrative_file_path = gr.State("")
design_log_file_path = gr.State("")
design_state_raw = gr.State("") # 上一次可机读 DesignState(JSON) 原文
design_state_obj = gr.State({}) # 解析后的 dict(用于显示与对比)
design_state_version = gr.State(0) # 版本号(每次成功提取 +1)
ready_to_generate = gr.State(False)
design_state_history = gr.State(create_empty_history()) # 版本历史
interaction_phase_state = gr.State(create_phase_state()) # 多阶段交互状态
asked_questions_state = gr.State([]) # Analyse 模式:已问过的问题
with gr.Group(elem_classes="side-card"):
gr.Markdown("### 可控迭代(推荐)")
iterative_mode = gr.Checkbox(
label="启用可控迭代:基于上轮 DesignState 做最小修改",
value=True,
)
analyse_mode = gr.Checkbox(
label="Analyse 模式:单问题迭代完善需求(不生成mGDL)",
value=False,
)
ready_status = gr.Markdown("READY_TO_GENERATE:未知", elem_classes="hint")
iteration_scope = gr.Dropdown(
label="本轮允许修改范围(预设)",
choices=get_scope_preset_options(),
value="仅优化创新机制",
)
# 细粒度范围控制(新增)
with gr.Accordion("🔒 细粒度范围控制", open=False) as scope_accordion:
scope_mode = gr.Radio(
choices=["预设模式", "自定义模式"],
value="预设模式",
label="控制模式",
)
constraint_level = gr.Radio(
choices=["软约束(提示)", "硬约束(阻断)"],
value="软约束(提示)",
label="约束级别",
)
with gr.Group(visible=False) as custom_scope_group:
gr.Markdown("**锁定字段**(选中的字段不允许修改)", elem_classes="hint")
locked_fields = gr.CheckboxGroup(
choices=[
("玩法名称", "new_variant_name"),
("底座玩法", "base_variants"),
("融合玩法", "fusion_variants"),
("游戏模式", "game_variant"),
("玩家人数", "players"),
("计分模式", "scoring_mode"),
("牌组配置", "tileset"),
],
value=[],
label="",
)
gr.Markdown("**锁定机制**(选中的机制不允许修改)", elem_classes="hint")
locked_mechanics = gr.CheckboxGroup(
choices=[],
value=[],
label="",
)
refresh_mechanics_btn = gr.Button("刷新机制列表", size="sm")
scope_status = gr.Markdown("_范围约束: 预设模式_", elem_classes="hint")
# 保存范围配置的 State
scope_config_state = gr.State(create_scope_config())
generate_from_state_btn = gr.Button("基于当前 DesignState 生成完整玩法", variant="primary")
design_state_status = gr.Markdown("DesignState:未建立(先生成一版玩法)", elem_classes="hint")
# 差异可视化区域(新增)
with gr.Accordion("📊 DesignState 变更详情", open=False) as diff_accordion:
diff_summary_display = gr.Markdown("_尚无变更记录_", elem_classes="diff-summary")
# 版本历史与回滚(新增)
with gr.Accordion("🕐 版本历史与回滚", open=False) as history_accordion:
version_dropdown = gr.Dropdown(
label="选择版本",
choices=[],
value=None,
interactive=True,
)
with gr.Row():
rollback_btn = gr.Button("回滚到所选版本", variant="secondary", size="sm")
refresh_history_btn = gr.Button("刷新列表", variant="secondary", size="sm")
history_status = gr.Markdown("_无版本历史_", elem_classes="hint")
# 多阶段交互与方案选择(新增)
with gr.Accordion("🎯 方案选择与交互引导", open=True) as phase_accordion:
phase_status = gr.Markdown("**当前阶段**: 🚀 初始", elem_classes="hint")
proposals_display = gr.Markdown("_等待生成方案..._", visible=False)
with gr.Row(visible=False) as proposal_select_row:
proposal_dropdown = gr.Dropdown(
label="选择方案",
choices=[],
value=None,
interactive=True,
scale=3,
)
select_proposal_btn = gr.Button("确认选择", variant="primary", size="sm", scale=1)
with gr.Row(visible=False) as diverge_action_row:
request_more_btn = gr.Button("🔄 重新发散", variant="secondary", size="sm")
request_elaborate_btn = gr.Button("📝 直接深入展开", variant="secondary", size="sm")
validation_status = gr.Markdown("输出校验:未运行", elem_classes="hint")
user_input = gr.Textbox(
placeholder="例如:为4人竞速推倒胡设计番型与流程;或“做一个双人合作麻将 roguelike” …",
show_label=False,
max_lines=5,
lines=3,
show_copy_button=True,
autofocus=True,
)
with gr.Row():
send_btn = gr.Button("发送", variant="primary")
stop_info = gr.Markdown("", visible=False)
# ====== 核心:流式提交回调(生成器) ======
#此函数为工具核心,处理用户发送消息的完整流程(模式判断-模型调用-解析与更新)
def on_submit(
user_text,
history_msgs,
files,
custom_prompt,
mode,
iterative_enabled,
analyse_enabled,
scope,
ds_raw_prev,
ds_obj_prev,
ds_ver_prev,
ready_prev,
history_data,
asked_questions,
):
user_text = (user_text or "").strip()
diff_md_prev = "_尚无变更记录_" # 保留上次的差异摘要
cur_choices = get_history_choices(history_data)
asked_questions = asked_questions or []
if not user_text:
# 不提交空消息:输出不变
yield history_msgs, "", history_msgs, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:未变更", ready_prev, "READY_TO_GENERATE:未知", "输出校验:未运行", diff_md_prev, history_data, gr.update(choices=cur_choices), "_无版本历史_" if not cur_choices else f"共 {len(cur_choices)} 个版本", asked_questions
return
# 立即显示"用户消息"
history = list(history_msgs or [])
history.append({"role": "user", "content": user_text})
yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:处理中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_处理中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions
# 添加空的助手气泡,用于逐步填充
history.append({"role": "assistant", "content": ""})
yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:处理中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_处理中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions
# ====== 现有玩法细节查询:直接返回本地玩法库内容 ======
if (not analyse_enabled) and (not iterative_enabled) and _is_rules_query(user_text) and (not _is_design_intent(user_text)):
matched = match_variants_in_text(user_text)
if matched:
parts = []
for name in matched:
md_text = (load_variant_md(name) or "").strip()
if md_text:
parts.append(f"### {name}\n\n{md_text}")
else:
parts.append(f"### {name}\n\n(未找到该玩法的本地规则文件)")
history[-1]["content"] = "\n\n---\n\n".join(parts).strip()
yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:未变更", ready_prev, "READY_TO_GENERATE:未知", "输出校验:未运行", "_未生成设计_", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions
return
# 流式生成内容
tuples_hist = _messages_to_tuples(history)
# 可控迭代:基于上轮 DesignState 做“最小修改”,并尽量减少历史噪声
iterative_enabled = bool(iterative_enabled)
analyse_enabled = bool(analyse_enabled)
scope = (scope or "").strip()
effective_user_text = user_text
tuples_to_send = tuples_hist
status_hint = "DesignState:未变更"
ready_to_gen = ready_prev if isinstance(ready_prev, bool) else False
ready_md = "READY_TO_GENERATE:未知"
if analyse_enabled:
tuples_to_send = [] # Analyse 模式不吃长history,避免漂移
asked_block = ""
if asked_questions:
asked_block = "已问过的问题(禁止重复):\n" + "\n".join(
f"- {q}" for q in asked_questions[:20]
) + "\n\n"
effective_user_text = (
"<ANALYSE_MODE>true</ANALYSE_MODE>\n"
"你正在进行【Analyse 模式】(单问题迭代)。\n"
"目标:通过多轮问答把需求场景收敛到可生成完整玩法。\n"
"限制:本轮禁止输出 mGDL/完整规则/自检报告。\n\n"
"{asked}"
"当前已有 DesignState(如为空表示尚未建立初版):\n"
"<DESIGN_STATE_JSON>\n{ds}\n</DESIGN_STATE_JSON>\n\n"
"用户本轮补充信息:\n{req}\n"
"要求:本轮只能问 1 个问题;不得重复已问过的问题;若信息已足够则直接 READY_TO_GENERATE: true。\n"
).format(asked=asked_block, ds=(ds_raw_prev or "{}").strip(), req=user_text)
status_hint = "DesignState:Analyse模式中…"
elif iterative_enabled and (ds_raw_prev or "").strip():
tuples_to_send = [] # 用“当前 DesignState”替代长历史,减少漂移
effective_user_text = (
"你正在进行【可控迭代】。请基于下方 DesignState(JSON) 对方案做【最小修改】。\n"
"本轮允许修改范围:{scope}\n"
"硬约束:\n"
"1) 除允许范围外的字段一律保持不变;若必须改动范围外字段,先提出澄清问题并停止输出 mGDL。\n"
"2) 必须输出新的 DesignState(JSON)(合法JSON、无注释)+ 变更摘要(列出改动点)。\n"
"3) 仍需按 m_prompt 输出完整结果(规则 + mGDL + 自检)。\n\n"
"<DESIGN_STATE_JSON>\n{ds}\n</DESIGN_STATE_JSON>\n\n"
"【用户本轮优化要求】\n{req}\n"
).format(scope=scope or "自由迭代(仍保最小修改)", ds=ds_raw_prev.strip(), req=user_text)
status_hint = "DesignState:可控迭代中…"
if design_mahjong_game_stream is not None:
try:
for piece in design_mahjong_game_stream(effective_user_text, tuples_to_send, files, custom_prompt, mode):
if not piece:
continue
history[-1]["content"] += str(piece)
yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, status_hint, ready_to_gen, ready_md, "输出校验:运行中…", "_处理中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions
except Exception as e:
history[-1]["content"] += f"\n(流式出错){type(e).__name__}: {e}"
yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:流式出错(未变更)", ready_to_gen, "READY_TO_GENERATE:未知", "输出校验:未运行", "_处理出错_", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions
else:
try:
full = design_mahjong_game(effective_user_text, tuples_to_send, files, custom_prompt, mode)
except Exception as e:
full = f"(出错){type(e).__name__}: {e}"
for piece in _chunk_fake_stream(str(full), step=40):
history[-1]["content"] += piece
yield history, "", history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, status_hint, ready_to_gen, ready_md, "输出校验:运行中…", "_处理中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_", asked_questions
# 提取 GDL 和自然语言描述并保存
new_ds_obj, new_ds_raw = extract_design_state(history[-1]["content"])
ready_flag = extract_ready_to_generate(history[-1]["content"])
ds_ver = int(ds_ver_prev or 0)
ds_status = "DesignState:未变更"
diff_summary_md = "_无变更_"
updated_history_data = history_data
if new_ds_obj and new_ds_raw:
ds_ver = ds_ver + 1
ds_status = "DesignState:v{0} 已更新 | {1}".format(ds_ver, summarize_design_state(new_ds_obj))
if isinstance(ready_flag, bool):
ready_to_gen = ready_flag
ready_md = "READY_TO_GENERATE:{0}".format("true" if ready_flag else "false")
# 生成差异摘要
diff_summary_md = generate_diff_summary(
ds_obj_prev if isinstance(ds_obj_prev, dict) else None,
new_ds_obj,
int(ds_ver_prev or 0),
ds_ver
)
# 添加到版本历史
compact_summary = generate_diff_summary_compact(ds_obj_prev if isinstance(ds_obj_prev, dict) else None, new_ds_obj)
updated_history_data = add_to_history(history_data, ds_ver, new_ds_obj, new_ds_raw, compact_summary)
# 可控迭代:做一个"范围越界"软检查(不阻断,只提示)
if iterative_enabled and ds_obj_prev and isinstance(ds_obj_prev, dict) and scope:
changed = diff_keys(ds_obj_prev, new_ds_obj)
mech_changes = diff_mechanics(ds_obj_prev, new_ds_obj)
if not is_change_within_scope(changed, scope):
history[-1]["content"] += (
"\n\n---\n⚠️ 可控迭代检查:检测到可能超出允许范围的改动。\n"
"允许范围:{scope}\n"
"顶层变更字段:{keys}\n"
"机制变更:{mech}\n"
"建议:请重新执行一次迭代,明确只改允许范围内字段,或切换为「自由迭代」。\n"
).format(
scope=scope,
keys=",".join(changed) if changed else "(无)",
mech="; ".join(mech_changes) if mech_changes else "(无)",
)
try:
# 更新版本下拉选项
new_choices = get_history_choices(updated_history_data)
history_status_md = f"共 {len(new_choices)} 个版本" if new_choices else "_无版本历史_"
# Analyse 模式不产出完整规则,不导出GDL/自然语言文件
if analyse_enabled:
new_questions = extract_clarify_questions(history[-1]["content"])
asked_questions = _merge_asked_questions(asked_questions, new_questions)
yield history, "", history, "", "", "", (new_ds_raw or ds_raw_prev), (new_ds_obj or ds_obj_prev), ds_ver, ds_status, ready_to_gen, ready_md, "输出校验:Analyse 模式跳过", diff_summary_md, updated_history_data, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), history_status_md, asked_questions
return
gdl_content, narrative_content = extract_gdl_and_narrative(history[-1]["content"])
design_log_content = extract_design_log(history[-1]["content"])
# 保存 GDL 和自然语言文件
gdl_path, narrative_path = save_gdl_and_narrative(gdl_content, narrative_content)
design_log_path = save_design_log(design_log_content)
# 返回文件路径,以便下载
yield history, "", history, gdl_path, narrative_path, design_log_path, (new_ds_raw or ds_raw_prev), (new_ds_obj or ds_obj_prev), ds_ver, ds_status, ready_to_gen, ready_md, render_validation_md(history[-1]["content"]), diff_summary_md, updated_history_data, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), history_status_md, asked_questions
except Exception as e:
print(f"保存GDL和自然语言文件时出错: {e}")
yield history, "", history, "", "", "", (new_ds_raw or ds_raw_prev), (new_ds_obj or ds_obj_prev), ds_ver, ds_status, ready_to_gen, ready_md, render_validation_md(history[-1]["content"]), diff_summary_md, updated_history_data, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), history_status_md, asked_questions
def on_generate_from_state(
history_msgs,
files,
custom_prompt,
mode,
ds_raw_prev,
ds_obj_prev,
ds_ver_prev,
ready_prev,
history_data,
):
cur_choices = get_history_choices(history_data)
if not (ds_raw_prev or "").strip():
yield history_msgs, history_msgs, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:未建立(先生成一版玩法)", ready_prev, "READY_TO_GENERATE:未知", "输出校验:未运行", "_尚无变更记录_", history_data, gr.update(choices=cur_choices), "_无版本历史_" if not cur_choices else f"共 {len(cur_choices)} 个版本"
return
history = list(history_msgs or [])
user_text = "基于当前 DesignState 生成完整玩法(自然语言规则 + mGDL + 自检报告)。"
history.append({"role": "user", "content": user_text})
yield history, history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:生成中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_生成中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_"
history.append({"role": "assistant", "content": ""})
yield history, history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:生成中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_生成中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_"
effective_user_text = (
"请基于下方 DesignState(JSON) 生成完整的麻将新玩法交付物:\n"
"1) 自然语言规则说明(含机制声明表)\n"
"2) 完整 mGDL v1.3(按 m_prompt 格式)\n"
"3) 自检报告\n"
"要求:保持 DesignState 的核心设定不漂移;若发现 DesignState 信息不足,请先提出澄清问题并停止输出 mGDL。\n\n"
"<DESIGN_STATE_JSON>\n{ds}\n</DESIGN_STATE_JSON>\n"
).format(ds=ds_raw_prev.strip())
tuples_to_send = []
buf = []
try:
if design_mahjong_game_stream is not None:
for piece in design_mahjong_game_stream(effective_user_text, tuples_to_send, files, custom_prompt, mode):
if not piece:
continue
s = str(piece)
buf.append(s)
history[-1]["content"] += s
yield history, history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:生成中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_生成中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_"
else:
full = design_mahjong_game(effective_user_text, tuples_to_send, files, custom_prompt, mode)
buf.append(str(full))
for piece in _chunk_fake_stream(str(full), step=40):
history[-1]["content"] += piece
yield history, history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:生成中…", ready_prev, "READY_TO_GENERATE:未知", "输出校验:运行中…", "_生成中..._", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_"
except Exception as e:
history[-1]["content"] += f"\n(生成出错){type(e).__name__}: {e}"
yield history, history, "", "", "", ds_raw_prev, ds_obj_prev, ds_ver_prev, "DesignState:生成出错", ready_prev, "READY_TO_GENERATE:未知", "输出校验:未运行", "_生成出错_", history_data, gr.update(choices=cur_choices), f"共 {len(cur_choices)} 个版本" if cur_choices else "_无版本历史_"
return
new_ds_obj, new_ds_raw = extract_design_state(history[-1]["content"])
ds_ver = int(ds_ver_prev or 0)
ds_status = "DesignState:未变更"
diff_summary_md = "_无变更_"
updated_history_data = history_data
if new_ds_obj and new_ds_raw:
ds_ver += 1
ds_status = "DesignState:v{0} 已更新 | {1}".format(ds_ver, summarize_design_state(new_ds_obj))
# 生成差异摘要
diff_summary_md = generate_diff_summary(
ds_obj_prev if isinstance(ds_obj_prev, dict) else None,
new_ds_obj,
int(ds_ver_prev or 0),
ds_ver
)
# 添加到版本历史
compact_summary = generate_diff_summary_compact(ds_obj_prev if isinstance(ds_obj_prev, dict) else None, new_ds_obj)
updated_history_data = add_to_history(history_data, ds_ver, new_ds_obj, new_ds_raw, compact_summary)
# 更新版本下拉选项
new_choices = get_history_choices(updated_history_data)
history_status_md = f"共 {len(new_choices)} 个版本" if new_choices else "_无版本历史_"
try:
gdl_content, narrative_content = extract_gdl_and_narrative(history[-1]["content"])
design_log_content = extract_design_log(history[-1]["content"])
gdl_path, narrative_path = save_gdl_and_narrative(gdl_content, narrative_content)
design_log_path = save_design_log(design_log_content)
yield history, history, gdl_path, narrative_path, design_log_path, (new_ds_raw or ds_raw_prev), (new_ds_obj or ds_obj_prev), ds_ver, ds_status, ready_prev, "READY_TO_GENERATE:未知", render_validation_md(history[-1]["content"]), diff_summary_md, updated_history_data, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), history_status_md
except Exception:
yield history, history, "", "", "", (new_ds_raw or ds_raw_prev), (new_ds_obj or ds_obj_prev), ds_ver, ds_status, ready_prev, "READY_TO_GENERATE:未知", render_validation_md(history[-1]["content"]), diff_summary_md, updated_history_data, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), history_status_md
# 绑定:回车提交(Enter=提交;Shift+Enter=换行由浏览器处理)
user_input.submit(
fn=on_submit,
inputs=[
user_input,
chat_state,
file_uploader,
custom_prompt_box,
prompt_mode,
iterative_mode,
analyse_mode,
iteration_scope,
design_state_raw,
design_state_obj,
design_state_version,
ready_to_generate,
design_state_history,
asked_questions_state,
],
outputs=[
chatbot,
user_input,
chat_state,
gdl_file_path,
narrative_file_path,
design_log_file_path,
design_state_raw,
design_state_obj,
design_state_version,
design_state_status,
ready_to_generate,
ready_status,
validation_status,
diff_summary_display,
design_state_history,
version_dropdown,
history_status,
asked_questions_state,
],
preprocess=True,
)
# 绑定:点击"发送"
send_btn.click(
fn=on_submit,
inputs=[
user_input,
chat_state,
file_uploader,
custom_prompt_box,
prompt_mode,
iterative_mode,
analyse_mode,
iteration_scope,
design_state_raw,
design_state_obj,
design_state_version,
ready_to_generate,
design_state_history,
asked_questions_state,
],
outputs=[
chatbot,
user_input,
chat_state,
gdl_file_path,
narrative_file_path,
design_log_file_path,
design_state_raw,
design_state_obj,
design_state_version,
design_state_status,
ready_to_generate,
ready_status,
validation_status,
diff_summary_display,
design_state_history,
version_dropdown,
history_status,
asked_questions_state,
],
preprocess=True,
)
generate_from_state_btn.click(
fn=on_generate_from_state,
inputs=[
chat_state,
file_uploader,
custom_prompt_box,
prompt_mode,
design_state_raw,
design_state_obj,
design_state_version,
ready_to_generate,
design_state_history,
],
outputs=[
chatbot,
chat_state,
gdl_file_path,
narrative_file_path,
design_log_file_path,
design_state_raw,
design_state_obj,
design_state_version,
design_state_status,
ready_to_generate,
ready_status,
validation_status,
diff_summary_display,
design_state_history,
version_dropdown,
history_status,
],
preprocess=True,
)
# 版本回滚功能
def on_rollback(selected_version, history_data, ds_obj_prev, ds_ver_prev):
"""回滚到选定版本"""
if not selected_version:
return (
ds_obj_prev, "", ds_ver_prev,
"DesignState:未选择版本", "_请选择要回滚的版本_",
history_data, gr.update(), f"共 {len(get_history_choices(history_data))} 个版本" if get_history_choices(history_data) else "_无版本历史_"
)
version = parse_version_from_choice(selected_version)
if version <= 0:
return (
ds_obj_prev, "", ds_ver_prev,
"DesignState:版本解析失败", "_版本号无效_",
history_data, gr.update(), f"共 {len(get_history_choices(history_data))} 个版本" if get_history_choices(history_data) else "_无版本历史_"
)
updated_history, state_obj, state_raw = rollback_history(history_data, version)
if state_obj is None:
return (
ds_obj_prev, "", ds_ver_prev,
f"DesignState:回滚失败(v{version} 不存在)", "_回滚失败_",
history_data, gr.update(), f"共 {len(get_history_choices(history_data))} 个版本" if get_history_choices(history_data) else "_无版本历史_"
)
new_choices = get_history_choices(updated_history)
ds_status = "DesignState:v{0} | {1}".format(version, summarize_design_state(state_obj))
diff_md = f"**已回滚到 v{version}**\n\n{summarize_design_state(state_obj)}"
return (
state_obj, state_raw, version,
ds_status, diff_md,
updated_history, gr.update(choices=new_choices, value=new_choices[-1] if new_choices else None), f"✅ 已回滚到 v{version},共 {len(new_choices)} 个版本"
)
rollback_btn.click(
fn=on_rollback,
inputs=[version_dropdown, design_state_history, design_state_obj, design_state_version],
outputs=[
design_state_obj,
design_state_raw,
design_state_version,
design_state_status,
diff_summary_display,
design_state_history,
version_dropdown,
history_status,
],
)
# 刷新历史列表
def on_refresh_history(history_data):
choices = get_history_choices(history_data)
return gr.update(choices=choices, value=choices[-1] if choices else None), f"共 {len(choices)} 个版本" if choices else "_无版本历史_"
refresh_history_btn.click(
fn=on_refresh_history,
inputs=[design_state_history],
outputs=[version_dropdown, history_status],
)
# ====== 多阶段交互:阶段检测与方案选择 ======
def update_phase_from_chat(history_msgs, ds_obj, phase_state):
"""根据最新聊天内容更新阶段状态和方案显示"""
if not history_msgs:
return (
"**当前阶段**: 🚀 初始",
gr.update(visible=False),
gr.update(choices=[], value=None),
gr.update(visible=False),
gr.update(visible=False),
phase_state,
)
# 获取最后一条助手消息
last_bot_msg = ""
for msg in reversed(history_msgs):
if isinstance(msg, dict) and msg.get("role") == "assistant":
last_bot_msg = msg.get("content", "")
break
if not last_bot_msg:
return (
"**当前阶段**: 🚀 初始",
gr.update(visible=False),
gr.update(choices=[], value=None),
gr.update(visible=False),
gr.update(visible=False),
phase_state,
)
# 检测阶段
detected_phase = detect_interaction_phase(last_bot_msg, ds_obj)
phase_display = get_phase_display_name(detected_phase)
# 提取方案
proposals = extract_proposals(last_bot_msg)
questions = extract_clarify_questions(last_bot_msg)
# 更新阶段状态
new_phase_state = update_phase_state(phase_state, detected_phase, proposals=proposals)
# 根据阶段决定 UI 显示
if detected_phase == InteractionPhase.DIVERGE and len(proposals) >= 2:
# 有多个方案,显示方案选择 UI
proposal_choices = [f"方案 {p['id']}: {p['title']}" for p in proposals]
proposals_md = format_proposals_for_display(proposals)
return (
f"**当前阶段**: {phase_display}\n\n发现 **{len(proposals)}** 个候选方案,请选择一个深入展开。",
gr.update(value=proposals_md, visible=True),
gr.update(choices=proposal_choices, value=None, visible=True),
gr.update(visible=True),
gr.update(visible=True),
new_phase_state,
)
elif detected_phase == InteractionPhase.UNDERSTAND and questions:
# 有确认问题
questions_md = "\n".join([f"❓ {q}" for q in questions])
return (
f"**当前阶段**: {phase_display}\n\n请回答以下确认问题:",
gr.update(value=questions_md, visible=True),
gr.update(choices=[], value=None),
gr.update(visible=False),
gr.update(visible=False),
new_phase_state,
)
else:
# 其他阶段
return (
f"**当前阶段**: {phase_display}",
gr.update(visible=False),
gr.update(choices=[], value=None),
gr.update(visible=False),
gr.update(visible=False),
new_phase_state,
)
# 当聊天内容变化时更新阶段
chatbot.change(
fn=update_phase_from_chat,
inputs=[chatbot, design_state_obj, interaction_phase_state],
outputs=[
phase_status,
proposals_display,
proposal_dropdown,
proposal_select_row,
diverge_action_row,
interaction_phase_state,
],
)
# 确认选择方案
def on_select_proposal(selected, phase_state, history_msgs):
"""用户选择方案后,生成相应的用户消息"""
if not selected:
return "", history_msgs, phase_state
# 提取方案 ID
proposal_id = selected.split(":")[0].replace("方案", "").strip()
# 生成用户确认消息
confirm_msg = f"我选择 **方案 {proposal_id}**,请对这个方案进行深入展开设计。"
# 更新阶段状态
new_phase_state = update_phase_state(phase_state, InteractionPhase.SELECT, selected=proposal_id)
return confirm_msg, history_msgs, new_phase_state
select_proposal_btn.click(
fn=on_select_proposal,
inputs=[proposal_dropdown, interaction_phase_state, chat_state],
outputs=[user_input, chat_state, interaction_phase_state],
)
# 重新发散按钮
def on_request_more():
return "请给出更多不同方向的机制组合方案,我想看看其他可能性。"
request_more_btn.click(
fn=on_request_more,
outputs=[user_input],
)
# 直接深入展开按钮
def on_request_elaborate(phase_state):
proposals = phase_state.get("proposals", [])
if proposals:
first = proposals[0]
return f"请直接对方案 {first['id']}{first['title']})进行深入展开设计。"
return "请直接对当前方案进行深入展开设计,生成完整的玩法规则和 mGDL。"
request_elaborate_btn.click(
fn=on_request_elaborate,
inputs=[interaction_phase_state],
outputs=[user_input],
)
# ====== 细粒度范围控制:事件绑定 ======
def on_scope_mode_change(mode):
"""切换控制模式时显示/隐藏自定义选项"""
is_custom = mode == "自定义模式"
status_text = "_范围约束: 自定义模式_" if is_custom else "_范围约束: 预设模式_"
return gr.update(visible=is_custom), status_text
scope_mode.change(
fn=on_scope_mode_change,
inputs=[scope_mode],
outputs=[custom_scope_group, scope_status],
)
def on_refresh_mechanics(ds_obj):
"""刷新当前 DesignState 中的机制列表"""
if not ds_obj or not isinstance(ds_obj, dict):
return gr.update(choices=[], value=[])
mechanics = get_mechanics_from_state(ds_obj)
choices = [(m, m) for m in mechanics]
return gr.update(choices=choices, value=[])
refresh_mechanics_btn.click(
fn=on_refresh_mechanics,
inputs=[design_state_obj],
outputs=[locked_mechanics],
)
def on_scope_config_update(mode, constraint, locked_flds, locked_mechs, preset_scope):
"""更新范围配置状态"""
is_preset = mode == "预设模式"
constraint_level_val = ScopeConstraint.SOFT if "软" in constraint else ScopeConstraint.HARD
config = {
"mode": "preset" if is_preset else "custom",
"preset": preset_scope if is_preset else "",
"constraint_level": constraint_level_val,
"locked_fields": locked_flds or [],
"allowed_fields": [],
"locked_mechanics": locked_mechs or [],
"allowed_mechanics": [],
}
# 生成状态提示
if is_preset:
status = f"_范围约束: 预设模式 ({preset_scope})_"
else:
locked_count = len(locked_flds or []) + len(locked_mechs or [])
constraint_text = "软约束" if constraint_level_val == ScopeConstraint.SOFT else "硬约束"
status = f"_范围约束: 自定义模式 | {constraint_text} | 锁定 {locked_count} 项_"
return config, status
# 当任意范围控制项变化时更新配置
for scope_input in [scope_mode, constraint_level, locked_fields, locked_mechanics, iteration_scope]:
scope_input.change(
fn=on_scope_config_update,
inputs=[scope_mode, constraint_level, locked_fields, locked_mechanics, iteration_scope],
outputs=[scope_config_state, scope_status],
)
# 导出对话
with gr.Row():
export_btn = gr.Button("导出对话(Markdown)", variant="secondary")
export_file = gr.File(label="点击下载导出文件", interactive=False)
# 清空对话
with gr.Row():
clear_dialog_btn = gr.Button("清空对话", variant="secondary")
def _clear_chat():
return (
[], "", [], "", "", "", "", {}, 0,
"DesignState:未建立(先生成一版玩法)", False, "READY_TO_GENERATE:未知",
"输出校验:未运行", "_尚无变更记录_",
create_empty_history(), gr.update(choices=[], value=None), "_无版本历史_",
create_phase_state(), "**当前阶段**: 🚀 初始",
gr.update(visible=False), gr.update(choices=[], value=None),
gr.update(visible=False), gr.update(visible=False),
create_scope_config(), "_范围约束: 预设模式_",
[],
)
clear_dialog_btn.click(
fn=_clear_chat,
inputs=None,
outputs=[
chatbot,
user_input,
chat_state,
gdl_file_path,
narrative_file_path,
design_log_file_path,
design_state_raw,
design_state_obj,
design_state_version,
design_state_status,
ready_to_generate,
ready_status,
validation_status,
diff_summary_display,
design_state_history,
version_dropdown,
history_status,
interaction_phase_state,
phase_status,
proposals_display,
proposal_dropdown,
proposal_select_row,
diverge_action_row,
scope_config_state,
scope_status,
asked_questions_state,
],
)
# ==================== 下载按钮部分 ====================
with gr.Row():
gr.Markdown("### 下载输出文件")
download_gdl_btn = gr.Button("下载 GDL 文件", variant="secondary") # 下载 GDL 按钮
download_narrative_btn = gr.Button("下载自然语言文件", variant="secondary") # 下载自然语言按钮
download_design_log_btn = gr.Button("下载思维日志文件", variant="secondary")
download_gdl_file = gr.File(label="GDL 文件", interactive=False) # 文件下载区域
download_narrative_file = gr.File(label="自然语言文件", interactive=False) # 文件下载区域
download_design_log_file = gr.File(label="思维日志文件", interactive=False)
# 🟩【修改】绑定下载按钮与文件路径 - 修复版本
def get_gdl_file(gdl_path):
if gdl_path and os.path.exists(gdl_path):
return gdl_path
return None
def get_narrative_file(narrative_path):
if narrative_path and os.path.exists(narrative_path):
return narrative_path
return None
def get_design_log_file(design_log_path):
if design_log_path and os.path.exists(design_log_path):
return design_log_path
return None
# 绑定下载按钮与文件路径
download_gdl_btn.click(
fn=get_gdl_file,
inputs=[gdl_file_path], # 🟩【修改】接收State中的路径
outputs=[download_gdl_file]
)
download_narrative_btn.click(
fn=get_narrative_file,
inputs=[narrative_file_path], # 🟩【修改】接收State中的路径
outputs=[download_narrative_file]
)
download_design_log_btn.click(
fn=get_design_log_file,
inputs=[design_log_file_path],
outputs=[download_design_log_file],
)
# ==================== 事件绑定(左侧) ====================
clear_cache_btn.click(fn=clear_cache, outputs=[cache_status])
clear_files_btn.click(fn=clear_files, outputs=[file_uploader])
file_uploader.change(fn=update_file_status, inputs=[file_uploader], outputs=[file_status])
# 导出按钮
def _export_wrapper(chat_history):
try:
path = export_history_to_markdown(chat_history)
return path
except Exception as e:
import tempfile
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".md")
with open(tmp.name, 'w', encoding='utf-8') as f:
f.write(f"导出失败:{type(e).__name__}: {e}\n")
return tmp.name
export_btn.click(fn=_export_wrapper, inputs=[chatbot], outputs=[export_file])
# ==================== 启动应用 ====================
if __name__ == "__main__":
# 兼容不同 gradio 版本:
# - 有的版本 queue() 不接受 concurrency_count/status_update_rate
# - 有的版本甚至不需要手动 queue()
app = demo
try:
app = demo.queue() # 不带参数,启用队列以支持生成器流式
except TypeError:
# 某些版本 queue() 可能参数或签名不同,直接跳过即可
app = demo
# HF Spaces 自带公开/私有托管,share=True 会尝试创建外部公网链接,常导致启动失败
is_space = bool(os.getenv("SPACE_ID") or os.getenv("HF_SPACE_ID"))
allow_share = (os.getenv("GRADIO_SHARE") or "0").strip() == "1"
app.launch(share=(allow_share and not is_space), show_api=False)