| | import torch
|
| | import torch.nn as nn
|
| |
|
| | from transformers import PreTrainedModel, PretrainedConfig
|
| | from typing import Union
|
| |
|
| | from .config import MoondreamConfig
|
| | from .moondream import MoondreamModel
|
| |
|
| |
|
| | from .image_crops import *
|
| | from .vision import *
|
| | from .text import *
|
| | from .region import *
|
| | from .utils import *
|
| |
|
| |
|
| | def extract_question(text):
|
| | prefix = "<image>\n\nQuestion: "
|
| | suffix = "\n\nAnswer:"
|
| |
|
| | if text.startswith(prefix) and text.endswith(suffix):
|
| | return text[len(prefix) : -len(suffix)]
|
| | else:
|
| | return None
|
| |
|
| |
|
| | class HfConfig(PretrainedConfig):
|
| | _auto_class = "AutoConfig"
|
| | model_type = "moondream1"
|
| |
|
| | def __init__(self, **kwargs):
|
| | super().__init__(**kwargs)
|
| | self.config = {}
|
| |
|
| |
|
| | class HfMoondream(PreTrainedModel):
|
| | _auto_class = "AutoModelForCausalLM"
|
| | config_class = HfConfig
|
| |
|
| | def __init__(self, config):
|
| | super().__init__(config)
|
| | self.model = MoondreamModel(
|
| | MoondreamConfig.from_dict(config.config), setup_caches=False
|
| | )
|
| | self._is_kv_cache_setup = False
|
| |
|
| | def _setup_caches(self):
|
| | if not self._is_kv_cache_setup:
|
| | self.model._setup_caches()
|
| | self._is_kv_cache_setup = True
|
| |
|
| | @property
|
| | def encode_image(self):
|
| | self._setup_caches()
|
| | return self.model.encode_image
|
| |
|
| | @property
|
| | def query(self):
|
| | self._setup_caches()
|
| | return self.model.query
|
| |
|
| | @property
|
| | def caption(self):
|
| | self._setup_caches()
|
| | return self.model.caption
|
| |
|
| | @property
|
| | def detect(self):
|
| | self._setup_caches()
|
| | return self.model.detect
|
| |
|
| | @property
|
| | def point(self):
|
| | self._setup_caches()
|
| | return self.model.point
|
| |
|
| | @property
|
| | def detect_gaze(self):
|
| | self._setup_caches()
|
| | return self.model.detect_gaze
|
| |
|
| | def answer_question(
|
| | self,
|
| | image_embeds,
|
| | question,
|
| | tokenizer=None,
|
| | chat_history="",
|
| | result_queue=None,
|
| | max_new_tokens=256,
|
| | **kwargs
|
| | ):
|
| | answer = self.query(image_embeds, question)["answer"].strip()
|
| |
|
| | if result_queue is not None:
|
| | result_queue.put(answer)
|
| | return answer
|
| |
|
| | def batch_answer(self, images, prompts, tokenizer=None, **kwargs):
|
| | answers = []
|
| | for image, prompt in zip(images, prompts):
|
| | answers.append(self.query(image, prompt)["answer"].strip())
|
| | return answers
|
| |
|
| | def _unsupported_exception(self):
|
| | raise NotImplementedError(
|
| | "This method is not supported in the latest version of moondream. "
|
| | "Consider upgrading to the updated API spec, or alternately pin "
|
| | "to 'revision=2024-08-26'."
|
| | )
|
| |
|
| | def generate(self, image_embeds, prompt, tokenizer, max_new_tokens=128, **kwargs):
|
| | """
|
| | Function definition remains unchanged for backwards compatibility.
|
| | Be aware that tokenizer, max_new_takens, and kwargs are ignored.
|
| | """
|
| | prompt_extracted = extract_question(prompt)
|
| | if prompt_extracted is not None:
|
| | answer = self.model.query(
|
| | image=image_embeds, question=prompt_extracted, stream=False
|
| | )["answer"]
|
| | else:
|
| | image_embeds = self.encode_image(image_embeds)
|
| | prompt_tokens = torch.tensor(
|
| | [self.model.tokenizer.encode(prompt).ids],
|
| | device=self.device,
|
| | )
|
| |
|
| | def generator():
|
| | for token in self.model._generate_answer(
|
| | prompt_tokens,
|
| | image_embeds.kv_cache,
|
| | image_embeds.pos,
|
| | max_new_tokens,
|
| | ):
|
| | yield token
|
| |
|
| | answer = "".join(list(generator()))
|
| |
|
| | return [answer]
|
| |
|
| | def get_input_embeddings(self) -> nn.Embedding:
|
| | """
|
| | Lazily wrap the raw parameter `self.model.text.wte` in a real
|
| | `nn.Embedding` layer so that HF mix-ins recognise it. The wrapper
|
| | **shares** the weight tensor—no copy is made.
|
| | """
|
| | if not hasattr(self, "_input_embeddings"):
|
| | self._input_embeddings = nn.Embedding.from_pretrained(
|
| | self.model.text.wte,
|
| | freeze=True,
|
| | )
|
| | return self._input_embeddings
|
| |
|
| | def set_input_embeddings(self, value: Union[nn.Embedding, nn.Module]) -> None:
|
| | """
|
| | Lets HF functions (e.g. `resize_token_embeddings`) replace or resize the
|
| | embeddings and keeps everything tied to `self.model.text.wte`.
|
| | """
|
| |
|
| | self.model.text.wte = value.weight
|
| |
|
| | self._input_embeddings = value
|
| |
|
| | def input_embeds(
|
| | self,
|
| | input_ids: Union[torch.LongTensor, list, tuple],
|
| | *,
|
| | device: torch.device | None = None
|
| | ) -> torch.FloatTensor:
|
| | """
|
| | Back-compat wrapper that turns token IDs into embeddings.
|
| |
|
| | Example:
|
| | ids = torch.tensor([[1, 2, 3]])
|
| | embeds = model.input_embeds(ids) # (1, 3, hidden_dim)
|
| | """
|
| | if not torch.is_tensor(input_ids):
|
| | input_ids = torch.as_tensor(input_ids)
|
| | if device is not None:
|
| | input_ids = input_ids.to(device)
|
| |
|
| | return self.get_input_embeddings()(input_ids)
|
| |
|