"""Tests for composer_replication.ingestion.trace_examples (Wave 19). Pins the contract that: 1. ClaudeCodeIngester output → claude_states_to_trace_examples → list[TraceExample] 2. Tool errors in source JSONL (`is_error: true`) survive the ingester's [TOOL_RESULT (ERROR)] tag → are detected by the adapter → mark the subsequent assistant turn with tool_error 3. The default error classifier categorizes common error kinds 4. The output is a valid input to ComposerDataCollator with hint_generator """ from __future__ import annotations from pathlib import Path import pytest from composer_replication.ingestion import ( ClaudeCodeIngester, TOOL_ERROR_TAG, claude_states_to_trace_examples, default_classify_error, ) HERE = Path(__file__).resolve().parent FIXTURE_DIR = HERE.parent.parent.parent / "spikes" / "007-real-trace-ingestion" / "fixtures" ERROR_FIXTURE = FIXTURE_DIR / "synthetic_session_with_error.jsonl" OK_FIXTURE = FIXTURE_DIR / "synthetic_session.jsonl" # ---------------------------------------------------------------------- # Error classifier # ---------------------------------------------------------------------- def test_classify_file_not_found(): assert default_classify_error( "Error: File does not exist: /etc/foo.yaml" ) == "file_not_found" assert default_classify_error( "no such file or directory: /tmp/x" ) == "file_not_found" def test_classify_permission_denied(): assert default_classify_error("Permission denied") == "permission_denied" def test_classify_command_not_found(): assert default_classify_error("bash: foo: command not found") == "command_not_found" def test_classify_unknown_falls_back(): assert default_classify_error("something weird went wrong") == "tool_error" # ---------------------------------------------------------------------- # Adapter — happy path with error site # ---------------------------------------------------------------------- def test_adapter_emits_one_example_per_state(): ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(ERROR_FIXTURE)) examples = claude_states_to_trace_examples(states) assert len(examples) == len(states) def test_adapter_detects_tool_error_on_recovery_turn(): """The assistant turn IMMEDIATELY AFTER a [TOOL_RESULT (ERROR)] user turn must be marked with tool_error. Earlier assistant turns (before any error) and assistant turns separated from the error by a successful tool result must NOT be marked.""" ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(ERROR_FIXTURE)) examples = claude_states_to_trace_examples(states) # Find the example with at least one error turn error_examples = [ ex for ex in examples if any(t.get("tool_error") for t in ex["turns"]) ] assert error_examples, ( f"Expected ≥1 example with a tool_error turn; got {len(error_examples)}. " f"Per-example error turns: {[(ex['trace_id'], sum(1 for t in ex['turns'] if t.get('tool_error'))) for ex in examples]}" ) # The error fixture has one error site; one of the late states should have exactly 1 error turn err_counts = [ sum(1 for t in ex["turns"] if t.get("tool_error")) for ex in examples ] assert max(err_counts) == 1, ( f"Expected exactly 1 error turn in some state; counts: {err_counts}" ) def test_adapter_classifies_file_not_found_in_fixture(): ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(ERROR_FIXTURE)) examples = claude_states_to_trace_examples(states) error_turns = [t for ex in examples for t in ex["turns"] if t.get("tool_error")] assert any(t["tool_error"] == "file_not_found" for t in error_turns), ( f"Expected 'file_not_found' classification on the fixture's " f"non-existent-config error; got: " f"{[t['tool_error'] for t in error_turns]}" ) def test_adapter_no_errors_on_clean_fixture(): """The original Spike 007 fixture has no is_error: true rows, so no error turns should be detected.""" ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(OK_FIXTURE)) examples = claude_states_to_trace_examples(states) err_turns = [t for ex in examples for t in ex["turns"] if t.get("tool_error")] assert not err_turns, ( f"Clean fixture should have 0 error turns; got " f"{len(err_turns)}: {[t['tool_error'] for t in err_turns]}" ) def test_adapter_preserves_role_and_content(): """Every output turn should have role + content from the input messages.""" ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(ERROR_FIXTURE)) examples = claude_states_to_trace_examples(states) for ex in examples: for turn in ex["turns"]: assert "role" in turn assert "content" in turn assert turn["role"] in ("system", "user", "assistant", "tool") def test_adapter_custom_error_kind_fn(): """User-provided error_kind_fn should override default classification.""" ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(ERROR_FIXTURE)) def custom_kind(content: str) -> str: return "custom_kind" examples = claude_states_to_trace_examples(states, error_kind_fn=custom_kind) error_turns = [t for ex in examples for t in ex["turns"] if t.get("tool_error")] assert all(t["tool_error"] == "custom_kind" for t in error_turns) def test_adapter_threads_final_reward(): ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(ERROR_FIXTURE)) examples = claude_states_to_trace_examples(states, final_reward=0.5) assert all(ex["final_reward"] == 0.5 for ex in examples) # ---------------------------------------------------------------------- # Tool error tag constant # ---------------------------------------------------------------------- def test_tool_error_tag_matches_ingester_output(): """The TOOL_ERROR_TAG constant must match what ClaudeCodeIngester actually writes for is_error: true records.""" ingester = ClaudeCodeIngester(skip_sidechain=True, strip_thinking=True) states = list(ingester.ingest(ERROR_FIXTURE)) # Find a user-message containing an error tool_result contents = [ m.get("content", "") for s in states for m in s["messages"] if m.get("role") == "user" ] assert any(TOOL_ERROR_TAG in c for c in contents if isinstance(c, str)), ( f"TOOL_ERROR_TAG {TOOL_ERROR_TAG!r} not found in any user content; " f"the constant has drifted from the ingester's output format." ) # ---------------------------------------------------------------------- # Empty input # ---------------------------------------------------------------------- def test_adapter_empty_input(): assert claude_states_to_trace_examples([]) == [] def test_adapter_state_with_no_messages(): """A degenerate state with empty messages should be skipped silently.""" examples = claude_states_to_trace_examples([{"state_id": "empty", "messages": []}]) assert examples == []