| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import json |
|
|
| import numpy as np |
| import triton_python_backend_utils as pb_utils |
| from transformers import AutoTokenizer |
|
|
|
|
| class TritonPythonModel: |
| """Your Python model must use the same class name. Every Python model |
| that is created must have "TritonPythonModel" as the class name. |
| """ |
|
|
| def initialize(self, args): |
| """`initialize` is called only once when the model is being loaded. |
| Implementing `initialize` function is optional. This function allows |
| the model to initialize any state associated with this model. |
| Parameters |
| ---------- |
| args : dict |
| Both keys and values are strings. The dictionary keys and values are: |
| * model_config: A JSON string containing the model configuration |
| * model_instance_kind: A string containing model instance kind |
| * model_instance_device_id: A string containing model instance device ID |
| * model_repository: Model repository path |
| * model_version: Model version |
| * model_name: Model name |
| """ |
| |
| model_config = json.loads(args['model_config']) |
| tokenizer_dir = model_config['parameters']['tokenizer_dir'][ |
| 'string_value'] |
|
|
| skip_special_tokens = model_config['parameters'].get( |
| 'skip_special_tokens') |
| if skip_special_tokens is not None: |
| skip_special_tokens_str = skip_special_tokens[ |
| 'string_value'].lower() |
| if skip_special_tokens_str in [ |
| 'true', 'false', '1', '0', 't', 'f', 'y', 'n', 'yes', 'no' |
| ]: |
| self.skip_special_tokens = skip_special_tokens_str in [ |
| 'true', '1', 't', 'y', 'yes' |
| ] |
| else: |
| print( |
| f"[TensorRT-LLM][WARNING] Don't setup 'skip_special_tokens' correctly (set value is {skip_special_tokens['string_value']}). Set it as True by default." |
| ) |
| self.skip_special_tokens = True |
| else: |
| print( |
| f"[TensorRT-LLM][WARNING] Don't setup 'skip_special_tokens'. Set it as True by default." |
| ) |
| self.skip_special_tokens = True |
|
|
| self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_dir, |
| legacy=False, |
| padding_side='left', |
| trust_remote_code=True) |
| if not self.tokenizer.pad_token: |
| self.tokenizer.pad_token = self.tokenizer.eos_token |
|
|
| |
| output_config = pb_utils.get_output_config_by_name( |
| model_config, "OUTPUT") |
|
|
| |
| self.output_dtype = pb_utils.triton_string_to_numpy( |
| output_config['data_type']) |
|
|
| def execute(self, requests): |
| """`execute` must be implemented in every Python model. `execute` |
| function receives a list of pb_utils.InferenceRequest as the only |
| argument. This function is called when an inference is requested |
| for this model. Depending on the batching configuration (e.g. Dynamic |
| Batching) used, `requests` may contain multiple requests. Every |
| Python model, must create one pb_utils.InferenceResponse for every |
| pb_utils.InferenceRequest in `requests`. If there is an error, you can |
| set the error argument when creating a pb_utils.InferenceResponse. |
| Parameters |
| ---------- |
| requests : list |
| A list of pb_utils.InferenceRequest |
| Returns |
| ------- |
| list |
| A list of pb_utils.InferenceResponse. The length of this list must |
| be the same as `requests` |
| """ |
|
|
| responses = [] |
|
|
| |
| |
| for idx, request in enumerate(requests): |
| |
| tokens_batch = pb_utils.get_input_tensor_by_name( |
| request, 'TOKENS_BATCH').as_numpy() |
|
|
| |
| sequence_lengths = pb_utils.get_input_tensor_by_name( |
| request, 'SEQUENCE_LENGTH').as_numpy() |
|
|
| |
| cum_log_probs = pb_utils.get_input_tensor_by_name( |
| request, 'CUM_LOG_PROBS') |
|
|
| |
| output_log_probs = pb_utils.get_input_tensor_by_name( |
| request, 'OUTPUT_LOG_PROBS') |
|
|
| |
| context_logits = pb_utils.get_input_tensor_by_name( |
| request, 'CONTEXT_LOGITS') |
|
|
| |
| generation_logits = pb_utils.get_input_tensor_by_name( |
| request, 'GENERATION_LOGITS') |
|
|
| |
| |
| |
|
|
| |
| outputs = self._postprocessing(tokens_batch, sequence_lengths) |
|
|
| |
| |
| output_tensor = pb_utils.Tensor( |
| 'OUTPUT', |
| np.array(outputs).astype(self.output_dtype)) |
|
|
| outputs = [] |
| outputs.append(output_tensor) |
|
|
| if cum_log_probs: |
| out_cum_log_probs = pb_utils.Tensor('OUT_CUM_LOG_PROBS', |
| cum_log_probs.as_numpy()) |
| outputs.append(out_cum_log_probs) |
| else: |
| out_cum_log_probs = pb_utils.Tensor( |
| 'OUT_CUM_LOG_PROBS', np.array([[0.0]], dtype=np.float32)) |
| outputs.append(out_cum_log_probs) |
|
|
| if output_log_probs: |
| out_output_log_probs = pb_utils.Tensor( |
| 'OUT_OUTPUT_LOG_PROBS', output_log_probs.as_numpy()) |
| outputs.append(out_output_log_probs) |
| else: |
| out_output_log_probs = pb_utils.Tensor( |
| 'OUT_OUTPUT_LOG_PROBS', |
| np.array([[[0.0]]], dtype=np.float32)) |
| outputs.append(out_output_log_probs) |
|
|
| if context_logits: |
| out_context_logits = pb_utils.Tensor('OUT_CONTEXT_LOGITS', |
| context_logits.as_numpy()) |
| outputs.append(out_context_logits) |
| else: |
| out_context_logits = pb_utils.Tensor( |
| 'OUT_CONTEXT_LOGITS', np.array([[[0.0]]], |
| dtype=np.float32)) |
| outputs.append(out_context_logits) |
|
|
| if generation_logits: |
| out_generation_logits = pb_utils.Tensor( |
| 'OUT_GENERATION_LOGITS', generation_logits.as_numpy()) |
| outputs.append(out_generation_logits) |
| else: |
| out_generation_logits = pb_utils.Tensor( |
| 'OUT_GENERATION_LOGITS', |
| np.array([[[[0.0]]]], dtype=np.float32)) |
| outputs.append(out_generation_logits) |
|
|
| |
| |
| |
| |
| |
| |
| |
| inference_response = pb_utils.InferenceResponse( |
| output_tensors=outputs) |
| responses.append(inference_response) |
|
|
| |
| |
| return responses |
|
|
| def finalize(self): |
| """`finalize` is called only once when the model is being unloaded. |
| Implementing `finalize` function is optional. This function allows |
| the model to perform any necessary clean ups before exit. |
| """ |
| print('Cleaning up...') |
|
|
| def _postprocessing(self, tokens_batch, sequence_lengths): |
| outputs = [] |
| for batch_idx, beam_tokens in enumerate(tokens_batch): |
| for beam_idx, tokens in enumerate(beam_tokens): |
| seq_len = sequence_lengths[batch_idx][beam_idx] |
| output = self.tokenizer.decode( |
| tokens[:seq_len], |
| skip_special_tokens=self.skip_special_tokens) |
| outputs.append(output.encode('utf8')) |
| return outputs |
|
|