# /*--------------------------------------------------------------------------------------------- # * Copyright (c) 2022-2023 STMicroelectronics. # * All rights reserved. # * # * This software is licensed under terms that can be found in the LICENSE file in # * the root directory of this software component. # * If no LICENSE file comes with this software, it is provided AS-IS. # *--------------------------------------------------------------------------------------------*/ import os from tabulate import tabulate from tensorflow.python.keras.utils.layer_utils import count_params from typing import Dict, Optional, Tuple, List import tensorflow as tf from tensorflow.keras.models import Model import numpy as np import sklearn from pathlib import Path from onnx import ModelProto import onnxruntime import mlflow from common.utils import log_to_file import torch def ai_interp_input_quant(ai_interp, data: np.array, file_extension: str): ai_runner_input_details = ai_interp.get_inputs()[0] # input if ai_runner_input_details.dtype in [np.uint8, np.int8]: # rescale the data between [0,255] resc_data = (data / ai_runner_input_details.scale[0]) + ai_runner_input_details.zero_point[0] # change the dtype of the data if ai_runner_input_details.dtype==np.int8: out_data = (resc_data-128).astype(np.int8) elif ai_runner_input_details.dtype==np.uint8: out_data = resc_data.astype(np.uint8) else: out_data = data.astype(np.float32) # if ai_runner_input_details.shape[1:] != out_data.shape[1:]: # if file_extension == '.tflite': # out_data = np.transpose(out_data,[0,3,1,2]) # chlast -> chfirst # elif file_extension == '.onnx': # out_data = np.transpose(out_data,[0,2,3,1]) # chfirst -> chlast return out_data def ai_interp_outputs_dequant(ai_interp, predictions: np.array): ai_runner_output_details = ai_interp.get_outputs() # outputs out_predictions = [] for i,ai_out in enumerate(ai_runner_output_details): if ai_out.scale[0]!=0 or ai_out.zero_point[0]!=0: out_predictions.append(ai_out.scale[0]*(predictions[i].astype(np.float32)-ai_out.zero_point[0])) else: out_predictions.append(predictions[i].astype(np.float32)) return out_predictions def ai_runner_interp(target: str, name_model: str): """Returns an interpreter for N6 board on-target inference Args: target (str): Target to run the model on (emulated N6 or real N6 board) name_model (str): Name of the model to print error message if cant connect to the board Returns: ai_runner_interpreter : The interpreter ai_runner_input_details : Dictionnary with details about the inputs of the model ai_runner_output_details : Dictionnary with details about the outputs of the model """ from common.stm_ai_runner import AiRunner if target in ['stedgeai_host', 'stedgeai_n6', 'stedgeai_h7p'] : print(f"Loading {target} for ST Edge AI inference of {name_model}") from common.stm_ai_runner import AiRunner if target == 'stedgeai_host': ai_runner_desc = 'st_ai_ws' if target in ['stedgeai_n6', 'stedgeai_h7p']: ai_runner_desc = 'serial:921600' ai_runner_interpreter = AiRunner() if not ai_runner_interpreter.connect(ai_runner_desc): raise TypeError(f"model='{name_model}' unable to load the model") ai_runner_input_details = ai_runner_interpreter.get_inputs() # inputs ai_runner_output_details = ai_runner_interpreter.get_outputs() # outputs for detail in ai_runner_input_details: print(f" I: {detail.name} {detail.shape} {detail.dtype} {detail.scale} {detail.zero_point}") for detail in ai_runner_output_details: print(f" O: {detail.name} {detail.shape} {detail.dtype} {detail.scale} {detail.zero_point}") else: ai_runner_interpreter, ai_runner_input_details, ai_runner_output_details = None,None,None return ai_runner_interpreter def get_model_name(model_type: str, input_shape: int, project_name: str) -> str: """Returns a string representation of the model name. Args: model_type (str): Type of the model. input_shape (int): Input shape of the model. project_name (str): Name of the project. Returns: str: String representation of the model name. """ # Combine strings to form model name strings = [model_type, str(input_shape), project_name] name = '_'.join([str(i) for i in strings]) return name def get_model_name_and_its_input_shape(model_path: str = None, custom_objects: Dict = None) -> Tuple: """ Load a model from a given file path and return the model name and its input shape. Supported model formats are .h5, .keras, .tflite and .onnx. The basename of the model file is used as the model name. The input shape is extracted from the model. Args: model_path (str): A path to an .h5, .keras, .tflite or .onnx model file. custom_objects (Dict): a dictionnary containing custom object from the model Returns: Tuple: A tuple containing the loaded model name and its input shape. The input shape is a tuple of length 3. Raises: ValueError: If the model file extension is not '.h5' or '.tflite'. RuntimeError: If the input shape of the model cannot be found. """ # We use the file basename as the model name. model_name = Path(model_path).stem file_extension = Path(model_path).suffix if file_extension in [".h5",".keras"]: # When we resume a training, the model includes the preprocessing layers # (augmented model). Therefore, we need to declare the custom data # augmentation layer as a custom object to be able to load the model. model = tf.keras.models.load_model( model_path, custom_objects = custom_objects, compile=False) try : input_shape = tuple(model.input.shape[1:]) except: input_shape = tuple(model.inputs[0].shape[1:]) elif file_extension == ".tflite": try: # Load the tflite model interpreter = tf.lite.Interpreter(model_path=model_path) interpreter.allocate_tensors() # Get the input details input_details = interpreter.get_input_details() input_shape = input_details[0]['shape'] input_shape = tuple(input_shape)[-3:] except RuntimeError as error: raise RuntimeError("\nUnable to extract input shape from .tflite model file\n" f"Received path {model_path}") from error elif file_extension == ".onnx": try: # Load the model onx = ModelProto() with open(model_path, "rb") as f: content = f.read() onx.ParseFromString(content) sess = onnxruntime.InferenceSession(model_path) # Get the model input shape input_shape = sess.get_inputs()[0].shape input_shape = tuple(input_shape)[-3:] except RuntimeError as error: raise RuntimeError("\nUnable to extract input shape from .onnx model file\n" f"Received path {model_path}") from error else: raise RuntimeError(f"\nUnknown/unsupported model file type.\nExpected `.tflite`, `.h5`, `.keras`, or `.onnx`." f"\nReceived path {model_path.split('.')[-1]}") return model_name, input_shape def check_model_support(model_name: str, version: Optional[str] = None, supported_models: Dict = None, message: Optional[str] = None) -> None: """ Check if a model name and version are supported based on a dictionary of supported models and versions. Args: model_name(str): The name of the model to check. version(str): The version of the model to check. May be set to None by the caller. supported_models(Dict[str, List[str]]): A dictionary of supported models and their versions. message(str): An error message to print. Raises: NotImplementedError: If the model name or version is not in the list of supported models or versions. ValueError: If the version attribute is missing or not applicable for the given model. """ if model_name not in supported_models: x = list(supported_models.keys()) raise ValueError("\nSupported model names are {}. Received {}.{}".format(x, model_name, message)) model_versions = supported_models[model_name] if model_versions: # There are different versions of the model. if not version: # The version is missing. raise ValueError("\nMissing `version` attribute for `{}` model.{}".format(model_name, message)) if version not in model_versions: # The version is not a supported version. raise ValueError("\nSupported versions for `{}` model are {}. " "Received {}.{}".format(model_name, model_versions, version, message)) else: if version: # A version was given but there is no version for this model. raise ValueError("\nThe `version` attribute is not applicable " "to '{}' model.{}".format(model_name, message)) def check_attribute_value(attribute_value: str, values: List[str] = None, name: str = None, message: str = None) -> None: """ Check if an attribute value is valid based on a list of supported values. Args: attribute_value(str): The value of the attribute to check. values(List[str]): A list of supported values. name(str): The name of the attribute being checked. message(str): A message to print if the attribute is not supported. Raises: ValueError: If the attribute value is not in the list of supported values. """ if attribute_value not in values: raise ValueError(f"\nSupported values for `{name}` attribute are {values}. " f"Received {attribute_value}.{message}") def transfer_pretrained_weights(target_model: tf.keras.Model, source_model_path: str = None, end_layer_index: int = None, target_model_name: str = None) -> None: # NOTE : Unused in AED for now. # When it's ready to use, call it after loading model in get_model. """ Copy the weights of a source model to a target model. Only the backbone weights are copied as the two models can have different classifiers. Args: target_model (tf.keras.Model): The target model. source_model_path (str): Path to the source model file (h5 or keras file). end_layer_index (int): Index of the last backbone layer (the first layer of the model has index 0). target_model_name (str): The name of the target model. Raises: ValueError: The source model file cannot be found. ValueError: The two models are incompatible because they have different backbones. """ if source_model_path: if not os.path.isfile(source_model_path): raise ValueError("Unable to find pretrained model file.\nReceived " f"model path {source_model_path}") source_model = tf.keras.models.load_model(source_model_path, compile=False) message = f"\nUnable to transfer to model `{target_model_name}`" message += f"the weights from model {source_model_path}\n" message += "Models are incompatible (backbones are different)." if len(source_model.layers) < end_layer_index + 1: raise ValueError(message) for i in range(end_layer_index + 1): weights = source_model.layers[i].get_weights() try: target_model.layers[i].set_weights(weights) except ValueError as error: raise message from error def model_summary(model): """ This function displays a model summary. It is similar to a Keras model summary with the additional information: - Indices of layers - Trainable/non-trainable status of layers - Total number of layers - Number of trainable layers - Number of non-trainable layers """ # Create the summary table num_layers = len(model.layers) trainable_layers = 0 table = [] for i, layer in enumerate(model.layers): layer_type = layer.__class__.__name__ if layer_type == "InputLayer": layer_shape = model.input.shape else: layer_shape = layer.output.shape is_trainable = True if layer.trainable else False num_params = layer.count_params() if layer.trainable: trainable_layers += 1 table.append([i, is_trainable, layer.name, layer_type, num_params, layer_shape]) # Display the table print(108 * '=') print(" Model:", model.name) print(108 * '=') print(tabulate(table, headers=["Layer index", "Trainable", "Name", "Type", "Params#", "Output shape"])) print(108 * '-') print("Total params:", model.count_params()) print("Trainable params: ", count_params(model.trainable_weights)) print("Non-trainable params: ", count_params(model.non_trainable_weights)) print(108 * '-') print("Total layers:", num_layers) print("Trainable layers:", trainable_layers) print("Non-trainable layers:", num_layers - trainable_layers) print(108 * '=') def count_h5_parameters(output_dir: str = None, model: tf.keras.Model = None): total_params = model.count_params() mlflow.log_metric(f"nb_params", total_params) log_to_file(output_dir, f"Nb params of float model : {total_params}") print(f"[INFO] : Nb params of float model : {total_params}") def count_tflite_parameters(output_dir: str = None, model_path: str = None, num_threads: Optional[int] = 1): interpreter = tf.lite.Interpreter(model_path=model_path, num_threads=num_threads) # Get all tensor details tensor_details = interpreter.get_tensor_details() total_params = 0 for tensor in tensor_details: shape = tensor['shape'] if shape is not None: num_params = 1 for dim in shape: num_params *= dim total_params += num_params mlflow.log_metric(f"quantized_nb_params", total_params) log_to_file(output_dir, f"Nb params of quantized model : {total_params}") print(f"[INFO] : Nb params of quantized model : {total_params}") def tf_dataset_to_np_array(input_ds, nchw=True, labels_included=True): """ Converts a TensorFlow dataset into two NumPy arrays containing the data and labels. This function iterates over the provided TensorFlow dataset, casts the image data to float32, and then converts the images and labels into NumPy arrays. The images and labels from all batches are concatenated along the first axis (batch dimension) to form two unified arrays. Parameters: - input_ds (tf.data.Dataset): A TensorFlow dataset object that yields tuples of (images, labels) when iterated over. - labels_included (bool): A boolean that represent whether or not the dataset contains the labels of the images (True) or just the images (False) Returns: - tuple: A tuple containing two NumPy arrays: - The first array contains the image data from the dataset. - The second array contains the corresponding labels. Example: ```python import tensorflow as tf import numpy as np # Assuming `dataset` is a pre-defined TensorFlow dataset with image-label pairs data, labels = tf_dataset_to_np_array(dataset) print(data.shape) # Prints the shape of the image data array print(labels.shape) # Prints the shape of the labels array # Assuming `dataset` is a pre-defined TensorFlow dataset with image only data, _ = tf_dataset_to_np_array(dataset,labels_included=False) print(data.shape) # Prints the shape of the image data array ``` Note: - The input TensorFlow dataset is expected to yield batches of data. - The function assumes that the dataset yields data in the form of (images, labels), where `images` are the features and `labels` are the corresponding targets or the data is of the form (images). - The function will fail if the input dataset does not yield data in the expected format. """ batch_data = [] batch_labels = [] if labels_included: for images, labels in input_ds: images = tf.cast(images, dtype=tf.float32).numpy() batch_data.append(images) batch_labels.append(labels) batch_labels = np.concatenate(batch_labels, axis=0) else: for images in input_ds: images = tf.cast(images, dtype=tf.float32).numpy() batch_data.append(images) batch_data = np.concatenate(batch_data, axis=0) # Convert image to input data if nchw and batch_data is not None: if batch_data.ndim == 4: # For a 4D array with shape [n, h, w, c], the new order will be [n, c, h, w] axes_order = (0, 3, 1, 2) elif batch_data.ndim == 3: # For a 3D array with shape [n, h, c], the new order will be [n, c, h] axes_order = (0, 2, 1) else: raise ValueError("The input array must have either 3 or 4 dimensions.") batch_data = np.transpose(batch_data, axes_order) return batch_data, batch_labels def torch_dataset_to_np_array(input_loader, nchw=True, labels_included=True, device="cpu"): """ Converts a PyTorch DataLoader into two NumPy arrays containing the data and labels. Parameters: - input_loader (torch.utils.data.DataLoader): A PyTorch DataLoader yielding batches of (images, labels) or (images,) tensors. - nchw (bool): Whether to ensure the output images follow NCHW format (channels first). - labels_included (bool): Whether the dataset includes labels. - device (str): Device to move tensors to before converting (default: 'cpu'). Returns: - tuple: (images_np, labels_np) - images_np: numpy.ndarray containing all image tensors concatenated along batch dim. - labels_np: numpy.ndarray containing all labels (if labels_included=True), else None. Example: ```python data, labels = torch_dataset_to_np_array(train_loader) print(data.shape, labels.shape) images_only, _ = torch_dataset_to_np_array(pred_loader, labels_included=False) print(images_only.shape) ``` """ all_images = [] all_labels = [] # Ensure no gradients interfere with torch.no_grad(): for batch in input_loader: if labels_included: images, labels = batch images = images.to(device) all_labels.append(labels.cpu().numpy()) else: images = batch[0] if isinstance(batch, (list, tuple)) else batch images = images.to(device) all_images.append(images.cpu().numpy()) # Concatenate all batches images_np = np.concatenate(all_images, axis=0) labels_np = np.concatenate(all_labels, axis=0) if labels_included else None # Reorder channels if needed if nchw and images_np.ndim == 4: # Convert from NCHW -> NHWC images_np = np.transpose(images_np, (0, 2, 3, 1)) return images_np, labels_np def compute_confusion_matrix(test_set: tf.data.Dataset = None, model: Model = None) -> Tuple[np.ndarray, np.float32]: """ Computes the confusion matrix and logs it as an image summary. Args: test_set (tf.data.Dataset): The test dataset to evaluate the model on. model (tf.keras.models.Model): The trained model to evaluate. Returns: confusion_matrix and accuracy """ test_pred = [] test_labels = [] for data in test_set: test_pred_score = model.predict_on_batch(data[0]) if test_pred_score.shape[1] > 1: # Multi-label classification test_pred.append(np.argmax(test_pred_score, axis=1)) else: # Binary classification test_pred_score = np.where(test_pred_score < 0.5, 0, 1) test_pred.append(np.squeeze(test_pred_score)) # handle both types of the ground truth labels (one-hotcoded or integer) batch_labels = np.argmax(data[1], axis=1) if len(data[1].shape)>1 else data[1] test_labels.append(batch_labels) labels = np.concatenate(test_labels, axis=0) logits = np.concatenate(test_pred, axis=0) test_accuracy = round((np.sum(labels == logits) * 100) / len(labels), 2) # Calculate the confusion matrix. cm = sklearn.metrics.confusion_matrix(labels, logits) return cm, test_accuracy