| """Streamlit trajectory viewer for github_red .eval log files. |
| |
| Run with: |
| streamlit run scripts/trajectory_viewer.py |
| """ |
|
|
| from __future__ import annotations |
|
|
| import glob |
| import json |
| import re |
| import zipfile |
| from pathlib import Path |
|
|
| import streamlit as st |
|
|
| |
| |
| _local_logs = Path(__file__).parent.parent / "logs" |
| _space_logs = Path(__file__).parent / "logs" |
| LOGS_DIR = _space_logs if _space_logs.exists() else _local_logs |
|
|
| |
|
|
| ROLE_COLOR = { |
| "profiler": "#0F766E", |
| "decomposer": "#7C3AED", |
| "executor": "#2563EB", |
| "target": "#DC2626", |
| "reviewer": "#DC2626", |
| "auditor": "#B45309", |
| "classifier": "#D97706", |
| } |
| ROLE_EMOJI = { |
| "profiler": "π¬", |
| "decomposer": "π§©", |
| "executor": "βοΈ", |
| "target": "π‘οΈ", |
| "reviewer": "π", |
| "auditor": "π―", |
| "classifier": "π", |
| } |
| TOOL_COLOR = { |
| "github_create_branch": "#0891B2", |
| "github_write_file": "#0891B2", |
| "github_create_pull_request": "#0891B2", |
| "transfer_to_target": "#7C3AED", |
| "github_list_pr_files": "#DC2626", |
| "github_read_file": "#DC2626", |
| "github_approve_pull_request": "#16A34A", |
| "github_add_pr_comment": "#DC2626", |
| "github_list_files": "#0284C7", |
| "bash": "#854D0E", |
| "submit": "#374151", |
| "decompose": "#7C3AED", |
| "think": "#9CA3AF", |
| "submit_profile": "#0F766E", |
| "get_repository_analysis": "#0F766E", |
| } |
|
|
| |
|
|
|
|
| def _resolve(val: object, atts: dict) -> object: |
| if isinstance(val, str) and val.startswith("attachment://"): |
| key = val[len("attachment://") :] |
| resolved = atts.get(key, val) |
| return resolved if isinstance(resolved, (str, dict, list)) else val |
| if isinstance(val, dict): |
| return {k: _resolve(v, atts) for k, v in val.items()} |
| if isinstance(val, list): |
| return [_resolve(v, atts) for v in val] |
| return val |
|
|
|
|
| def _text(content: object) -> str: |
| if isinstance(content, str): |
| return content |
| if isinstance(content, list): |
| parts = [] |
| for p in content: |
| if isinstance(p, dict) and p.get("type") == "text": |
| parts.append(p.get("text", "")) |
| elif isinstance(p, str): |
| parts.append(p) |
| return "".join(parts) |
| return "" |
|
|
|
|
| def _parse_args(raw: object) -> dict: |
| if isinstance(raw, str): |
| try: |
| return json.loads(raw) |
| except Exception: |
| return {"raw": raw} |
| return raw if isinstance(raw, dict) else {} |
|
|
|
|
| |
|
|
|
|
| def _parse_pr_files(text: str) -> list[dict]: |
| """Parse github_list_pr_files output into a list of {path, content} dicts. |
| |
| The format is: |
| === path/to/file.py === |
| <file content> |
| |
| === another/file.yml === |
| <file content> |
| """ |
| files = [] |
| |
| parts = re.split(r"^=== (.+?) ===$", text, flags=re.MULTILINE) |
| |
| it = iter(parts[1:]) |
| for path in it: |
| content = next(it, "").strip() |
| if path.strip(): |
| files.append({"path": path.strip(), "content": content}) |
| return files |
|
|
|
|
| def _lang_for(path: str) -> str: |
| """Return a Streamlit/Pygments language hint for syntax highlighting.""" |
| ext = path.rsplit(".", 1)[-1].lower() if "." in path else "" |
| return { |
| "py": "python", |
| "yml": "yaml", |
| "yaml": "yaml", |
| "sh": "bash", |
| "bash": "bash", |
| "json": "json", |
| "js": "javascript", |
| "ts": "typescript", |
| "md": "markdown", |
| "txt": "text", |
| "toml": "toml", |
| "dockerfile": "dockerfile", |
| "tf": "hcl", |
| }.get(ext, "text") |
|
|
|
|
| def parse_events(events: list, atts: dict) -> list: |
| """Return a flat list of structured step dicts.""" |
| steps = [] |
| subtask = 0 |
| _seen_pr_sigs: set[str] = set() |
| _profiler_phase_emitted = False |
| _decomp_phase_emitted = False |
| |
| _seen_role_in_subtask: set[tuple] = set() |
| _anchor_counter = 0 |
|
|
| def _next_anchor() -> str: |
| nonlocal _anchor_counter |
| _anchor_counter += 1 |
| return f"step-{_anchor_counter}" |
|
|
| for e in events: |
| ev = e.get("event", "") |
|
|
| |
| if ev == "info": |
| raw = e.get("data", "") |
| text = _resolve(raw, atts) if isinstance(raw, str) else str(raw) |
| if not isinstance(text, str): |
| text = json.dumps(text) |
|
|
| |
| m = re.search(r"Subtask (\d+)", text) |
| if m and ("context for" in text or "Starting" in text): |
| subtask = int(m.group(1)) |
| anchor = f"subtask-{subtask}" |
| steps.append( |
| { |
| "kind": "subtask_divider", |
| "subtask": subtask, |
| "anchor": anchor, |
| "nav_label": f"Subtask {subtask}", |
| "nav_color": "#2563EB", |
| "nav_indent": 0, |
| } |
| ) |
| continue |
|
|
| |
| if text.startswith("attachment://"): |
| continue |
| if text.startswith("β") and "[profiler]" not in text: |
| continue |
| if "=================" in text: |
| continue |
|
|
| steps.append({"kind": "info", "text": text, "subtask": subtask}) |
|
|
| |
| elif ev == "model": |
| role = e.get("role", "unknown") |
|
|
| |
| if role == "profiler" and not _profiler_phase_emitted: |
| _profiler_phase_emitted = True |
| steps.append( |
| { |
| "kind": "phase", |
| "label": "π¬ Profiler Phase", |
| "subtask": 0, |
| "anchor": "phase-profiler", |
| "nav_label": "π¬ Profiler", |
| "nav_color": "#0F766E", |
| "nav_indent": 0, |
| } |
| ) |
|
|
| if role == "decomposer" and not _decomp_phase_emitted: |
| _decomp_phase_emitted = True |
| steps.append( |
| { |
| "kind": "phase", |
| "label": "π§© Decomposition Phase", |
| "subtask": 0, |
| "anchor": "phase-decomposer", |
| "nav_label": "π§© Decomposer", |
| "nav_color": "#7C3AED", |
| "nav_indent": 0, |
| } |
| ) |
|
|
| model = e.get("model", "") |
| out = e.get("output") or {} |
| choices = out.get("choices", []) |
| msg = choices[0].get("message", {}) if choices else {} |
|
|
| text = _text(msg.get("content", "")) |
| raw_tcs = msg.get("tool_calls") or [] |
| tool_calls = [] |
| for tc in raw_tcs: |
| fn = tc.get("function", "") |
| args = _resolve(_parse_args(tc.get("arguments", {})), atts) |
| tool_calls.append({"fn": fn, "args": args}) |
|
|
| |
| anchor = None |
| nav_label = None |
| nav_color = None |
| nav_indent = None |
| role_key = (role, subtask) |
| if ( |
| role in ("executor", "reviewer", "target", "auditor") |
| and role_key not in _seen_role_in_subtask |
| ): |
| _seen_role_in_subtask.add(role_key) |
| anchor = _next_anchor() |
| emoji = ROLE_EMOJI.get(role, "π€") |
| suffix = f" (subtask {subtask})" if subtask else "" |
| nav_label = f"{emoji} {role.capitalize()}{suffix}" |
| nav_color = ROLE_COLOR.get(role, "#6B7280") |
| nav_indent = 1 |
|
|
| step: dict = { |
| "kind": "model", |
| "role": role, |
| "model": model, |
| "text": text, |
| "tool_calls": tool_calls, |
| "subtask": subtask, |
| } |
| if anchor: |
| step["anchor"] = anchor |
| step["nav_label"] = nav_label |
| step["nav_color"] = nav_color |
| step["nav_indent"] = nav_indent |
|
|
| steps.append(step) |
|
|
| |
| |
| if role in ("target", "reviewer"): |
| inp = e.get("input", []) |
| for m in inp: |
| if m.get("role") != "tool": |
| continue |
| fn = m.get("function") or m.get("name", "") |
| if fn != "github_list_pr_files": |
| continue |
| raw = m.get("content", "") or "" |
| raw = _resolve(raw, atts) if isinstance(raw, str) else str(raw) |
| if isinstance(raw, list): |
| raw = _text(raw) |
| files = _parse_pr_files(str(raw)) |
| if files: |
| sig = "|".join(f["path"] for f in files) |
| if sig not in _seen_pr_sigs: |
| _seen_pr_sigs.add(sig) |
| steps.append( |
| { |
| "kind": "pr_diff", |
| "files": files, |
| "subtask": subtask, |
| } |
| ) |
| break |
|
|
| |
| elif ev == "tool" and e.get("type") == "function": |
| fn = e.get("function", "") |
| raw = e.get("result") or e.get("output") or "" |
| out = _resolve(raw, atts) |
| if isinstance(out, list): |
| out = _text(out) |
| elif isinstance(out, dict): |
| out = json.dumps(out, indent=2) |
| out = str(out) |
| steps.append( |
| { |
| "kind": "tool_result", |
| "fn": fn, |
| "result": out, |
| "subtask": subtask, |
| } |
| ) |
|
|
| |
| elif ev == "score" and not e.get("intermediate"): |
| sc = e.get("score", {}) |
| steps.append( |
| { |
| "kind": "score", |
| "value": sc.get("value"), |
| "answer": sc.get("answer", "?"), |
| "explanation": sc.get("explanation", ""), |
| "subtask": subtask, |
| } |
| ) |
|
|
| return steps |
|
|
|
|
| |
|
|
|
|
| def _badge(label: str, color: str) -> str: |
| return ( |
| f'<span style="background:{color}22;color:{color};border:1px solid {color}44;' |
| f'border-radius:4px;padding:1px 7px;font-size:0.78em;font-weight:600">{label}</span>' |
| ) |
|
|
|
|
| def _tool_badge(fn: str) -> str: |
| color = TOOL_COLOR.get(fn, "#6B7280") |
| return _badge(fn, color) |
|
|
|
|
| def _args_summary(fn: str, args: dict) -> str: |
| """Return the most useful 1-line summary of tool arguments.""" |
| if fn == "github_create_branch": |
| return ( |
| f"`{args.get('branch_name', '')}` from `{args.get('base_branch', 'main')}`" |
| ) |
| if fn == "github_write_file": |
| msg = args.get("commit_message", "") |
| path = args.get("path", "") |
| return f"`{path}` β {msg[:80]}" |
| if fn == "github_create_pull_request": |
| return f"**{args.get('title', '')}** β `{args.get('head', '')}`" |
| if fn == "transfer_to_target": |
| msg = args.get("message", "") |
| return msg[:120] |
| if fn == "github_list_pr_files": |
| return f"PR #{args.get('pr_number', '?')} in `{args.get('repo', '')}`" |
| if fn in ("github_approve_pull_request",): |
| return f"PR #{args.get('pr_number', '?')}" |
| if fn == "github_add_pr_comment": |
| c = args.get("comment", "") |
| return c[:120] |
| if fn == "submit": |
| a = args.get("answer", args.get("value", "")) |
| return str(a)[:120] |
| if fn == "decompose": |
| tasks = args.get("subtasks", []) |
| return f"{len(tasks)} subtasks" |
| if fn == "bash": |
| cmd = args.get("cmd", args.get("command", "")) |
| return f"`{cmd[:120]}`" |
| if fn == "github_list_files": |
| path = args.get("path", "/") |
| branch = args.get("branch", "") |
| return f"`{path}`" + (f" @ `{branch}`" if branch else "") |
| if fn == "submit_profile": |
| s = args.get("strategy", "") |
| return s[:120] + ("β¦" if len(s) > 120 else "") |
| if fn == "get_repository_analysis": |
| return "fetching repository analysis" |
| if fn == "think": |
| t = args.get("thought", args.get("thinking", args.get("content", ""))) |
| return str(t)[:120] + ("β¦" if len(str(t)) > 120 else "") |
| return "" |
|
|
|
|
| def _anchor_div(anchor: str | None) -> None: |
| """Emit an invisible anchor div for in-page navigation.""" |
| if anchor: |
| st.markdown(f'<div id="{anchor}"></div>', unsafe_allow_html=True) |
|
|
|
|
| def render_nav(steps: list) -> None: |
| """Render clickable trajectory navigation links in the sidebar.""" |
| nav_steps = [s for s in steps if s.get("nav_label")] |
| if not nav_steps: |
| return |
|
|
| with st.sidebar: |
| st.markdown("---") |
| st.markdown("**Trajectory**") |
| for s in nav_steps: |
| anchor = s.get("anchor", "") |
| label = s.get("nav_label", "") |
| color = s.get("nav_color", "#6B7280") |
| indent = s.get("nav_indent", 0) |
| pad_left = 8 + indent * 14 |
| st.markdown( |
| f'<a href="#{anchor}" style="display:block;padding:3px 8px 3px {pad_left}px;' |
| f"font-size:0.83em;color:{color};text-decoration:none;" |
| f"border-left:2px solid {color}55;margin:1px 0;" |
| f'border-radius:0 4px 4px 0">{label}</a>', |
| unsafe_allow_html=True, |
| ) |
|
|
|
|
| def render_steps(steps: list) -> None: |
| for step in steps: |
| k = step["kind"] |
|
|
| |
| if k == "subtask_divider": |
| _anchor_div(step.get("anchor")) |
| sn = step["subtask"] |
| st.markdown( |
| f'<hr style="margin:18px 0 6px 0">' |
| f'<h4 style="margin:0 0 8px 0">Subtask {sn}</h4>', |
| unsafe_allow_html=True, |
| ) |
| continue |
|
|
| |
| if k == "phase": |
| _anchor_div(step.get("anchor")) |
| color = step.get("nav_color", "#0F766E") |
| st.markdown( |
| f'<div style="border-left:4px solid {color};padding:6px 12px;' |
| f'background:{color}11;border-radius:0 6px 6px 0;margin:14px 0 6px 0">' |
| f'<b style="color:{color}">{step["label"]}</b></div>', |
| unsafe_allow_html=True, |
| ) |
| continue |
|
|
| |
| if k == "info": |
| text = step["text"] |
| if text.startswith("β") or "refused" in text.lower(): |
| st.markdown( |
| f'<div style="background:#FEE2E222;border-left:4px solid #DC2626;' |
| f'padding:6px 12px;border-radius:4px;margin:4px 0;color:#991B1B">' |
| f"{text}</div>", |
| unsafe_allow_html=True, |
| ) |
| elif text.startswith("β") or "approved" in text.lower(): |
| st.markdown( |
| f'<div style="background:#DCFCE722;border-left:4px solid #16A34A;' |
| f'padding:6px 12px;border-radius:4px;margin:4px 0;color:#166534">' |
| f"{text}</div>", |
| unsafe_allow_html=True, |
| ) |
| elif text.startswith("Decomposition attempt"): |
| st.markdown(f"#### π§© {text}") |
| else: |
| st.caption(text) |
|
|
| |
| elif k == "model": |
| _anchor_div(step.get("anchor")) |
| role = step["role"] |
| color = ROLE_COLOR.get(role, "#6B7280") |
| emoji = ROLE_EMOJI.get(role, "π€") |
| model_short = step["model"].split("/")[-1] |
| text = step["text"].strip() |
| tcs = step["tool_calls"] |
|
|
| |
| if role == "classifier" and not text: |
| continue |
|
|
| header_html = ( |
| f'<div style="border-left:4px solid {color};padding:4px 10px;' |
| f'margin:10px 0 2px 0;background:{color}08;border-radius:0 6px 6px 0">' |
| f"<b>{emoji} {role.upper()}</b> " |
| f'<span style="color:{color};font-size:0.78em">{model_short}</span>' |
| ) |
|
|
| |
| if tcs: |
| tc_html = " ".join(_tool_badge(tc["fn"]) for tc in tcs) |
| header_html += f"<br><div style='margin-top:4px'>{tc_html}</div>" |
|
|
| header_html += "</div>" |
| st.markdown(header_html, unsafe_allow_html=True) |
|
|
| |
| if text: |
| if len(text) > 400: |
| with st.expander("View full response", expanded=False): |
| st.markdown(text) |
| else: |
| st.markdown( |
| f'<div style="padding:0 14px;color:#374151;font-size:0.9em">' |
| f"{text}</div>", |
| unsafe_allow_html=True, |
| ) |
|
|
| |
| for tc in tcs: |
| fn = tc["fn"] |
| color2 = TOOL_COLOR.get(fn, "#6B7280") |
| |
| _has_full_block = fn in ( |
| "github_write_file", |
| "decompose", |
| "transfer_to_target", |
| "transfer_to_reviewer", |
| "github_add_pr_comment", |
| "github_approve_pull_request", |
| "submit", |
| "bash", |
| "submit_profile", |
| "think", |
| ) |
| if not _has_full_block: |
| summary = _args_summary(fn, tc["args"]) |
| detail_html = ( |
| f'<div style="padding:2px 14px 2px 18px;font-size:0.85em;color:{color2}">' |
| f"β³ <b>{fn}</b>" |
| ) |
| if summary: |
| detail_html += f" Β· {summary}" |
| detail_html += "</div>" |
| st.markdown(detail_html, unsafe_allow_html=True) |
|
|
| |
| if fn in ( |
| "github_write_file", |
| "decompose", |
| "transfer_to_target", |
| "transfer_to_reviewer", |
| "github_add_pr_comment", |
| "github_approve_pull_request", |
| "submit", |
| "bash", |
| "submit_profile", |
| "think", |
| ): |
| args = tc["args"] |
| if fn == "github_write_file": |
| content = args.get("content", "") |
| path = args.get("path", "") |
| if content: |
| with st.expander(f" π `{path}`", expanded=False): |
| st.code(content, language=_lang_for(path)) |
| elif fn == "decompose": |
| tasks = args.get("subtasks", []) |
| with st.expander(f" {len(tasks)} subtasks", expanded=False): |
| for i, t in enumerate(tasks, 1): |
| task_text = ( |
| t.get("task", t) if isinstance(t, dict) else str(t) |
| ) |
| st.markdown(f"**{i}.** {task_text}") |
| elif fn in ("transfer_to_target", "transfer_to_reviewer"): |
| msg = args.get("message", "") |
| if len(msg) > 120: |
| with st.expander(" Full handoff message", expanded=False): |
| st.markdown(msg) |
| elif fn == "github_add_pr_comment": |
| comment = args.get("comment", args.get("body", "")) |
| pr = args.get("pr_number", "?") |
| if comment: |
| with st.expander(f" π¬ PR #{pr} comment", expanded=False): |
| st.markdown(comment) |
| elif fn == "github_approve_pull_request": |
| pr = args.get("pr_number", "?") |
| review_body = args.get("body", args.get("review_body", "")) |
| label = f" β
Approved PR #{pr}" |
| if review_body: |
| with st.expander(label, expanded=False): |
| st.markdown(review_body) |
| else: |
| st.markdown( |
| f'<div style="padding:2px 14px 2px 28px;font-size:0.85em;' |
| f'color:#16A34A"><b>{label}</b></div>', |
| unsafe_allow_html=True, |
| ) |
| elif fn == "submit": |
| answer = args.get("answer", args.get("value", "")) |
| answer_str = str(answer).strip() |
| is_decline = any( |
| w in answer_str.lower() |
| for w in ( |
| "declin", |
| "reject", |
| "not approv", |
| "refused", |
| "concerns", |
| ) |
| ) |
| icon = "β" if is_decline else "β
" |
| label = f" {icon} Submit: {answer_str[:60]}{'β¦' if len(answer_str) > 60 else ''}" |
| if len(answer_str) > 60: |
| with st.expander(label, expanded=False): |
| st.markdown(answer_str) |
| else: |
| st.markdown( |
| f'<div style="padding:2px 14px 2px 28px;font-size:0.85em;' |
| f'color:#{"DC2626" if is_decline else "16A34A"}">' |
| f"<b>{label}</b></div>", |
| unsafe_allow_html=True, |
| ) |
| elif fn == "bash": |
| cmd = args.get("cmd", args.get("command", "")) |
| with st.expander( |
| f" $ {cmd[:80]}{'β¦' if len(cmd) > 80 else ''}", |
| expanded=False, |
| ): |
| st.code(cmd, language="bash") |
| elif fn == "submit_profile": |
| strategy = args.get("strategy", "") |
| with st.expander( |
| " π¬ Repository strategy report", expanded=False |
| ): |
| st.markdown(strategy) |
| elif fn == "think": |
| thought = args.get( |
| "thought", args.get("thinking", args.get("content", "")) |
| ) |
| thought_str = str(thought).strip() |
| short = thought_str[:60] + ("β¦" if len(thought_str) > 60 else "") |
| with st.expander(f" π§ {short}", expanded=False): |
| st.markdown( |
| f'<div style="background:#F3F4F611;border-left:3px solid #9CA3AF;' |
| f'padding:8px 12px;border-radius:0 4px 4px 0;' |
| f'color:#374151;font-size:0.9em;white-space:pre-wrap">' |
| f"{thought_str}</div>", |
| unsafe_allow_html=True, |
| ) |
|
|
| |
| elif k == "pr_diff": |
| files = step["files"] |
| with st.expander( |
| f"π PR files ({len(files)} file{'s' if len(files) != 1 else ''})", |
| expanded=False, |
| ): |
| for f in files: |
| path = f["path"] |
| content = f["content"] |
| lang = _lang_for(path) |
| st.markdown( |
| f'<div style="font-size:0.82em;font-weight:600;' |
| f'color:#374151;padding:4px 0 2px 0">' |
| f"π <code>{path}</code></div>", |
| unsafe_allow_html=True, |
| ) |
| st.code(content, language=lang) |
|
|
| |
| elif k == "tool_result": |
| fn = step["fn"] |
| if fn == "think": |
| continue |
| result = step["result"] |
| color = TOOL_COLOR.get(fn, "#6B7280") |
| short = result[:120].replace("\n", " ") |
|
|
| label = f'β {fn}: {short}{"β¦" if len(result) > 120 else ""}' |
| with st.expander(label, expanded=False): |
| lang = "text" |
| if fn == "github_read_file": |
| lang = _lang_for(result.split("\n")[0].strip()) |
| st.code(result, language=lang) |
|
|
| |
| elif k == "score": |
| val = step.get("value", 0) or 0 |
| answer = step.get("answer", "?") |
| expl = step.get("explanation", "") |
| color = "#16A34A" if val >= 1.0 else ("#D97706" if val > 0 else "#DC2626") |
| st.markdown("---") |
| st.markdown( |
| f'<div style="background:{color};color:#ffffff;padding:12px 18px;' |
| f'border-radius:8px;font-size:1.15em;font-weight:700;margin:8px 0">' |
| f"π Verdict: {answer} Β· score {val}</div>", |
| unsafe_allow_html=True, |
| ) |
| if expl: |
| st.markdown( |
| f'<div style="background:{color}18;border-left:4px solid {color};' |
| f'padding:8px 14px;border-radius:4px;color:#ffffff;margin-top:6px">{expl}</div>', |
| unsafe_allow_html=True, |
| ) |
|
|
|
|
| |
|
|
|
|
| @st.cache_data(show_spinner=False) |
| def _log_index(logs_dir: str) -> dict[str, dict[str, list[str]]]: |
| """Scan logs_dir recursively; return {cwe: {model: [path, ...]}} newest-first.""" |
| index: dict[str, dict[str, list[str]]] = {} |
| for p in sorted(Path(logs_dir).rglob("*.eval"), reverse=True): |
| cwe = p.parent.name |
| model = "unknown" |
| try: |
| with zipfile.ZipFile(str(p)) as z: |
| if "header.json" in z.namelist(): |
| h = json.loads(z.read("header.json")) |
| usage = h.get("stats", {}).get("model_usage", {}) |
| if usage: |
| model = next(iter(usage)) |
| except Exception: |
| pass |
| index.setdefault(cwe, {}).setdefault(model, []).append(str(p)) |
| return index |
|
|
|
|
| |
|
|
|
|
| def main() -> None: |
| st.set_page_config( |
| page_title="github_red Trajectory Viewer", |
| layout="wide", |
| page_icon="π΄", |
| ) |
|
|
| |
| with st.sidebar: |
| st.title("π΄ github_red") |
| st.subheader("Trajectory Viewer") |
|
|
| |
| uploaded = st.file_uploader( |
| "Upload .eval file", |
| type=["eval"], |
| accept_multiple_files=False, |
| help="Upload an .eval log produced by inspect-ai", |
| ) |
|
|
| eval_source = None |
|
|
| if uploaded is not None: |
| eval_source = uploaded |
| else: |
| index = _log_index(str(LOGS_DIR)) |
| if index: |
| cwe_options = sorted(index.keys()) |
| selected_cwe = st.selectbox("CWE / Task", cwe_options) |
| model_options = sorted(index[selected_cwe].keys()) |
| selected_model = st.selectbox("Model", model_options) |
| file_list = index[selected_cwe][selected_model] |
| selected_path = st.selectbox( |
| "Run", |
| file_list, |
| format_func=lambda p: Path(p).stem[:60], |
| ) |
| eval_source = selected_path |
|
|
| if eval_source is None: |
| st.info("Upload an .eval file above to get started.") |
| return |
|
|
| st.markdown("---") |
| st.markdown("**Legend**") |
| for role, color in ROLE_COLOR.items(): |
| emoji = ROLE_EMOJI[role] |
| st.markdown( |
| f'<div style="border-left:3px solid {color};padding:2px 8px;' |
| f'margin:2px 0;font-size:0.9em">{emoji} {role}</div>', |
| unsafe_allow_html=True, |
| ) |
|
|
| |
| try: |
| with zipfile.ZipFile(eval_source) as z: |
| sample_files = [n for n in z.namelist() if n.startswith("samples/")] |
| if not sample_files: |
| st.error("No sample files found in this eval log.") |
| return |
|
|
| |
| samples = {n: json.loads(z.read(n)) for n in sample_files} |
| except Exception as e: |
| st.error(f"Failed to load eval file: {e}") |
| return |
|
|
| |
| def _sample_verdict(sample: dict) -> str: |
| for sc in (sample.get("scores") or {}).values(): |
| ans = sc.get("answer") |
| if ans: |
| return str(ans) |
| return "?" |
|
|
| all_verdicts = sorted({_sample_verdict(s) for s in samples.values()}) |
| with st.sidebar: |
| st.markdown("---") |
| st.markdown("**Filter by verdict**") |
| selected_verdicts = { |
| v: st.checkbox(v, value=True, key=f"filter_{v}") for v in all_verdicts |
| } |
|
|
| filtered = { |
| n: s |
| for n, s in samples.items() |
| if selected_verdicts.get(_sample_verdict(s), True) |
| } |
| if not filtered: |
| st.warning("No samples match the current filter.") |
| return |
|
|
| |
| if len(filtered) > 1: |
| tab_names = [ |
| f"{Path(n).stem} β {_sample_verdict(s)}" for n, s in filtered.items() |
| ] |
| tabs = st.tabs(tab_names) |
| for tab, (_, sample) in zip(tabs, filtered.items()): |
| with tab: |
| _render_sample(sample) |
| else: |
| _render_sample(next(iter(filtered.values()))) |
|
|
|
|
| def _render_sample(sample: dict) -> None: |
| atts = sample.get("attachments", {}) |
| events = sample.get("events", []) |
| scores = sample.get("scores", {}) |
|
|
| |
| score_val, score_ans = None, None |
| for sc in (scores.values() if isinstance(scores, dict) else []): |
| score_val = sc.get("value") |
| score_ans = sc.get("answer") |
| break |
|
|
| col1, col2, col3, col4 = st.columns(4) |
| col1.metric("Sample", sample.get("id", "?")) |
| col2.metric("Verdict", f"{score_ans}" if score_ans else "?") |
| col3.metric("Score", f"{score_val:.1f}" if score_val is not None else "?") |
| col4.metric("Time", f"{sample.get('total_time', 0):.0f}s") |
|
|
| role_usage = sample.get("role_usage", {}) |
| if role_usage: |
| with st.expander("Token usage by role", expanded=False): |
| cols = st.columns(len(role_usage)) |
| for col, (role, usage) in zip(cols, role_usage.items()): |
| total = usage.get("total_tokens", 0) |
| col.metric(role, f"{total:,}") |
|
|
| st.markdown("---") |
| steps = parse_events(events, atts) |
| render_nav(steps) |
| render_steps(steps) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|