composer-replication-framework / docs /research /REPLAYSIM_NORMALIZATION_RECONNAISSANCE.md
Codeseys's picture
Wave 17: close all 5 audit FLAGs + SDPO context alignment + serverless re-exports
a84c060
# Replaysim Normalization Reconnaissance
**Status:** Recon · **Feeds:** ADR-004, V5 "replaysim with normalization"
**Author:** subagent (delegated audit) · **Date:** 2026-05-25
**Sources:** GitHub REST API metadata + DeepWiki structured indexes of each repo's primary source. All repo metadata cited below was pulled from `api.github.com/repos/<owner>/<name>` directly.
## TL;DR
| Library | License | Last push | ★ | Verdict |
|---|---|---|---|---|
| **data-juicer** | Apache-2.0 | **2026-05-25** | 6.4k | ✅ **RECOMMENDED** — the only candidate with a class-based op-graph that *natively* understands `messages: [{role, content}]`, multi-turn dialog, and DPO-pair (`chosen`/`rejected`) preference samples as **first-class data formats**, with a `pair_preference_mapper` operator that maps directly onto our `extract_dpo_pairs` output. |
| **distilabel** | Apache-2.0 | 2026-05-25 | 3.2k | Strong runner-up. DAG pipeline, native chat-message format, built-in `FormatChatGenerationDPO`. But it is primarily a *generation orchestrator* and would force us to rewrite our existing OpenRouter teacher orchestration as Distilabel `LLM` subclasses. Larger refactor surface. |
| **datatrove** | Apache-2.0 | 2026-05-06 | 3.1k | ❌ **Deal-breaker.** `Document` dataclass is `text: str + metadata: dict`. All filters/dedup operate on flat `doc.text`. Multi-turn is only supported in the *generation* (`InferenceRunner.rollout_fn`) path, not the normalization/filter path. Forces lossy chat→string flattening. |
| **NeMo-Curator** | Apache-2.0 | 2026-05-25 | 1.6k | Strong on scale (Ray + Xenna + GPU), supports streaming and DPO via `generate_two_turn_prompt`. But: semantic dedup, fuzzy dedup, and classifier filters all *require GPUs*; CPU-only install drops most of the differentiating ops. Heavy framework for the size of replaysim. |
| **lilac** | Apache-2.0 | **archived 2024-03-19** | 1.1k | ❌ **Dead.** `databricks/lilac` repo `"archived": true`. The current `lilacai/lilac` is a 2-star squatter stub created Nov 2025. Do not adopt. |
**Recommendation:** Adopt **data-juicer** as the normalization op-graph layer wrapped around `replay_trace``extract_dpo_pairs`. Estimated integration cost: **~250–400 LOC** in `composer_replication.replaysim` for an adapter + 1 YAML recipe.
**Critical chat-template question answered:** data-juicer is the only audited library whose *filtering and normalization operators* (not just its generation operators) operate directly on a structured `messages: [{role, content}]` format and on `chosen`/`rejected` preference-pair format. The other three candidates either flatten to text (datatrove), only handle chat in the generation path (datatrove again), or treat chat as a generation output to be assembled rather than a structured object to be filtered (NeMo-Curator, distilabel partly).
---
## 1. Audit Methodology
For each candidate, primary-source data was collected from:
1. `https://api.github.com/repos/<owner>/<name>` for license, `pushed_at`, `archived`, stars, forks, topics — these are authoritative GitHub metadata, not scraped.
2. DeepWiki structured indexes of each repo's source tree for: op model, data structures (`Document` / `Sample` / `Step`), conversation/DPO support in filtering vs. generation paths, GPU dependencies.
3. README confirmation through the GitHub API for transferred-org redirects.
No secondary sources, no marketing pages, no blog posts.
Two facts to flag up front because they materially change the candidate set:
- `modelscope/data-juicer` redirects to **`datajuicer/data-juicer`**. The team spun out of ModelScope into a dedicated `datajuicer` org. Same code, just a transferred name — `pushed_at` is current.
- `NVIDIA/NeMo-Curator` redirects to **`NVIDIA-NeMo/Curator`**. Same situation — moved into the dedicated `NVIDIA-NeMo` org in 2025.
---
## 2. Per-Candidate Audit
### 2.1 datatrove (huggingface)
| Dimension | Value |
|---|---|
| Repo | `huggingface/datatrove` |
| License | Apache-2.0 |
| Created | 2023-06-14 |
| Last push | **2026-05-06** |
| Stars / Forks | 3068 / 266 |
| Commits | 725 (default branch) |
| Maturity | Production. Used to build FineWeb. Active. |
**Op model.** Class-based **linear pipeline** of `PipelineStep` instances. `PipelineStep.run(data: DocumentsPipeline, rank: int, world_size: int) -> DocumentsPipeline` where `DocumentsPipeline` is an iterator of `Document` objects. Steps are composed by Python list concatenation, not a DAG — branching/joining requires manual orchestration.
**Multi-turn / chat-template support — DEAL-BREAKER.**
The `Document` dataclass (`src/datatrove/data.py`) is:
```python
@dataclass
class Document:
text: str
id: str
media: list[Media] # placeholder, "for future uses, currently not used"
metadata: dict
```
There is **no `messages` field**. Every built-in filter (e.g., `C4QualityFilter`, `LanguageFilter`, `GopherQualityFilter`) and every built-in dedup op (`MinhashDedup*`, `SentenceDedup*`, `BloomFilter`) operates on `doc.text` as a flat string.
Multi-turn does appear, but **only in the generation path** (`InferenceRunner` + user-supplied `rollout_fn(doc, generate)`), where the user constructs `{"messages": [{"role": ..., "content": ...}]}` payloads themselves. Once the generation completes, the result is stuffed back into `doc.text` (or `doc.metadata`) and downstream filters again see flat text.
For our use case — normalizing already-generated multi-turn DPO pairs with `chosen`/`rejected` chat structures and tool calls — this means we'd have to:
1. Serialize `messages` into a flat string (`<|im_start|>user...`).
2. Run datatrove filters on the serialized string.
3. Re-parse back into `messages` afterward.
Tool-call structure (`{"role": "tool", "tool_call_id": ...}`, `tool_calls: [...]`) does not survive that round-trip cleanly without custom serialization on both sides. Per the user's hard requirement — "if only flat text, that's a deal-breaker" — datatrove fails here.
**Streaming.** Yes. `HuggingFaceDatasetReader(streaming=True)` and the iterator-based `PipelineStep.run` mean we can pipe documents through during generation. Streaming is fine.
**GPU.** None of the *normalization* ops require GPU. MinHash dedup is CPU. Only the `InferenceRunner` path needs a GPU (vLLM/SGLang backend) and we don't need that — we'd be calling OpenRouter, not running local models.
**Integration cost.** Moot — the chat-template gap is the deal-breaker.
---
### 2.2 data-juicer (datajuicer org, formerly modelscope)
| Dimension | Value |
|---|---|
| Repo | `datajuicer/data-juicer` (redirect target of the legacy `modelscope/data-juicer`) |
| License | Apache-2.0 |
| Created | 2023-08-01 |
| Last push | **2026-05-25** (most recent of all candidates) |
| Stars / Forks | 6444 / 373 |
| Maturity | Production. Active core team (Alibaba/ModelScope-spinout). Most stars of the candidate set. Has its own conference papers and a docs site at `datajuicer.github.io/data-juicer`. |
**Op model.** Class-based DAG of **operators ("Ops")** organized as **mappers**, **filters**, **deduplicators**, and **selectors**. Each Op is a Python class subclassing `Mapper`/`Filter`/`Deduplicator`. Pipelines are declared as YAML recipes (`process: [- op_name: { args }, ...]`) and executed by the `Executor` (default Ray-distributed; also a local Pandas-backed mode). Conditional branching through `OpFusion` and `Adapter` modules is supported, and there is a Ray-Data executor for true streaming.
**Multi-turn / chat-template support — NATIVE.** This is the discriminator.
Data-juicer has a **first-class conversation schema**, supporting *both*:
1. OpenAI-style `messages: [{role, content}]`
2. A "Data-Juicer format" `{query, response, history: [[q, r], ...]}`
It exposes operators that are *purpose-built* for dialog/preference data:
- `dialog_intent_detection_mapper`
- `dialog_sentiment_detection_mapper`
- `dialog_sentiment_intensity_mapper`
- `dialog_topic_detection_mapper`
- `pair_preference_mapper`**directly relevant**: ingests a `(prompt, chosen)` and synthesizes/refines a `rejected_response` plus a `reason` field. This is exactly the schema produced by our `extract_dpo_pairs`.
- `query_intent_detection_mapper`, `query_sentiment_detection_mapper`, `query_topic_detection_mapper`
- `optimize_qa_mapper`, `optimize_query_mapper`, `optimize_response_mapper` — refine individual fields without flattening the whole conversation.
Tool-call structure: data-juicer's conversation schema preserves arbitrary keys per message (because it operates on dict-of-lists Arrow tables), so `tool_call_id`, `tool_calls`, `name`, etc. survive through filters as long as no operator explicitly drops them. This is structurally safe — confirmed by the operator code only reading `role`/`content` and forwarding the rest.
**Streaming.** Partial. The default executor is batch on Arrow/HF datasets, but data-juicer integrated with **Ray Data** for distributed/streaming processing, and the README references "streaming JSON reader patches integrated by Apache Arrow." For our scale (≤100k DPO pairs per run), batch is fine; for true online normalization during multi-teacher generation, the Ray executor handles it — but a simpler approach is to wrap each `replay_trace` rollout's output into a tiny in-memory dataset and run the recipe per-batch (mini-batch streaming).
**GPU.** Only needed for image/video/multi-modal ops and for the LLM-API mappers when configured to run a *local* model. Every op we care about for replaysim — `pair_preference_mapper`, dialog detection mappers, `text_length_filter`, `language_id_score_filter`, MinHash dedup, etc. — is CPU-OK or calls a remote API (which is exactly our existing OpenRouter pattern). Importantly, **MinHash and exact dedup in data-juicer do not require GPU**, unlike NeMo-Curator's fuzzy/semantic dedup.
**Integration cost into `composer_replication.replaysim`.** Estimated ~250–400 LOC, breakdown:
- Adapter `replaysim/normalize.py`: ~80–120 LOC. Wraps a `DJDataset` (data-juicer's dataset abstraction), exposes `normalize_dpo_batch(pairs: list[DPOPair]) -> list[DPOPair]`.
- YAML recipe `replaysim/recipes/dpo_normalize.yaml`: ~40 LOC declarative.
- Hook in `teacher_replay.py` after `extract_dpo_pairs` and before final write: ~20 LOC.
- New tests `tests/replaysim/test_normalize.py`: ~80–120 LOC.
- ADR-004 update + module docs: ~20 LOC.
Dependency footprint: `pip install py-data-juicer` pulls in `datasets`, `pyarrow`, `loguru`, `jsonargparse`, optionally `ray`. We already have `datasets`/`pyarrow` indirectly from HF stack.
---
### 2.3 NeMo-Curator (NVIDIA-NeMo)
| Dimension | Value |
|---|---|
| Repo | `NVIDIA-NeMo/Curator` (redirect target of `NVIDIA/NeMo-Curator`) |
| License | Apache-2.0 |
| Created | 2024-03-14 |
| Last push | **2026-05-25** |
| Stars / Forks | 1584 / 274 |
| Maturity | Production at NVIDIA scale. Built for pre-training-corpus curation (Nemotron / Nemotron-4). |
**Op model.** Task-centric distributed processing, built on **Ray** + the **Xenna** executor. Stages are class-based, composed into pipelines, executed by `XennaExecutor` in either `streaming` or `batch` mode. Closer to Spark/Ray-Data than to a Python list of steps.
**Multi-turn / chat-template support — partial, generation-side only.** Curator has model-specific formatters (`Mixtral8x7BFormatter`, `NemotronFormatter`) that *render* multi-turn dialogue into a flat prompt string for the target model's chat template. There is `generate_dialogue` for multi-turn synthesis and `generate_two_turn_prompt` for DPO-style preference pairs. **But**: like datatrove, the *filtering* and *deduplication* stages do not have first-class conversation/preference operators — they treat the data as text after rendering. Tool-call preservation is not addressed in the public API.
**Streaming.** Yes — `XennaExecutor(execution_mode="streaming")` is a first-class option.
**GPU — significant cost.** Curator's discriminating features all require GPUs:
- **Semantic deduplication** — GPU-only, embedding generation + clustering. "Not supported for CPU-only processing."
- **Fuzzy deduplication** (MinHash + LSH) — GPU backend (cuDF/cuML), not CPU.
- **Classifier filters** (domain / quality / safety via `DistributedDataClassifier`) — GPU clusters.
- **Image curation modules** — GPU.
CPU-only install supports basic text filters and exact dedup, but *that's the same surface area we'd get from data-juicer without the dependency weight*. If we are not running on a GPU cluster, NeMo-Curator's value proposition collapses.
**Integration cost.** ~600–900 LOC plus operational cost: a Ray cluster setup, GPU nodes if we want the differentiating features. For replaysim's scale (a few thousand DPO pairs per run), this is overkill.
---
### 2.4 distilabel (argilla-io)
| Dimension | Value |
|---|---|
| Repo | `argilla-io/distilabel` |
| License | Apache-2.0 |
| Created | 2023-10-16 |
| Last push | **2026-05-25** |
| Stars / Forks | 3230 / 242 |
| Maturity | Production. Argilla is now part of HF; project remains active under argilla-io. |
**Op model.** **DAG pipeline** of `Step` and `Task` (Task = Step with an LLM). Each step declares `inputs: list[str]`, `outputs: list[str]`, and `process(*inputs) -> Generator[outputs]`. Steps are wired via `>>` operator. Resource declarations (`StepResources(replicas=N, gpus=M)`) handle scaling, optionally on Ray.
**Multi-turn / chat-template support — NATIVE on the generation side, partial on the normalization side.**
- `ChatGeneration` task accepts OpenAI-format `messages: [{role, content}]` natively.
- `FormatTextGenerationDPO` and `FormatChatGenerationDPO` produce the exact `{prompt, chosen, rejected, ratings, reason}` schema we want.
- `UltraFeedback` task is the canonical preference-rating step.
- `DeitaFiltering` and `MinHashDedup` are the only filtering/dedup steps; they operate on text fields rather than on structured `messages`. Tool-call structure is preserved as long as no step explicitly normalizes it (like data-juicer, by virtue of dict-of-fields semantics) — but there isn't a `pair_preference_mapper` analogue that operates on `messages` directly.
**Streaming.** Supports streaming generation per LLM (e.g., `AnthropicLLM` streams tokens). Pipeline-level execution is batch-of-batches; you can `.run(parameters={...})` and consume outputs as they materialize.
**GPU.** Only when steps choose to run a local LLM (vLLM, transformers). API-based steps (OpenAI, Anthropic, Mistral, OpenRouter via OpenAI-compat) are CPU-only.
**Integration cost — large but high overlap.** Distilabel would *replace* much of `teacher_replay.py`, not just normalize after it:
- Rewrite multi-teacher OpenRouter calls as a `Pipeline` of `Task`s subclassing distilabel's `LLM` interface (or use the `OpenAILLM` wrapper pointed at OpenRouter): ~300–500 LOC delta.
- Re-express `extract_dpo_pairs` as a custom `Task` or use `FormatChatGenerationDPO`: ~100–150 LOC.
- Migrate trace plumbing into distilabel's `GeneratorStep`/`Task` DAG: ~150 LOC.
- Tests + docs: ~150 LOC.
Total **~700–900 LOC** and a meaningful refactor of teacher orchestration. The win is that we'd get a real DAG runtime, retries, caching, and Argilla-integration for free. The lose is that we get *coupled* to distilabel's `LLM`/`Task` abstractions for the entire generation pipeline, not just a normalization op-graph wrapped around it.
This is a strategic decision the user phrased as: "see if we can leverage [a normalization library] to **normalize the data while also making the replaysim dataset generation**." Distilabel takes the broader interpretation — replace replaysim's generation with a distilabel pipeline. That is a bigger commitment than this recon was scoped to recommend.
---
### 2.5 lilac
**STATUS: dead. Do not adopt.**
- `databricks/lilac`: `"archived": true`, last push **2024-03-19**, license Apache-2.0. Repo says "Curate better data for LLMs." The Databricks acquisition (April 2024) absorbed it into Databricks Mosaic AI; the OSS project was archived shortly after.
- `lilacai/lilac`: created **2025-11-14** by a user account `lilacai`, 2 stars, 0 forks, no license, description says "Thee Eclipse - Hackerone: @theeeclipse." This is a **squatter / unrelated stub**, not the original lilac.
- No actively maintained successor with the original lilac code base outside Databricks' proprietary platform.
---
## 3. Recommendation: data-juicer
### 3.1 Why
1. **Only candidate with native conversation + preference-pair operators in the *normalization* path**, not just the generation path. `pair_preference_mapper` is a near-perfect fit for the output of `extract_dpo_pairs`.
2. **Tool-call structure is preserved** because operators read specific fields and forward the rest of the dict — confirmed by the operator schema design.
3. **No GPU required** for the operators we'd actually use (preference, dialog, length, language-id, MinHash dedup). Matches our OpenRouter-API-driven, CPU-friendly architecture.
4. **YAML-recipe style** lets us version the normalization graph as a config artifact alongside the recon doc, instead of as Python code that drifts.
5. **Lowest integration cost** of the viable candidates — wraps around our existing pipeline rather than replacing it.
6. **Maturity**: 6.4k stars, last push today, dedicated org, paper-backed.
### 3.2 Why not the others (one-liners)
- **datatrove**: flat-text `Document`, lossy round-trip on chat structure → deal-breaker.
- **distilabel**: would force a rewrite of teacher orchestration — too broad a refactor for "wrap normalization around the existing pipeline."
- **NeMo-Curator**: best ops require GPUs; without them it offers no advantage over data-juicer.
- **lilac**: archived.
### 3.3 Risk register
| Risk | Severity | Mitigation |
|---|---|---|
| Data-juicer YAML recipe drift between dev and CI | M | Pin `py-data-juicer` version; commit recipe under `replaysim/recipes/` and load via `importlib.resources`. |
| Some ops silently coerce conversation structure | M | Add a round-trip test: `pair → normalize → pair` must preserve `messages`, `tool_calls`, and arbitrary metadata. |
| Ray executor bloat if user enables it | L | Default to local Pandas executor; gate Ray behind an explicit flag. |
| `pair_preference_mapper` calls an LLM by default to synthesize `rejected` | H | We *already have* `rejected` from disagreement. Configure the mapper as a pass-through filter / use it only for refinement; if it can't be made non-LLM, fall back to a custom Mapper that just runs length/language/dedup checks on the existing pair. **Verify in spike before locking in.** |
| Apache-2.0 inbound license compatibility | L | Our framework is Apache-2.0. Compatible. |
| Op-graph executes per batch, not per sample, so a single bad pair stalls a batch | L | Use small Ray-Data batches (e.g. 64) so a stall is bounded. |
### 3.4 Open spike question (must verify before merge)
The single risk worth a 1-day spike: **does `pair_preference_mapper` accept a pre-existing `rejected` and *only* run validation/length/language filters, or does it *always* call an LLM to (re)synthesize a rejected response?** Read the operator source in `data_juicer/ops/mapper/pair_preference_mapper.py` and confirm. If the latter, we wire our pre-existing `rejected` through `optimize_response_mapper` (refinement, not regeneration) plus a custom no-op preference validator. Either way, the integration shape below stands; only the recipe content changes.
---
## 4. Integration Sketch
### 4.1 Current pipeline (today)
```
TraceState
▼ (per-trace, multi-teacher OpenRouter call)
replay_trace(state, teachers=[m1, m2, m3])
▼ (returns: list[TeacherCompletion] keyed by model_id)
disagreement_score(completions)
▼ (if score > τ)
extract_dpo_pairs(completions, state)
▼ (yields)
DPOPair { prompt: messages[], chosen: messages[], rejected: messages[], state, meta }
write_jsonl(out_path)
```
### 4.2 Proposed pipeline (with data-juicer normalization op-graph)
```
TraceState
replay_trace(state, teachers) ← unchanged
disagreement_score(completions) ← unchanged
extract_dpo_pairs(completions, state) ← unchanged
[NEW] DJNormalizer.normalize_batch(dpo_pairs) ──── loads recipe from
│ replaysim/recipes/dpo_normalize.yaml
│ data-juicer op-graph runs:
│ 1. text_length_filter (on chosen + rejected separately)
│ 2. language_id_score_filter (en-only or configured)
│ 3. dialog_topic_detection_mapper (annotates meta, no drop)
│ 4. minhash_deduplicator (on prompt+chosen serialization)
│ 5. (optional) optimize_response_mapper to clean trailing whitespace, code-block fences
│ 6. custom PreferenceValidator op (chosen != rejected, both non-empty,
│ tool_calls structurally valid)
write_jsonl(out_path) ← unchanged consumer
```
The op-graph is a **wrapper around** `extract_dpo_pairs`, not a replacement. `replay_trace` and `extract_dpo_pairs` keep their current signatures. The only call-site change in `teacher_replay.py` is one line:
```python
# before:
pairs = list(extract_dpo_pairs(completions, state))
write_jsonl(out_path, pairs)
# after:
pairs = list(extract_dpo_pairs(completions, state))
pairs = DJNormalizer.from_recipe("dpo_normalize.yaml").normalize_batch(pairs)
write_jsonl(out_path, pairs)
```
### 4.3 Adapter shape (`replaysim/normalize.py`)
> **Realised in v0.1 (Wave 17 update):** ADR-004 shipped with a different
> public surface than the sketch below. The actual API:
>
> ```python
> from composer_replication.replaysim import (
> replay_and_normalize_trace, # convenience wrapper
> DJNormalizer, # the normalizer class
> DPOPair, # input TypedDict (from teacher_replay)
> NormalizedDPOPair, # output TypedDict
> replay_trace, extract_dpo_pairs, # re-exports of upstream stages
> )
> ```
>
> Key shape differences from the sketch:
>
> 1. **`DPOPair` is a TypedDict, not a dataclass.** Its actual fields
> are `{state_id: str, state_messages: list[dict], chosen: str,
> rejected: str, n_teachers_agreeing: int}` (defined in
> `composer_replication/teacher_replay.py:99`) — **not**
> `{prompt, chosen, rejected, state, meta}`. The `_to_dj`/`_from_dj`
> sketch round-trip below would not type-check against the realised
> TypedDict.
> 2. **Recipe path is `composer_replication/recipes/replaysim/default.yaml`**,
> not `composer_replication/replaysim/recipes/dpo_normalize.yaml`.
> There is no `replaysim/recipes/` subpackage; recipes live under
> the top-level `recipes/` tree.
> 3. **No `composer_replication/replaysim/ops/` subpackage exists.**
> The custom op file `preference_validator.py` was not created;
> data-juicer's stock ops + the framework's own validation in
> `DJNormalizer` covered the requirement.
> 4. **The integration hook is `replay_and_normalize_trace(...)`** in
> `composer_replication/replaysim/__init__.py` (re-exported from
> `normalize.py`). It wraps the existing `replay_trace` +
> `extract_dpo_pairs` flow without modifying `teacher_replay.py`.
> There is no separate `composer_replication/replaysim/teacher_replay.py`
> — `teacher_replay` lives at top-level `composer_replication/teacher_replay.py`.
>
> The pre-spike sketch below is preserved as historical proposal context.
> It documents the shape of thinking that fed ADR-004; the realised code
> is the source of truth for the adapter contract.
```python
# composer_replication/replaysim/normalize.py
from __future__ import annotations
from dataclasses import asdict
from importlib.resources import files
from typing import Iterable
from data_juicer.config import init_configs
from data_juicer.core.executor import DefaultExecutor
from data_juicer.format import load_formatter
from .types import DPOPair
class DJNormalizer:
"""Wraps a data-juicer op-graph as a batch normalization step over
DPOPair samples produced by extract_dpo_pairs.
The recipe (YAML) declares the op sequence. Operators consume and
produce the data-juicer conversation schema, which we convert to
and from our internal DPOPair on the boundary.
"""
def __init__(self, recipe_path: str):
cfg = init_configs(["--config", recipe_path])
self._executor = DefaultExecutor(cfg)
@classmethod
def from_recipe(cls, name: str) -> "DJNormalizer":
recipe = files("composer_replication.replaysim.recipes") / name
return cls(str(recipe))
@staticmethod
def _to_dj(p: DPOPair) -> dict:
# data-juicer preference schema:
# {"prompt": str-or-messages, "chosen": str-or-messages,
# "rejected": str-or-messages, "meta": {...}}
return {
"prompt": p.prompt, # messages[]
"chosen": p.chosen, # messages[]
"rejected": p.rejected, # messages[]
"meta": {
"trace_id": p.state.trace_id,
"teachers": p.meta.get("teachers", []),
"disagreement": p.meta.get("disagreement"),
**p.meta,
},
}
@staticmethod
def _from_dj(s: dict) -> DPOPair:
return DPOPair(
prompt=s["prompt"],
chosen=s["chosen"],
rejected=s["rejected"],
state=..., # rehydrate from meta.trace_id + cache
meta=s.get("meta", {}),
)
def normalize_batch(self, pairs: Iterable[DPOPair]) -> list[DPOPair]:
in_records = [self._to_dj(p) for p in pairs]
# Build an in-memory DJDataset from records (no disk round-trip).
ds = self._executor.formatter.load_dataset_from_records(in_records)
ds = self._executor.run(dataset=ds)
out_records = ds.to_list()
return [self._from_dj(r) for r in out_records]
```
### 4.4 Recipe (`replaysim/recipes/dpo_normalize.yaml`)
```yaml
# data-juicer recipe for normalizing replaysim DPO output
project_name: replaysim_dpo_normalize
executor_type: default # local Pandas; switch to 'ray' for distributed
np: 4
# Conversation/preference schema mode
text_keys: ['chosen', 'rejected'] # ops scan both response variants
suffixes: ['.jsonl']
process:
# 1. Length sanity on each response variant
- text_length_filter:
text_key: chosen
min_len: 10
max_len: 16384
- text_length_filter:
text_key: rejected
min_len: 10
max_len: 16384
# 2. Language gate (configurable; default English-only)
- language_id_score_filter:
text_key: chosen
lang: en
min_score: 0.6
# 3. Dialog topic annotation (no drop, just attaches meta.topic)
- dialog_topic_detection_mapper:
api_or_hf_model: openrouter:openai/gpt-4o-mini
mode: annotate
# 4. Near-duplicate removal across the batch on (prompt + chosen)
- document_minhash_deduplicator:
tokenization: space
window_size: 5
num_permutations: 256
jaccard_threshold: 0.85
text_key: chosen
# 5. Custom preference validator (chosen != rejected, structural integrity)
- preference_validator_filter: # module: composer_replication.replaysim.ops
check_distinct: true
check_tool_calls_valid: true
```
A custom op `preference_validator_filter` lives in `composer_replication/replaysim/ops/preference_validator.py` and is registered via data-juicer's plugin entry point.
### 4.5 Hook into `teacher_replay.py`
```python
# composer_replication/replaysim/teacher_replay.py (delta)
from .normalize import DJNormalizer
def run_replay(traces, teachers, out_path, *, normalize: bool = True):
pairs: list[DPOPair] = []
for state in traces:
completions = replay_trace(state, teachers=teachers)
if disagreement_score(completions) <= TAU:
continue
pairs.extend(extract_dpo_pairs(completions, state))
if normalize:
norm = DJNormalizer.from_recipe("dpo_normalize.yaml")
pairs = norm.normalize_batch(pairs)
write_jsonl(out_path, pairs)
```
The `normalize=True` flag keeps the old code-path one negation away during initial rollout.
### 4.6 Test plan (`tests/replaysim/test_normalize.py`)
1. **Round-trip preservation**: synthesize a DPOPair with `tool_calls`, run through `DJNormalizer.normalize_batch`, assert tool-call structure and arbitrary `meta` keys are preserved.
2. **Length filter**: a pair with empty `chosen` is dropped.
3. **Language filter**: a non-English `chosen` (Cyrillic) below the score threshold is dropped.
4. **Near-duplicate**: two pairs with identical `chosen` collapse to one.
5. **Distinctness**: a pair where `chosen == rejected` is dropped by `preference_validator_filter`.
6. **Multi-turn**: a 3-turn conversation in `prompt` survives end-to-end with role+content intact.
7. **Recipe loading**: `DJNormalizer.from_recipe("dpo_normalize.yaml")` works with `importlib.resources` regardless of install location.
---
## 5. ADR-004 Implications
ADR-004 (the umbrella ADR for "replaysim with normalization") should record:
- **Decision**: adopt data-juicer (`datajuicer/data-juicer`, Apache-2.0) as the normalization op-graph layer.
- **Status**: proposed; promote to accepted after the spike on `pair_preference_mapper`.
- **Consequences**:
- New runtime dependency: `py-data-juicer` (transitively pulls `pyarrow`, `datasets`, `loguru`, `jsonargparse`).
- Optional `ray` extra for distributed execution; not enabled by default.
- `replaysim/recipes/*.yaml` becomes a versioned config artifact; recipe changes must accompany behavioral-test updates.
- Tool-call and multi-turn structure preserved through normalization — verified by round-trip test.
- **Alternatives considered**: distilabel (too broad — would replace generation orchestration), datatrove (flat-text only — deal-breaker), NeMo-Curator (GPU-bound), lilac (archived).
---
## 6. Primary-source citations
| Claim | Source |
|---|---|
| datatrove license, last push, archived state | `https://api.github.com/repos/huggingface/datatrove` (`license.spdx_id`, `pushed_at`, `archived`) |
| datatrove `Document` is text+metadata, no `messages` field; built-in filters operate on `doc.text` | DeepWiki index of `huggingface/datatrove`, `src/datatrove/data.py`, `src/datatrove/pipeline/filters/c4_filters.py` |
| datatrove multi-turn only via `InferenceRunner.rollout_fn` | DeepWiki index of `huggingface/datatrove`, `src/datatrove/pipeline/inference/run_inference.py` |
| data-juicer license, last push, redirect to `datajuicer/data-juicer` | `https://api.github.com/repos/modelscope/data-juicer` (resolves to `datajuicer/data-juicer`) |
| data-juicer supports `messages: [{role, content}]` and Data-Juicer dialog format `{query, response, history}` | DeepWiki index of `modelscope/data-juicer` |
| `pair_preference_mapper` synthesizes `rejected_response` and `reason` | DeepWiki index of `modelscope/data-juicer`, `data_juicer/ops/mapper/pair_preference_mapper.py` |
| data-juicer GPU-required ops are tagged `🚀GPU` (image/video/multi-modal); core text + dialog mappers are CPU-OK | DeepWiki index of `modelscope/data-juicer` |
| NeMo-Curator license, last push, redirect to `NVIDIA-NeMo/Curator` | `https://api.github.com/repos/NVIDIA/NeMo-Curator` |
| NeMo-Curator semantic dedup is GPU-only; CPU install drops differentiating ops | DeepWiki index of `NVIDIA/NeMo-Curator` |
| distilabel license, last push, DAG model, `FormatChatGenerationDPO`, `MinHashDedup`, `DeitaFiltering` | `https://api.github.com/repos/argilla-io/distilabel`; DeepWiki index of `argilla-io/distilabel` |
| `databricks/lilac` archived 2024-03-19 | `https://api.github.com/repos/databricks/lilac` (`archived: true`, `pushed_at: "2024-03-19T12:41:30Z"`) |
| `lilacai/lilac` is a 2-star squatter stub created 2025-11-14 | `https://api.github.com/repos/lilacai/lilac` |
---
## 7. Confirmed output path
**File:** `/home/codeseys/.hermes/hermes-agent/docs/research/REPLAYSIM_NORMALIZATION_RECONNAISSANCE.md`
**Length:** ≤600 lines (this file).