Image-Text-to-Text
Transformers
Safetensors
kimi_k25
feature-extraction
compressed-tensors
conversational
custom_code
Instructions to use Pinaster/Kimi-K2.5-5layer with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use Pinaster/Kimi-K2.5-5layer with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("image-text-to-text", model="Pinaster/Kimi-K2.5-5layer", trust_remote_code=True) messages = [ { "role": "user", "content": [ {"type": "image", "url": "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/p-blog/candy.JPG"}, {"type": "text", "text": "What animal is on the candy?"} ] }, ] pipe(text=messages)# Load model directly from transformers import AutoModel model = AutoModel.from_pretrained("Pinaster/Kimi-K2.5-5layer", trust_remote_code=True, dtype="auto") - Notebooks
- Google Colab
- Kaggle
- Local Apps
- vLLM
How to use Pinaster/Kimi-K2.5-5layer with vLLM:
Install from pip and serve model
# Install vLLM from pip: pip install vllm # Start the vLLM server: vllm serve "Pinaster/Kimi-K2.5-5layer" # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:8000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "Pinaster/Kimi-K2.5-5layer", "messages": [ { "role": "user", "content": [ { "type": "text", "text": "Describe this image in one sentence." }, { "type": "image_url", "image_url": { "url": "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg" } } ] } ] }'Use Docker
docker model run hf.co/Pinaster/Kimi-K2.5-5layer
- SGLang
How to use Pinaster/Kimi-K2.5-5layer with SGLang:
Install from pip and serve model
# Install SGLang from pip: pip install sglang # Start the SGLang server: python3 -m sglang.launch_server \ --model-path "Pinaster/Kimi-K2.5-5layer" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "Pinaster/Kimi-K2.5-5layer", "messages": [ { "role": "user", "content": [ { "type": "text", "text": "Describe this image in one sentence." }, { "type": "image_url", "image_url": { "url": "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg" } } ] } ] }'Use Docker images
docker run --gpus all \ --shm-size 32g \ -p 30000:30000 \ -v ~/.cache/huggingface:/root/.cache/huggingface \ --env "HF_TOKEN=<secret>" \ --ipc=host \ lmsysorg/sglang:latest \ python3 -m sglang.launch_server \ --model-path "Pinaster/Kimi-K2.5-5layer" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "Pinaster/Kimi-K2.5-5layer", "messages": [ { "role": "user", "content": [ { "type": "text", "text": "Describe this image in one sentence." }, { "type": "image_url", "image_url": { "url": "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg" } } ] } ] }' - Docker Model Runner
How to use Pinaster/Kimi-K2.5-5layer with Docker Model Runner:
docker model run hf.co/Pinaster/Kimi-K2.5-5layer
| """Image processor class for Kimi-K2.5. | |
| """ | |
| import json | |
| from typing import Any, Dict, Optional, Union | |
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| from transformers.image_processing_utils import (BaseImageProcessor, | |
| BatchFeature) | |
| from transformers.utils import TensorType | |
| from .media_utils import (MediaInput, VideoChunkInput, _to_tensor, | |
| ensure_media_type, get_video_meta, image_to_np, | |
| navit_patchify, navit_resize_image, | |
| navit_resize_video, normalize, | |
| real_sample_fps_and_max_num_frames, timestamp_as_str) | |
| try: | |
| from mecord import VideoReader | |
| except ImportError: | |
| VideoReader = None | |
| def resampling(video_bytes: bytes, | |
| sample_indices: list[int], | |
| key_indices=None, | |
| frame_time_info=None, | |
| num_threads=4) -> str: | |
| video = VideoReader(video_bytes, | |
| num_threads=num_threads, | |
| frame_time_info=frame_time_info, | |
| key_indices=key_indices) | |
| # extract target frames | |
| frames = video[sample_indices] | |
| frames = [Image.fromarray(frame) for frame in frames] | |
| return frames | |
| class KimiK25VisionProcessor(BaseImageProcessor): | |
| model_type = "kimi_k25" | |
| def __init__( | |
| self, | |
| media_proc_cfg: dict, | |
| **kwargs, | |
| ): | |
| super().__init__(**kwargs) | |
| self.media_proc_cfg = media_proc_cfg | |
| self.num_frames_per_chunk = media_proc_cfg[ | |
| 'temporal_merge_kernel_size'] | |
| def media_tokens_calculator(self, media: MediaInput): | |
| media = ensure_media_type(media) | |
| ret = self.get_resize_config(media) | |
| return ret['num_tokens'] | |
| def make_chunk_prompt(cls, timestamp_text: str) -> str: | |
| return f"{timestamp_text}<|media_begin|>video<|media_content|><|media_pad|><|media_end|>" | |
| def split_video_chunks(self, | |
| video_url: str | bytes) -> list[list[Image.Image]]: | |
| # video_url should be base64 str or bytes | |
| video_spec = get_video_meta(video_url) | |
| sample_fps = min(self.media_proc_cfg['sample_fps'], video_spec.fps) | |
| sampled_nframes = max( | |
| round(video_spec.num_frames * sample_fps / video_spec.fps), 1) | |
| frame_inds = np.linspace(0, video_spec.num_frames - 1, | |
| sampled_nframes).round().astype(int) | |
| frame_inds = frame_inds.tolist() | |
| sampled_frame_ids = [] | |
| temporal_merge_kernel_size = self.media_proc_cfg[ | |
| "temporal_merge_kernel_size"] | |
| num_chunks = 0 | |
| chunk_timestamp = [] | |
| for i in range(0, len(frame_inds), temporal_merge_kernel_size): | |
| sampled_frame_ids.extend(frame_inds[i:i + | |
| temporal_merge_kernel_size]) | |
| start_time = frame_inds[i] / float(video_spec.fps) | |
| timestamp_text = timestamp_as_str( | |
| start_time, self.media_proc_cfg["timestamp_mode"]) | |
| chunk_timestamp.append(timestamp_text) | |
| num_chunks += 1 | |
| sampled_frames = resampling(video_url, sampled_frame_ids) | |
| chunks = [] | |
| for chunk_id in range(num_chunks): | |
| chunk = sampled_frames[chunk_id * | |
| temporal_merge_kernel_size:(chunk_id + 1) * | |
| temporal_merge_kernel_size] | |
| chunks.append( | |
| VideoChunkInput(type="video_chunk", | |
| video_chunk=chunk, | |
| prompt=self.make_chunk_prompt( | |
| chunk_timestamp[chunk_id]))) | |
| return chunks | |
| def get_resize_config(self, media_input: MediaInput) -> dict: | |
| if media_input['type'] == 'image': | |
| w, h = media_input['image'].size | |
| ret = navit_resize_image( | |
| w, h, self.media_proc_cfg['patch_size'], | |
| self.media_proc_cfg['merge_kernel_size'], | |
| self.media_proc_cfg['in_patch_limit'], | |
| self.media_proc_cfg['patch_limit_on_one_side'], | |
| self.media_proc_cfg['fixed_output_tokens']) | |
| return ret | |
| elif media_input['type'] == 'video_chunk': | |
| frame = media_input['video_chunk'][0] | |
| width, height = frame.size | |
| num_frames = len(media_input["video_chunk"]) | |
| fps = 1.0 | |
| sample_fps, max_num_frames_each_video = real_sample_fps_and_max_num_frames( | |
| media_input["type"], | |
| self.media_proc_cfg['sample_fps'], | |
| self.media_proc_cfg['max_num_frames_each_video'], | |
| ) | |
| in_patch_limit_each_frame = self.media_proc_cfg[ | |
| 'in_patch_limit_each_frame'] | |
| if in_patch_limit_each_frame is None: | |
| in_patch_limit_each_frame = self.media_proc_cfg[ | |
| 'in_patch_limit'] | |
| ret = navit_resize_video( | |
| width, | |
| height, | |
| num_frames, | |
| fps, | |
| sample_fps, | |
| self.media_proc_cfg['patch_size'], | |
| self.media_proc_cfg['merge_kernel_size'], | |
| in_patch_limit_each_frame, | |
| self.media_proc_cfg['patch_limit_on_one_side'], | |
| self.media_proc_cfg['in_patch_limit_video'], | |
| max_num_frames_each_video, | |
| self.media_proc_cfg['fixed_output_tokens'], | |
| ) | |
| return ret | |
| else: | |
| raise ValueError("Unsupported type: {}".format( | |
| media_input['type'])) | |
| def resize_image(self, image: Image.Image, new_width: int, new_height: int, | |
| pad_width: int, pad_height: int) -> np.ndarray: | |
| image_np = image_to_np(image, (new_width, new_height), "resize") | |
| image_np = np.pad( | |
| image_np, | |
| ((0, pad_height), (0, pad_width), (0, 0)), | |
| mode="constant", | |
| constant_values=0, | |
| ) | |
| return image_np | |
| def preprocess( | |
| self, | |
| medias: list[MediaInput], | |
| return_tensors: Optional[Union[str, TensorType]] = None, | |
| ) -> BatchFeature: | |
| """ | |
| Preprocess a atom vision input (images/video_chunk) into model-ready tensors. | |
| Args: | |
| medias: List of MediaInput. | |
| return_tensors: Desired output format ('pt', 'np', 'tf', or None). | |
| Returns: | |
| BatchFeature containing 'pixel_values' and 'grid_thws' tensors. | |
| """ | |
| if not isinstance(medias, list): | |
| medias = [medias] | |
| if medias: | |
| pixel_values = [] | |
| for item in medias: | |
| item = ensure_media_type(item) | |
| resize_config = self.get_resize_config(item) | |
| new_width, new_height, pad_width, pad_height = resize_config[ | |
| 'new_width'], resize_config['new_height'], resize_config[ | |
| 'pad_width'], resize_config['pad_height'] | |
| if item['type'] == 'image': | |
| image = item['image'] | |
| image_np = self.resize_image(image, new_width, new_height, | |
| pad_width, pad_height) | |
| pixel_values.append(np.expand_dims(image_np, axis=0)) | |
| elif item['type'] == 'video_chunk': | |
| pixels = [] | |
| for frame in item['video_chunk']: | |
| frame_np = self.resize_image(frame, new_width, | |
| new_height, pad_width, | |
| pad_height) | |
| pixels.append(frame_np) | |
| pixel_values.append(np.stack(pixels, axis=0)) | |
| else: | |
| raise ValueError("Unsupported type: {}".format( | |
| item['type'])) | |
| normalized_pixel_values = [] | |
| image_std_inv = 1.0 / np.array(self.media_proc_cfg['image_std']) | |
| image_mean = np.array(self.media_proc_cfg['image_mean']) | |
| for pixels in pixel_values: | |
| pixels = normalize(pixels, image_mean, image_std_inv) | |
| pixels_and_thw = navit_patchify( | |
| pixels, | |
| self.media_proc_cfg['patch_size'], | |
| ) | |
| normalized_pixel_values.append(pixels_and_thw) | |
| pixel_values = torch.cat([ | |
| _to_tensor(pixel_value['pixel_values']) | |
| for pixel_value in normalized_pixel_values | |
| ]) | |
| grid_thws = torch.cat([ | |
| _to_tensor(pixel_value['grid_thw'], | |
| dtype=torch.int64).unsqueeze(0) | |
| for pixel_value in normalized_pixel_values | |
| ]) | |
| data = { | |
| 'pixel_values': pixel_values, | |
| 'grid_thws': grid_thws, | |
| } | |
| else: | |
| data = {} | |
| return BatchFeature(data=data, tensor_type=return_tensors) | |
| def __repr__(self): | |
| return f"KimiK25VisionProcessor(media_proc_cfg={self.media_proc_cfg})" | |
| def to_dict(self) -> Dict[str, Any]: | |
| output = super().to_dict() | |
| output["media_proc_cfg"] = self.media_proc_cfg | |
| if "media_processor" in output: | |
| del output["media_processor"] | |
| return output | |
| def from_dict(cls, config_dict: Dict[str, Any], **kwargs): | |
| config = config_dict.copy() | |
| media_proc_cfg = config.pop("media_proc_cfg", {}) | |
| return cls(media_proc_cfg=media_proc_cfg, **config, **kwargs) | |
| def to_json_string(self): | |
| dictionary = self.to_dict() | |
| for key, value in dictionary.items(): | |
| if hasattr(value, 'tolist'): | |
| dictionary[key] = value.tolist() | |
| return json.dumps(dictionary, indent=2, sort_keys=True) + "\n" | |