|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| from onnxruntime.quantization import QuantType, CalibrationMethod
|
| import keras
|
| import onnx
|
| from onnx import numpy_helper
|
| import os
|
| import numpy as np
|
| from omegaconf import DictConfig
|
| import tensorflow
|
|
|
| dict_types = {"Int16": QuantType.QInt16,
|
| "UInt16": QuantType.QUInt16,
|
| "Int8": QuantType.QInt8,
|
| "UInt8": QuantType.QUInt8,
|
| "Int4": QuantType.QInt4,
|
| "UInt4": QuantType.QUInt4
|
| }
|
|
|
|
|
| def get_weights_activations_quant_type(cfg: DictConfig):
|
| """
|
| Converts bit_width type string in onnx type
|
| Inputs:
|
| cfg: dict of input parameters
|
| Returns:
|
| weight, activation type
|
| """
|
|
|
|
|
| if cfg.quantization.onnx_quant_parameters:
|
| if cfg.quantization.onnx_quant_parameters.weightType:
|
| weight_type = dict_types[cfg.quantization.onnx_quant_parameters.weightType]
|
| else: weight_type = QuantType.QInt8
|
| if cfg.quantization.onnx_quant_parameters.activType:
|
| activation_type = dict_types[cfg.quantization.onnx_quant_parameters.activType]
|
| else: activation_type = QuantType.QInt8
|
| else:
|
| weight_type = QuantType.QInt8
|
| activation_type = QuantType.QInt8
|
|
|
| return weight_type, activation_type
|
|
|
|
|
| def get_calibration_method(cfg: DictConfig):
|
| """
|
| Converts calibration method from string in onnx class type
|
| Inputs:
|
| cfg: dict of input parameters
|
| Returns:
|
| calibration_method
|
| """
|
|
|
| if cfg.quantization.onnx_quant_parameters:
|
| calibration_param = cfg.quantization.onnx_quant_parameters.calibrate_method
|
| else: calibration_param = None
|
|
|
| if calibration_param is None:
|
| calibration_method = CalibrationMethod.MinMax
|
| elif calibration_param == "MinMax":
|
| calibration_method = CalibrationMethod.MinMax
|
| elif calibration_param == "Entropy":
|
| calibration_method = CalibrationMethod.Entropy
|
| else:
|
| raise ValueError(f"Unsupported calibration method: {calibration_param}. Review your config yaml file at section"
|
| f"quantization_parameters. Only MinMax or Entropy are supported so far.")
|
|
|
| return calibration_method
|
|
|
|
|
| def update_bit_width(tensor_type: str = None, order: int = None, direction: str = None):
|
| """
|
| update the bit width order times, increasing or decreasing
|
| Inputs:
|
| tensor_type(str): Int4, UInt4, Int8, UInt8, Int16, or UInt16
|
| order (int): number of times we update the type in the specified way, must be 1 or 2
|
| direction (str): 'up' increase the bit width, 'down': decrease the bit_width
|
|
|
| Returns:
|
| updated_type (str): Int4, UInt4, Int8, UInt8, Int16, or UInt16
|
| """
|
|
|
| if tensor_type == 'Int4':
|
| if direction == 'down':
|
| return dict_types[tensor_type]
|
| else:
|
| if order == 1:
|
| return dict_types['Int8']
|
| if order >= 2:
|
| return dict_types['Int16']
|
| elif tensor_type == 'UInt4':
|
| if direction == 'down':
|
| return dict_types[tensor_type]
|
| else:
|
| if order == 1:
|
| return dict_types['UInt8']
|
| if order >= 2:
|
| return dict_types['UInt16']
|
|
|
| elif tensor_type == 'Int8':
|
| if direction == 'down':
|
| if order >= 1:
|
| return dict_types['Int4']
|
| else:
|
| if order >= 1:
|
| return dict_types['Int16']
|
| elif tensor_type == 'UInt8':
|
| if direction == 'down':
|
| if order >= 1:
|
| return dict_types['UInt4']
|
| else:
|
| if order >= 1:
|
| return dict_types['UInt16']
|
|
|
| elif tensor_type == 'Int16':
|
| if direction == 'down':
|
| if order == 1:
|
| return dict_types['Int8']
|
| if order >= 2:
|
| return dict_types['Int4']
|
| else:
|
| return dict_types[tensor_type]
|
| elif tensor_type == 'UInt16':
|
| if direction == 'down':
|
| if order == 1:
|
| return dict_types['UInt8']
|
| if order >= 2:
|
| return dict_types['UInt4']
|
| else:
|
| return dict_types[tensor_type]
|
| else:
|
| raise ValueError(f"Unsupported 'update_bit_width' function parameters. Check tensor_type = {tensor_type}, "
|
| f"order = {order} and direction = {direction}")
|
|
|
|
|
| def define_extra_options(cfg: dict = None, list_weights_tensors=None, list_activation_tensors=None, axis_list=None,
|
| bit_width_w=None, bit_width_a=None):
|
| """
|
| Set ONNX quantizer extra options according to config file
|
| Inputs:
|
| cfg (dict): dictionary of configuration parameters
|
| list_weights_tensors (str): list of weights tensor names for which we want to override quantization
|
| parameters. If None, ignored
|
| list_activation_tensors (str): list of activation tensor names for which we want to override
|
| quantization parameters. If None, ignored
|
| bit_width_w (QuantType): QuantType.QInt16 or QuantType.QUInt16 or QuantType.QInt8 or QuantType.QUInt8,
|
| or QuantType.QInt4 or QuantType.QUInt4 for all weights
|
| bit_width_a (QuantType): QuantType.QInt16 or QuantType.QUInt16 or QuantType.QInt8 or QuantType.QUInt8,
|
| or QuantType.QInt4 or QuantType.QUInt4 for all activations
|
| Returns:
|
| a dictionary with all extra options set
|
| """
|
|
|
| extra_options = dict()
|
|
|
|
|
| if cfg.quantization.onnx_extra_options:
|
| extra_options["WeightSymmetric"] = cfg.quantization.onnx_extra_options.WeightSymmetric \
|
| if cfg.quantization.onnx_extra_options.WeightSymmetric is not None else True
|
| extra_options["ActivationSymmetric"] = cfg.quantization.onnx_extra_options.ActivationSymmetric \
|
| if cfg.quantization.onnx_extra_options.ActivationSymmetric is not None else False
|
| extra_options["CalibMovingAverage"] = cfg.quantization.onnx_extra_options.CalibMovingAverage \
|
| if cfg.quantization.onnx_extra_options.CalibMovingAverage is not None else False
|
| extra_options["QuantizeBias"] = cfg.quantization.onnx_extra_options.QuantizeBias \
|
| if cfg.quantization.onnx_extra_options.QuantizeBias is not None else True
|
| extra_options["SmoothQuant"] = cfg.quantization.onnx_extra_options.SmoothQuant \
|
| if cfg.quantization.onnx_extra_options.SmoothQuant is not None else False
|
| extra_options["SmoothQuantAlpha"] = cfg.quantization.onnx_extra_options.SmoothQuantAlpha \
|
| if cfg.quantization.onnx_extra_options.SmoothQuantAlpha is not None else 0.5
|
| extra_options["SmoothQuantFolding"] = cfg.quantization.onnx_extra_options.SmoothQuantFolding \
|
| if cfg.quantization.onnx_extra_options.SmoothQuantFolding is not None else True
|
| else:
|
| extra_options["WeightSymmetric"] = True
|
| extra_options["ActivationSymmetric"] = False
|
| extra_options["CalibMovingAverage"] = False
|
| extra_options["QuantizeBias"] = True
|
| extra_options["SmoothQuant"] = False
|
| extra_options["SmoothQuantAlpha"] = 0.5
|
| extra_options["SmoothQuantFolding"] = True
|
|
|
| extra_options["TensorQuantOverrides"] = {}
|
|
|
|
|
|
|
| if list_weights_tensors:
|
| for idx, e in enumerate(list_weights_tensors):
|
| extra_options["TensorQuantOverrides"][e] = [{'quant_type': bit_width_w}]
|
| if cfg.quantization.granularity == 'per_channel':
|
| extra_options["TensorQuantOverrides"][e][0]["axis"] = axis_list[idx]
|
|
|
|
|
| if list_activation_tensors:
|
| for e in list_activation_tensors:
|
| extra_options["TensorQuantOverrides"][e] = [{'quant_type': bit_width_a}]
|
|
|
|
|
|
|
|
|
|
|
| if not list_weights_tensors and not list_activation_tensors:
|
| list_override_tensor = []
|
| if cfg.quantization.onnx_extra_options:
|
| if cfg.quantization.onnx_extra_options.weights_tensor_override:
|
| list_override_tensor = cfg.quantization.onnx_extra_options.weights_tensor_override
|
| if cfg.quantization.onnx_extra_options.activations_tensor_override:
|
| list_override_tensor = list_override_tensor + cfg.quantization.onnx_extra_options.activations_tensor_override
|
|
|
| if list_override_tensor:
|
| for idx, t in enumerate(list_override_tensor):
|
|
|
| t[1].quant_type = dict_types[t[1].quant_type]
|
| if t[1].zero_point is not None: t[1].zero_point = np.array(t[1].zero_point)
|
| if t[1].scale is not None: t[1].scale = np.array(t[1].scale, dtype=np.float32)
|
| extra_options["TensorQuantOverrides"][t[0]] = [dict(t[1])]
|
|
|
| return extra_options
|
|
|
|
|
| def count_weights(onnx_model_path_quant, w_tensor_name: str = None):
|
| """
|
| Count weights that are in 4 bits and weights that are in 8 bits and output the ratio
|
|
|
| Args:
|
| onnx_model_path_quant: the ONNX quantized model path
|
| w_tensor_name (str): name of any tensor that has weights
|
|
|
| Returns:
|
| nb_weights_4bits, nb_weights_8bits, total_count_weights
|
| """
|
| nb_weights_4bits = nb_weights_8bits = total_count_weights = 0
|
| w_values_t_names = [w + '_quantized' for w in w_tensor_name]
|
| model_quant = onnx.load(onnx_model_path_quant)
|
|
|
| initializers = [initializer for initializer in model_quant.graph.initializer if initializer.name in w_values_t_names]
|
|
|
| for initializer in initializers:
|
| tensor_values = numpy_helper.to_array(initializer)
|
| n_weights = tensor_values.size
|
| total_count_weights += n_weights
|
|
|
|
|
| dyn_range = int(tensor_values.max()) - int(tensor_values.min())
|
| if dyn_range <= 15:
|
| nb_weights_4bits += n_weights
|
| elif tensor_values.dtype == np.int8:
|
| nb_weights_8bits += n_weights
|
|
|
| return nb_weights_4bits, nb_weights_8bits, total_count_weights
|
|
|
|
|
| def weights_based_layer_ranking(model, extension: str = None):
|
| """
|
| Count weights (not bias) per layer in a Keras or onnx model
|
|
|
| Args:
|
| model: the keras or onnx model
|
| extension (str): model backend, expected 'onnx' or 'keras'
|
|
|
| Returns:
|
| list of layer name and weight number after ranking
|
| """
|
|
|
| layer_params = []
|
| if extension == 'keras':
|
| for layer in model.layers:
|
| if hasattr(layer, "kernel"):
|
| num_params = layer.get_weights()[0].size
|
| layer_params.append((layer.name, num_params))
|
| elif extension == 'onnx':
|
| initializers = {init.name: init for init in model.graph.initializer}
|
| for node in model.graph.node:
|
| if node.op_type in ['Conv', 'ConvTranspose', 'Gemm', 'MatMul']:
|
| initializer = initializers[node.input[1]]
|
| num_params = int(np.prod(list(initializer.dims)))
|
| layer_params.append((initializer.name, num_params))
|
|
|
|
|
| layer_params_ranked = sorted(layer_params, key=lambda x: x[1], reverse=True)
|
|
|
| return layer_params_ranked
|
|
|
|
|
| def _get_initializer_tensor(model, name: str = None):
|
| """
|
| Report weights (not bias) corresponding to tensor 'name' if exists, in an onnx model
|
|
|
| Args:
|
| model: the onnx model
|
| name (str): tensor name
|
|
|
| Returns: weight tensor values
|
|
|
| """
|
| for tensor in model.graph.initializer:
|
| if tensor.name == name:
|
| return onnx.numpy_helper.to_array(tensor)
|
| return None
|
|
|
|
|
| def _onnx_node_identity_card(model, node):
|
| """
|
| Returns a list with node characteristic in order to make a ranking later-on
|
|
|
| Args:
|
| model: the onnx model
|
| node: node under consideration
|
|
|
| Returns:
|
| list of node characteristics
|
| """
|
|
|
| node_card_list = []
|
| group = 1
|
|
|
| if node.op_type == "Conv":
|
| for attr in node.attribute:
|
| if attr.name == "group":
|
| group = attr.i
|
| break
|
| weight_name = node.input[1]
|
| weight = _get_initializer_tensor(model, weight_name)
|
|
|
| out_channels, in_channels_per_group, kH, kW = weight.shape
|
| nparams = int(np.prod([out_channels, in_channels_per_group, kH, kW]))
|
| params_per_scale = int(np.prod([in_channels_per_group, kH, kW]))
|
|
|
| if in_channels_per_group == 1 and group == out_channels:
|
| layer_type = 0
|
| else:
|
| layer_type = 1
|
| node_card_list = [node.name, layer_type, nparams, params_per_scale]
|
| elif node.op_type in ["Gemm", "MatMul"]:
|
| weight_name = node.input[1]
|
| weight = _get_initializer_tensor(model, weight_name)
|
|
|
|
|
| if weight is not None:
|
| out_channels, in_channels = weight.shape
|
| nparams = out_channels * in_channels
|
| params_per_scale = out_channels
|
| node_card_list = [node.name, 2, nparams, params_per_scale]
|
| else:
|
| node_card_list = []
|
|
|
| return node_card_list
|
|
|
|
|
| def _keras_layer_identity_card(layer):
|
| """
|
| Returns a list with node characteristic in order to make a ranking later-on
|
|
|
| Args:
|
| layer: layer under consideration
|
|
|
| Returns:
|
| list of layers characteristics
|
| """
|
| layer_card_list = []
|
|
|
| if hasattr(layer, "kernel"):
|
| weight = layer.get_weights()[0]
|
| nparams = weight.size
|
| if isinstance(layer, tensorflow.keras.layers.DepthwiseConv2D):
|
| layer_type = 0
|
| kH, kW, in_ch, depth_mult = weight.shape
|
| params_per_scale = kH * kW
|
| elif isinstance(layer, tensorflow.keras.layers.Conv2D):
|
| layer_type = 1
|
| kH, kW, in_ch, out_ch = weight.shape
|
| params_per_scale = int(np.prod([kH, kW, in_ch]))
|
| elif isinstance(layer, tensorflow.keras.layers.Dense):
|
| layer_type = 2
|
| in_dim, out_dim = weight.shape
|
| params_per_scale = out_dim
|
|
|
| layer_card_list = [layer.name, layer_type, nparams, params_per_scale]
|
|
|
| return layer_card_list
|
|
|
|
|
| def composite_score_layer_ranking(model, extension: str = None):
|
| """
|
| Count weights (not bias) per layer in a Keras model
|
|
|
| Args:
|
| model: the model either onnx or keras
|
| extension: model backend, expected 'onnx' or 'keras'
|
|
|
| Returns:
|
| list of layer names ranked
|
| """
|
|
|
| layer_params = []
|
|
|
| if extension == '.keras':
|
| for layer in model.layers:
|
| layer_card_list = _keras_layer_identity_card(layer)
|
| if layer_card_list:
|
| layer_params.append(layer_card_list)
|
| elif extension == '.onnx':
|
| for node in model.graph.node:
|
| node_card_list = _onnx_node_identity_card(model, node)
|
| if node_card_list:
|
| layer_params.append(node_card_list)
|
|
|
|
|
| layer_params_ranked = sorted(layer_params, key=lambda x: (x[2], -x[3], x[0]), reverse=True)
|
|
|
| return layer_params_ranked |