rufimelo's picture
Add CWE/model sidebar selectors + scan logs/ subdirs
af55c58 verified
"""Streamlit trajectory viewer for github_red .eval log files.
Run with:
streamlit run scripts/trajectory_viewer.py
"""
from __future__ import annotations
import glob
import json
import re
import zipfile
from pathlib import Path
import streamlit as st
# Local: scripts/trajectory_viewer.py β†’ repo_root/logs
# Space: /app/app.py β†’ /app/logs
_local_logs = Path(__file__).parent.parent / "logs"
_space_logs = Path(__file__).parent / "logs"
LOGS_DIR = _space_logs if _space_logs.exists() else _local_logs
# ── Colours & labels ────────────────────────────────────────────────────────
ROLE_COLOR = {
"profiler": "#0F766E",
"decomposer": "#7C3AED",
"executor": "#2563EB",
"target": "#DC2626",
"reviewer": "#DC2626",
"auditor": "#B45309",
"classifier": "#D97706",
}
ROLE_EMOJI = {
"profiler": "πŸ”¬",
"decomposer": "🧩",
"executor": "βš™οΈ",
"target": "πŸ›‘οΈ",
"reviewer": "πŸ”Ž",
"auditor": "🎯",
"classifier": "πŸ”",
}
TOOL_COLOR = {
"github_create_branch": "#0891B2",
"github_write_file": "#0891B2",
"github_create_pull_request": "#0891B2",
"transfer_to_target": "#7C3AED",
"github_list_pr_files": "#DC2626",
"github_read_file": "#DC2626",
"github_approve_pull_request": "#16A34A",
"github_add_pr_comment": "#DC2626",
"github_list_files": "#0284C7",
"bash": "#854D0E",
"submit": "#374151",
"decompose": "#7C3AED",
"think": "#9CA3AF",
"submit_profile": "#0F766E",
"get_repository_analysis": "#0F766E",
}
# ── Helpers ──────────────────────────────────────────────────────────────────
def _resolve(val: object, atts: dict) -> object:
if isinstance(val, str) and val.startswith("attachment://"):
key = val[len("attachment://") :]
resolved = atts.get(key, val)
return resolved if isinstance(resolved, (str, dict, list)) else val
if isinstance(val, dict):
return {k: _resolve(v, atts) for k, v in val.items()}
if isinstance(val, list):
return [_resolve(v, atts) for v in val]
return val
def _text(content: object) -> str:
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for p in content:
if isinstance(p, dict) and p.get("type") == "text":
parts.append(p.get("text", ""))
elif isinstance(p, str):
parts.append(p)
return "".join(parts)
return ""
def _parse_args(raw: object) -> dict:
if isinstance(raw, str):
try:
return json.loads(raw)
except Exception:
return {"raw": raw}
return raw if isinstance(raw, dict) else {}
# ── Event parsing ─────────────────────────────────────────────────────────────
def _parse_pr_files(text: str) -> list[dict]:
"""Parse github_list_pr_files output into a list of {path, content} dicts.
The format is:
=== path/to/file.py ===
<file content>
=== another/file.yml ===
<file content>
"""
files = []
# Split on === ... === headers
parts = re.split(r"^=== (.+?) ===$", text, flags=re.MULTILINE)
# parts = ["preamble", "path1", "content1", "path2", "content2", ...]
it = iter(parts[1:]) # skip preamble
for path in it:
content = next(it, "").strip()
if path.strip():
files.append({"path": path.strip(), "content": content})
return files
def _lang_for(path: str) -> str:
"""Return a Streamlit/Pygments language hint for syntax highlighting."""
ext = path.rsplit(".", 1)[-1].lower() if "." in path else ""
return {
"py": "python",
"yml": "yaml",
"yaml": "yaml",
"sh": "bash",
"bash": "bash",
"json": "json",
"js": "javascript",
"ts": "typescript",
"md": "markdown",
"txt": "text",
"toml": "toml",
"dockerfile": "dockerfile",
"tf": "hcl",
}.get(ext, "text")
def parse_events(events: list, atts: dict) -> list:
"""Return a flat list of structured step dicts."""
steps = []
subtask = 0
_seen_pr_sigs: set[str] = set()
_profiler_phase_emitted = False
_decomp_phase_emitted = False
# Track first appearance of each role per subtask for nav anchors
_seen_role_in_subtask: set[tuple] = set()
_anchor_counter = 0
def _next_anchor() -> str:
nonlocal _anchor_counter
_anchor_counter += 1
return f"step-{_anchor_counter}"
for e in events:
ev = e.get("event", "")
# ── Info banners ──────────────────────────────────────────────
if ev == "info":
raw = e.get("data", "")
text = _resolve(raw, atts) if isinstance(raw, str) else str(raw)
if not isinstance(text, str):
text = json.dumps(text)
# Detect subtask transitions
m = re.search(r"Subtask (\d+)", text)
if m and ("context for" in text or "Starting" in text):
subtask = int(m.group(1))
anchor = f"subtask-{subtask}"
steps.append(
{
"kind": "subtask_divider",
"subtask": subtask,
"anchor": anchor,
"nav_label": f"Subtask {subtask}",
"nav_color": "#2563EB",
"nav_indent": 0,
}
)
continue
# Suppress noisy attachment / memory lines (but keep profiler banners)
if text.startswith("attachment://"):
continue
if text.startswith("●") and "[profiler]" not in text:
continue
if "=================" in text:
continue
steps.append({"kind": "info", "text": text, "subtask": subtask})
# ── Model turns ───────────────────────────────────────────────
elif ev == "model":
role = e.get("role", "unknown")
# Emit a one-time phase divider when the profiler starts
if role == "profiler" and not _profiler_phase_emitted:
_profiler_phase_emitted = True
steps.append(
{
"kind": "phase",
"label": "πŸ”¬ Profiler Phase",
"subtask": 0,
"anchor": "phase-profiler",
"nav_label": "πŸ”¬ Profiler",
"nav_color": "#0F766E",
"nav_indent": 0,
}
)
if role == "decomposer" and not _decomp_phase_emitted:
_decomp_phase_emitted = True
steps.append(
{
"kind": "phase",
"label": "🧩 Decomposition Phase",
"subtask": 0,
"anchor": "phase-decomposer",
"nav_label": "🧩 Decomposer",
"nav_color": "#7C3AED",
"nav_indent": 0,
}
)
model = e.get("model", "")
out = e.get("output") or {}
choices = out.get("choices", [])
msg = choices[0].get("message", {}) if choices else {}
text = _text(msg.get("content", ""))
raw_tcs = msg.get("tool_calls") or []
tool_calls = []
for tc in raw_tcs:
fn = tc.get("function", "")
args = _resolve(_parse_args(tc.get("arguments", {})), atts)
tool_calls.append({"fn": fn, "args": args})
# Assign nav anchor on first appearance of executor/reviewer per subtask
anchor = None
nav_label = None
nav_color = None
nav_indent = None
role_key = (role, subtask)
if (
role in ("executor", "reviewer", "target", "auditor")
and role_key not in _seen_role_in_subtask
):
_seen_role_in_subtask.add(role_key)
anchor = _next_anchor()
emoji = ROLE_EMOJI.get(role, "πŸ€–")
suffix = f" (subtask {subtask})" if subtask else ""
nav_label = f"{emoji} {role.capitalize()}{suffix}"
nav_color = ROLE_COLOR.get(role, "#6B7280")
nav_indent = 1
step: dict = {
"kind": "model",
"role": role,
"model": model,
"text": text,
"tool_calls": tool_calls,
"subtask": subtask,
}
if anchor:
step["anchor"] = anchor
step["nav_label"] = nav_label
step["nav_color"] = nav_color
step["nav_indent"] = nav_indent
steps.append(step)
# ── PR file contents seen by reviewer/target ───────────────
# Parse github_list_pr_files results from this model's input messages
if role in ("target", "reviewer"):
inp = e.get("input", [])
for m in inp:
if m.get("role") != "tool":
continue
fn = m.get("function") or m.get("name", "")
if fn != "github_list_pr_files":
continue
raw = m.get("content", "") or ""
raw = _resolve(raw, atts) if isinstance(raw, str) else str(raw)
if isinstance(raw, list):
raw = _text(raw)
files = _parse_pr_files(str(raw))
if files:
sig = "|".join(f["path"] for f in files)
if sig not in _seen_pr_sigs:
_seen_pr_sigs.add(sig)
steps.append(
{
"kind": "pr_diff",
"files": files,
"subtask": subtask,
}
)
break # only emit once per model call
# ── Tool results ──────────────────────────────────────────────
elif ev == "tool" and e.get("type") == "function":
fn = e.get("function", "")
raw = e.get("result") or e.get("output") or ""
out = _resolve(raw, atts)
if isinstance(out, list):
out = _text(out)
elif isinstance(out, dict):
out = json.dumps(out, indent=2)
out = str(out)
steps.append(
{
"kind": "tool_result",
"fn": fn,
"result": out,
"subtask": subtask,
}
)
# ── Final score ───────────────────────────────────────────────
elif ev == "score" and not e.get("intermediate"):
sc = e.get("score", {})
steps.append(
{
"kind": "score",
"value": sc.get("value"),
"answer": sc.get("answer", "?"),
"explanation": sc.get("explanation", ""),
"subtask": subtask,
}
)
return steps
# ── Rendering ─────────────────────────────────────────────────────────────────
def _badge(label: str, color: str) -> str:
return (
f'<span style="background:{color}22;color:{color};border:1px solid {color}44;'
f'border-radius:4px;padding:1px 7px;font-size:0.78em;font-weight:600">{label}</span>'
)
def _tool_badge(fn: str) -> str:
color = TOOL_COLOR.get(fn, "#6B7280")
return _badge(fn, color)
def _args_summary(fn: str, args: dict) -> str:
"""Return the most useful 1-line summary of tool arguments."""
if fn == "github_create_branch":
return (
f"`{args.get('branch_name', '')}` from `{args.get('base_branch', 'main')}`"
)
if fn == "github_write_file":
msg = args.get("commit_message", "")
path = args.get("path", "")
return f"`{path}` β€” {msg[:80]}"
if fn == "github_create_pull_request":
return f"**{args.get('title', '')}** β†’ `{args.get('head', '')}`"
if fn == "transfer_to_target":
msg = args.get("message", "")
return msg[:120]
if fn == "github_list_pr_files":
return f"PR #{args.get('pr_number', '?')} in `{args.get('repo', '')}`"
if fn in ("github_approve_pull_request",):
return f"PR #{args.get('pr_number', '?')}"
if fn == "github_add_pr_comment":
c = args.get("comment", "")
return c[:120]
if fn == "submit":
a = args.get("answer", args.get("value", ""))
return str(a)[:120]
if fn == "decompose":
tasks = args.get("subtasks", [])
return f"{len(tasks)} subtasks"
if fn == "bash":
cmd = args.get("cmd", args.get("command", ""))
return f"`{cmd[:120]}`"
if fn == "github_list_files":
path = args.get("path", "/")
branch = args.get("branch", "")
return f"`{path}`" + (f" @ `{branch}`" if branch else "")
if fn == "submit_profile":
s = args.get("strategy", "")
return s[:120] + ("…" if len(s) > 120 else "")
if fn == "get_repository_analysis":
return "fetching repository analysis"
if fn == "think":
t = args.get("thought", args.get("thinking", args.get("content", "")))
return str(t)[:120] + ("…" if len(str(t)) > 120 else "")
return ""
def _anchor_div(anchor: str | None) -> None:
"""Emit an invisible anchor div for in-page navigation."""
if anchor:
st.markdown(f'<div id="{anchor}"></div>', unsafe_allow_html=True)
def render_nav(steps: list) -> None:
"""Render clickable trajectory navigation links in the sidebar."""
nav_steps = [s for s in steps if s.get("nav_label")]
if not nav_steps:
return
with st.sidebar:
st.markdown("---")
st.markdown("**Trajectory**")
for s in nav_steps:
anchor = s.get("anchor", "")
label = s.get("nav_label", "")
color = s.get("nav_color", "#6B7280")
indent = s.get("nav_indent", 0)
pad_left = 8 + indent * 14
st.markdown(
f'<a href="#{anchor}" style="display:block;padding:3px 8px 3px {pad_left}px;'
f"font-size:0.83em;color:{color};text-decoration:none;"
f"border-left:2px solid {color}55;margin:1px 0;"
f'border-radius:0 4px 4px 0">{label}</a>',
unsafe_allow_html=True,
)
def render_steps(steps: list) -> None:
for step in steps:
k = step["kind"]
# ── Subtask divider ───────────────────────────────────────────
if k == "subtask_divider":
_anchor_div(step.get("anchor"))
sn = step["subtask"]
st.markdown(
f'<hr style="margin:18px 0 6px 0">'
f'<h4 style="margin:0 0 8px 0">Subtask {sn}</h4>',
unsafe_allow_html=True,
)
continue
# ── Phase divider ─────────────────────────────────────────────
if k == "phase":
_anchor_div(step.get("anchor"))
color = step.get("nav_color", "#0F766E")
st.markdown(
f'<div style="border-left:4px solid {color};padding:6px 12px;'
f'background:{color}11;border-radius:0 6px 6px 0;margin:14px 0 6px 0">'
f'<b style="color:{color}">{step["label"]}</b></div>',
unsafe_allow_html=True,
)
continue
# ── Info banner ───────────────────────────────────────────────
if k == "info":
text = step["text"]
if text.startswith("βœ—") or "refused" in text.lower():
st.markdown(
f'<div style="background:#FEE2E222;border-left:4px solid #DC2626;'
f'padding:6px 12px;border-radius:4px;margin:4px 0;color:#991B1B">'
f"{text}</div>",
unsafe_allow_html=True,
)
elif text.startswith("βœ“") or "approved" in text.lower():
st.markdown(
f'<div style="background:#DCFCE722;border-left:4px solid #16A34A;'
f'padding:6px 12px;border-radius:4px;margin:4px 0;color:#166534">'
f"{text}</div>",
unsafe_allow_html=True,
)
elif text.startswith("Decomposition attempt"):
st.markdown(f"#### 🧩 {text}")
else:
st.caption(text)
# ── Model turn ────────────────────────────────────────────────
elif k == "model":
_anchor_div(step.get("anchor"))
role = step["role"]
color = ROLE_COLOR.get(role, "#6B7280")
emoji = ROLE_EMOJI.get(role, "πŸ€–")
model_short = step["model"].split("/")[-1]
text = step["text"].strip()
tcs = step["tool_calls"]
# Skip classifier turns (not very interesting)
if role == "classifier" and not text:
continue
header_html = (
f'<div style="border-left:4px solid {color};padding:4px 10px;'
f'margin:10px 0 2px 0;background:{color}08;border-radius:0 6px 6px 0">'
f"<b>{emoji} {role.upper()}</b>&nbsp;&nbsp;"
f'<span style="color:{color};font-size:0.78em">{model_short}</span>'
)
# Tool call summary inline in header
if tcs:
tc_html = " ".join(_tool_badge(tc["fn"]) for tc in tcs)
header_html += f"<br><div style='margin-top:4px'>{tc_html}</div>"
header_html += "</div>"
st.markdown(header_html, unsafe_allow_html=True)
# Reasoning / response text
if text:
if len(text) > 400:
with st.expander("View full response", expanded=False):
st.markdown(text)
else:
st.markdown(
f'<div style="padding:0 14px;color:#374151;font-size:0.9em">'
f"{text}</div>",
unsafe_allow_html=True,
)
# Tool call detail
for tc in tcs:
fn = tc["fn"]
color2 = TOOL_COLOR.get(fn, "#6B7280")
# Skip the one-liner summary for tools that render their own full block
_has_full_block = fn in (
"github_write_file",
"decompose",
"transfer_to_target",
"transfer_to_reviewer",
"github_add_pr_comment",
"github_approve_pull_request",
"submit",
"bash",
"submit_profile",
"think",
)
if not _has_full_block:
summary = _args_summary(fn, tc["args"])
detail_html = (
f'<div style="padding:2px 14px 2px 18px;font-size:0.85em;color:{color2}">'
f"↳ <b>{fn}</b>"
)
if summary:
detail_html += f" &nbsp;Β·&nbsp; {summary}"
detail_html += "</div>"
st.markdown(detail_html, unsafe_allow_html=True)
# Show full args for interesting tools on demand
if fn in (
"github_write_file",
"decompose",
"transfer_to_target",
"transfer_to_reviewer",
"github_add_pr_comment",
"github_approve_pull_request",
"submit",
"bash",
"submit_profile",
"think",
):
args = tc["args"]
if fn == "github_write_file":
content = args.get("content", "")
path = args.get("path", "")
if content:
with st.expander(f" πŸ“„ `{path}`", expanded=False):
st.code(content, language=_lang_for(path))
elif fn == "decompose":
tasks = args.get("subtasks", [])
with st.expander(f" {len(tasks)} subtasks", expanded=False):
for i, t in enumerate(tasks, 1):
task_text = (
t.get("task", t) if isinstance(t, dict) else str(t)
)
st.markdown(f"**{i}.** {task_text}")
elif fn in ("transfer_to_target", "transfer_to_reviewer"):
msg = args.get("message", "")
if len(msg) > 120:
with st.expander(" Full handoff message", expanded=False):
st.markdown(msg)
elif fn == "github_add_pr_comment":
comment = args.get("comment", args.get("body", ""))
pr = args.get("pr_number", "?")
if comment:
with st.expander(f" πŸ’¬ PR #{pr} comment", expanded=False):
st.markdown(comment)
elif fn == "github_approve_pull_request":
pr = args.get("pr_number", "?")
review_body = args.get("body", args.get("review_body", ""))
label = f" βœ… Approved PR #{pr}"
if review_body:
with st.expander(label, expanded=False):
st.markdown(review_body)
else:
st.markdown(
f'<div style="padding:2px 14px 2px 28px;font-size:0.85em;'
f'color:#16A34A"><b>{label}</b></div>',
unsafe_allow_html=True,
)
elif fn == "submit":
answer = args.get("answer", args.get("value", ""))
answer_str = str(answer).strip()
is_decline = any(
w in answer_str.lower()
for w in (
"declin",
"reject",
"not approv",
"refused",
"concerns",
)
)
icon = "❌" if is_decline else "βœ…"
label = f" {icon} Submit: {answer_str[:60]}{'…' if len(answer_str) > 60 else ''}"
if len(answer_str) > 60:
with st.expander(label, expanded=False):
st.markdown(answer_str)
else:
st.markdown(
f'<div style="padding:2px 14px 2px 28px;font-size:0.85em;'
f'color:#{"DC2626" if is_decline else "16A34A"}">'
f"<b>{label}</b></div>",
unsafe_allow_html=True,
)
elif fn == "bash":
cmd = args.get("cmd", args.get("command", ""))
with st.expander(
f" $ {cmd[:80]}{'…' if len(cmd) > 80 else ''}",
expanded=False,
):
st.code(cmd, language="bash")
elif fn == "submit_profile":
strategy = args.get("strategy", "")
with st.expander(
" πŸ”¬ Repository strategy report", expanded=False
):
st.markdown(strategy)
elif fn == "think":
thought = args.get(
"thought", args.get("thinking", args.get("content", ""))
)
thought_str = str(thought).strip()
short = thought_str[:60] + ("…" if len(thought_str) > 60 else "")
with st.expander(f" 🧠 {short}", expanded=False):
st.markdown(
f'<div style="background:#F3F4F611;border-left:3px solid #9CA3AF;'
f'padding:8px 12px;border-radius:0 4px 4px 0;'
f'color:#374151;font-size:0.9em;white-space:pre-wrap">'
f"{thought_str}</div>",
unsafe_allow_html=True,
)
# ── PR file diff ──────────────────────────────────────────────
elif k == "pr_diff":
files = step["files"]
with st.expander(
f"πŸ“‚ PR files ({len(files)} file{'s' if len(files) != 1 else ''})",
expanded=False,
):
for f in files:
path = f["path"]
content = f["content"]
lang = _lang_for(path)
st.markdown(
f'<div style="font-size:0.82em;font-weight:600;'
f'color:#374151;padding:4px 0 2px 0">'
f"πŸ“„ <code>{path}</code></div>",
unsafe_allow_html=True,
)
st.code(content, language=lang)
# ── Tool result ───────────────────────────────────────────────
elif k == "tool_result":
fn = step["fn"]
if fn == "think":
continue # thought content already shown in the tool call block
result = step["result"]
color = TOOL_COLOR.get(fn, "#6B7280")
short = result[:120].replace("\n", " ")
label = f'← {fn}: {short}{"…" if len(result) > 120 else ""}'
with st.expander(label, expanded=False):
lang = "text"
if fn == "github_read_file":
lang = _lang_for(result.split("\n")[0].strip())
st.code(result, language=lang)
# ── Final score ───────────────────────────────────────────────
elif k == "score":
val = step.get("value", 0) or 0
answer = step.get("answer", "?")
expl = step.get("explanation", "")
color = "#16A34A" if val >= 1.0 else ("#D97706" if val > 0 else "#DC2626")
st.markdown("---")
st.markdown(
f'<div style="background:{color};color:#ffffff;padding:12px 18px;'
f'border-radius:8px;font-size:1.15em;font-weight:700;margin:8px 0">'
f"🏁 Verdict: {answer} &nbsp;·&nbsp; score {val}</div>",
unsafe_allow_html=True,
)
if expl:
st.markdown(
f'<div style="background:{color}18;border-left:4px solid {color};'
f'padding:8px 14px;border-radius:4px;color:#ffffff;margin-top:6px">{expl}</div>',
unsafe_allow_html=True,
)
# ── Log index ─────────────────────────────────────────────────────────────────
@st.cache_data(show_spinner=False)
def _log_index(logs_dir: str) -> dict[str, dict[str, list[str]]]:
"""Scan logs_dir recursively; return {cwe: {model: [path, ...]}} newest-first."""
index: dict[str, dict[str, list[str]]] = {}
for p in sorted(Path(logs_dir).rglob("*.eval"), reverse=True):
cwe = p.parent.name # directory name used as CWE/task label
model = "unknown"
try:
with zipfile.ZipFile(str(p)) as z:
if "header.json" in z.namelist():
h = json.loads(z.read("header.json"))
usage = h.get("stats", {}).get("model_usage", {})
if usage:
model = next(iter(usage))
except Exception:
pass
index.setdefault(cwe, {}).setdefault(model, []).append(str(p))
return index
# ── Main app ──────────────────────────────────────────────────────────────────
def main() -> None:
st.set_page_config(
page_title="github_red Trajectory Viewer",
layout="wide",
page_icon="πŸ”΄",
)
# ── Sidebar ───────────────────────────────────────────────────────
with st.sidebar:
st.title("πŸ”΄ github_red")
st.subheader("Trajectory Viewer")
# File uploader takes priority; fall back to local disk files
uploaded = st.file_uploader(
"Upload .eval file",
type=["eval"],
accept_multiple_files=False,
help="Upload an .eval log produced by inspect-ai",
)
eval_source = None # will be a file-like or path string
if uploaded is not None:
eval_source = uploaded
else:
index = _log_index(str(LOGS_DIR))
if index:
cwe_options = sorted(index.keys())
selected_cwe = st.selectbox("CWE / Task", cwe_options)
model_options = sorted(index[selected_cwe].keys())
selected_model = st.selectbox("Model", model_options)
file_list = index[selected_cwe][selected_model]
selected_path = st.selectbox(
"Run",
file_list,
format_func=lambda p: Path(p).stem[:60],
)
eval_source = selected_path
if eval_source is None:
st.info("Upload an .eval file above to get started.")
return
st.markdown("---")
st.markdown("**Legend**")
for role, color in ROLE_COLOR.items():
emoji = ROLE_EMOJI[role]
st.markdown(
f'<div style="border-left:3px solid {color};padding:2px 8px;'
f'margin:2px 0;font-size:0.9em">{emoji} {role}</div>',
unsafe_allow_html=True,
)
# ── Load ──────────────────────────────────────────────────────────
try:
with zipfile.ZipFile(eval_source) as z:
sample_files = [n for n in z.namelist() if n.startswith("samples/")]
if not sample_files:
st.error("No sample files found in this eval log.")
return
# Support multiple samples (tabs)
samples = {n: json.loads(z.read(n)) for n in sample_files}
except Exception as e:
st.error(f"Failed to load eval file: {e}")
return
# ── Verdict filter ────────────────────────────────────────────────
def _sample_verdict(sample: dict) -> str:
for sc in (sample.get("scores") or {}).values():
ans = sc.get("answer")
if ans:
return str(ans)
return "?"
all_verdicts = sorted({_sample_verdict(s) for s in samples.values()})
with st.sidebar:
st.markdown("---")
st.markdown("**Filter by verdict**")
selected_verdicts = {
v: st.checkbox(v, value=True, key=f"filter_{v}") for v in all_verdicts
}
filtered = {
n: s
for n, s in samples.items()
if selected_verdicts.get(_sample_verdict(s), True)
}
if not filtered:
st.warning("No samples match the current filter.")
return
# ── Header metrics ────────────────────────────────────────────────
if len(filtered) > 1:
tab_names = [
f"{Path(n).stem} β€” {_sample_verdict(s)}" for n, s in filtered.items()
]
tabs = st.tabs(tab_names)
for tab, (_, sample) in zip(tabs, filtered.items()):
with tab:
_render_sample(sample)
else:
_render_sample(next(iter(filtered.values())))
def _render_sample(sample: dict) -> None:
atts = sample.get("attachments", {})
events = sample.get("events", [])
scores = sample.get("scores", {})
# Metrics row
score_val, score_ans = None, None
for sc in (scores.values() if isinstance(scores, dict) else []):
score_val = sc.get("value")
score_ans = sc.get("answer")
break
col1, col2, col3, col4 = st.columns(4)
col1.metric("Sample", sample.get("id", "?"))
col2.metric("Verdict", f"{score_ans}" if score_ans else "?")
col3.metric("Score", f"{score_val:.1f}" if score_val is not None else "?")
col4.metric("Time", f"{sample.get('total_time', 0):.0f}s")
role_usage = sample.get("role_usage", {})
if role_usage:
with st.expander("Token usage by role", expanded=False):
cols = st.columns(len(role_usage))
for col, (role, usage) in zip(cols, role_usage.items()):
total = usage.get("total_tokens", 0)
col.metric(role, f"{total:,}")
st.markdown("---")
steps = parse_events(events, atts)
render_nav(steps)
render_steps(steps)
if __name__ == "__main__":
main()