| | """ |
| | BitTransformerLM Dataset Builder & HuggingFace Integration |
| | |
| | Creates curated datasets optimized for bit-native transformer training with |
| | comprehensive safety benchmarks, scaling curricula, and progressive complexity. |
| | """ |
| |
|
| | import os |
| | import json |
| | import gzip |
| | import random |
| | from typing import List, Dict, Any, Optional, Tuple |
| | from pathlib import Path |
| | from datetime import datetime |
| | import tempfile |
| |
|
| | import torch |
| | import numpy as np |
| | from datasets import Dataset, DatasetDict |
| | from huggingface_hub import HfApi, login, create_repo |
| |
|
| | from .bit_io import text_to_bits, bits_to_text |
| | from .parity import enforce_parity as _enforce_parity_tensor |
| | from .compression import compress_bits |
| | |
| |
|
| | |
| | def compute_negentropy(bit_tensor: torch.Tensor) -> float: |
| | """Compute negentropy (departure from randomness) of bit sequence.""" |
| | if len(bit_tensor) == 0: |
| | return 0.0 |
| | |
| | |
| | p_1 = bit_tensor.float().mean() |
| | p_0 = 1.0 - p_1 |
| | |
| | |
| | p_1 = torch.clamp(p_1, min=1e-7, max=1.0-1e-7) |
| | p_0 = torch.clamp(p_0, min=1e-7, max=1.0-1e-7) |
| | |
| | |
| | entropy = -(p_1 * torch.log2(p_1) + p_0 * torch.log2(p_0)) |
| | |
| | |
| | max_entropy = 1.0 |
| | negentropy = (max_entropy - entropy) / max_entropy |
| | |
| | return float(negentropy) |
| |
|
| |
|
| | def compute_lz_complexity(bits: List[int]) -> float: |
| | """Compute approximation of Lempel-Ziv complexity.""" |
| | if not bits: |
| | return 0.0 |
| | |
| | |
| | runs = [] |
| | if bits: |
| | current_run = 1 |
| | for i in range(1, len(bits)): |
| | if bits[i] == bits[i-1]: |
| | current_run += 1 |
| | else: |
| | runs.append(current_run) |
| | current_run = 1 |
| | runs.append(current_run) |
| | |
| | if not runs: |
| | return 0.0 |
| | |
| | |
| | complexity = len(runs) / len(bits) |
| | return min(1.0, complexity * 2) |
| |
|
| |
|
| | def compute_symbiosis(bit_tensor1: torch.Tensor, bit_tensor2: torch.Tensor) -> float: |
| | """Compute symbiosis score between two bit sequences.""" |
| | if len(bit_tensor1) != len(bit_tensor2) or len(bit_tensor1) == 0: |
| | return 0.0 |
| | |
| | |
| | corr = torch.corrcoef(torch.stack([bit_tensor1.float(), bit_tensor2.float()]))[0, 1] |
| | |
| | |
| | if torch.isnan(corr): |
| | return 0.0 |
| | |
| | |
| | symbiosis = (corr + 1) / 2 |
| | return float(symbiosis) |
| |
|
| |
|
| | def enforce_parity(bits: List[int]) -> List[int]: |
| | """Simple parity wrapper for lists.""" |
| | if not bits: |
| | return bits |
| | |
| | |
| | while len(bits) % 9 != 0: |
| | bits.append(0) |
| | |
| | |
| | try: |
| | bits_tensor = torch.tensor(bits, dtype=torch.long) |
| | corrected_tensor, _ = _enforce_parity_tensor(bits_tensor) |
| | return corrected_tensor.tolist() |
| | except: |
| | |
| | return bits |
| |
|
| |
|
| | class BitTransformerDatasetBuilder: |
| | """ |
| | Comprehensive dataset builder for BitTransformerLM training. |
| | |
| | Generates: |
| | - Binary sequences with parity protection |
| | - Progressive complexity curricula |
| | - Safety benchmark validation sets |
| | - Synthetic bit patterns for robustness |
| | - Compressed sequence variants |
| | """ |
| | |
| | def __init__(self, hf_token: str, repo_id: str = "BitTransformerLM"): |
| | """Initialize with HuggingFace credentials.""" |
| | self.hf_token = hf_token |
| | self.repo_id = repo_id |
| | self.api = HfApi() |
| | |
| | |
| | login(token=hf_token) |
| | |
| | |
| | self.config = { |
| | "version": "1.0.0", |
| | "created": datetime.now().isoformat(), |
| | "model_compatibility": "BitTransformerLM", |
| | "bit_encoding": "parity_protected", |
| | "max_sequence_length": 512, |
| | "total_samples": 50000, |
| | "safety_thresholds": { |
| | "min_negentropy": 0.1, |
| | "max_lz_complexity": 0.9, |
| | "min_symbiosis": 0.3 |
| | } |
| | } |
| | |
| | def generate_text_to_bits_data(self, texts: List[str], max_len: int = 512) -> List[Dict]: |
| | """Convert text samples to parity-protected bit sequences.""" |
| | samples = [] |
| | |
| | for i, text in enumerate(texts): |
| | try: |
| | |
| | bits = text_to_bits(text)[:max_len] |
| | bits = enforce_parity(bits) |
| | |
| | |
| | if len(bits) < max_len: |
| | bits.extend([0] * (max_len - len(bits))) |
| | |
| | |
| | bit_tensor = torch.tensor(bits, dtype=torch.float32) |
| | negentropy = compute_negentropy(bit_tensor) |
| | lz_complexity = compute_lz_complexity(bits) |
| | |
| | |
| | sample = { |
| | "id": f"text_to_bits_{i:06d}", |
| | "original_text": text[:100] + "..." if len(text) > 100 else text, |
| | "bit_sequence": bits, |
| | "sequence_length": len([b for b in bits if b != 0]), |
| | "negentropy": float(negentropy), |
| | "lz_complexity": float(lz_complexity), |
| | "has_parity": True, |
| | "category": "text_conversion", |
| | |
| | "pattern_type": None, |
| | "safety_category": None, |
| | "target_negentropy": None, |
| | "target_complexity": None, |
| | "original_id": None, |
| | "compression_ratio": None, |
| | "original_length": None |
| | } |
| | samples.append(sample) |
| | |
| | except Exception as e: |
| | print(f"Error processing text {i}: {e}") |
| | continue |
| | |
| | return samples |
| | |
| | def generate_synthetic_patterns(self, num_samples: int = 5000, max_len: int = 512) -> List[Dict]: |
| | """Generate synthetic bit patterns for robustness testing.""" |
| | samples = [] |
| | |
| | patterns = [ |
| | "alternating", |
| | "blocks", |
| | "fibonacci", |
| | "prime_based", |
| | "random_walk", |
| | "spiral", |
| | "fractal", |
| | ] |
| | |
| | for i in range(num_samples): |
| | pattern_type = random.choice(patterns) |
| | bits = self._generate_pattern(pattern_type, max_len) |
| | bits = enforce_parity(bits) |
| | |
| | |
| | bit_tensor = torch.tensor(bits, dtype=torch.float32) |
| | negentropy = compute_negentropy(bit_tensor) |
| | lz_complexity = compute_lz_complexity(bits) |
| | |
| | sample = { |
| | "id": f"synthetic_{pattern_type}_{i:06d}", |
| | "bit_sequence": bits, |
| | "sequence_length": len([b for b in bits if b != 0]), |
| | "negentropy": float(negentropy), |
| | "lz_complexity": float(lz_complexity), |
| | "pattern_type": pattern_type, |
| | "has_parity": True, |
| | "category": "synthetic_pattern", |
| | |
| | "original_text": None, |
| | "safety_category": None, |
| | "target_negentropy": None, |
| | "target_complexity": None, |
| | "original_id": None, |
| | "compression_ratio": None, |
| | "original_length": None |
| | } |
| | samples.append(sample) |
| | |
| | return samples |
| | |
| | def generate_safety_benchmarks(self, num_samples: int = 2000) -> List[Dict]: |
| | """Generate sequences specifically for safety metric validation.""" |
| | samples = [] |
| | |
| | |
| | safety_targets = [ |
| | ("low_entropy", {"target_negentropy": 0.05, "target_complexity": 0.2}), |
| | ("medium_entropy", {"target_negentropy": 0.5, "target_complexity": 0.5}), |
| | ("high_entropy", {"target_negentropy": 0.95, "target_complexity": 0.8}), |
| | ("edge_cases", {"target_negentropy": 0.99, "target_complexity": 0.99}), |
| | ] |
| | |
| | samples_per_target = num_samples // len(safety_targets) |
| | |
| | for safety_type, targets in safety_targets: |
| | for i in range(samples_per_target): |
| | bits = self._generate_safety_controlled_sequence( |
| | targets["target_negentropy"], |
| | targets["target_complexity"] |
| | ) |
| | bits = enforce_parity(bits) |
| | |
| | |
| | bit_tensor = torch.tensor(bits, dtype=torch.float32) |
| | actual_negentropy = compute_negentropy(bit_tensor) |
| | actual_complexity = compute_lz_complexity(bits) |
| | |
| | sample = { |
| | "id": f"safety_{safety_type}_{i:06d}", |
| | "bit_sequence": bits, |
| | "sequence_length": len(bits), |
| | "negentropy": float(actual_negentropy), |
| | "lz_complexity": float(actual_complexity), |
| | "target_negentropy": targets["target_negentropy"], |
| | "target_complexity": targets["target_complexity"], |
| | "safety_category": safety_type, |
| | "has_parity": True, |
| | "category": "safety_benchmark", |
| | |
| | "original_text": None, |
| | "pattern_type": None, |
| | "original_id": None, |
| | "compression_ratio": None, |
| | "original_length": None |
| | } |
| | samples.append(sample) |
| | |
| | return samples |
| | |
| | def generate_compression_variants(self, base_samples: List[Dict], |
| | compression_ratios: List[float] = [0.5, 0.7, 0.9]) -> List[Dict]: |
| | """Generate compressed variants of base sequences.""" |
| | compressed_samples = [] |
| | |
| | for ratio in compression_ratios: |
| | for sample in base_samples[:1000]: |
| | try: |
| | original_bits = sample["bit_sequence"] |
| | |
| | bits_tensor = torch.tensor(original_bits, dtype=torch.uint8) |
| | compressed_tensor = compress_bits(bits_tensor) |
| | compressed_bits = compressed_tensor.tolist() |
| | compressed_bits = enforce_parity(compressed_bits) |
| | |
| | |
| | bit_tensor = torch.tensor(compressed_bits, dtype=torch.float32) |
| | negentropy = compute_negentropy(bit_tensor) |
| | lz_complexity = compute_lz_complexity(compressed_bits) |
| | |
| | compressed_sample = { |
| | "id": f"{sample['id']}_compressed_{ratio}", |
| | "original_id": sample["id"], |
| | "bit_sequence": compressed_bits, |
| | "sequence_length": len(compressed_bits), |
| | "negentropy": float(negentropy), |
| | "lz_complexity": float(lz_complexity), |
| | "compression_ratio": ratio, |
| | "original_length": len(original_bits), |
| | "has_parity": True, |
| | "category": "compressed_variant", |
| | |
| | "original_text": None, |
| | "pattern_type": None, |
| | "safety_category": None, |
| | "target_negentropy": None, |
| | "target_complexity": None |
| | } |
| | compressed_samples.append(compressed_sample) |
| | |
| | except Exception as e: |
| | continue |
| | |
| | return compressed_samples |
| | |
| | def _generate_pattern(self, pattern_type: str, length: int) -> List[int]: |
| | """Generate specific bit patterns.""" |
| | if pattern_type == "alternating": |
| | return [i % 2 for i in range(length)] |
| | |
| | elif pattern_type == "blocks": |
| | block_size = random.randint(3, 8) |
| | pattern = [] |
| | current_bit = 0 |
| | for i in range(length): |
| | if i % block_size == 0: |
| | current_bit = 1 - current_bit |
| | pattern.append(current_bit) |
| | return pattern |
| | |
| | elif pattern_type == "fibonacci": |
| | |
| | fib = [0, 1] |
| | while len(fib) < length: |
| | fib.append((fib[-1] + fib[-2]) % 2) |
| | return fib[:length] |
| | |
| | elif pattern_type == "prime_based": |
| | |
| | primes = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31] |
| | pattern = [] |
| | for i in range(length): |
| | is_prime_related = any((i + 1) % p == 0 for p in primes[:5]) |
| | pattern.append(1 if is_prime_related else 0) |
| | return pattern |
| | |
| | elif pattern_type == "random_walk": |
| | |
| | pattern = [random.randint(0, 1)] |
| | for i in range(1, length): |
| | |
| | if random.random() < 0.7: |
| | pattern.append(pattern[-1]) |
| | else: |
| | pattern.append(1 - pattern[-1]) |
| | return pattern |
| | |
| | else: |
| | |
| | return [random.randint(0, 1) for _ in range(length)] |
| | |
| | def _generate_safety_controlled_sequence(self, target_negentropy: float, |
| | target_complexity: float, length: int = 256) -> List[int]: |
| | """Generate bit sequence targeting specific safety metrics.""" |
| | |
| | if target_negentropy < 0.3: |
| | base_pattern = [0] * (length // 2) + [1] * (length // 2) |
| | elif target_negentropy > 0.7: |
| | base_pattern = [random.randint(0, 1) for _ in range(length)] |
| | else: |
| | block_size = max(1, int(10 * (1 - target_complexity))) |
| | base_pattern = [] |
| | current = 0 |
| | for i in range(length): |
| | if i % block_size == 0: |
| | current = random.randint(0, 1) |
| | base_pattern.append(current) |
| | |
| | |
| | noise_level = max(0.1, target_complexity) |
| | final_pattern = [] |
| | for bit in base_pattern: |
| | if random.random() < noise_level: |
| | final_pattern.append(1 - bit) |
| | else: |
| | final_pattern.append(bit) |
| | |
| | return final_pattern |
| | |
| | def build_complete_dataset(self, source_texts: Optional[List[str]] = None) -> DatasetDict: |
| | """Build the complete BitTransformerLM dataset.""" |
| | print("🚀 Building BitTransformerLM Dataset...") |
| | |
| | |
| | if source_texts is None: |
| | source_texts = self._get_default_texts() |
| | |
| | all_samples = [] |
| | |
| | |
| | print("📝 Generating text-to-bits samples...") |
| | text_samples = self.generate_text_to_bits_data(source_texts[:10000]) |
| | all_samples.extend(text_samples) |
| | |
| | |
| | print("🎨 Generating synthetic patterns...") |
| | synthetic_samples = self.generate_synthetic_patterns(7500) |
| | all_samples.extend(synthetic_samples) |
| | |
| | |
| | print("🛡️ Generating safety benchmarks...") |
| | safety_samples = self.generate_safety_benchmarks(5000) |
| | all_samples.extend(safety_samples) |
| | |
| | |
| | print("🗜️ Generating compression variants...") |
| | compression_samples = self.generate_compression_variants(text_samples[:1000]) |
| | all_samples.extend(compression_samples) |
| | |
| | |
| | random.shuffle(all_samples) |
| | |
| | total = len(all_samples) |
| | train_split = int(0.8 * total) |
| | val_split = int(0.9 * total) |
| | |
| | train_data = all_samples[:train_split] |
| | val_data = all_samples[train_split:val_split] |
| | test_data = all_samples[val_split:] |
| | |
| | |
| | dataset_dict = DatasetDict({ |
| | 'train': Dataset.from_list(train_data), |
| | 'validation': Dataset.from_list(val_data), |
| | 'test': Dataset.from_list(test_data) |
| | }) |
| | |
| | print(f"✅ Dataset built: {len(train_data)} train, {len(val_data)} val, {len(test_data)} test") |
| | return dataset_dict |
| | |
| | def _get_default_texts(self) -> List[str]: |
| | """Get default text corpus for bit conversion.""" |
| | |
| | texts = [ |
| | "The quick brown fox jumps over the lazy dog.", |
| | "In the beginning was the Word, and the Word was with God.", |
| | "To be or not to be, that is the question.", |
| | "I think, therefore I am.", |
| | "The only thing we have to fear is fear itself.", |
| | "Ask not what your country can do for you.", |
| | "E = mc²", |
| | "The mitochondria is the powerhouse of the cell.", |
| | "SELECT * FROM users WHERE active = 1;", |
| | "def fibonacci(n): return n if n < 2 else fibonacci(n-1) + fibonacci(n-2)", |
| | "Binary trees are hierarchical data structures.", |
| | "The entropy of a system tends to increase over time.", |
| | ] |
| | |
| | |
| | expanded_texts = texts.copy() |
| | for i in range(500): |
| | |
| | combined = " ".join(random.sample(texts, random.randint(2, 4))) |
| | expanded_texts.append(combined) |
| | |
| | |
| | if i % 50 == 0: |
| | expanded_texts.append(f"Sample {i}: " + random.choice(texts)) |
| | |
| | return expanded_texts |
| | |
| | def upload_to_huggingface(self, dataset: DatasetDict, |
| | private: bool = True) -> str: |
| | """Upload dataset to HuggingFace Hub.""" |
| | print(f"🌐 Uploading to HuggingFace: {self.repo_id}") |
| | |
| | try: |
| | |
| | create_repo( |
| | repo_id=self.repo_id, |
| | repo_type="dataset", |
| | private=private, |
| | exist_ok=True, |
| | token=self.hf_token |
| | ) |
| | |
| | |
| | dataset_info = { |
| | "dataset_info": self.config, |
| | "splits": { |
| | "train": len(dataset["train"]), |
| | "validation": len(dataset["validation"]), |
| | "test": len(dataset["test"]) |
| | }, |
| | "features": { |
| | "id": "string", |
| | "bit_sequence": "list of integers (0/1)", |
| | "sequence_length": "integer", |
| | "negentropy": "float", |
| | "lz_complexity": "float", |
| | "category": "string", |
| | "has_parity": "boolean" |
| | }, |
| | "usage_notes": [ |
| | "Optimized for BitTransformerLM bit-native training", |
| | "All sequences include parity protection", |
| | "Safety metrics (K/C/S) computed for each sample", |
| | "Supports progressive curriculum learning" |
| | ] |
| | } |
| | |
| | |
| | dataset.push_to_hub( |
| | repo_id=self.repo_id, |
| | token=self.hf_token, |
| | private=private |
| | ) |
| | |
| | |
| | with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: |
| | json.dump(dataset_info, f, indent=2) |
| | self.api.upload_file( |
| | path_or_fileobj=f.name, |
| | path_in_repo="dataset_info.json", |
| | repo_id=self.repo_id, |
| | repo_type="dataset", |
| | token=self.hf_token |
| | ) |
| | |
| | print(f"✅ Dataset uploaded successfully to: https://huggingface.co/datasets/{self.repo_id}") |
| | return f"https://huggingface.co/datasets/{self.repo_id}" |
| | |
| | except Exception as e: |
| | print(f"❌ Upload failed: {e}") |
| | raise |
| |
|
| |
|
| | def create_bittransformerlm_dataset(hf_token: str, |
| | repo_id: str = "BitTransformerLM", |
| | source_texts: Optional[List[str]] = None) -> str: |
| | """ |
| | Convenience function to create and upload BitTransformerLM dataset. |
| | |
| | Args: |
| | hf_token: HuggingFace access token |
| | repo_id: Dataset repository ID |
| | source_texts: Optional list of source texts for conversion |
| | |
| | Returns: |
| | URL to the uploaded dataset |
| | """ |
| | builder = BitTransformerDatasetBuilder(hf_token, repo_id) |
| | dataset = builder.build_complete_dataset(source_texts) |
| | return builder.upload_to_huggingface(dataset, private=True) |