composer-replication-framework / docs /research /REPLAYSIM_NORMALIZATION_RECONNAISSANCE.md
Codeseys's picture
Wave 17: close all 5 audit FLAGs + SDPO context alignment + serverless re-exports
a84c060

Replaysim Normalization Reconnaissance

Status: Recon · Feeds: ADR-004, V5 "replaysim with normalization" Author: subagent (delegated audit) · Date: 2026-05-25 Sources: GitHub REST API metadata + DeepWiki structured indexes of each repo's primary source. All repo metadata cited below was pulled from api.github.com/repos/<owner>/<name> directly.

TL;DR

Library License Last push Verdict
data-juicer Apache-2.0 2026-05-25 6.4k RECOMMENDED — the only candidate with a class-based op-graph that natively understands messages: [{role, content}], multi-turn dialog, and DPO-pair (chosen/rejected) preference samples as first-class data formats, with a pair_preference_mapper operator that maps directly onto our extract_dpo_pairs output.
distilabel Apache-2.0 2026-05-25 3.2k Strong runner-up. DAG pipeline, native chat-message format, built-in FormatChatGenerationDPO. But it is primarily a generation orchestrator and would force us to rewrite our existing OpenRouter teacher orchestration as Distilabel LLM subclasses. Larger refactor surface.
datatrove Apache-2.0 2026-05-06 3.1k Deal-breaker. Document dataclass is text: str + metadata: dict. All filters/dedup operate on flat doc.text. Multi-turn is only supported in the generation (InferenceRunner.rollout_fn) path, not the normalization/filter path. Forces lossy chat→string flattening.
NeMo-Curator Apache-2.0 2026-05-25 1.6k Strong on scale (Ray + Xenna + GPU), supports streaming and DPO via generate_two_turn_prompt. But: semantic dedup, fuzzy dedup, and classifier filters all require GPUs; CPU-only install drops most of the differentiating ops. Heavy framework for the size of replaysim.
lilac Apache-2.0 archived 2024-03-19 1.1k Dead. databricks/lilac repo "archived": true. The current lilacai/lilac is a 2-star squatter stub created Nov 2025. Do not adopt.

Recommendation: Adopt data-juicer as the normalization op-graph layer wrapped around replay_traceextract_dpo_pairs. Estimated integration cost: ~250–400 LOC in composer_replication.replaysim for an adapter + 1 YAML recipe.

Critical chat-template question answered: data-juicer is the only audited library whose filtering and normalization operators (not just its generation operators) operate directly on a structured messages: [{role, content}] format and on chosen/rejected preference-pair format. The other three candidates either flatten to text (datatrove), only handle chat in the generation path (datatrove again), or treat chat as a generation output to be assembled rather than a structured object to be filtered (NeMo-Curator, distilabel partly).


1. Audit Methodology

For each candidate, primary-source data was collected from:

  1. https://api.github.com/repos/<owner>/<name> for license, pushed_at, archived, stars, forks, topics — these are authoritative GitHub metadata, not scraped.
  2. DeepWiki structured indexes of each repo's source tree for: op model, data structures (Document / Sample / Step), conversation/DPO support in filtering vs. generation paths, GPU dependencies.
  3. README confirmation through the GitHub API for transferred-org redirects.

No secondary sources, no marketing pages, no blog posts.

Two facts to flag up front because they materially change the candidate set:

  • modelscope/data-juicer redirects to datajuicer/data-juicer. The team spun out of ModelScope into a dedicated datajuicer org. Same code, just a transferred name — pushed_at is current.
  • NVIDIA/NeMo-Curator redirects to NVIDIA-NeMo/Curator. Same situation — moved into the dedicated NVIDIA-NeMo org in 2025.

2. Per-Candidate Audit

2.1 datatrove (huggingface)

Dimension Value
Repo huggingface/datatrove
License Apache-2.0
Created 2023-06-14
Last push 2026-05-06
Stars / Forks 3068 / 266
Commits 725 (default branch)
Maturity Production. Used to build FineWeb. Active.

Op model. Class-based linear pipeline of PipelineStep instances. PipelineStep.run(data: DocumentsPipeline, rank: int, world_size: int) -> DocumentsPipeline where DocumentsPipeline is an iterator of Document objects. Steps are composed by Python list concatenation, not a DAG — branching/joining requires manual orchestration.

Multi-turn / chat-template support — DEAL-BREAKER.

The Document dataclass (src/datatrove/data.py) is:

@dataclass
class Document:
    text: str
    id: str
    media: list[Media]   # placeholder, "for future uses, currently not used"
    metadata: dict

There is no messages field. Every built-in filter (e.g., C4QualityFilter, LanguageFilter, GopherQualityFilter) and every built-in dedup op (MinhashDedup*, SentenceDedup*, BloomFilter) operates on doc.text as a flat string.

Multi-turn does appear, but only in the generation path (InferenceRunner + user-supplied rollout_fn(doc, generate)), where the user constructs {"messages": [{"role": ..., "content": ...}]} payloads themselves. Once the generation completes, the result is stuffed back into doc.text (or doc.metadata) and downstream filters again see flat text.

For our use case — normalizing already-generated multi-turn DPO pairs with chosen/rejected chat structures and tool calls — this means we'd have to:

  1. Serialize messages into a flat string (<|im_start|>user...).
  2. Run datatrove filters on the serialized string.
  3. Re-parse back into messages afterward.

Tool-call structure ({"role": "tool", "tool_call_id": ...}, tool_calls: [...]) does not survive that round-trip cleanly without custom serialization on both sides. Per the user's hard requirement — "if only flat text, that's a deal-breaker" — datatrove fails here.

Streaming. Yes. HuggingFaceDatasetReader(streaming=True) and the iterator-based PipelineStep.run mean we can pipe documents through during generation. Streaming is fine.

GPU. None of the normalization ops require GPU. MinHash dedup is CPU. Only the InferenceRunner path needs a GPU (vLLM/SGLang backend) and we don't need that — we'd be calling OpenRouter, not running local models.

Integration cost. Moot — the chat-template gap is the deal-breaker.


2.2 data-juicer (datajuicer org, formerly modelscope)

Dimension Value
Repo datajuicer/data-juicer (redirect target of the legacy modelscope/data-juicer)
License Apache-2.0
Created 2023-08-01
Last push 2026-05-25 (most recent of all candidates)
Stars / Forks 6444 / 373
Maturity Production. Active core team (Alibaba/ModelScope-spinout). Most stars of the candidate set. Has its own conference papers and a docs site at datajuicer.github.io/data-juicer.

Op model. Class-based DAG of operators ("Ops") organized as mappers, filters, deduplicators, and selectors. Each Op is a Python class subclassing Mapper/Filter/Deduplicator. Pipelines are declared as YAML recipes (process: [- op_name: { args }, ...]) and executed by the Executor (default Ray-distributed; also a local Pandas-backed mode). Conditional branching through OpFusion and Adapter modules is supported, and there is a Ray-Data executor for true streaming.

Multi-turn / chat-template support — NATIVE. This is the discriminator.

Data-juicer has a first-class conversation schema, supporting both:

  1. OpenAI-style messages: [{role, content}]
  2. A "Data-Juicer format" {query, response, history: [[q, r], ...]}

It exposes operators that are purpose-built for dialog/preference data:

  • dialog_intent_detection_mapper
  • dialog_sentiment_detection_mapper
  • dialog_sentiment_intensity_mapper
  • dialog_topic_detection_mapper
  • pair_preference_mapperdirectly relevant: ingests a (prompt, chosen) and synthesizes/refines a rejected_response plus a reason field. This is exactly the schema produced by our extract_dpo_pairs.
  • query_intent_detection_mapper, query_sentiment_detection_mapper, query_topic_detection_mapper
  • optimize_qa_mapper, optimize_query_mapper, optimize_response_mapper — refine individual fields without flattening the whole conversation.

Tool-call structure: data-juicer's conversation schema preserves arbitrary keys per message (because it operates on dict-of-lists Arrow tables), so tool_call_id, tool_calls, name, etc. survive through filters as long as no operator explicitly drops them. This is structurally safe — confirmed by the operator code only reading role/content and forwarding the rest.

Streaming. Partial. The default executor is batch on Arrow/HF datasets, but data-juicer integrated with Ray Data for distributed/streaming processing, and the README references "streaming JSON reader patches integrated by Apache Arrow." For our scale (≤100k DPO pairs per run), batch is fine; for true online normalization during multi-teacher generation, the Ray executor handles it — but a simpler approach is to wrap each replay_trace rollout's output into a tiny in-memory dataset and run the recipe per-batch (mini-batch streaming).

GPU. Only needed for image/video/multi-modal ops and for the LLM-API mappers when configured to run a local model. Every op we care about for replaysim — pair_preference_mapper, dialog detection mappers, text_length_filter, language_id_score_filter, MinHash dedup, etc. — is CPU-OK or calls a remote API (which is exactly our existing OpenRouter pattern). Importantly, MinHash and exact dedup in data-juicer do not require GPU, unlike NeMo-Curator's fuzzy/semantic dedup.

Integration cost into composer_replication.replaysim. Estimated ~250–400 LOC, breakdown:

  • Adapter replaysim/normalize.py: ~80–120 LOC. Wraps a DJDataset (data-juicer's dataset abstraction), exposes normalize_dpo_batch(pairs: list[DPOPair]) -> list[DPOPair].
  • YAML recipe replaysim/recipes/dpo_normalize.yaml: ~40 LOC declarative.
  • Hook in teacher_replay.py after extract_dpo_pairs and before final write: ~20 LOC.
  • New tests tests/replaysim/test_normalize.py: ~80–120 LOC.
  • ADR-004 update + module docs: ~20 LOC.

Dependency footprint: pip install py-data-juicer pulls in datasets, pyarrow, loguru, jsonargparse, optionally ray. We already have datasets/pyarrow indirectly from HF stack.


2.3 NeMo-Curator (NVIDIA-NeMo)

Dimension Value
Repo NVIDIA-NeMo/Curator (redirect target of NVIDIA/NeMo-Curator)
License Apache-2.0
Created 2024-03-14
Last push 2026-05-25
Stars / Forks 1584 / 274
Maturity Production at NVIDIA scale. Built for pre-training-corpus curation (Nemotron / Nemotron-4).

Op model. Task-centric distributed processing, built on Ray + the Xenna executor. Stages are class-based, composed into pipelines, executed by XennaExecutor in either streaming or batch mode. Closer to Spark/Ray-Data than to a Python list of steps.

Multi-turn / chat-template support — partial, generation-side only. Curator has model-specific formatters (Mixtral8x7BFormatter, NemotronFormatter) that render multi-turn dialogue into a flat prompt string for the target model's chat template. There is generate_dialogue for multi-turn synthesis and generate_two_turn_prompt for DPO-style preference pairs. But: like datatrove, the filtering and deduplication stages do not have first-class conversation/preference operators — they treat the data as text after rendering. Tool-call preservation is not addressed in the public API.

Streaming. Yes — XennaExecutor(execution_mode="streaming") is a first-class option.

GPU — significant cost. Curator's discriminating features all require GPUs:

  • Semantic deduplication — GPU-only, embedding generation + clustering. "Not supported for CPU-only processing."
  • Fuzzy deduplication (MinHash + LSH) — GPU backend (cuDF/cuML), not CPU.
  • Classifier filters (domain / quality / safety via DistributedDataClassifier) — GPU clusters.
  • Image curation modules — GPU.

CPU-only install supports basic text filters and exact dedup, but that's the same surface area we'd get from data-juicer without the dependency weight. If we are not running on a GPU cluster, NeMo-Curator's value proposition collapses.

Integration cost. ~600–900 LOC plus operational cost: a Ray cluster setup, GPU nodes if we want the differentiating features. For replaysim's scale (a few thousand DPO pairs per run), this is overkill.


2.4 distilabel (argilla-io)

Dimension Value
Repo argilla-io/distilabel
License Apache-2.0
Created 2023-10-16
Last push 2026-05-25
Stars / Forks 3230 / 242
Maturity Production. Argilla is now part of HF; project remains active under argilla-io.

Op model. DAG pipeline of Step and Task (Task = Step with an LLM). Each step declares inputs: list[str], outputs: list[str], and process(*inputs) -> Generator[outputs]. Steps are wired via >> operator. Resource declarations (StepResources(replicas=N, gpus=M)) handle scaling, optionally on Ray.

Multi-turn / chat-template support — NATIVE on the generation side, partial on the normalization side.

  • ChatGeneration task accepts OpenAI-format messages: [{role, content}] natively.
  • FormatTextGenerationDPO and FormatChatGenerationDPO produce the exact {prompt, chosen, rejected, ratings, reason} schema we want.
  • UltraFeedback task is the canonical preference-rating step.
  • DeitaFiltering and MinHashDedup are the only filtering/dedup steps; they operate on text fields rather than on structured messages. Tool-call structure is preserved as long as no step explicitly normalizes it (like data-juicer, by virtue of dict-of-fields semantics) — but there isn't a pair_preference_mapper analogue that operates on messages directly.

Streaming. Supports streaming generation per LLM (e.g., AnthropicLLM streams tokens). Pipeline-level execution is batch-of-batches; you can .run(parameters={...}) and consume outputs as they materialize.

GPU. Only when steps choose to run a local LLM (vLLM, transformers). API-based steps (OpenAI, Anthropic, Mistral, OpenRouter via OpenAI-compat) are CPU-only.

Integration cost — large but high overlap. Distilabel would replace much of teacher_replay.py, not just normalize after it:

  • Rewrite multi-teacher OpenRouter calls as a Pipeline of Tasks subclassing distilabel's LLM interface (or use the OpenAILLM wrapper pointed at OpenRouter): ~300–500 LOC delta.
  • Re-express extract_dpo_pairs as a custom Task or use FormatChatGenerationDPO: ~100–150 LOC.
  • Migrate trace plumbing into distilabel's GeneratorStep/Task DAG: ~150 LOC.
  • Tests + docs: ~150 LOC.

Total ~700–900 LOC and a meaningful refactor of teacher orchestration. The win is that we'd get a real DAG runtime, retries, caching, and Argilla-integration for free. The lose is that we get coupled to distilabel's LLM/Task abstractions for the entire generation pipeline, not just a normalization op-graph wrapped around it.

This is a strategic decision the user phrased as: "see if we can leverage [a normalization library] to normalize the data while also making the replaysim dataset generation." Distilabel takes the broader interpretation — replace replaysim's generation with a distilabel pipeline. That is a bigger commitment than this recon was scoped to recommend.


2.5 lilac

STATUS: dead. Do not adopt.

  • databricks/lilac: "archived": true, last push 2024-03-19, license Apache-2.0. Repo says "Curate better data for LLMs." The Databricks acquisition (April 2024) absorbed it into Databricks Mosaic AI; the OSS project was archived shortly after.
  • lilacai/lilac: created 2025-11-14 by a user account lilacai, 2 stars, 0 forks, no license, description says "Thee Eclipse - Hackerone: @theeeclipse." This is a squatter / unrelated stub, not the original lilac.
  • No actively maintained successor with the original lilac code base outside Databricks' proprietary platform.

3. Recommendation: data-juicer

3.1 Why

  1. Only candidate with native conversation + preference-pair operators in the normalization path, not just the generation path. pair_preference_mapper is a near-perfect fit for the output of extract_dpo_pairs.
  2. Tool-call structure is preserved because operators read specific fields and forward the rest of the dict — confirmed by the operator schema design.
  3. No GPU required for the operators we'd actually use (preference, dialog, length, language-id, MinHash dedup). Matches our OpenRouter-API-driven, CPU-friendly architecture.
  4. YAML-recipe style lets us version the normalization graph as a config artifact alongside the recon doc, instead of as Python code that drifts.
  5. Lowest integration cost of the viable candidates — wraps around our existing pipeline rather than replacing it.
  6. Maturity: 6.4k stars, last push today, dedicated org, paper-backed.

3.2 Why not the others (one-liners)

  • datatrove: flat-text Document, lossy round-trip on chat structure → deal-breaker.
  • distilabel: would force a rewrite of teacher orchestration — too broad a refactor for "wrap normalization around the existing pipeline."
  • NeMo-Curator: best ops require GPUs; without them it offers no advantage over data-juicer.
  • lilac: archived.

3.3 Risk register

Risk Severity Mitigation
Data-juicer YAML recipe drift between dev and CI M Pin py-data-juicer version; commit recipe under replaysim/recipes/ and load via importlib.resources.
Some ops silently coerce conversation structure M Add a round-trip test: pair → normalize → pair must preserve messages, tool_calls, and arbitrary metadata.
Ray executor bloat if user enables it L Default to local Pandas executor; gate Ray behind an explicit flag.
pair_preference_mapper calls an LLM by default to synthesize rejected H We already have rejected from disagreement. Configure the mapper as a pass-through filter / use it only for refinement; if it can't be made non-LLM, fall back to a custom Mapper that just runs length/language/dedup checks on the existing pair. Verify in spike before locking in.
Apache-2.0 inbound license compatibility L Our framework is Apache-2.0. Compatible.
Op-graph executes per batch, not per sample, so a single bad pair stalls a batch L Use small Ray-Data batches (e.g. 64) so a stall is bounded.

3.4 Open spike question (must verify before merge)

The single risk worth a 1-day spike: does pair_preference_mapper accept a pre-existing rejected and only run validation/length/language filters, or does it always call an LLM to (re)synthesize a rejected response? Read the operator source in data_juicer/ops/mapper/pair_preference_mapper.py and confirm. If the latter, we wire our pre-existing rejected through optimize_response_mapper (refinement, not regeneration) plus a custom no-op preference validator. Either way, the integration shape below stands; only the recipe content changes.


4. Integration Sketch

4.1 Current pipeline (today)

TraceState
   │
   ▼  (per-trace, multi-teacher OpenRouter call)
replay_trace(state, teachers=[m1, m2, m3])
   │
   ▼  (returns: list[TeacherCompletion] keyed by model_id)
disagreement_score(completions)
   │
   ▼  (if score > τ)
extract_dpo_pairs(completions, state)
   │
   ▼  (yields)
DPOPair { prompt: messages[], chosen: messages[], rejected: messages[], state, meta }
   │
   ▼
write_jsonl(out_path)

4.2 Proposed pipeline (with data-juicer normalization op-graph)

TraceState
   │
   ▼
replay_trace(state, teachers)         ← unchanged
   │
   ▼
disagreement_score(completions)        ← unchanged
   │
   ▼
extract_dpo_pairs(completions, state)  ← unchanged
   │
   ▼
[NEW] DJNormalizer.normalize_batch(dpo_pairs) ──── loads recipe from
   │                                                replaysim/recipes/dpo_normalize.yaml
   │   data-juicer op-graph runs:
   │     1. text_length_filter (on chosen + rejected separately)
   │     2. language_id_score_filter (en-only or configured)
   │     3. dialog_topic_detection_mapper (annotates meta, no drop)
   │     4. minhash_deduplicator (on prompt+chosen serialization)
   │     5. (optional) optimize_response_mapper to clean trailing whitespace, code-block fences
   │     6. custom PreferenceValidator op (chosen != rejected, both non-empty,
   │        tool_calls structurally valid)
   ▼
write_jsonl(out_path)                  ← unchanged consumer

The op-graph is a wrapper around extract_dpo_pairs, not a replacement. replay_trace and extract_dpo_pairs keep their current signatures. The only call-site change in teacher_replay.py is one line:

# before:
pairs = list(extract_dpo_pairs(completions, state))
write_jsonl(out_path, pairs)

# after:
pairs = list(extract_dpo_pairs(completions, state))
pairs = DJNormalizer.from_recipe("dpo_normalize.yaml").normalize_batch(pairs)
write_jsonl(out_path, pairs)

4.3 Adapter shape (replaysim/normalize.py)

Realised in v0.1 (Wave 17 update): ADR-004 shipped with a different public surface than the sketch below. The actual API:

from composer_replication.replaysim import (
    replay_and_normalize_trace,    # convenience wrapper
    DJNormalizer,                  # the normalizer class
    DPOPair,                       # input TypedDict (from teacher_replay)
    NormalizedDPOPair,             # output TypedDict
    replay_trace, extract_dpo_pairs,  # re-exports of upstream stages
)

Key shape differences from the sketch:

  1. DPOPair is a TypedDict, not a dataclass. Its actual fields are {state_id: str, state_messages: list[dict], chosen: str, rejected: str, n_teachers_agreeing: int} (defined in composer_replication/teacher_replay.py:99) — not {prompt, chosen, rejected, state, meta}. The _to_dj/_from_dj sketch round-trip below would not type-check against the realised TypedDict.
  2. Recipe path is composer_replication/recipes/replaysim/default.yaml, not composer_replication/replaysim/recipes/dpo_normalize.yaml. There is no replaysim/recipes/ subpackage; recipes live under the top-level recipes/ tree.
  3. No composer_replication/replaysim/ops/ subpackage exists. The custom op file preference_validator.py was not created; data-juicer's stock ops + the framework's own validation in DJNormalizer covered the requirement.
  4. The integration hook is replay_and_normalize_trace(...) in composer_replication/replaysim/__init__.py (re-exported from normalize.py). It wraps the existing replay_trace + extract_dpo_pairs flow without modifying teacher_replay.py. There is no separate composer_replication/replaysim/teacher_replay.pyteacher_replay lives at top-level composer_replication/teacher_replay.py.

The pre-spike sketch below is preserved as historical proposal context. It documents the shape of thinking that fed ADR-004; the realised code is the source of truth for the adapter contract.

# composer_replication/replaysim/normalize.py
from __future__ import annotations
from dataclasses import asdict
from importlib.resources import files
from typing import Iterable

from data_juicer.config import init_configs
from data_juicer.core.executor import DefaultExecutor
from data_juicer.format import load_formatter

from .types import DPOPair


class DJNormalizer:
    """Wraps a data-juicer op-graph as a batch normalization step over
    DPOPair samples produced by extract_dpo_pairs.

    The recipe (YAML) declares the op sequence. Operators consume and
    produce the data-juicer conversation schema, which we convert to
    and from our internal DPOPair on the boundary.
    """

    def __init__(self, recipe_path: str):
        cfg = init_configs(["--config", recipe_path])
        self._executor = DefaultExecutor(cfg)

    @classmethod
    def from_recipe(cls, name: str) -> "DJNormalizer":
        recipe = files("composer_replication.replaysim.recipes") / name
        return cls(str(recipe))

    @staticmethod
    def _to_dj(p: DPOPair) -> dict:
        # data-juicer preference schema:
        #   {"prompt": str-or-messages, "chosen": str-or-messages,
        #    "rejected": str-or-messages, "meta": {...}}
        return {
            "prompt": p.prompt,        # messages[]
            "chosen": p.chosen,        # messages[]
            "rejected": p.rejected,    # messages[]
            "meta": {
                "trace_id": p.state.trace_id,
                "teachers": p.meta.get("teachers", []),
                "disagreement": p.meta.get("disagreement"),
                **p.meta,
            },
        }

    @staticmethod
    def _from_dj(s: dict) -> DPOPair:
        return DPOPair(
            prompt=s["prompt"],
            chosen=s["chosen"],
            rejected=s["rejected"],
            state=...,           # rehydrate from meta.trace_id + cache
            meta=s.get("meta", {}),
        )

    def normalize_batch(self, pairs: Iterable[DPOPair]) -> list[DPOPair]:
        in_records = [self._to_dj(p) for p in pairs]
        # Build an in-memory DJDataset from records (no disk round-trip).
        ds = self._executor.formatter.load_dataset_from_records(in_records)
        ds = self._executor.run(dataset=ds)
        out_records = ds.to_list()
        return [self._from_dj(r) for r in out_records]

4.4 Recipe (replaysim/recipes/dpo_normalize.yaml)

# data-juicer recipe for normalizing replaysim DPO output
project_name: replaysim_dpo_normalize
executor_type: default          # local Pandas; switch to 'ray' for distributed
np: 4

# Conversation/preference schema mode
text_keys: ['chosen', 'rejected']    # ops scan both response variants
suffixes: ['.jsonl']

process:
  # 1. Length sanity on each response variant
  - text_length_filter:
      text_key: chosen
      min_len: 10
      max_len: 16384
  - text_length_filter:
      text_key: rejected
      min_len: 10
      max_len: 16384

  # 2. Language gate (configurable; default English-only)
  - language_id_score_filter:
      text_key: chosen
      lang: en
      min_score: 0.6

  # 3. Dialog topic annotation (no drop, just attaches meta.topic)
  - dialog_topic_detection_mapper:
      api_or_hf_model: openrouter:openai/gpt-4o-mini
      mode: annotate

  # 4. Near-duplicate removal across the batch on (prompt + chosen)
  - document_minhash_deduplicator:
      tokenization: space
      window_size: 5
      num_permutations: 256
      jaccard_threshold: 0.85
      text_key: chosen

  # 5. Custom preference validator (chosen != rejected, structural integrity)
  - preference_validator_filter:        # module: composer_replication.replaysim.ops
      check_distinct: true
      check_tool_calls_valid: true

A custom op preference_validator_filter lives in composer_replication/replaysim/ops/preference_validator.py and is registered via data-juicer's plugin entry point.

4.5 Hook into teacher_replay.py

# composer_replication/replaysim/teacher_replay.py (delta)

from .normalize import DJNormalizer

def run_replay(traces, teachers, out_path, *, normalize: bool = True):
    pairs: list[DPOPair] = []
    for state in traces:
        completions = replay_trace(state, teachers=teachers)
        if disagreement_score(completions) <= TAU:
            continue
        pairs.extend(extract_dpo_pairs(completions, state))

    if normalize:
        norm = DJNormalizer.from_recipe("dpo_normalize.yaml")
        pairs = norm.normalize_batch(pairs)

    write_jsonl(out_path, pairs)

The normalize=True flag keeps the old code-path one negation away during initial rollout.

4.6 Test plan (tests/replaysim/test_normalize.py)

  1. Round-trip preservation: synthesize a DPOPair with tool_calls, run through DJNormalizer.normalize_batch, assert tool-call structure and arbitrary meta keys are preserved.
  2. Length filter: a pair with empty chosen is dropped.
  3. Language filter: a non-English chosen (Cyrillic) below the score threshold is dropped.
  4. Near-duplicate: two pairs with identical chosen collapse to one.
  5. Distinctness: a pair where chosen == rejected is dropped by preference_validator_filter.
  6. Multi-turn: a 3-turn conversation in prompt survives end-to-end with role+content intact.
  7. Recipe loading: DJNormalizer.from_recipe("dpo_normalize.yaml") works with importlib.resources regardless of install location.

5. ADR-004 Implications

ADR-004 (the umbrella ADR for "replaysim with normalization") should record:

  • Decision: adopt data-juicer (datajuicer/data-juicer, Apache-2.0) as the normalization op-graph layer.
  • Status: proposed; promote to accepted after the spike on pair_preference_mapper.
  • Consequences:
    • New runtime dependency: py-data-juicer (transitively pulls pyarrow, datasets, loguru, jsonargparse).
    • Optional ray extra for distributed execution; not enabled by default.
    • replaysim/recipes/*.yaml becomes a versioned config artifact; recipe changes must accompany behavioral-test updates.
    • Tool-call and multi-turn structure preserved through normalization — verified by round-trip test.
  • Alternatives considered: distilabel (too broad — would replace generation orchestration), datatrove (flat-text only — deal-breaker), NeMo-Curator (GPU-bound), lilac (archived).

6. Primary-source citations

Claim Source
datatrove license, last push, archived state https://api.github.com/repos/huggingface/datatrove (license.spdx_id, pushed_at, archived)
datatrove Document is text+metadata, no messages field; built-in filters operate on doc.text DeepWiki index of huggingface/datatrove, src/datatrove/data.py, src/datatrove/pipeline/filters/c4_filters.py
datatrove multi-turn only via InferenceRunner.rollout_fn DeepWiki index of huggingface/datatrove, src/datatrove/pipeline/inference/run_inference.py
data-juicer license, last push, redirect to datajuicer/data-juicer https://api.github.com/repos/modelscope/data-juicer (resolves to datajuicer/data-juicer)
data-juicer supports messages: [{role, content}] and Data-Juicer dialog format {query, response, history} DeepWiki index of modelscope/data-juicer
pair_preference_mapper synthesizes rejected_response and reason DeepWiki index of modelscope/data-juicer, data_juicer/ops/mapper/pair_preference_mapper.py
data-juicer GPU-required ops are tagged 🚀GPU (image/video/multi-modal); core text + dialog mappers are CPU-OK DeepWiki index of modelscope/data-juicer
NeMo-Curator license, last push, redirect to NVIDIA-NeMo/Curator https://api.github.com/repos/NVIDIA/NeMo-Curator
NeMo-Curator semantic dedup is GPU-only; CPU install drops differentiating ops DeepWiki index of NVIDIA/NeMo-Curator
distilabel license, last push, DAG model, FormatChatGenerationDPO, MinHashDedup, DeitaFiltering https://api.github.com/repos/argilla-io/distilabel; DeepWiki index of argilla-io/distilabel
databricks/lilac archived 2024-03-19 https://api.github.com/repos/databricks/lilac (archived: true, pushed_at: "2024-03-19T12:41:30Z")
lilacai/lilac is a 2-star squatter stub created 2025-11-14 https://api.github.com/repos/lilacai/lilac

7. Confirmed output path

File: /home/codeseys/.hermes/hermes-agent/docs/research/REPLAYSIM_NORMALIZATION_RECONNAISSANCE.md Length: ≤600 lines (this file).