Codeseys's picture
Wave 19: production-grade SDPO via ComposerDataCollator + adapter + collator fixes
03bf323
"""Tests for composer_replication.ingestion.trace_examples (Wave 19).
Pins the contract that:
1. ClaudeCodeIngester output → claude_states_to_trace_examples → list[TraceExample]
2. Tool errors in source JSONL (`is_error: true`) survive the ingester's
[TOOL_RESULT (ERROR)] tag → are detected by the adapter → mark the
subsequent assistant turn with tool_error
3. The default error classifier categorizes common error kinds
4. The output is a valid input to ComposerDataCollator with hint_generator
"""
from __future__ import annotations
from pathlib import Path
import pytest
from composer_replication.ingestion import (
ClaudeCodeIngester,
TOOL_ERROR_TAG,
claude_states_to_trace_examples,
default_classify_error,
)
HERE = Path(__file__).resolve().parent
FIXTURE_DIR = HERE.parent.parent.parent / "spikes" / "007-real-trace-ingestion" / "fixtures"
ERROR_FIXTURE = FIXTURE_DIR / "synthetic_session_with_error.jsonl"
OK_FIXTURE = FIXTURE_DIR / "synthetic_session.jsonl"
# ----------------------------------------------------------------------
# Error classifier
# ----------------------------------------------------------------------
def test_classify_file_not_found():
assert default_classify_error(
"Error: File does not exist: /etc/foo.yaml"
) == "file_not_found"
assert default_classify_error(
"no such file or directory: /tmp/x"
) == "file_not_found"
def test_classify_permission_denied():
assert default_classify_error("Permission denied") == "permission_denied"
def test_classify_command_not_found():
assert default_classify_error("bash: foo: command not found") == "command_not_found"
def test_classify_unknown_falls_back():
assert default_classify_error("something weird went wrong") == "tool_error"
# ----------------------------------------------------------------------
# Adapter — happy path with error site
# ----------------------------------------------------------------------
def test_adapter_emits_one_example_per_state():
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(ERROR_FIXTURE))
examples = claude_states_to_trace_examples(states)
assert len(examples) == len(states)
def test_adapter_detects_tool_error_on_recovery_turn():
"""The assistant turn IMMEDIATELY AFTER a [TOOL_RESULT (ERROR)] user
turn must be marked with tool_error. Earlier assistant turns (before
any error) and assistant turns separated from the error by a
successful tool result must NOT be marked."""
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(ERROR_FIXTURE))
examples = claude_states_to_trace_examples(states)
# Find the example with at least one error turn
error_examples = [
ex for ex in examples
if any(t.get("tool_error") for t in ex["turns"])
]
assert error_examples, (
f"Expected ≥1 example with a tool_error turn; got {len(error_examples)}. "
f"Per-example error turns: {[(ex['trace_id'], sum(1 for t in ex['turns'] if t.get('tool_error'))) for ex in examples]}"
)
# The error fixture has one error site; one of the late states should have exactly 1 error turn
err_counts = [
sum(1 for t in ex["turns"] if t.get("tool_error"))
for ex in examples
]
assert max(err_counts) == 1, (
f"Expected exactly 1 error turn in some state; counts: {err_counts}"
)
def test_adapter_classifies_file_not_found_in_fixture():
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(ERROR_FIXTURE))
examples = claude_states_to_trace_examples(states)
error_turns = [t for ex in examples for t in ex["turns"] if t.get("tool_error")]
assert any(t["tool_error"] == "file_not_found" for t in error_turns), (
f"Expected 'file_not_found' classification on the fixture's "
f"non-existent-config error; got: "
f"{[t['tool_error'] for t in error_turns]}"
)
def test_adapter_no_errors_on_clean_fixture():
"""The original Spike 007 fixture has no is_error: true rows, so no
error turns should be detected."""
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(OK_FIXTURE))
examples = claude_states_to_trace_examples(states)
err_turns = [t for ex in examples for t in ex["turns"] if t.get("tool_error")]
assert not err_turns, (
f"Clean fixture should have 0 error turns; got "
f"{len(err_turns)}: {[t['tool_error'] for t in err_turns]}"
)
def test_adapter_preserves_role_and_content():
"""Every output turn should have role + content from the input messages."""
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(ERROR_FIXTURE))
examples = claude_states_to_trace_examples(states)
for ex in examples:
for turn in ex["turns"]:
assert "role" in turn
assert "content" in turn
assert turn["role"] in ("system", "user", "assistant", "tool")
def test_adapter_custom_error_kind_fn():
"""User-provided error_kind_fn should override default classification."""
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(ERROR_FIXTURE))
def custom_kind(content: str) -> str:
return "custom_kind"
examples = claude_states_to_trace_examples(states, error_kind_fn=custom_kind)
error_turns = [t for ex in examples for t in ex["turns"] if t.get("tool_error")]
assert all(t["tool_error"] == "custom_kind" for t in error_turns)
def test_adapter_threads_final_reward():
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(ERROR_FIXTURE))
examples = claude_states_to_trace_examples(states, final_reward=0.5)
assert all(ex["final_reward"] == 0.5 for ex in examples)
# ----------------------------------------------------------------------
# Tool error tag constant
# ----------------------------------------------------------------------
def test_tool_error_tag_matches_ingester_output():
"""The TOOL_ERROR_TAG constant must match what ClaudeCodeIngester
actually writes for is_error: true records."""
ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True)
states = list(ingester.ingest(ERROR_FIXTURE))
# Find a user-message containing an error tool_result
contents = [
m.get("content", "")
for s in states for m in s["messages"]
if m.get("role") == "user"
]
assert any(TOOL_ERROR_TAG in c for c in contents if isinstance(c, str)), (
f"TOOL_ERROR_TAG {TOOL_ERROR_TAG!r} not found in any user content; "
f"the constant has drifted from the ingester's output format."
)
# ----------------------------------------------------------------------
# Empty input
# ----------------------------------------------------------------------
def test_adapter_empty_input():
assert claude_states_to_trace_examples([]) == []
def test_adapter_state_with_no_messages():
"""A degenerate state with empty messages should be skipped silently."""
examples = claude_states_to_trace_examples([{"state_id": "empty", "messages": []}])
assert examples == []