MTP-3 / tokenizer.py
teszenofficial's picture
Upload 6 files
563bb6a verified
import sentencepiece as spm
import os
import json
class MTPTokenizer:
"""Tokenizer using SentencePiece BPE - Optimizado para formato instruction-response"""
def __init__(self, model_path=None):
self.sp = None
self.model_path = model_path
if model_path and os.path.exists(model_path):
self.load(model_path)
def train(self, corpus_path, vocab_size=4000, model_prefix='mtp_tokenizer'):
"""Train SentencePiece BPE tokenizer en corpus con formato JSONL"""
# Extraer texto de corpus JSONL
texts = []
print(f" → Procesando corpus para entrenar tokenizer...")
with open(corpus_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
# Agregar todos los campos de texto disponibles
if 'instruction' in data:
texts.append(data['instruction'].strip())
if 'context' in data and data['context'].strip():
texts.append(data['context'].strip())
if 'response' in data:
texts.append(data['response'].strip())
except json.JSONDecodeError:
continue
# Filtrar textos vacíos
texts = [t for t in texts if t and t.strip()]
if not texts:
raise ValueError("No se encontraron textos válidos en el corpus")
# Guardar archivo temporal
temp_file = 'temp_corpus.txt'
with open(temp_file, 'w', encoding='utf-8') as f:
for text in texts:
f.write(text + '\n')
# Estadísticas
total_chars = sum(len(text) for text in texts)
max_vocab = min(vocab_size, max(256, int(total_chars * 0.15))) # Heurística mejorada
print(f" → Corpus stats: {len(texts)} textos, {total_chars} caracteres")
print(f" → Vocabulario ajustado: {max_vocab} (solicitado: {vocab_size})")
# Parámetros optimizados para Q&A
spm.SentencePieceTrainer.train(
input=temp_file,
model_prefix=model_prefix,
vocab_size=max_vocab,
model_type='bpe',
pad_id=0,
unk_id=1,
bos_id=2,
eos_id=3,
character_coverage=1.0,
normalization_rule_name='identity', # No normalizar para mantener formato
num_threads=4,
split_digits=True,
allow_whitespace_only_pieces=False,
byte_fallback=False,
max_sentencepiece_length=16,
add_dummy_prefix=False, # Importante para mantener inicio de textos
remove_extra_whitespaces=False # Mantener formato exacto
)
# Limpiar
os.remove(temp_file)
# Cargar modelo entrenado
self.model_path = f"{model_prefix}.model"
self.load(self.model_path)
print(f"✓ Tokenizer entrenado: {self.vocab_size()} tokens")
print(f"✓ Modelo guardado: {self.model_path}")
def load(self, model_path):
"""Load trained tokenizer"""
self.sp = spm.SentencePieceProcessor()
self.sp.load(model_path)
self.model_path = model_path
def encode(self, text):
"""Encode text to token IDs"""
if self.sp is None:
raise ValueError("Tokenizer not loaded. Train or load a model first.")
return self.sp.encode_as_ids(text)
def decode(self, ids):
"""Decode token IDs to text"""
if self.sp is None:
raise ValueError("Tokenizer not loaded. Train or load a model first.")
return self.sp.decode_ids(ids)
def vocab_size(self):
"""Get vocabulary size"""
if self.sp is None:
return 0
return self.sp.get_piece_size()
def bos_id(self):
"""Beginning of sentence token ID"""
return self.sp.bos_id()
def eos_id(self):
"""End of sentence token ID"""
return self.sp.eos_id()
def pad_id(self):
"""Padding token ID"""
return self.sp.pad_id()
def unk_id(self):
"""Unknown token ID"""
return self.sp.unk_id()