| """ |
| Bob - ABC Burgers AI Assistant (Toy Prototype) |
| |
| Requires: |
| pip install gradio transformers torch accelerate |
| |
| To run with a real model: |
| HF_MODEL=google/gemma-2b-it python bob_abc_burgers.py |
| |
| Requires a configured HF model via HF_MODEL. |
| """ |
|
|
| import base64 |
| import os |
| import random |
| import re |
| import json |
| import html |
| from typing import Any |
| import uuid |
| import gradio as gr |
| import threading |
| from pathlib import Path |
| from bob_resources import ( |
| CLARIFY_OPTIONS, |
| ENCODED_SYSTEM_PROMPT, |
| TOOL_CATALOG, |
| apply_discount, |
| connect, |
| clarify_intent, |
| competitor_mentions, |
| emergency_crisis, |
| food_safety_endpoint, |
| legal_endpoint, |
| loyalty_program, |
| sample_assistants, |
| store_app_website, |
| store_information, |
| store_policy, |
| take_order, |
| validate, |
| skip, |
| ) |
| from bob_agents import ( |
| _translate_clarify_text, translate_to_detector_language, |
| build_unfulfillable_response_stream, |
| ) |
| from bob_utils import ( |
| generate_response_stream, _sanitize_display_text, _clean_tool_text, |
| _strip_trailing_malformed_tool_tokens, |
| _strip_tool_call_markup, |
| detect_jailbreak, detect_preferred_language, |
| detect_prompt_injection, SUPPORTED_GEMMA_LANGS, |
| _processor, |
| HF_MODEL, JAILBREAK_MODEL, PROMPT_INJECTION_MODEL, REFUSAL_LANGUAGE_MODEL, |
| ) |
|
|
| def get_system_prompt(assistant_list: list) -> str: |
| raw = base64.b64decode(ENCODED_SYSTEM_PROMPT).decode() |
| names = ", ".join(assistant_list) |
| return raw.replace("{assistant_list}", names) |
|
|
|
|
| LANGUAGE_STEER_MESSAGES = { |
| "EN": "I’m sorry, I don’t understand this request clearly enough to help safely.", |
| } |
|
|
| |
| |
| |
|
|
| TOOL_CALL_RE = re.compile( |
| r"(?:<\|?tool_call\|?>|^)\s*" |
| r"(?:call:)?(?P<name>[a-zA-Z_][a-zA-Z0-9_\-\s]*?)\s*" |
| r"\{(?P<args>.*)\}\s*" |
| r"(?P<close><\|?tool_call\|?>|<eos>|<end_of_turn>|<turn\|?>|</s>|<\|?channel\|?>|$)", |
| re.DOTALL, |
| ) |
|
|
| TOOL_CALL_MARKUP_RE = re.compile( |
| r"<\|?tool_call\|?>.*?(?:<\|?tool_call\|?>|<eos>|$)", |
| re.DOTALL, |
| ) |
|
|
| THOUGHT_BLOCK_RE = re.compile( |
| r"<\|channel\|?>thought\s*.*?<channel\|>", |
| re.DOTALL, |
| ) |
|
|
| THOUGHT_OPEN_RE = re.compile(r"<\|?channel\|?>thought", re.DOTALL) |
|
|
| TOOL_CALL_TOKEN_RE = re.compile( |
| r"(?:<\|?tool_call\|?>|^)\s*" |
| r"(?:call:)?(?P<name>[a-zA-Z_][a-zA-Z0-9_\-\s]*?)\s*" |
| r"(?P<brace>[\{\(])", |
| re.DOTALL, |
| ) |
|
|
|
|
| def _strip_thought_channel_markup(text: str) -> str: |
| cleaned = (text or "").replace("\r", "") |
| if THOUGHT_OPEN_RE.search(cleaned): |
| if "<channel|>" in cleaned: |
| cleaned = cleaned.rsplit("<channel|>", 1)[1] |
| else: |
| return "" |
| cleaned = THOUGHT_BLOCK_RE.sub("", cleaned) |
| cleaned = cleaned.replace("<|channel>thought", "").replace("<channel|>", "") |
| return cleaned.strip() |
|
|
|
|
| def _split_thinking_and_answer(text: str) -> tuple[str, str, bool]: |
| cleaned = (text or "").replace("\r", "") |
| thought_start = cleaned.find("<|channel>thought") |
| if thought_start == -1: |
| thought_start = cleaned.find("<channel>thought") |
| if thought_start == -1: |
| return "", _strip_tool_call_markup(cleaned), False |
|
|
| pre_thought = cleaned[:thought_start] |
| after_start = cleaned[thought_start:] |
| end_marker = after_start.find("<channel|>") |
| if end_marker == -1: |
| thought_body = after_start.replace("<|channel>thought", "").replace("<channel>thought", "") |
| return thought_body.strip(), _strip_tool_call_markup(pre_thought).strip(), True |
|
|
| thought_body = after_start[:end_marker] |
| thought_body = thought_body.replace("<|channel>thought", "").replace("<channel>thought", "") |
| answer_body = after_start[end_marker + len("<channel|>") :] |
| |
| combined_answer = pre_thought |
| if answer_body: |
| combined_answer += "\n" + answer_body |
| return thought_body.strip(), _strip_tool_call_markup(combined_answer).strip(), False |
|
|
|
|
| def _format_thinking_bubble(thinking: str, answer: str, thinking_active: bool) -> str: |
| def _blockquote(text: str) -> str: |
| lines = [line.rstrip() for line in text.splitlines()] |
| return "\n".join(f"> {line}" if line else ">" for line in lines) |
|
|
| parts = [] |
| if thinking: |
| parts.append("**Thinking**") |
| parts.append(_blockquote(thinking)) |
| elif thinking_active: |
| parts.append("**Thinking**") |
| parts.append("> Working...") |
| if answer: |
| if parts: |
| parts.append("") |
| parts.append(answer) |
| return "\n".join(parts).strip() |
|
|
|
|
| def _format_live_thinking(thinking: str, thinking_active: bool) -> str: |
| if thinking: |
| lines = [line.rstrip() for line in thinking.splitlines()] |
| body = "\n".join(f"> {line}" if line else ">" for line in lines) |
| return f"**Thinking**\n{body}".strip() |
| if thinking_active: |
| return "**Thinking**\n> Working..." |
| return "" |
|
|
|
|
| def _extract_reasoning(text: str) -> tuple[str, bool]: |
| cleaned = (text or "").replace("\r", "") |
| thought_start = cleaned.find("<|channel>thought") |
| if thought_start == -1: |
| thought_start = cleaned.find("<channel>thought") |
| if thought_start == -1: |
| return "", False |
| after_start = cleaned[thought_start:] |
| end_marker = after_start.find("<channel|>") |
| if end_marker == -1: |
| thought_body = after_start.replace("<|channel>thought", "").replace("<channel>thought", "") |
| return thought_body.strip(), True |
| thought_body = after_start[:end_marker] |
| thought_body = thought_body.replace("<|channel>thought", "").replace("<channel>thought", "") |
| return thought_body.strip(), False |
|
|
|
|
| def _find_matching_brace(text: str, start_index: int, open_char: str) -> int: |
| close_char = "}" if open_char == "{" else ")" |
| depth = 0 |
| in_string = False |
| escape = False |
| for idx in range(start_index, len(text)): |
| ch = text[idx] |
| if escape: |
| escape = False |
| continue |
| if ch == "\\" and in_string: |
| escape = True |
| continue |
| if ch == '"': |
| in_string = not in_string |
| continue |
| if in_string: |
| continue |
| if ch == open_char: |
| depth += 1 |
| elif ch == close_char: |
| depth -= 1 |
| if depth == 0: |
| return idx |
| return -1 |
|
|
|
|
| def _trigger_clarify_intent_flow( |
| user_message: str, |
| history: list, |
| session_state: dict, |
| user_language: str, |
| msg_interactive: bool, |
| send_btn_interactive: bool, |
| ): |
| session_state["pending_clarify"] = True |
| |
| |
| history.append({"role": "user", "content": user_message}) |
| |
| |
| clarify_result_json = clarify_intent() |
| |
| try: |
| parsed_result = json.loads(clarify_result_json) |
| options_keys = parsed_result.get("options", []) |
|
|
| translated_options_keys = [ |
| _translate_clarify_text(key, user_language) |
| for key in options_keys |
| ] |
| translated_label = _translate_clarify_text( |
| "Clarify intent", user_language |
| ) |
|
|
| |
| history.append({"role": "assistant", "content": translated_label}) |
|
|
| |
| yield history, session_state, gr.update( |
| value="", interactive=False |
| ), gr.update( |
| interactive=False |
| ), gr.update( |
| label=translated_label, |
| choices=translated_options_keys, |
| visible=True, |
| interactive=True |
| ), gr.update( |
| visible=True |
| ), _debug_state(session_state) |
| |
| except json.JSONDecodeError: |
| |
| history.append({"role": "assistant", "content": "I'm sorry, I encountered an issue trying to clarify your intent."}) |
| yield history, session_state, gr.update(value="", interactive=msg_interactive), gr.update(interactive=send_btn_interactive), gr.update(visible=False), gr.update(visible=False), _debug_state(session_state) |
|
|
|
|
| def _open_clarify_intent_menu(history: list, session_state: dict): |
| session_state["pending_clarify"] = True |
| clarify_result_json = clarify_intent() |
| try: |
| parsed_result = json.loads(clarify_result_json) |
| options_keys = parsed_result.get("options", []) |
| translated_options_keys = [ |
| _translate_clarify_text(key, "EN") |
| for key in options_keys |
| ] |
| translated_label = _translate_clarify_text("Clarify intent", "EN") |
| yield history or [], session_state, gr.update(value="", interactive=False), gr.update(interactive=False), gr.update( |
| label=translated_label, |
| choices=translated_options_keys, |
| visible=True, |
| interactive=True, |
| ), gr.update(visible=True), _debug_state(session_state) |
| except json.JSONDecodeError: |
| yield history or [], session_state, gr.update(value="", interactive=True), gr.update(interactive=True), gr.update(visible=False), gr.update(visible=False), _debug_state(session_state) |
|
|
|
|
| def _format_tool_catalog() -> str: |
| lines = ["<ul>"] |
| for tool, desc in TOOL_CATALOG.items(): |
| lines.append(f"<li><code>{tool}</code> - {desc}</li>") |
| lines.append("</ul>") |
| return "\n".join(lines) |
|
|
|
|
| def _render_tool_result_for_display(result: str) -> str: |
| try: |
| parsed = json.loads(result) |
| except json.JSONDecodeError: |
| return result |
|
|
| if not isinstance(parsed, dict): |
| return result |
|
|
| lines = [] |
| for key, value in parsed.items(): |
| if key == "instructions": |
| continue |
| if isinstance(value, list): |
| lines.append(f"- **{key}**") |
| for item in value: |
| lines.append(f" - {item}") |
| elif isinstance(value, dict): |
| lines.append(f"- **{key}**") |
| for sub_key, sub_value in value.items(): |
| lines.append(f" - {sub_key}: {sub_value}") |
| else: |
| lines.append(f"- **{key}**: {value}") |
|
|
| if "instructions" in parsed: |
| lines.append("<SYSTEM>") |
| instructions = parsed["instructions"] |
| if isinstance(instructions, list): |
| for item in instructions: |
| if isinstance(item, dict): |
| lines.append(f" - {item.get('kind', 'instruction')}: {item.get('text', item)}") |
| else: |
| lines.append(f" - {item}") |
| elif isinstance(instructions, dict): |
| for key, value in instructions.items(): |
| lines.append(f" - {key}: {value}") |
| else: |
| lines.append(f" - {instructions}") |
| lines.append("</SYSTEM>") |
| return "\n".join(lines).strip() or result |
|
|
|
|
| TOOL_FUNCTIONS = { |
| "connect": connect, |
| "validate": validate, |
| "skip": skip, |
| "clarify_intent": clarify_intent, |
| "store_policy": store_policy, |
| "store_information": store_information, |
| "store_app_website": store_app_website, |
| "food_safety_endpoint": food_safety_endpoint, |
| "legal_endpoint": legal_endpoint, |
| "emergency_crisis": emergency_crisis, |
| "apply_discount": apply_discount, |
| "loyalty_program": loyalty_program, |
| "competitor_mentions": competitor_mentions, |
| "take_order": take_order, |
| } |
|
|
|
|
| def _parse_agent_output(raw: str) -> tuple[str, list[dict]]: |
| text = raw.strip() |
| tool_calls: list[dict] = [] |
|
|
| def _clean_tool_args(value: str) -> str: |
| cleaned = _clean_tool_text(value or "") |
| cleaned = _strip_trailing_malformed_tool_tokens(cleaned) |
| return cleaned.strip() |
|
|
| |
| cursor = 0 |
| while cursor < len(text): |
| call_match = TOOL_CALL_TOKEN_RE.search(text, cursor) |
| if not call_match: |
| break |
| name = call_match.group("name") |
| brace = call_match.group("brace") |
| args_start = call_match.end() |
| args_end = _find_matching_brace(text, args_start - 1, brace) |
| if args_end == -1: |
| malformed_tail = text[call_match.start():] |
| response_marker = malformed_tail.find("<|tool_response|>") |
| if response_marker == -1: |
| response_marker = malformed_tail.find("<tool_response>") |
| if response_marker != -1: |
| malformed_tail = malformed_tail[:response_marker] |
| tool_calls.append({ |
| "name": name, |
| "args": _clean_tool_args(malformed_tail), |
| }) |
| break |
| args_str = text[args_start:args_end].strip().replace("<|\"|>", '"') |
| tool_calls.append({ |
| "name": name, |
| "args": _clean_tool_args(args_str), |
| }) |
| cursor = args_end + 1 |
| while cursor < len(text) and text[cursor].isspace(): |
| cursor += 1 |
| if text[cursor:cursor + 12].startswith("<|tool_call|>") or text[cursor:cursor + 11].startswith("<tool_call>"): |
| continue |
| if tool_calls: |
| remaining_text = text[cursor:].strip() |
| response_marker = remaining_text.find("<|tool_response|>") |
| if response_marker == -1: |
| response_marker = remaining_text.find("<tool_response>") |
| if response_marker != -1: |
| remaining_text = remaining_text[:response_marker] |
| normalized_text = _clean_tool_args(remaining_text) |
| return normalized_text, tool_calls |
| |
| |
| |
| |
| try: |
| parsed_json = json.loads(text) |
| if isinstance(parsed_json, list) and len(parsed_json) > 0 and isinstance(parsed_json[0], dict) and "text" in parsed_json[0]: |
| text_content = parsed_json[0]["text"] |
| normalized = _clean_tool_text(text_content) |
| normalized = _strip_trailing_malformed_tool_tokens(normalized) |
| return normalized, tool_calls |
| except json.JSONDecodeError: |
| pass |
|
|
| normalized = ( |
| _clean_tool_text(text) |
| ) |
| normalized = _strip_trailing_malformed_tool_tokens(normalized) |
|
|
| return normalized, tool_calls |
|
|
|
|
| def _normalize_persistent_text(text: str, system_prompt: str | None = None) -> str: |
| return _sanitize_display_text(text, system_prompt).strip() |
|
|
|
|
| def _count_tokens(text_or_messages) -> int: |
| if isinstance(text_or_messages, list): |
| rendered = _processor.tokenizer.apply_chat_template( |
| text_or_messages, |
| tokenize=False, |
| add_generation_prompt=False, |
| ) |
| return len(_processor.tokenizer.encode(rendered, add_special_tokens=False)) |
| return len(_processor.tokenizer.encode(str(text_or_messages), add_special_tokens=False)) |
|
|
|
|
| def _parse_bool(value): |
| if isinstance(value, bool): |
| return value |
| if value is None: |
| return False |
| return str(value).strip().lower() in {"1", "true", "yes", "y"} |
|
|
|
|
| def _parse_tool_args(args): |
| if isinstance(args, dict): |
| return args |
| if not isinstance(args, str): |
| return {} |
|
|
| |
| try: |
| wrapped = args.strip() |
| if not wrapped.startswith("{"): |
| wrapped = f"{{{wrapped}}}" |
| parsed_json = json.loads(wrapped) |
| if isinstance(parsed_json, dict): |
| return parsed_json |
| except json.JSONDecodeError: |
| pass |
|
|
| def _extract_value(text: str, key: str, next_keys: tuple[str, ...]) -> str: |
| start = -1 |
| for marker in (f'"{key}":', f"'{key}':", f"{key}:", f"{key}="): |
| idx = text.find(marker) |
| if idx != -1: |
| start = idx + len(marker) |
| break |
| if start == -1: |
| return "" |
| end = len(text) |
| for next_key in next_keys: |
| for token in (f",{next_key}:", f" {next_key}:", f",{next_key}=", f" {next_key}=", f",\"{next_key}\":", f",'{next_key}':"): |
| idx = text.find(token, start) |
| if idx != -1: |
| end = min(end, idx) |
| closing = text.find("}", start) |
| if closing != -1: |
| end = min(end, closing) |
|
|
| value = text[start:end].strip() |
| if value.startswith(("\"", "'")) and value.endswith(("\"", "'")) and len(value) >= 2: |
| value = value[1:-1] |
| value = value.strip() |
| if value.endswith(","): |
| value = value[:-1].rstrip() |
| return value |
|
|
| parsed = {} |
| parsed["name"] = _extract_value(args, "name", ("request", "request_append", "context_append", "emergency")) |
| parsed["request"] = _extract_value(args, "request", ("request_append", "context_append", "emergency")) |
| parsed["emergency"] = _extract_value(args, "emergency", ()) |
| return {key: value for key, value in parsed.items() if value != ""} |
|
|
|
|
| def _call_tool_function(name: str, args, session_state: dict) -> str: |
| if name == "connect": |
| parsed = _parse_tool_args(args) |
| assistant_name = str(parsed.get("name", "")).strip() |
| if not assistant_name: |
| import random |
| pool = session_state.get("assistants", []) |
| assistant_name = random.choice(pool) if pool else "Alice" |
| return connect( |
| name=assistant_name, |
| emergency=_parse_bool(parsed.get("emergency", False)), |
| ) |
| if name == "validate": |
| parsed = _parse_tool_args(args) |
| assistant_name = str(parsed.get("name", "")).strip() |
| if not assistant_name: |
| import random |
| pool = session_state.get("assistants", []) |
| assistant_name = random.choice(pool) if pool else "Alice" |
| |
| return validate( |
| name=assistant_name, |
| emergency=_parse_bool(parsed.get("emergency", False)), |
| ) |
| if name == "skip": |
| parsed = _parse_tool_args(args) |
| assistant_name = str(parsed.get("name", "")).strip() |
| if not assistant_name: |
| import random |
| pool = session_state.get("assistants", []) |
| assistant_name = random.choice(pool) if pool else "Alice" |
|
|
| return skip( |
| name=assistant_name, |
| emergency=_parse_bool(parsed.get("emergency", False)), |
| ) |
| if name == "clarify_intent": |
| session_state["pending_clarify"] = True |
| return clarify_intent() |
| if name == "take_order": |
| order = session_state.setdefault("order", { |
| "status": "draft", |
| "items": [], |
| "subtotal": 0.0, |
| "tax": 0.0, |
| "total": 0.0, |
| "order_id": f"ABC-{uuid.uuid4().hex[:8].upper()}", |
| "refund_policy_url": "abcburgers.com/orders", |
| "changes_url": "abcburgers.com/orders", |
| }) |
| payload = json.loads(take_order()) |
| payload["order"].update(order) |
| payload["order"]["status"] = "submitted" |
| payload["order"]["status_page"] = "abcburgers.com/orders/status" |
| payload["order"]["changes_page"] = "abcburgers.com/orders/changes" |
| payload["order"]["refunds_page"] = "abcburgers.com/orders/refunds" |
| return json.dumps(payload) |
| fn = TOOL_FUNCTIONS.get(name) |
| if fn is None: |
| return json.dumps({ |
| "status": "ok", |
| "output": "Fallback: the requested tool was malformed or unknown.", |
| "instructions": [ |
| { |
| "kind": "free_text", |
| "text": "Ask a brief clarifying question and continue safely with ABC Burgers support.", |
| } |
| ], |
| }) |
| return fn() |
|
|
|
|
| |
| def _format_instruction_block(instructions: Any) -> str: |
| if isinstance(instructions, str): |
| return instructions |
| return json.dumps(instructions, indent=2, sort_keys=True) |
|
|
|
|
| def _execute_tool_calls(tool_calls: list[dict], session_state: dict) -> list[dict]: |
| outputs = [] |
| current_turn_instructions = [] |
| for call in tool_calls: |
| name = str(call.get("name", "")).strip() |
| args = call.get("args", "") |
|
|
| |
| if name not in TOOL_FUNCTIONS and (" " in name or "-" in name or name in session_state.get("assistants", [])): |
| args = {"name": name} |
| name = "connect" |
| call["name"] = name |
| call["args"] = args |
|
|
| if isinstance(args, str): |
| stripped = args.strip() |
| if stripped.startswith("{") or stripped.startswith("["): |
| try: |
| args = json.loads(stripped) |
| except json.JSONDecodeError: |
| args = stripped |
| if _is_routing_tool(name): |
| parsed_args = args if isinstance(args, dict) else _parse_tool_args(args) |
| assistant_name = _assistant_classification(str(parsed_args.get("name", "")).strip() or "Alice") |
| counts = dict(session_state.get("routing_trigger_counts", {})) |
| counts[assistant_name] = int(counts.get(assistant_name, 0)) + 1 |
| session_state["routing_trigger_counts"] = counts |
| session_state["routing_trigger_events"] = _bounded_append( |
| session_state.get("routing_trigger_events", []), |
| { |
| "tool": name, |
| "assistant": assistant_name, |
| "emergency": _parse_bool(parsed_args.get("emergency", False)), |
| }, |
| int(os.environ.get("ROUTING_TRIGGER_LIMIT", 12)), |
| ) |
| result = _call_tool_function(name, args, session_state) |
|
|
| |
| try: |
| parsed_result = json.loads(result) |
| if "instructions" in parsed_result: |
| current_turn_instructions.append(_format_instruction_block(parsed_result["instructions"])) |
| except json.JSONDecodeError: |
| pass |
| replay_text = result |
| if _is_routing_tool(name): |
| try: |
| parsed_result = json.loads(result) |
| except json.JSONDecodeError: |
| parsed_result = {} |
| replay_text = str(parsed_result.get("next_turn_summary", result)) |
| outputs.append({ |
| "name": name, |
| "args": args, |
| "result": result, |
| "full": f"*[{name}({args})]*\n\n{_render_tool_result_for_display(result)}", |
| "replay": replay_text, |
| }) |
| if current_turn_instructions: |
| |
| session_state["current_turn_instructions"] = "\n".join(current_turn_instructions) |
| else: |
| session_state.pop("current_turn_instructions", None) |
| return outputs |
|
|
|
|
| def _tool_message_name(tool_call: dict) -> str: |
| return str(tool_call.get("name", "")).strip() |
|
|
|
|
| def _append_tool_messages(messages: list, tool_calls: list[dict], tool_outputs: list[Any]) -> list: |
| updated = list(messages) |
| for tool_call, tool_output in zip(tool_calls, tool_outputs): |
| name = _tool_message_name(tool_call) |
| args = tool_call.get("args", "") |
| tool_arguments = args if isinstance(args, dict) else _parse_tool_args(args) |
| tool_content = str(tool_output.get("result", tool_output.get("full", ""))) |
| if _is_routing_tool(name): |
| tool_content = str(tool_output.get("replay", tool_content)) |
| updated.append({ |
| "role": "assistant", |
| "content": "", |
| "tool_calls": [{ |
| "type": "function", |
| "function": { |
| "name": name, |
| "arguments": tool_arguments, |
| }, |
| }], |
| }) |
| updated.append({ |
| "role": "tool", |
| "name": name, |
| "content": tool_content, |
| }) |
| return updated |
|
|
|
|
| def _compact_message_view(messages: list) -> list[dict]: |
| compact = [] |
| for item in messages or []: |
| entry = {"role": item.get("role"), "content": html.escape(str(item.get("content", "")))} |
| if "name" in item: |
| entry["name"] = html.escape(str(item["name"])) |
| compact.append(entry) |
| return compact |
|
|
|
|
| def _history_tool_message(tool_output: dict) -> str: |
| return str(tool_output.get("replay") or tool_output.get("full") or "") |
|
|
|
|
| def _history_tool_is_routing(tool_content: str) -> bool: |
| text = (tool_content or "").lower() |
| return "*[connect(" in text or "*[validate(" in text or "*[skip(" in text |
|
|
|
|
| def _is_routing_tool(name: str) -> bool: |
| return name in {"connect", "validate", "skip"} |
|
|
|
|
| def _assistant_classification(name: str) -> str: |
| cleaned = " ".join(str(name or "").strip().split()) |
| if not cleaned: |
| return "assistant" |
| return cleaned.split()[0] |
|
|
|
|
| def _sandbox_tool_message(tool_output: dict) -> str: |
| message = str(tool_output.get("replay") or tool_output.get("result") or "").strip() |
| if message: |
| return message |
| return str(tool_output.get("full") or "").strip() |
|
|
|
|
| def _bounded_append(items: list, item, limit: int) -> list: |
| if limit <= 0: |
| return [] |
| updated = list(items or []) |
| updated.append(item) |
| if len(updated) > limit: |
| updated = updated[-limit:] |
| return updated |
|
|
|
|
| def process_turn(user_message: str, history: list, session_state: dict): |
| current_normalized_message = " ".join(str(user_message or "").split()).strip() |
| last_seen_message = " ".join(str(session_state.get("last_processed_user_message") or "").split()).strip() |
| if current_normalized_message and current_normalized_message == last_seen_message: |
| yield history, session_state, gr.update(value="", interactive=not session_state.get("pending_clarify", False)), gr.update(interactive=not session_state.get("pending_clarify", False)), gr.update(visible=session_state.get("pending_clarify", False)), gr.update(visible=True), _debug_state(session_state) |
| return |
|
|
| if session_state.get("terminated"): |
| history = history + [ |
| {"role": "user", "content": user_message}, |
| {"role": "assistant", "content": "This session has been terminated."}, |
| ] |
| yield history, session_state, gr.update(value="", interactive=False), gr.update(interactive=False), gr.update(visible=False), gr.update(visible=True), _debug_state(session_state) |
| return |
|
|
| |
| is_pending_clarify = session_state.get("pending_clarify", False) |
| msg_interactive = not is_pending_clarify |
| send_btn_interactive = not is_pending_clarify |
|
|
| |
| if session_state.get("terminated"): |
| |
| yield history, session_state, gr.update(value="", interactive=False), gr.update(interactive=False), gr.update(visible=False), gr.update(visible=True), _debug_state(session_state) |
| return |
|
|
| last_assistant_message = "" |
| for item in reversed(history): |
| if isinstance(item, dict) and item.get("role") == "assistant": |
| last_assistant_message = str(item.get("content", "")) |
| break |
| elif hasattr(item, "role") and getattr(item, "role") == "assistant": |
| last_assistant_message = str(getattr(item, "content", "")) |
| break |
| elif isinstance(item, (list, tuple)) and len(item) == 2: |
| if item[1]: |
| last_assistant_message = str(item[1]) |
| break |
|
|
| context_for_detection = f"{last_assistant_message}\n{user_message}" if last_assistant_message else user_message |
| user_language = detect_preferred_language(context_for_detection) |
| session_state["active_language"] = user_language |
| session_state["last_processed_user_message"] = user_message |
| session_state["current_stage"] = "language_detection" |
| _set_decision_path(session_state, "language_detected") |
| if user_language not in SUPPORTED_GEMMA_LANGS: |
| session_state["current_stage"] = "language_not_supported" |
| session_state["translation_status"] = "steer" |
| _set_decision_path(session_state, "language_detected", "steer") |
| history = history + [ |
| {"role": "user", "content": user_message}, |
| {"role": "assistant", "content": ""}, |
| ] |
| assistant_index = len(history) - 1 |
| for chunk in build_unfulfillable_response_stream(user_message, session_state, "language_not_supported"): |
| history[assistant_index]["content"] += chunk |
| yield history, session_state, gr.update(value="", interactive=msg_interactive), gr.update(interactive=send_btn_interactive), gr.update(visible=is_pending_clarify), gr.update(visible=True), _debug_state(session_state) |
| yield history, session_state, gr.update(value="", interactive=msg_interactive), gr.update(interactive=send_btn_interactive), gr.update(visible=is_pending_clarify), gr.update(visible=True), _debug_state(session_state) |
| return |
|
|
| safety_text, is_refused, refusal_reason = translate_to_detector_language(user_message, user_language) |
| session_state["translation_status"] = "translated" if not is_refused else "refused" |
| _set_decision_path(session_state, "language_detected", "translate") |
| if is_refused: |
| session_state["current_stage"] = "translation_refused" |
| _set_decision_path(session_state, "language_detected", "translate", "refusal") |
| session_state["terminated"] = True |
| session_state["last_jailbreak_score"] = 1.0 |
| session_state["last_jailbreak_predicted_label"] = "unsafe" |
| session_state["last_refusal_reason"] = refusal_reason |
| history = history + [ |
| {"role": "user", "content": user_message}, |
| {"role": "assistant", "content": ""}, |
| ] |
| assistant_index = len(history) - 1 |
| for chunk in build_unfulfillable_response_stream(user_message, session_state, "translation_refused", refusal_reason): |
| history[assistant_index]["content"] += chunk |
| yield history, session_state, gr.update(value="", interactive=msg_interactive), gr.update(interactive=send_btn_interactive), gr.update(visible=is_pending_clarify), gr.update(visible=True), _debug_state(session_state) |
| yield history, session_state, gr.update(value="", interactive=msg_interactive), gr.update(interactive=send_btn_interactive), gr.update(visible=is_pending_clarify), gr.update(visible=True), _debug_state(session_state) |
| return |
|
|
| jailbreak = detect_jailbreak(safety_text) |
| session_state["current_stage"] = "jailbreak_check" |
| _set_decision_path(session_state, "language_detected", "translate", "jailbreak_check") |
| session_state["last_jailbreak_score"] = jailbreak["score"] |
| session_state["last_jailbreak_predicted_label"] = jailbreak["predicted_label"] |
| prompt_injection = None |
| if user_language == "EN": |
| prompt_injection = detect_prompt_injection(safety_text) |
| session_state["last_prompt_injection_score"] = prompt_injection["score"] |
| session_state["last_prompt_injection_predicted_label"] = prompt_injection["predicted_label"] |
| if (jailbreak["blocked"] or (prompt_injection and prompt_injection["blocked"])): |
| session_state["current_stage"] = "blocked_or_clarify" |
| if random.random() < 0.5: |
| |
| session_state["routing_status"] = "clarify_intent" |
| _set_decision_path(session_state, "language_detected", "translate", "jailbreak_check", "clarify_intent") |
| yield from _trigger_clarify_intent_flow( |
| user_message, history, session_state, user_language, msg_interactive, send_btn_interactive |
| ) |
| return |
| else: |
| session_state["routing_status"] = "sandbox_refusal" |
| _set_decision_path(session_state, "language_detected", "translate", "jailbreak_check", "sandbox_refusal") |
| session_state["terminated"] = True |
| history = history + [ |
| {"role": "user", "content": user_message}, |
| {"role": "assistant", "content": ""}, |
| ] |
| assistant_index = len(history) - 1 |
| for chunk in build_unfulfillable_response_stream(user_message, session_state, "jailbreak_detected"): |
| history[assistant_index]["content"] += chunk |
| yield history, session_state, gr.update(value="", interactive=msg_interactive), gr.update(interactive=send_btn_interactive), gr.update(visible=is_pending_clarify), gr.update(visible=True), _debug_state(session_state) |
| yield history, session_state, gr.update(value="", interactive=msg_interactive), gr.update(interactive=send_btn_interactive), gr.update(visible=is_pending_clarify), gr.update(visible=True), _debug_state(session_state) |
| return |
|
|
| if "assistants" not in session_state: |
| session_state["assistants"] = sample_assistants() |
| session_state["active_agent"] = "Bob" |
| _set_decision_path(session_state, "language_detected", "translate", "jailbreak_check", "bob_turn") |
| system_prompt = get_system_prompt(session_state["assistants"]) |
| session_state["system_prompt_tokens"] = _count_tokens(system_prompt) |
| session_state["current_user_message"] = user_message |
| session_state.setdefault("assistant_memory", []) |
|
|
| assistant_memory = list(session_state.get("assistant_memory", [])) |
| if len(assistant_memory) > 1: |
| assistant_memory = assistant_memory[-1:] |
| session_state["assistant_memory"] = assistant_memory |
|
|
| messages = [] |
| for item in assistant_memory: |
| |
| if isinstance(item, dict): |
| normalized_item = dict(item) |
| if "content" in normalized_item: |
| normalized_item["content"] = _normalize_persistent_text(str(normalized_item.get("content", ""))) |
| messages.append(normalized_item) |
|
|
| |
| for item in history: |
| if isinstance(item, dict): |
| role = item.get("role") |
| content = item.get("content") |
| if role and content is not None: |
| if str(role) == "tool" and not _history_tool_is_routing(str(content)): |
| continue |
| messages.append({"role": str(role), "content": _normalize_persistent_text(str(content))}) |
| elif hasattr(item, "role") and hasattr(item, "content"): |
| role = getattr(item, "role") |
| content = getattr(item, "content") |
| if role and content is not None: |
| if str(role) == "tool" and not _history_tool_is_routing(str(content)): |
| continue |
| messages.append({"role": str(role), "content": _normalize_persistent_text(str(content))}) |
| elif isinstance(item, (list, tuple)) and len(item) == 2: |
| user_text, assistant_text = item |
| if user_text: |
| messages.append({"role": "user", "content": _normalize_persistent_text(str(user_text))}) |
| if assistant_text: |
| messages.append({"role": "assistant", "content": _normalize_persistent_text(str(assistant_text))}) |
| messages.append({"role": "user", "content": user_message}) |
| session_state["current_turn_tokens"] = _count_tokens( |
| [{"role": "system", "content": system_prompt}] + messages |
| ) |
| session_state["current_turn_characters"] = sum( |
| len(str(item.get("content", ""))) for item in ([{"role": "system", "content": system_prompt}] + messages) |
| ) |
|
|
| history = history + [{"role": "user", "content": user_message}, {"role": "assistant", "content": ""}] |
| assistant_index = len(history) - 1 |
| max_rounds = 3 |
| session_state["last_input_messages"] = _compact_message_view(messages) |
| session_state["last_raw_output"] = None |
| session_state["last_parsed_text"] = None |
| session_state["last_tool_calls"] = [] |
| session_state["pre_tool_call_assistant_message"] = "" |
| session_state.pop("current_turn_instructions", None) |
| session_state["last_tool_outputs"] = [] |
| session_state["tool_path"] = "generation" |
| session_state["routing_status"] = "none" |
| session_state["thinking_active"] = False |
| turn_raw_prefix = "" |
|
|
| |
| |
| session_state.pop("current_turn_instructions", None) |
|
|
| for round_index in range(max_rounds): |
| raw = "" |
| previously_yielded_thinking_view = "" |
| session_state.pop("current_turn_instructions", None) |
| for chunk in generate_response_stream( |
| messages, |
| system_prompt, |
| enable_thinking=False, |
| ): |
| raw += chunk |
| thought_text, thinking_active = _extract_reasoning(raw) |
| _, answer_text, _ = _split_thinking_and_answer(raw) |
| session_state["thinking_active"] = thinking_active |
| current_display_output = _format_live_thinking(thought_text, thinking_active) |
| if answer_text: |
| if current_display_output: |
| current_display_output += "\n\n" |
| current_display_output += answer_text |
|
|
| if len(current_display_output) > len(previously_yielded_thinking_view): |
| new_content_part = current_display_output[len(previously_yielded_thinking_view):] |
| history[assistant_index]["content"] += new_content_part |
| previously_yielded_thinking_view = current_display_output |
|
|
| |
| current_round_system_prompt = system_prompt |
| if "current_turn_instructions" in session_state: |
| current_round_system_prompt = session_state["current_turn_instructions"] + "\n\n" + system_prompt |
|
|
| session_state["last_raw_output"] = turn_raw_prefix + raw |
| yield history, session_state, gr.update(value="", interactive=msg_interactive), gr.update(interactive=send_btn_interactive), gr.update(visible=is_pending_clarify), gr.update(visible=True), _debug_state(session_state) |
|
|
| turn_raw_prefix += raw + "\n" |
| session_state["thinking_active"] = False |
|
|
| final_thought, final_answer, _ = _split_thinking_and_answer(raw) |
| finalized_display = _format_thinking_bubble( |
| final_thought, |
| _clean_tool_text(_normalize_persistent_text(final_answer, system_prompt)), |
| False, |
| ) |
| history[assistant_index]["content"] = finalized_display |
| try: |
| text, tool_calls = _parse_agent_output(raw) |
| except json.JSONDecodeError: |
| text, tool_calls = raw, [] |
|
|
| if text: |
| normalized_text = _normalize_persistent_text(text, system_prompt) |
| session_state["last_parsed_text"] = (str(session_state.get("last_parsed_text") or "") + "\n" + normalized_text).strip() |
| if tool_calls: |
| |
| |
| |
| session_state["last_tool_calls"].extend(tool_calls) |
|
|
| |
| session_state["pre_tool_call_assistant_message"] = _strip_thought_channel_markup( |
| str(history[assistant_index]["content"]) |
| ) |
|
|
| |
| if not tool_calls: |
| |
| yield history, session_state, gr.update(value="", interactive=msg_interactive), gr.update(interactive=send_btn_interactive), gr.update(visible=is_pending_clarify), gr.update(visible=True), _debug_state(session_state) |
| return |
|
|
| tool_outputs = _execute_tool_calls(tool_calls, session_state) |
| session_state["last_tool_outputs"].extend(tool_outputs) |
| session_state["tool_path"] = ",".join(sorted({str(tc.get("name", "")).strip() for tc in tool_calls if str(tc.get("name", "")).strip()})) |
| normalized_text = _normalize_persistent_text(text, system_prompt) |
| messages = _append_tool_messages(messages + [{"role": "assistant", "content": normalized_text}], tool_calls, tool_outputs) |
|
|
| tool_display = "\n\n".join(item["full"] for item in tool_outputs).strip() |
| called_tools = [call.get("name") for call in tool_calls] |
| if tool_display: |
| history.append({ |
| "role": "tool", |
| "content": tool_display, |
| }) |
| yield history, session_state, gr.update(value="", interactive=msg_interactive), gr.update(interactive=send_btn_interactive), gr.update(visible=is_pending_clarify), gr.update(visible=True), _debug_state(session_state) |
| |
| if "clarify_intent" in called_tools: |
| session_state["current_stage"] = "clarify_menu" |
| session_state["routing_status"] = "clarify_intent" |
| _set_decision_path(session_state, "language_detected", "translate", "jailbreak_check", "clarify_intent") |
| clarify_output = next( |
| ( |
| output |
| for output in tool_outputs |
| if output.get("name") == "clarify_intent" |
| ), |
| None, |
| ) |
| if clarify_output: |
| try: |
| parsed_result = json.loads(clarify_output["result"]) |
| options_keys = parsed_result.get( |
| "options", [] |
| ) |
| emergency_info = parsed_result.get( |
| "emergency_options", "" |
| ) |
|
|
| translated_options_keys = [ |
| _translate_clarify_text(key, user_language) |
| for key in options_keys |
| ] |
| translated_label = _translate_clarify_text( |
| "Clarify intent", user_language |
| ) |
|
|
| |
| yield history, session_state, gr.update(value="", interactive=False), gr.update(interactive=False), gr.update( |
| label=translated_label, |
| |
| interactive=True, |
| choices=translated_options_keys, |
| visible=True, |
| ), gr.update(visible=True), _debug_state(session_state) |
| return |
| except json.JSONDecodeError: |
| pass |
|
|
| if "connect" in called_tools or "validate" in called_tools or "skip" in called_tools: |
| session_state["current_stage"] = "sandboxed_redirect" |
| session_state["routing_status"] = "call_or_validate" |
| _set_decision_path(session_state, "language_detected", "translate", "jailbreak_check", "tool_routing", "sandboxed_redirect") |
| target_tc = next(tc for tc in tool_calls if _is_routing_tool(tc.get("name", ""))) |
| target_tc = next((tc for tc in tool_calls if _is_routing_tool(tc.get("name", ""))), {}) |
| parsed = _parse_tool_args(target_tc.get("args", "")) |
| assistant_name = _assistant_classification(str(parsed.get("name", "")).strip() or "Alice") |
| user_msg = session_state.get("current_user_message", "").lower() |
|
|
| |
| session_state.pop("current_turn_instructions", None) |
| |
| |
| safe_tool_results = [] |
| for tool_output in tool_outputs: |
| if not _is_routing_tool(tool_output.get("name", "")): |
| result_str = str(tool_output.get("result", "")) |
| try: |
| parsed = json.loads(result_str) |
| if isinstance(parsed, dict) and "instructions" in parsed: |
| del parsed["instructions"] |
| safe_tool_results.append(f"{tool_output.get('name')}: {json.dumps(parsed)}") |
| except json.JSONDecodeError: |
| safe_tool_results.append(f"{tool_output.get('name')}: {result_str}") |
| sandbox_tool_context = "\n".join(safe_tool_results) if safe_tool_results else None |
|
|
| |
| session_state["routing_status"] = "sandbox_refusal" |
| _set_decision_path(session_state, "language_detected", "translate", "jailbreak_check", "tool_routing", "sandbox_refusal") |
| history.append({"role": "assistant", "content": ""}) |
| assistant_index_for_redirect = len(history) - 1 |
| redirect_buffer = "" |
| for chunk in build_unfulfillable_response_stream( |
| user_msg, |
| session_state, |
| "out_of_scope_tool_call", |
| assistant_name, |
| pre_tool_call_assistant_message=session_state["pre_tool_call_assistant_message"], |
| sandbox_tool_context=sandbox_tool_context, |
| assistant_classification=assistant_name, |
| ): |
| redirect_buffer += chunk |
| session_state["last_redirect_output"] = redirect_buffer |
| history[assistant_index_for_redirect]["content"] = ( |
| _format_live_thinking("", True) + "\n\n" + redirect_buffer |
| ).strip() |
| yield history, session_state, gr.update(value="", interactive=msg_interactive), gr.update(interactive=send_btn_interactive), gr.update(visible=is_pending_clarify), gr.update(visible=True), _debug_state(session_state) |
| session_state["last_redirect_output"] = redirect_buffer |
| history[assistant_index_for_redirect]["content"] = redirect_buffer.strip() |
| |
|
|
| for tool_output in tool_outputs: |
| if _is_routing_tool(tool_output.get("name", "")): |
| replay_text = _history_tool_message(tool_output) |
| if replay_text: |
| session_state["assistant_memory"] = _bounded_append( |
| session_state.get("assistant_memory", []), |
| {"role": "assistant", "content": _normalize_persistent_text(replay_text)}, |
| int(os.environ.get("ASSISTANT_MEMORY_LIMIT", 1)), |
| ) |
|
|
| yield history, session_state, gr.update(value="", interactive=msg_interactive), gr.update(interactive=send_btn_interactive), gr.update(visible=is_pending_clarify), gr.update(visible=True), _debug_state(session_state) |
| return |
|
|
| if round_index < max_rounds - 1: |
| history.append({"role": "assistant", "content": ""}) |
| assistant_index = len(history) - 1 |
|
|
| if tool_outputs: |
| for tool_output in tool_outputs: |
| if _is_routing_tool(tool_output.get("name", "")): |
| replay_text = _history_tool_message(tool_output) |
| if replay_text: |
| session_state["assistant_memory"] = _bounded_append( |
| session_state.get("assistant_memory", []), |
| {"role": "assistant", "content": _normalize_persistent_text(replay_text)}, |
| int(os.environ.get("ASSISTANT_MEMORY_LIMIT", 1)), |
| ) |
| yield history, session_state, gr.update(value="", interactive=not is_pending_clarify), gr.update(interactive=not is_pending_clarify), gr.update(visible=is_pending_clarify), gr.update(visible=True), _debug_state(session_state) |
| return |
|
|
|
|
| def resolve_clarify_choice(choice: str, history: list, session_state: dict): |
| |
| is_pending_clarify = session_state.get("pending_clarify", False) |
| msg_interactive = not is_pending_clarify |
| send_btn_interactive = not is_pending_clarify |
|
|
| if session_state.get("terminated"): |
| yield history, session_state, gr.update(value="", interactive=False), gr.update(interactive=False), gr.update(visible=False), gr.update(visible=False), _debug_state(session_state) |
| return |
|
|
| if not session_state.get("pending_clarify"): |
| yield history or [], session_state, gr.update(value="", interactive=True), gr.update(interactive=True), gr.update(visible=False), gr.update(visible=True), _debug_state(session_state) |
| return |
|
|
| session_state.pop("pending_clarify", None) |
|
|
| normalized = (choice or "").strip().lower() |
| if normalized == "emergency": |
| result = emergency_crisis() |
| session_state["terminated"] = True |
| history = history + [ |
| {"role": "user", "content": "emergency"}, |
| {"role": "assistant", "content": result}, |
| ] |
| yield history, session_state, gr.update(value="", interactive=False), gr.update(interactive=False), gr.update(visible=False), gr.update(visible=True), _debug_state(session_state) |
| return |
|
|
| if normalized == "what bob does": |
| user_message = "What can Bob help with?" |
| elif normalized == "app support": |
| user_message = "I need app support." |
| elif normalized == "store info": |
| user_message = "I need store info." |
| elif normalized == "food safety": |
| user_message = "I have a food safety question." |
| elif normalized == "legal": |
| user_message = "I have a legal question." |
| elif normalized == "order": |
| user_message = "I want to place or modify an order." |
| else: |
| user_message = "I need help." |
|
|
| yield history or [], session_state, gr.update(value="", interactive=False), gr.update(interactive=False), gr.update(visible=False), gr.update(visible=False), _debug_state(session_state) |
| yield from process_turn(user_message, history or [], session_state) |
|
|
|
|
| def _debug_state(state): |
| decision_path = state.get("decision_path") or "idle" |
| decision_graph = state.get("decision_graph") or decision_path.replace(" -> ", " -> ") |
| dashboard_state = { |
| "terminated": state.get("terminated", False), |
| "pending_clarify": state.get("pending_clarify", False), |
| "current_stage": state.get("current_stage"), |
| "active_agent": state.get("active_agent"), |
| "active_language": state.get("active_language"), |
| "translation_status": state.get("translation_status"), |
| "routing_status": state.get("routing_status"), |
| "tool_path": state.get("tool_path"), |
| "last_jailbreak_score": state.get("last_jailbreak_score"), |
| "last_jailbreak_predicted_label": state.get("last_jailbreak_predicted_label"), |
| "last_prompt_injection_score": state.get("last_prompt_injection_score"), |
| "last_prompt_injection_predicted_label": state.get("last_prompt_injection_predicted_label"), |
| "last_refusal_reason": state.get("last_refusal_reason"), |
| "assistants_pool_sample": state.get("assistants", [])[:6], |
| "tool_catalog_size": len(TOOL_CATALOG), |
| "last_input_messages": state.get("last_input_messages", []), |
| "last_raw_output": html.escape(str(state.get("last_raw_output", ""))), |
| "last_parsed_text": html.escape(str(state.get("last_parsed_text", ""))), |
| "last_redirect_output": html.escape(str(state.get("last_redirect_output", ""))), |
| "thinking_active": state.get("thinking_active", False), |
| "last_tool_calls": state.get("last_tool_calls", []), |
| "last_tool_outputs": state.get("last_tool_outputs", []), |
| "routing_trigger_counts": state.get("routing_trigger_counts", {}), |
| "routing_trigger_events": state.get("routing_trigger_events", []), |
| "system_prompt_tokens": state.get("system_prompt_tokens"), |
| "current_turn_tokens": state.get("current_turn_tokens"), |
| "current_turn_characters": state.get("current_turn_characters"), |
| "decision_path": decision_path, |
| "decision_graph": decision_graph, |
| } |
| return _render_dashboard_html(dashboard_state) |
|
|
|
|
| def _set_decision_path(session_state: dict, *steps: str) -> None: |
| compact = " -> ".join(step for step in steps if step) |
| session_state["decision_path"] = compact or "idle" |
| if compact: |
| session_state["decision_graph"] = "\n".join([ |
| "┌─ decision path", |
| *(f"│ {step}" for step in compact.split(" -> ")), |
| "└─ end", |
| ]) |
| else: |
| session_state["decision_graph"] = "┌─ decision path\n│ idle\n└─ end" |
|
|
|
|
| def _render_dashboard_html(state: dict) -> str: |
| path = str(state.get("decision_path") or "idle") |
| steps = [step for step in path.split(" -> ") if step] or ["idle"] |
| colors = { |
| "language_detected": "#2b6cb0", |
| "translate": "#805ad5", |
| "jailbreak_check": "#c05621", |
| "clarify_intent": "#2f855a", |
| "sandbox_refusal": "#c53030", |
| "tool_routing": "#d69e2e", |
| "sandboxed_redirect": "#2c7a7b", |
| "sanitized_reprocess": "#718096", |
| "bob_turn": "#1a202c", |
| "idle": "#718096", |
| } |
| width = max(240, 150 * len(steps)) |
| nodes = [] |
| for idx, step in enumerate(steps): |
| x = 40 + idx * 140 |
| fill = colors.get(step, "#4a5568") |
| nodes.append( |
| f'<g><rect x="{x}" y="34" rx="12" ry="12" width="112" height="44" fill="{fill}" opacity="0.92" />' |
| f'<text x="{x + 56}" y="61" text-anchor="middle" font-size="12" fill="#fff" font-family="ui-sans-serif, system-ui, sans-serif">{html.escape(step)}</text></g>' |
| ) |
| if idx < len(steps) - 1: |
| arrow_x1 = x + 112 |
| arrow_x2 = x + 140 |
| nodes.append( |
| f'<line x1="{arrow_x1}" y1="56" x2="{arrow_x2}" y2="56" stroke="#94a3b8" stroke-width="3" marker-end="url(#arrowhead)" />' |
| ) |
| svg = ( |
| f'<svg viewBox="0 0 {width} 112" width="100%" height="112" xmlns="http://www.w3.org/2000/svg" role="img" aria-label="Decision path chart">' |
| '<defs><marker id="arrowhead" markerWidth="8" markerHeight="8" refX="6" refY="3" orient="auto">' |
| '<path d="M0,0 L6,3 L0,6 Z" fill="#94a3b8" /></marker></defs>' |
| + "".join(nodes) |
| + "</svg>" |
| ) |
|
|
| def badge(label: str, value: Any) -> str: |
| return ( |
| '<div class="dash-badge"><span class="dash-label">' |
| + html.escape(label) |
| + '</span><span class="dash-value">' |
| + html.escape(str(value if value is not None else "")) |
| + "</span></div>" |
| ) |
|
|
| trigger_counts = state.get("routing_trigger_counts") or {} |
| trigger_events = state.get("routing_trigger_events") or [] |
| sorted_triggers = sorted( |
| ((str(name), int(count)) for name, count in trigger_counts.items()), |
| key=lambda item: (-item[1], item[0].lower()), |
| ) |
| if sorted_triggers: |
| trigger_rows = "".join( |
| f'<div class="dash-trigger-row"><span>{html.escape(name)}</span><strong>{count}</strong></div>' |
| for name, count in sorted_triggers |
| ) |
| else: |
| trigger_rows = '<div class="dash-empty">No `connect` / `validate` / `skip` triggers yet.</div>' |
|
|
| if trigger_events: |
| trigger_history_parts = [] |
| for item in reversed(trigger_events): |
| emergency_tag = ' <span class="dash-muted">(emergency)</span>' if item.get("emergency") else "" |
| trigger_history_parts.append( |
| f'<li><code>{html.escape(str(item.get("tool", "")))}</code> ' |
| f'→ <strong>{html.escape(str(item.get("assistant", "")))}</strong>' |
| f"{emergency_tag}</li>" |
| ) |
| trigger_history = "".join(trigger_history_parts) |
| else: |
| trigger_history = '<li class="dash-empty">Nothing recorded yet.</li>' |
|
|
| return f""" |
| <div class="dashboard-panel"> |
| <div class="dashboard-title">Live dashboard</div> |
| <div class="dashboard-grid"> |
| {badge("Stage", state.get("current_stage"))} |
| {badge("Agent", state.get("active_agent"))} |
| {badge("Lang", state.get("active_language"))} |
| {badge("Route", state.get("routing_status"))} |
| {badge("Tools", state.get("tool_path"))} |
| {badge("Turn tokens", state.get("current_turn_tokens"))} |
| {badge("Prompt tokens", state.get("system_prompt_tokens"))} |
| {badge("Chars", state.get("current_turn_characters"))} |
| {badge("Terminated", state.get("terminated", False))} |
| {badge("Redirect Active", "Yes" if state.get("last_redirect_output") else "No")} |
| </div> |
| <div class="dashboard-section"> |
| <div class="dashboard-subtitle">Routing triggers</div> |
| <div class="dashboard-trigger-list">{trigger_rows}</div> |
| </div> |
| <div class="dashboard-section"> |
| <div class="dashboard-subtitle">Thinking state</div> |
| <div class="dash-badge"><span class="dash-label">Active</span><span class="dash-value">{html.escape(str(state.get("thinking_active", False)))}</span></div> |
| </div> |
| <div class="dashboard-section"> |
| <div class="dashboard-subtitle">Recent hits</div> |
| <ul class="dashboard-trigger-history">{trigger_history}</ul> |
| </div> |
| <div class="dashboard-path">{html.escape(path)}</div> |
| <div class="dashboard-svg">{svg}</div> |
| <details class="dashboard-details"> |
| <summary>Raw debug</summary> |
| <pre>{html.escape(json.dumps(state, indent=2, sort_keys=True))}</pre> |
| </details> |
| <details class="dashboard-details"> |
| <summary>Redirect trace</summary> |
| <pre>{html.escape(str(state.get("last_redirect_output", "")))}</pre> |
| </details> |
| </div> |
| """ |
|
|
|
|
| |
| |
| |
|
|
| CSS = """ |
| .bob-header { text-align: center; padding: 1.2rem 0 0.4rem; } |
| .bob-header h1 { font-size: 2rem; font-weight: 800; color: #c84b11; margin: 0; } |
| .bob-header p { color: #888; font-size: 0.88rem; margin: 0.2rem 0 0; } |
| .probe-panel { font-size: 0.82rem; line-height: 1.7; |
| border-left: 3px solid #e74c3c; |
| padding: 0.75rem 1rem; |
| background: var(--block-background-fill); |
| border-radius: 6px; } |
| .probe-panel strong { color: #c0392b; } |
| .probe-panel em { color: #555; } |
| .catalog-panel { font-size: 0.82rem; line-height: 1.55; |
| border-left: 3px solid #d97706; |
| padding: 0.75rem 1rem; |
| background: var(--block-background-fill); |
| border-radius: 6px; } |
| .model-panel { font-size: 0.82rem; line-height: 1.55; |
| border-left: 3px solid #3b82f6; |
| padding: 0.75rem 1rem; margin-bottom: 0.75rem; |
| background: var(--block-background-fill); |
| border-radius: 6px; } |
| .catalog-panel code { font-size: 0.78rem; } |
| .dashboard-panel { font-size: 0.82rem; line-height: 1.45; } |
| .dashboard-title { font-weight: 800; margin-bottom: 0.5rem; color: #1f2937; } |
| .dashboard-section { margin: 0.75rem 0; padding: 0.65rem 0.7rem; border-radius: 0.65rem; background: rgba(248,250,252,0.88); border: 1px solid rgba(148,163,184,0.22); } |
| .dashboard-subtitle { font-size: 0.72rem; font-weight: 800; text-transform: uppercase; letter-spacing: 0.06em; color: #475569; margin-bottom: 0.45rem; } |
| .dashboard-trigger-list { display: grid; gap: 0.35rem; } |
| .dash-trigger-row { display: flex; align-items: center; justify-content: space-between; gap: 0.5rem; padding: 0.35rem 0.45rem; border-radius: 0.45rem; background: rgba(255,255,255,0.82); } |
| .dash-trigger-row span { font-weight: 600; color: #1e293b; } |
| .dash-trigger-row strong { color: #b45309; } |
| .dashboard-trigger-history { margin: 0; padding-left: 1rem; color: #334155; } |
| .dashboard-trigger-history li { margin: 0.2rem 0; } |
| .dash-muted { color: #64748b; font-size: 0.75rem; } |
| .dash-empty { color: #64748b; font-style: italic; } |
| .dashboard-grid { display: grid; grid-template-columns: repeat(2, minmax(0, 1fr)); gap: 0.4rem; margin-bottom: 0.7rem; } |
| .dash-badge { padding: 0.45rem 0.55rem; border-radius: 0.55rem; background: rgba(255,255,255,0.7); border: 1px solid rgba(0,0,0,0.08); } |
| .dash-label { display: block; font-size: 0.69rem; text-transform: uppercase; letter-spacing: 0.04em; color: #6b7280; } |
| .dash-value { display: block; margin-top: 0.15rem; font-weight: 700; color: #111827; word-break: break-word; } |
| .dashboard-path { font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace; padding: 0.4rem 0.55rem; border-radius: 0.55rem; background: rgba(241,245,249,0.95); margin-bottom: 0.6rem; color: #334155; } |
| .dashboard-svg svg { display: block; margin: 0.25rem 0 0.75rem; } |
| .dashboard-details pre { white-space: pre-wrap; max-height: 220px; overflow: auto; } |
| .thinking-panel { margin: 0 0 0.55rem 0; padding: 0.55rem 0.7rem; border-radius: 0.7rem; background: rgba(148,163,184,0.12); border: 1px solid rgba(148,163,184,0.25); color: #334155; } |
| .thinking-panel summary { cursor: pointer; font-size: 0.72rem; font-weight: 800; letter-spacing: 0.05em; text-transform: uppercase; color: #64748b; } |
| .thinking-panel summary::-webkit-details-marker { display: none; } |
| .thinking-body { margin-top: 0.45rem; padding-top: 0.45rem; border-top: 1px solid rgba(148,163,184,0.18); white-space: pre-wrap; } |
| .thinking-pulse { font-style: italic; opacity: 0.75; } |
| .thinking-divider { height: 1px; margin: 0.55rem 0; background: rgba(148,163,184,0.18); } |
| """ |
|
|
|
|
| def build_ui(): |
| with gr.Blocks(title="Bob — ABC Burgers AI", theme=gr.themes.Soft(primary_hue="orange"), css=CSS) as demo: |
|
|
| gr.HTML(""" |
| <div class="bob-header"> |
| <h1>Bob</h1> |
| <p>ABC Burgers AI Assistant</p> |
| </div> |
| """) |
|
|
| with gr.Row(): |
| with gr.Column(scale=3): |
| chatbot = gr.Chatbot(label="", height=500) |
| with gr.Row(): |
| msg = gr.Textbox( |
| placeholder="Talk to Bob...", |
| label="", |
| scale=5, |
| lines=1, |
| autofocus=True, |
| max_length=600, |
| ) |
| send_btn = gr.Button("Send", variant="primary", scale=1) |
| clarify_btn = gr.Button("Clarify: Food Safety, Orders, Legal Inquiry, Store Information, and App Support", variant="secondary") |
| clarify_choice = gr.Radio( |
| choices=CLARIFY_OPTIONS, |
| label="Clarify intent", |
| visible=False, |
| interactive=True, |
| ) |
| clarify_submit = gr.Button("Use selection", variant="secondary", visible=False) |
| clear_btn = gr.Button("New session", size="sm", variant="secondary") |
|
|
| with gr.Column(scale=1, min_width=220): |
| gr.HTML(f""" |
| <div class="model-panel"> |
| <strong>Active Models</strong><br> |
| <ul style="margin: 0.4rem 0 0; padding-left: 1.2rem;"> |
| <li><strong>LLM:</strong> <code>{HF_MODEL}</code></li> |
| <li><strong>Safety 1:</strong> <code>{JAILBREAK_MODEL}</code></li> |
| <li><strong>Safety 2 (EN):</strong> <code>{PROMPT_INJECTION_MODEL}</code></li> |
| <li><strong>Language:</strong> <code>{REFUSAL_LANGUAGE_MODEL}</code></li> |
| </ul> |
| </div> |
| """) |
| gr.HTML(""" |
| <div class="catalog-panel"> |
| <strong>Tool catalog</strong><br><br> |
| """) |
| gr.HTML(_format_tool_catalog()) |
| gr.HTML("</div>") |
| session_info = gr.HTML(value=_render_dashboard_html({ |
| "decision_path": "idle", |
| "decision_graph": "┌─ decision path\n│ idle\n└─ end", |
| })) |
|
|
| session_state = gr.State({}) |
|
|
| def on_send(user_msg, history, state): |
| |
| is_pending_clarify = state.get("pending_clarify", False) |
| msg_interactive = not is_pending_clarify |
| send_btn_interactive = not is_pending_clarify |
|
|
| if not user_msg.strip(): |
| yield history or [], state, gr.update(value="", interactive=msg_interactive), gr.update(interactive=send_btn_interactive), gr.update(visible=is_pending_clarify), gr.update(visible=True), _debug_state(state) |
| return |
| yield history or [], state, gr.update(value="", interactive=False), gr.update(interactive=False), gr.update(visible=is_pending_clarify), gr.update(visible=True), _debug_state(state) |
| yield from process_turn(user_msg, history or [], state) |
|
|
| def on_clarify(choice, history, state): |
| yield from resolve_clarify_choice(choice, history or [], state) |
|
|
| def on_open_clarify(history, state): |
| yield from _open_clarify_intent_menu(history or [], state) |
|
|
| def on_clear(): |
| |
| return [], {}, gr.update(value="", interactive=True), gr.update(interactive=True), gr.update(visible=False), gr.update(visible=False), "" |
|
|
| send_btn.click( |
| on_send, [msg, chatbot, session_state], |
| [chatbot, session_state, msg, send_btn, clarify_choice, clarify_btn, session_info], |
| ) |
| msg.submit( |
| on_send, [msg, chatbot, session_state], |
| [chatbot, session_state, msg, send_btn, clarify_choice, clarify_btn, session_info], |
| ) |
| clarify_btn.click( |
| on_open_clarify, [chatbot, session_state], |
| [chatbot, session_state, msg, send_btn, clarify_choice, clarify_btn, session_info], |
| ) |
| clarify_choice.change( |
| on_clarify, |
| [clarify_choice, chatbot, session_state], |
| [chatbot, session_state, msg, send_btn, clarify_choice, clarify_btn, session_info], |
| ) |
| clarify_submit.click( |
| on_clarify, [clarify_choice, chatbot, session_state], |
| [chatbot, session_state, msg, send_btn, clarify_choice, clarify_btn, session_info], |
| ) |
| clear_btn.click( |
| on_clear, [], |
| [chatbot, session_state, msg, send_btn, clarify_choice, clarify_btn, session_info] |
| ) |
|
|
| return demo |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| demo = build_ui() |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=int(os.environ.get("PORT", 7860)), |
| share=True, |
| show_error=True, |
| ) |
|
|