| import os |
| from transformers import ( |
| PretrainedConfig, |
| PreTrainedModel, |
| AutoConfig, |
| ) |
| from huggingface_hub import snapshot_download |
| from typing import Optional |
| from transformers.utils import is_flash_attn_2_available |
| from .utils import ( |
| get_decoder_model, |
| prepare_tokenizer, |
| splade_max, |
| similarity, |
| encode, |
| ) |
| from peft import PeftModel |
|
|
|
|
| class SpladeConfig(PretrainedConfig): |
| model_type = "splade" |
|
|
| def __init__( |
| self, |
| model_name_or_path: str = "Qwen/Qwen3-0.6B", |
| attn_implementation: str = "flash_attention_2", |
| bidirectional: bool = True, |
| padding_side: str = "left", |
| **kwargs, |
| ): |
| super().__init__(**kwargs) |
| self.model_name_or_path = model_name_or_path |
| self.attn_implementation = attn_implementation |
| self.bidirectional = bidirectional |
| self.padding_side = padding_side |
|
|
|
|
| class Splade(PreTrainedModel): |
| config_class = SpladeConfig |
|
|
| |
| similarity = similarity |
| encode = encode |
|
|
| def __init__(self, config, weights_path=None, token=None): |
| super().__init__(config) |
| self.name = "splade" |
| |
| base_cfg = AutoConfig.from_pretrained( |
| config.model_name_or_path, |
| attn_implementation=config.attn_implementation, |
| torch_dtype="auto", |
| ) |
| |
| self.tokenizer = prepare_tokenizer( |
| config.model_name_or_path, padding_side=config.padding_side |
| ) |
| |
| if is_flash_attn_2_available(): |
| config.attn_implementation = "flash_attention_2" |
| else: |
| config.attn_implementation = "sdpa" |
|
|
| source = weights_path or config.model_name_or_path |
| |
| self.model = get_decoder_model( |
| model_name_or_path=source, |
| attn_implementation=config.attn_implementation, |
| bidirectional=getattr(config, "bidirectional", False), |
| base_cfg=base_cfg, |
| token=token |
| ) |
|
|
| def save_pretrained(self, save_directory, *args, **kwargs): |
| self.model.save_pretrained(os.path.join(save_directory, "lora")) |
| self.config.save_pretrained(save_directory) |
|
|
| @classmethod |
| def from_pretrained(cls, model_name_or_path, *args, **kwargs): |
| token = kwargs.get("token", None) |
| |
| config = SpladeConfig.from_pretrained( |
| model_name_or_path, |
| token=token, |
| ) |
| |
| model = cls(config, weights_path=model_name_or_path, token=token) |
| |
| model.reverse_voc = {v: k for k, v in model.tokenizer.vocab.items()} |
| return model |
|
|
| def forward(self, **tokens): |
| output = self.model(**tokens) |
| splade_reps, _ = splade_max(output.logits, tokens["attention_mask"]) |
| return (splade_reps,) |
|
|
| def get_width(self): |
| return self.model.config.vocab_size |
|
|
| def create_batch_dict(self, input_texts, max_length): |
| return self.tokenizer( |
| input_texts, |
| add_special_tokens=True, |
| padding="longest", |
| truncation=True, |
| max_length=max_length, |
| return_attention_mask=True, |
| return_tensors="pt", |
| ) |