"""Shared transcript rendering used by both UIs. Both ``local_ui.py`` (driving a raw ``opencode serve``) and the deployed ``server/gradio_ui.py`` (driving an in-sandbox ``opencode serve`` through the env's MCP tools) consume the same opencode message+parts shape: messages: [ { "info": {id, role, sessionID, time, ...}, "parts": [ {"type": "step-start", ...}, {"type": "reasoning", "text": ..., "id": ...}, {"type": "text", "text": ..., "id": ...}, {"type": "tool", "tool": "...", "state": {status, input, output}, ...}, {"type": "step-finish", "tokens": {...}, ...}, ], }, ... ] or the flat SSE form: events: [{"type": "message.part.updated", "properties": {"part": {...}}}, ...] Both reduce to an ordered list of parts keyed on ``part.id``. """ from __future__ import annotations import html as _html import json from typing import Any # ── Part collection ──────────────────────────────────────────────────────── def collect_parts_from_events(events: list[dict[str, Any]]) -> list[dict[str, Any]]: """Reduce SSE ``message.part.updated`` frames to latest snapshot per ``part.id``. Used by ``local_ui.py`` (direct SSE consumer). """ order: list[str] = [] latest: dict[str, dict[str, Any]] = {} for ev in events: if ev.get("type") != "message.part.updated": continue p = (ev.get("properties") or {}).get("part") or {} pid = p.get("id") if not pid: continue if pid not in latest: order.append(pid) latest[pid] = p return [latest[i] for i in order] def collect_parts_from_messages( messages: list[dict[str, Any]], ) -> list[dict[str, Any]]: """Flatten the ``GET /session/:id/message`` shape into an ordered parts list. Used by the deployed Gradio UI which polls via ``get_messages`` MCP tool. Message order is preserved; within a message the server returns parts in emission order so no further sorting is needed. """ parts: list[dict[str, Any]] = [] for m in messages or []: if not isinstance(m, dict): continue for p in m.get("parts") or []: if isinstance(p, dict): parts.append(p) return parts # ── Rendering ────────────────────────────────────────────────────────────── def _esc(s: Any) -> str: return _html.escape("" if s is None else str(s)) def _cap(s: str, n: int = 6000) -> str: if len(s) <= n: return s return s[:n] + f"\n… ({len(s) - n} chars hidden)" def _todo_icon(status: str | None) -> str: return {"completed": "✅", "in_progress": "🔄"}.get(status or "", "⏳") def fmt_tool(name: str, state: dict[str, Any], raw: dict[str, Any]) -> str: """Per-tool card — mirrors opencode's own UI shapes.""" status = (state or {}).get("status") or "?" inp = (state or {}).get("input") or raw.get("input") or {} out = (state or {}).get("output") or raw.get("output") or "" badge = {"completed": "ok", "error": "err", "running": "run"}.get(status, "") if name == "read": summary = f"📖 read {_esc(inp.get('filePath') or inp.get('path'))}" body = f"
{_esc(_cap(str(out)))}
" elif name == "write": path = inp.get("filePath") or inp.get("path") content = inp.get("content") or "" summary = f"✍️ write {_esc(path)} ({len(content)} chars)" body = f"
{_esc(_cap(content))}
" elif name == "edit": path = inp.get("filePath") or inp.get("path") old = inp.get("oldString") or "" new = inp.get("newString") or "" summary = f"✏️ edit {_esc(path)}" body = ( f"
- old
{_esc(_cap(old, 3000))}
" f"
+ new
{_esc(_cap(new, 3000))}
" ) if out: body += f"
output
{_esc(_cap(str(out), 2000))}
" elif name == "bash": cmd = inp.get("command") or inp.get("cmd") or "" summary = f"⚡ bash {_esc(cmd[:160])}" body = f"
{_esc(_cap(str(out)))}
" elif name in ("glob", "find"): pattern = inp.get("pattern") or inp.get("query") or "" summary = f"🔎 {name} {_esc(pattern)}" body = f"
{_esc(_cap(str(out), 4000))}
" elif name == "grep": pattern = inp.get("pattern") or "" path = inp.get("path") or "" summary = f"🔎 grep {_esc(pattern)}" + ( f" in {_esc(path)}" if path else "" ) body = f"
{_esc(_cap(str(out), 4000))}
" elif name == "todowrite": todos = inp.get("todos") or [] summary = f"📝 todowrite ({len(todos)} items)" body = "" elif name == "task": desc = inp.get("description") or inp.get("prompt") or "" summary = f"🧩 task — {_esc(desc[:160])}" body = f"
{_esc(_cap(str(out), 4000))}
" elif name == "webfetch": summary = f"🌐 webfetch {_esc(inp.get('url'))}" body = f"
{_esc(_cap(str(out), 4000))}
" else: summary = f"🔧 {_esc(name)}" body = ( f"
input
{_esc(_cap(json.dumps(inp, indent=2, default=str), 4000))}
" f"
output
{_esc(_cap(str(out), 4000))}
" ) return ( "
" f"{summary} {_esc(status)}" f"
{body}
" "
" ) def render_transcript( parts: list[dict[str, Any]], errors: list[str] | None = None ) -> str: """Render a parts list as HTML cards. Emits wrapped CSS-friendly markup. Consumers should inject the CSS from :data:`TRANSCRIPT_CSS`. """ out: list[str] = [] if errors: out.append( "
⚠️ errors
" ) if not parts: out.append("
waiting for first part…
") return "".join(out) out.append("
") for p in parts: t = p.get("type") if t == "step-start": out.append("
── new step ──
") elif t == "reasoning": txt = (p.get("text") or "").strip() if txt: out.append( "
🧠 reasoning" f"
{_esc(_cap(txt, 4000))}
" ) elif t == "text": txt = (p.get("text") or "").strip() if txt: out.append(f"
{_esc(txt)}
") elif t == "tool": out.append(fmt_tool(p.get("tool") or "?", p.get("state") or {}, p)) elif t == "step-finish": tokens = p.get("tokens") or (p.get("state") or {}).get("tokens") or {} if tokens: out.append( f"
tokens: " f"{_esc(json.dumps(tokens, default=str))}
" ) out.append("
") return "".join(out) TRANSCRIPT_CSS = """ .chat { font-size:14px; } .assistant pre { background:#0e1013; padding:10px; border-radius:8px; white-space:pre-wrap; color:#eee; margin:6px 0; } .reasoning { opacity:0.8; margin:4px 0; } .reasoning pre { background:#0a0b0d; color:#aab; padding:8px; white-space:pre-wrap; } .tool { border:1px solid #2a2f3a; border-radius:8px; padding:6px 10px; margin:6px 0; background:#12161c; } .tool summary { cursor:pointer; color:#ddd; } .tool code { background:#222; color:#9cf; padding:1px 4px; border-radius:3px; } .tbody { margin-top:6px; } .tbody pre { background:#0a0b0d; padding:8px; border-radius:4px; white-space:pre-wrap; max-height:400px; overflow:auto; font-size:12px; color:#ddd; margin:2px 0; } .tbody pre.add { border-left:3px solid #2e6; } .tbody pre.del { border-left:3px solid #e53; } .tbody .lbl { color:#888; font-size:11px; margin-top:6px; } .badge { padding:1px 6px; border-radius:8px; font-size:11px; background:#333; color:#ddd; } .badge.ok { background:#1f6f43; color:white; } .badge.err { background:#7a1e1e; color:white; } .badge.run { background:#7a5c1e; color:white; } .step { color:#555; text-align:center; margin:10px 0; font-size:11px; } .stepfin { color:#666; font-size:11px; margin:4px 0 12px; } .empty { color:#666; font-style:italic; padding:12px; } .errbox { background:#2a1414; border:1px solid #7a1e1e; border-radius:6px; padding:6px 10px; margin:6px 0; color:#f88; font-size:13px; } .errbox ul { margin:2px 0 0 18px; } """