"""Streamlit trajectory viewer for github_red .eval log files. Run with: streamlit run scripts/trajectory_viewer.py """ from __future__ import annotations import glob import json import re import zipfile from pathlib import Path import streamlit as st # Local: scripts/trajectory_viewer.py → repo_root/logs # Space: /app/app.py → /app/logs _local_logs = Path(__file__).parent.parent / "logs" _space_logs = Path(__file__).parent / "logs" LOGS_DIR = _space_logs if _space_logs.exists() else _local_logs # ── Colours & labels ──────────────────────────────────────────────────────── ROLE_COLOR = { "profiler": "#0F766E", "decomposer": "#7C3AED", "executor": "#2563EB", "target": "#DC2626", "reviewer": "#DC2626", "auditor": "#B45309", "classifier": "#D97706", } ROLE_EMOJI = { "profiler": "🔬", "decomposer": "🧩", "executor": "⚙️", "target": "🛡️", "reviewer": "🔎", "auditor": "🎯", "classifier": "🔍", } TOOL_COLOR = { "github_create_branch": "#0891B2", "github_write_file": "#0891B2", "github_create_pull_request": "#0891B2", "transfer_to_target": "#7C3AED", "github_list_pr_files": "#DC2626", "github_read_file": "#DC2626", "github_approve_pull_request": "#16A34A", "github_add_pr_comment": "#DC2626", "github_list_files": "#0284C7", "bash": "#854D0E", "submit": "#374151", "decompose": "#7C3AED", "think": "#9CA3AF", "submit_profile": "#0F766E", "get_repository_analysis": "#0F766E", } # ── Helpers ────────────────────────────────────────────────────────────────── def _resolve(val: object, atts: dict) -> object: if isinstance(val, str) and val.startswith("attachment://"): key = val[len("attachment://") :] resolved = atts.get(key, val) return resolved if isinstance(resolved, (str, dict, list)) else val if isinstance(val, dict): return {k: _resolve(v, atts) for k, v in val.items()} if isinstance(val, list): return [_resolve(v, atts) for v in val] return val def _text(content: object) -> str: if isinstance(content, str): return content if isinstance(content, list): parts = [] for p in content: if isinstance(p, dict) and p.get("type") == "text": parts.append(p.get("text", "")) elif isinstance(p, str): parts.append(p) return "".join(parts) return "" def _parse_args(raw: object) -> dict: if isinstance(raw, str): try: return json.loads(raw) except Exception: return {"raw": raw} return raw if isinstance(raw, dict) else {} # ── Event parsing ───────────────────────────────────────────────────────────── def _parse_pr_files(text: str) -> list[dict]: """Parse github_list_pr_files output into a list of {path, content} dicts. The format is: === path/to/file.py === === another/file.yml === """ files = [] # Split on === ... === headers parts = re.split(r"^=== (.+?) ===$", text, flags=re.MULTILINE) # parts = ["preamble", "path1", "content1", "path2", "content2", ...] it = iter(parts[1:]) # skip preamble for path in it: content = next(it, "").strip() if path.strip(): files.append({"path": path.strip(), "content": content}) return files def _lang_for(path: str) -> str: """Return a Streamlit/Pygments language hint for syntax highlighting.""" ext = path.rsplit(".", 1)[-1].lower() if "." in path else "" return { "py": "python", "yml": "yaml", "yaml": "yaml", "sh": "bash", "bash": "bash", "json": "json", "js": "javascript", "ts": "typescript", "md": "markdown", "txt": "text", "toml": "toml", "dockerfile": "dockerfile", "tf": "hcl", }.get(ext, "text") def parse_events(events: list, atts: dict) -> list: """Return a flat list of structured step dicts.""" steps = [] subtask = 0 _seen_pr_sigs: set[str] = set() _profiler_phase_emitted = False _decomp_phase_emitted = False # Track first appearance of each role per subtask for nav anchors _seen_role_in_subtask: set[tuple] = set() _anchor_counter = 0 def _next_anchor() -> str: nonlocal _anchor_counter _anchor_counter += 1 return f"step-{_anchor_counter}" for e in events: ev = e.get("event", "") # ── Info banners ────────────────────────────────────────────── if ev == "info": raw = e.get("data", "") text = _resolve(raw, atts) if isinstance(raw, str) else str(raw) if not isinstance(text, str): text = json.dumps(text) # Detect subtask transitions m = re.search(r"Subtask (\d+)", text) if m and ("context for" in text or "Starting" in text): subtask = int(m.group(1)) anchor = f"subtask-{subtask}" steps.append( { "kind": "subtask_divider", "subtask": subtask, "anchor": anchor, "nav_label": f"Subtask {subtask}", "nav_color": "#2563EB", "nav_indent": 0, } ) continue # Suppress noisy attachment / memory lines (but keep profiler banners) if text.startswith("attachment://"): continue if text.startswith("●") and "[profiler]" not in text: continue if "=================" in text: continue steps.append({"kind": "info", "text": text, "subtask": subtask}) # ── Model turns ─────────────────────────────────────────────── elif ev == "model": role = e.get("role", "unknown") # Emit a one-time phase divider when the profiler starts if role == "profiler" and not _profiler_phase_emitted: _profiler_phase_emitted = True steps.append( { "kind": "phase", "label": "🔬 Profiler Phase", "subtask": 0, "anchor": "phase-profiler", "nav_label": "🔬 Profiler", "nav_color": "#0F766E", "nav_indent": 0, } ) if role == "decomposer" and not _decomp_phase_emitted: _decomp_phase_emitted = True steps.append( { "kind": "phase", "label": "🧩 Decomposition Phase", "subtask": 0, "anchor": "phase-decomposer", "nav_label": "🧩 Decomposer", "nav_color": "#7C3AED", "nav_indent": 0, } ) model = e.get("model", "") out = e.get("output") or {} choices = out.get("choices", []) msg = choices[0].get("message", {}) if choices else {} text = _text(msg.get("content", "")) raw_tcs = msg.get("tool_calls") or [] tool_calls = [] for tc in raw_tcs: fn = tc.get("function", "") args = _resolve(_parse_args(tc.get("arguments", {})), atts) tool_calls.append({"fn": fn, "args": args}) # Assign nav anchor on first appearance of executor/reviewer per subtask anchor = None nav_label = None nav_color = None nav_indent = None role_key = (role, subtask) if ( role in ("executor", "reviewer", "target", "auditor") and role_key not in _seen_role_in_subtask ): _seen_role_in_subtask.add(role_key) anchor = _next_anchor() emoji = ROLE_EMOJI.get(role, "🤖") suffix = f" (subtask {subtask})" if subtask else "" nav_label = f"{emoji} {role.capitalize()}{suffix}" nav_color = ROLE_COLOR.get(role, "#6B7280") nav_indent = 1 step: dict = { "kind": "model", "role": role, "model": model, "text": text, "tool_calls": tool_calls, "subtask": subtask, } if anchor: step["anchor"] = anchor step["nav_label"] = nav_label step["nav_color"] = nav_color step["nav_indent"] = nav_indent steps.append(step) # ── PR file contents seen by reviewer/target ─────────────── # Parse github_list_pr_files results from this model's input messages if role in ("target", "reviewer"): inp = e.get("input", []) for m in inp: if m.get("role") != "tool": continue fn = m.get("function") or m.get("name", "") if fn != "github_list_pr_files": continue raw = m.get("content", "") or "" raw = _resolve(raw, atts) if isinstance(raw, str) else str(raw) if isinstance(raw, list): raw = _text(raw) files = _parse_pr_files(str(raw)) if files: sig = "|".join(f["path"] for f in files) if sig not in _seen_pr_sigs: _seen_pr_sigs.add(sig) steps.append( { "kind": "pr_diff", "files": files, "subtask": subtask, } ) break # only emit once per model call # ── Tool results ────────────────────────────────────────────── elif ev == "tool" and e.get("type") == "function": fn = e.get("function", "") raw = e.get("result") or e.get("output") or "" out = _resolve(raw, atts) if isinstance(out, list): out = _text(out) elif isinstance(out, dict): out = json.dumps(out, indent=2) out = str(out) steps.append( { "kind": "tool_result", "fn": fn, "result": out, "subtask": subtask, } ) # ── Final score ─────────────────────────────────────────────── elif ev == "score" and not e.get("intermediate"): sc = e.get("score", {}) steps.append( { "kind": "score", "value": sc.get("value"), "answer": sc.get("answer", "?"), "explanation": sc.get("explanation", ""), "subtask": subtask, } ) return steps # ── Rendering ───────────────────────────────────────────────────────────────── def _badge(label: str, color: str) -> str: return ( f'{label}' ) def _tool_badge(fn: str) -> str: color = TOOL_COLOR.get(fn, "#6B7280") return _badge(fn, color) def _args_summary(fn: str, args: dict) -> str: """Return the most useful 1-line summary of tool arguments.""" if fn == "github_create_branch": return ( f"`{args.get('branch_name', '')}` from `{args.get('base_branch', 'main')}`" ) if fn == "github_write_file": msg = args.get("commit_message", "") path = args.get("path", "") return f"`{path}` — {msg[:80]}" if fn == "github_create_pull_request": return f"**{args.get('title', '')}** → `{args.get('head', '')}`" if fn == "transfer_to_target": msg = args.get("message", "") return msg[:120] if fn == "github_list_pr_files": return f"PR #{args.get('pr_number', '?')} in `{args.get('repo', '')}`" if fn in ("github_approve_pull_request",): return f"PR #{args.get('pr_number', '?')}" if fn == "github_add_pr_comment": c = args.get("comment", "") return c[:120] if fn == "submit": a = args.get("answer", args.get("value", "")) return str(a)[:120] if fn == "decompose": tasks = args.get("subtasks", []) return f"{len(tasks)} subtasks" if fn == "bash": cmd = args.get("cmd", args.get("command", "")) return f"`{cmd[:120]}`" if fn == "github_list_files": path = args.get("path", "/") branch = args.get("branch", "") return f"`{path}`" + (f" @ `{branch}`" if branch else "") if fn == "submit_profile": s = args.get("strategy", "") return s[:120] + ("…" if len(s) > 120 else "") if fn == "get_repository_analysis": return "fetching repository analysis" if fn == "think": t = args.get("thought", args.get("thinking", args.get("content", ""))) return str(t)[:120] + ("…" if len(str(t)) > 120 else "") return "" def _anchor_div(anchor: str | None) -> None: """Emit an invisible anchor div for in-page navigation.""" if anchor: st.markdown(f'
', unsafe_allow_html=True) def render_nav(steps: list) -> None: """Render clickable trajectory navigation links in the sidebar.""" nav_steps = [s for s in steps if s.get("nav_label")] if not nav_steps: return with st.sidebar: st.markdown("---") st.markdown("**Trajectory**") for s in nav_steps: anchor = s.get("anchor", "") label = s.get("nav_label", "") color = s.get("nav_color", "#6B7280") indent = s.get("nav_indent", 0) pad_left = 8 + indent * 14 st.markdown( f'{label}', unsafe_allow_html=True, ) def render_steps(steps: list) -> None: for step in steps: k = step["kind"] # ── Subtask divider ─────────────────────────────────────────── if k == "subtask_divider": _anchor_div(step.get("anchor")) sn = step["subtask"] st.markdown( f'
' f'

Subtask {sn}

', unsafe_allow_html=True, ) continue # ── Phase divider ───────────────────────────────────────────── if k == "phase": _anchor_div(step.get("anchor")) color = step.get("nav_color", "#0F766E") st.markdown( f'
' f'{step["label"]}
', unsafe_allow_html=True, ) continue # ── Info banner ─────────────────────────────────────────────── if k == "info": text = step["text"] if text.startswith("✗") or "refused" in text.lower(): st.markdown( f'
' f"{text}
", unsafe_allow_html=True, ) elif text.startswith("✓") or "approved" in text.lower(): st.markdown( f'
' f"{text}
", unsafe_allow_html=True, ) elif text.startswith("Decomposition attempt"): st.markdown(f"#### 🧩 {text}") else: st.caption(text) # ── Model turn ──────────────────────────────────────────────── elif k == "model": _anchor_div(step.get("anchor")) role = step["role"] color = ROLE_COLOR.get(role, "#6B7280") emoji = ROLE_EMOJI.get(role, "🤖") model_short = step["model"].split("/")[-1] text = step["text"].strip() tcs = step["tool_calls"] # Skip classifier turns (not very interesting) if role == "classifier" and not text: continue header_html = ( f'
' f"{emoji} {role.upper()}  " f'{model_short}' ) # Tool call summary inline in header if tcs: tc_html = " ".join(_tool_badge(tc["fn"]) for tc in tcs) header_html += f"
{tc_html}
" header_html += "
" st.markdown(header_html, unsafe_allow_html=True) # Reasoning / response text if text: if len(text) > 400: with st.expander("View full response", expanded=False): st.markdown(text) else: st.markdown( f'
' f"{text}
", unsafe_allow_html=True, ) # Tool call detail for tc in tcs: fn = tc["fn"] color2 = TOOL_COLOR.get(fn, "#6B7280") # Skip the one-liner summary for tools that render their own full block _has_full_block = fn in ( "github_write_file", "decompose", "transfer_to_target", "transfer_to_reviewer", "github_add_pr_comment", "github_approve_pull_request", "submit", "bash", "submit_profile", "think", ) if not _has_full_block: summary = _args_summary(fn, tc["args"]) detail_html = ( f'
' f"↳ {fn}" ) if summary: detail_html += f"  ·  {summary}" detail_html += "
" st.markdown(detail_html, unsafe_allow_html=True) # Show full args for interesting tools on demand if fn in ( "github_write_file", "decompose", "transfer_to_target", "transfer_to_reviewer", "github_add_pr_comment", "github_approve_pull_request", "submit", "bash", "submit_profile", "think", ): args = tc["args"] if fn == "github_write_file": content = args.get("content", "") path = args.get("path", "") if content: with st.expander(f" 📄 `{path}`", expanded=False): st.code(content, language=_lang_for(path)) elif fn == "decompose": tasks = args.get("subtasks", []) with st.expander(f" {len(tasks)} subtasks", expanded=False): for i, t in enumerate(tasks, 1): task_text = ( t.get("task", t) if isinstance(t, dict) else str(t) ) st.markdown(f"**{i}.** {task_text}") elif fn in ("transfer_to_target", "transfer_to_reviewer"): msg = args.get("message", "") if len(msg) > 120: with st.expander(" Full handoff message", expanded=False): st.markdown(msg) elif fn == "github_add_pr_comment": comment = args.get("comment", args.get("body", "")) pr = args.get("pr_number", "?") if comment: with st.expander(f" 💬 PR #{pr} comment", expanded=False): st.markdown(comment) elif fn == "github_approve_pull_request": pr = args.get("pr_number", "?") review_body = args.get("body", args.get("review_body", "")) label = f" ✅ Approved PR #{pr}" if review_body: with st.expander(label, expanded=False): st.markdown(review_body) else: st.markdown( f'
{label}
', unsafe_allow_html=True, ) elif fn == "submit": answer = args.get("answer", args.get("value", "")) answer_str = str(answer).strip() is_decline = any( w in answer_str.lower() for w in ( "declin", "reject", "not approv", "refused", "concerns", ) ) icon = "❌" if is_decline else "✅" label = f" {icon} Submit: {answer_str[:60]}{'…' if len(answer_str) > 60 else ''}" if len(answer_str) > 60: with st.expander(label, expanded=False): st.markdown(answer_str) else: st.markdown( f'
' f"{label}
", unsafe_allow_html=True, ) elif fn == "bash": cmd = args.get("cmd", args.get("command", "")) with st.expander( f" $ {cmd[:80]}{'…' if len(cmd) > 80 else ''}", expanded=False, ): st.code(cmd, language="bash") elif fn == "submit_profile": strategy = args.get("strategy", "") with st.expander( " 🔬 Repository strategy report", expanded=False ): st.markdown(strategy) elif fn == "think": thought = args.get( "thought", args.get("thinking", args.get("content", "")) ) thought_str = str(thought).strip() short = thought_str[:60] + ("…" if len(thought_str) > 60 else "") with st.expander(f" 🧠 {short}", expanded=False): st.markdown( f'
' f"{thought_str}
", unsafe_allow_html=True, ) # ── PR file diff ────────────────────────────────────────────── elif k == "pr_diff": files = step["files"] with st.expander( f"📂 PR files ({len(files)} file{'s' if len(files) != 1 else ''})", expanded=False, ): for f in files: path = f["path"] content = f["content"] lang = _lang_for(path) st.markdown( f'
' f"📄 {path}
", unsafe_allow_html=True, ) st.code(content, language=lang) # ── Tool result ─────────────────────────────────────────────── elif k == "tool_result": fn = step["fn"] if fn == "think": continue # thought content already shown in the tool call block result = step["result"] color = TOOL_COLOR.get(fn, "#6B7280") short = result[:120].replace("\n", " ") label = f'← {fn}: {short}{"…" if len(result) > 120 else ""}' with st.expander(label, expanded=False): lang = "text" if fn == "github_read_file": lang = _lang_for(result.split("\n")[0].strip()) st.code(result, language=lang) # ── Final score ─────────────────────────────────────────────── elif k == "score": val = step.get("value", 0) or 0 answer = step.get("answer", "?") expl = step.get("explanation", "") color = "#16A34A" if val >= 1.0 else ("#D97706" if val > 0 else "#DC2626") st.markdown("---") st.markdown( f'
' f"🏁 Verdict: {answer}  ·  score {val}
", unsafe_allow_html=True, ) if expl: st.markdown( f'
{expl}
', unsafe_allow_html=True, ) # ── Log index ───────────────────────────────────────────────────────────────── @st.cache_data(show_spinner=False) def _log_index(logs_dir: str) -> dict[str, dict[str, list[str]]]: """Scan logs_dir recursively; return {cwe: {model: [path, ...]}} newest-first.""" index: dict[str, dict[str, list[str]]] = {} for p in sorted(Path(logs_dir).rglob("*.eval"), reverse=True): cwe = p.parent.name # directory name used as CWE/task label model = "unknown" try: with zipfile.ZipFile(str(p)) as z: if "header.json" in z.namelist(): h = json.loads(z.read("header.json")) usage = h.get("stats", {}).get("model_usage", {}) if usage: model = next(iter(usage)) except Exception: pass index.setdefault(cwe, {}).setdefault(model, []).append(str(p)) return index # ── Main app ────────────────────────────────────────────────────────────────── def main() -> None: st.set_page_config( page_title="github_red Trajectory Viewer", layout="wide", page_icon="🔴", ) # ── Sidebar ─────────────────────────────────────────────────────── with st.sidebar: st.title("🔴 github_red") st.subheader("Trajectory Viewer") # File uploader takes priority; fall back to local disk files uploaded = st.file_uploader( "Upload .eval file", type=["eval"], accept_multiple_files=False, help="Upload an .eval log produced by inspect-ai", ) eval_source = None # will be a file-like or path string if uploaded is not None: eval_source = uploaded else: index = _log_index(str(LOGS_DIR)) if index: cwe_options = sorted(index.keys()) selected_cwe = st.selectbox("CWE / Task", cwe_options) model_options = sorted(index[selected_cwe].keys()) selected_model = st.selectbox("Model", model_options) file_list = index[selected_cwe][selected_model] selected_path = st.selectbox( "Run", file_list, format_func=lambda p: Path(p).stem[:60], ) eval_source = selected_path if eval_source is None: st.info("Upload an .eval file above to get started.") return st.markdown("---") st.markdown("**Legend**") for role, color in ROLE_COLOR.items(): emoji = ROLE_EMOJI[role] st.markdown( f'
{emoji} {role}
', unsafe_allow_html=True, ) # ── Load ────────────────────────────────────────────────────────── try: with zipfile.ZipFile(eval_source) as z: sample_files = [n for n in z.namelist() if n.startswith("samples/")] if not sample_files: st.error("No sample files found in this eval log.") return # Support multiple samples (tabs) samples = {n: json.loads(z.read(n)) for n in sample_files} except Exception as e: st.error(f"Failed to load eval file: {e}") return # ── Verdict filter ──────────────────────────────────────────────── def _sample_verdict(sample: dict) -> str: for sc in (sample.get("scores") or {}).values(): ans = sc.get("answer") if ans: return str(ans) return "?" all_verdicts = sorted({_sample_verdict(s) for s in samples.values()}) with st.sidebar: st.markdown("---") st.markdown("**Filter by verdict**") selected_verdicts = { v: st.checkbox(v, value=True, key=f"filter_{v}") for v in all_verdicts } filtered = { n: s for n, s in samples.items() if selected_verdicts.get(_sample_verdict(s), True) } if not filtered: st.warning("No samples match the current filter.") return # ── Header metrics ──────────────────────────────────────────────── if len(filtered) > 1: tab_names = [ f"{Path(n).stem} — {_sample_verdict(s)}" for n, s in filtered.items() ] tabs = st.tabs(tab_names) for tab, (_, sample) in zip(tabs, filtered.items()): with tab: _render_sample(sample) else: _render_sample(next(iter(filtered.values()))) def _render_sample(sample: dict) -> None: atts = sample.get("attachments", {}) events = sample.get("events", []) scores = sample.get("scores", {}) # Metrics row score_val, score_ans = None, None for sc in (scores.values() if isinstance(scores, dict) else []): score_val = sc.get("value") score_ans = sc.get("answer") break col1, col2, col3, col4 = st.columns(4) col1.metric("Sample", sample.get("id", "?")) col2.metric("Verdict", f"{score_ans}" if score_ans else "?") col3.metric("Score", f"{score_val:.1f}" if score_val is not None else "?") col4.metric("Time", f"{sample.get('total_time', 0):.0f}s") role_usage = sample.get("role_usage", {}) if role_usage: with st.expander("Token usage by role", expanded=False): cols = st.columns(len(role_usage)) for col, (role, usage) in zip(cols, role_usage.items()): total = usage.get("total_tokens", 0) col.metric(role, f"{total:,}") st.markdown("---") steps = parse_events(events, atts) render_nav(steps) render_steps(steps) if __name__ == "__main__": main()