"""Small reference decoder for bigOEIS `.packed` files. On disk, each token is stored in a 4-bit nibble: 0..9 decimal digits 10 term separator, i.e. comma 11 negative sign 14 final padding nibble, if the file has an odd nibble count 15 sequence delimiter / EOS Note that the packed disk codes are intentionally compact and are not exactly the model vocabulary ids: the model uses NEG=10 and SEP=11. Use `iter_model_token_rows()` when you want rows in model-token-id space. """ from __future__ import annotations import argparse import json from pathlib import Path from typing import Iterator PACKED_SEP = 10 PACKED_NEG = 11 PACKED_PAD = 14 PACKED_DELIM = 15 MODEL_NEG = 10 MODEL_SEP = 11 MODEL_BOS = 12 MODEL_EOS = 13 def iter_packed_nibbles(path: str | Path, chunk_size: int = 8 * 1024 * 1024) -> Iterator[int]: """Yield high nibble then low nibble for every byte in `path`.""" with Path(path).open("rb") as f: while True: chunk = f.read(chunk_size) if not chunk: return for byte in chunk: yield byte >> 4 yield byte & 0x0F def iter_model_token_rows( path: str | Path, *, include_bos_eos: bool = True, max_rows: int | None = None, strict: bool = True, ) -> Iterator[list[int]]: """Yield each packed sequence as model-token ids. By default rows include BOS/EOS, matching `tokenize_utils.tokenize_sequence`. Set `include_bos_eos=False` to get only the content tokens. """ row: list[int] = [MODEL_BOS] if include_bos_eos else [] yielded = 0 seen_pad = False for nib in iter_packed_nibbles(path): if nib == PACKED_PAD: seen_pad = True continue if seen_pad: if strict: raise ValueError("Found non-pad nibble after final packed padding.") continue if 0 <= nib <= 9: row.append(nib) elif nib == PACKED_SEP: row.append(MODEL_SEP) elif nib == PACKED_NEG: row.append(MODEL_NEG) elif nib == PACKED_DELIM: if include_bos_eos: row.append(MODEL_EOS) yield row yielded += 1 if max_rows is not None and yielded >= max_rows: return row = [MODEL_BOS] if include_bos_eos else [] else: if strict: raise ValueError(f"Invalid packed nibble: {nib}") empty = [MODEL_BOS] if include_bos_eos else [] if strict and row != empty: raise ValueError("Packed file ended with an unterminated sequence.") def iter_integer_sequences( path: str | Path, *, as_ints: bool = False, max_rows: int | None = None, strict: bool = True, ) -> Iterator[list[int] | list[str]]: """Yield decoded OEIS rows. Values are strings by default so enormous integers round-trip exactly through JSON. Pass `as_ints=True` if Python integers are more convenient. """ terms: list[str] = [] chars: list[str] = [] yielded = 0 seen_pad = False def finish_term() -> None: if chars: terms.append("".join(chars)) chars.clear() elif strict: raise ValueError("Encountered an empty term in packed sequence.") for nib in iter_packed_nibbles(path): if nib == PACKED_PAD: seen_pad = True continue if seen_pad: if strict: raise ValueError("Found non-pad nibble after final packed padding.") continue if 0 <= nib <= 9: chars.append(str(nib)) elif nib == PACKED_NEG: if strict and chars: raise ValueError("Found a negative sign after term digits had started.") chars.append("-") elif nib == PACKED_SEP: finish_term() elif nib == PACKED_DELIM: if chars: finish_term() row = [int(term) for term in terms] if as_ints else list(terms) yield row yielded += 1 if max_rows is not None and yielded >= max_rows: return terms.clear() else: if strict: raise ValueError(f"Invalid packed nibble: {nib}") if strict and (terms or chars): raise ValueError("Packed file ended with an unterminated sequence.") def _main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("packed_file", help="Path to a .packed file") parser.add_argument("-n", "--max-rows", type=int, default=5) parser.add_argument("--tokens", action="store_true", help="Print model-token rows instead of decoded terms") parser.add_argument("--content-only", action="store_true", help="Omit BOS/EOS when printing token rows") parser.add_argument("--ints", action="store_true", help="Emit decoded terms as JSON numbers instead of strings") parser.add_argument("--no-strict", action="store_true", help="Ignore invalid/trailing data instead of raising") args = parser.parse_args() strict = not args.no_strict if args.tokens: rows = iter_model_token_rows( args.packed_file, include_bos_eos=not args.content_only, max_rows=args.max_rows, strict=strict, ) for row in rows: print(json.dumps({"tokens": row}, separators=(",", ":"))) else: rows = iter_integer_sequences( args.packed_file, as_ints=args.ints, max_rows=args.max_rows, strict=strict, ) for row in rows: print(json.dumps({"seq": row}, separators=(",", ":"))) if __name__ == "__main__": _main()