| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import logging |
| import os |
| import sys |
| import warnings |
| import subprocess |
| import platform |
| from hydra.core.hydra_config import HydraConfig |
| from omegaconf import DictConfig |
| import tensorflow as tf |
| import shutil |
| from typing import Optional |
| from pathlib import Path |
|
|
| import common.stm32ai_local as stmaic |
| from common.benchmarking import cloud_connect, cloud_analyze, benchmark_model |
| from common.stm32ai_dc import (CliLibraryIde, CliLibrarySerie, CliParameters) |
| from .external_memory_mgt import update_activation_c_code |
|
|
| import json |
| import re |
| from typing import Dict, List |
|
|
| def check_submodule(c_project_path: str): |
| """ |
| Check if the project submodule is initialized in the C project path. |
| |
| Args: |
| c_project_path (str): Path to the C project. |
| """ |
| BOLD_YELLOW = "\033[1;33m" |
| BOLD_RED = "\033[1;31m" |
| RESET = "\033[0m" |
| from git import Repo, InvalidGitRepositoryError |
|
|
| |
| script_dir = os.path.dirname(os.path.abspath(__file__)) |
|
|
| |
| try: |
| repo = Repo(script_dir, search_parent_directories=True) |
| except InvalidGitRepositoryError: |
| raise InvalidGitRepositoryError(f"No git repository found from path {script_dir}") |
|
|
| submodule_path = c_project_path.lstrip('./').rstrip("./") |
|
|
| |
| submodule = None |
| for sm in repo.submodules: |
| print(sm.path) |
| if sm.path == submodule_path: |
| submodule = sm |
| break |
|
|
| if submodule is None: |
| raise ValueError(f"Submodule '{submodule_path}' not found.") |
|
|
| |
| try: |
| sub_repo = submodule.module() |
| initialized = True |
| except InvalidGitRepositoryError: |
| print(f"{RESET}{BOLD_RED}[ERROR]:{RESET} Submodule '{submodule_path}' is not initialized. Please run 'git submodule update --init {submodule_path}'.") |
| initialized = False |
| sub_repo = None |
| return False |
|
|
| if initialized: |
| |
| main_commit = submodule.hexsha |
|
|
| |
| current_commit = sub_repo.head.commit.hexsha |
| if main_commit != current_commit: |
| print(f"{RESET}{BOLD_RED}[ERROR]:{RESET} Submodule '{submodule_path}' is not at the expected commit.") |
| print(f" Main repo expects commit {main_commit} but submodule is at commit {current_commit}.") |
| print(f" Please run 'git submodule update --init {submodule_path}' to sync the submodule.") |
| return False |
| if sub_repo.is_dirty(untracked_files=True): |
| print(f"{RESET}{BOLD_YELLOW}[WARNING]:{RESET} Submodule '{submodule_path}' has uncommitted changes. Please commit or stash them.") |
| return True |
| return True |
|
|
| def _keep_internal_weights(path_network_data_params: str): |
| with open(path_network_data_params,'r') as f1,\ |
| open(os.path.join(os.path.dirname(path_network_data_params), 'network_data_params_modify.c'),'w') as f2: |
| for lineNumber, line in enumerate(f1): |
| if line == '#include "network_data_params.h"\n': |
| line = '#define AI_INTERNAL_FLASH __attribute__((section(".InternalFlashSection")))\n' + line |
| re.findall("const ai_u(?:\d+) (.*)\[(?:\d+)\]", line) |
| |
| weight = re.findall("const ai_u(?:\d+) (.*)\[(?:\d+)\]", line) |
| if weight != []: |
| line = 'AI_INTERNAL_FLASH\n' + line |
| f2.write(line) |
| os.replace(os.path.join(os.path.dirname(path_network_data_params), 'network_data_params_modify.c'), path_network_data_params) |
|
|
|
|
| def _dispatch_weights(internalFlashSizeFlash_KB: str, |
| kernelFlash_KB: str, |
| applicationSizeFlash_KB: str, |
| path_network_c_info: str, |
| path_network_data_params: str): |
|
|
| with open(os.path.join(path_network_c_info), 'r') as f: |
| graph = json.load(f) |
|
|
| |
| for i in range(len(graph["memory_pools"])-1,0,-1): |
| element = graph["memory_pools"][i] |
| if element["rights"] != "ACC_READ": |
| graph["memory_pools"].remove(element) |
|
|
| |
| sorted_weights = sorted(graph["memory_pools"], key=lambda item: item['used_size_bytes'], reverse=True) |
|
|
| internalFlashSize_inBytes = int(re.split('(\d+)', internalFlashSizeFlash_KB)[1])*10**3 |
| kernel_flash_inBytes = int(re.split('(\d+)', kernelFlash_KB)[1])*10**3 |
| application_size_flash_inBytes = int(re.split('(\d+)', applicationSizeFlash_KB)[1])*10**3 |
|
|
| freeInternalFlashSize = internalFlashSize_inBytes - kernel_flash_inBytes - application_size_flash_inBytes |
| ExternalWeightArray = [] |
| InternalWeightArray = [] |
| for detail in sorted_weights: |
| if (freeInternalFlashSize - detail["used_size_bytes"]) > 0: |
| bytes_number = detail["used_size_bytes"] |
| |
| InternalWeightArray.append(detail["name"]) |
| |
| freeInternalFlashSize = freeInternalFlashSize - bytes_number |
| else: |
| |
| ExternalWeightArray.append(detail["name"]) |
|
|
| with open(path_network_data_params,'r') as f1,\ |
| open(os.path.join(os.path.dirname(path_network_data_params), 'network_data_params_modify.c'),'w') as f2: |
| for lineNumber, line in enumerate(f1): |
| if line == '#include "network_data_params.h"\n': |
| line = '#define AI_EXTERNAL_FLASH __attribute__((section(".ExternalFlashSection")))\n\ |
| #define AI_INTERNAL_FLASH __attribute__((section(".InternalFlashSection")))\n' + line |
| |
| weight = re.findall("const ai_u(?:\d+) \D_network_(.*)_\D(?:\d+)\[(?:\d+)\]", line) |
| if weight != []: |
| if weight[0] in InternalWeightArray: |
| line = 'AI_INTERNAL_FLASH\n' + line |
| elif weight[0] in ExternalWeightArray: |
| line = 'AI_EXTERNAL_FLASH\n' + line |
| f2.write(line) |
| os.replace(os.path.join(os.path.dirname(path_network_data_params), 'network_data_params_modify.c'), path_network_data_params) |
|
|
|
|
| def stm32ai_deploy(target: bool = False, |
| stlink_serial_number: str = None, |
| stedgeai_core_version: str = None, |
| c_project_path: str = None, |
| output_dir: str = None, |
| stm32ai_output: str = None, |
| optimization: str = None, |
| path_to_stm32ai: str = None, |
| path_to_cube_ide: str = None, |
| additional_files: list = None, |
| stmaic_conf_filename: str = 'stmaic_c_project.conf', |
| verbosity: int = None, |
| debug: bool = False, |
| model_path: str = None, |
| get_model_name_output: str = None, |
| stm32ai_ide: str = None, |
| stm32ai_serie: str = None, |
| credentials: list[str] = None, |
| on_cloud: bool =False, |
| check_large_model:bool = False, |
| cfg = None, |
| custom_objects: Dict = None) -> None: |
| """ |
| Deploy an STM32 AI model to a target device. |
| |
| Args: |
| target (bool): Whether to generate the STM32Cube.AI library and header files on the target device. Defaults to False. |
| stedgeai_core_version (str): Version of the STEdgeAI Core to use. |
| c_project_path (str): Path to the STM32CubeIDE C project. |
| output_dir (str): Path to the output directory. |
| stm32ai_output (str: Path to the STM32Cube.AI output directory. Defaults to None. |
| optimization (str, optional): Optimization level for the STM32Cube.AI compiler. Defaults to None. |
| path_to_stm32ai (str): Path to the STM32Cube.AI compiler executable. Defaults to None. |
| path_to_cube_ide (str: Path to the STM32CubeIDE executable. Defaults to None. |
| stmaic_conf_filename (list): List of the additional files generated by the deployment that needs to be copied in the C application. Defaults to None. |
| stmaic_conf_filename (str): Path to the configuration file used to build the C application. Defaults to 'stmaic_c_project.conf'. |
| verbosity (int, optional): Level of verbosity for the STM32Cube.AI driver. Defaults to None. |
| debug (bool, optional): Whether to enable debug mode. Defaults to False. |
| model_path (str, optional): Path to the AI model file. Defaults to None. |
| get_model_name_output(str): Path to the output directory for the generated model name |
| stm32ai_ide: IDE to generate code for |
| stm32ai_serie: STM32 series to generate code for |
| credentials list[str]: User credentials used before to connect. |
| on_cloud(bool): whether to deploy using the cloud. Defaults to False |
| check_large_model: Launch an analysis to check if the model fit in internal memory, if not it will dispatch in internal and external |
| |
| Returns: |
| split_weights (bool): return true if the weights has been splitted; False otherwise |
| """ |
|
|
| def _stmaic_local_call(session): |
| """ |
| Compile the AI model using the STM32Cube.AI compiler. |
| |
| Args: |
| session (stmaic.STMAiSession): The STM32Cube.AI session object. |
| |
| Returns: |
| None |
| """ |
| if not check_large_model: |
| |
| os.environ["STM32_AI_EXE"] = path_to_stm32ai |
| |
| tools = stmaic.STMAiTools() |
| session.set_tools(tools) |
| print("[INFO] : Offline CubeAI used; Selected tools: ", tools, flush=True) |
|
|
| |
| shutil.rmtree(stm32ai_output, ignore_errors=True) |
| |
| opt = stmaic.STMAiCompileOptions(no_inputs_allocation=False, no_outputs_allocation=False) |
| opt.optimization = optimization |
|
|
| |
| stmaic.compile(session, opt) |
|
|
| else: |
| split_weights = False |
| split_ram = False |
|
|
| |
| benchmark_model(optimization=optimization, model_path=model_path, |
| path_to_stm32ai=path_to_stm32ai, stm32ai_output=stm32ai_output, |
| stedgeai_core_version=stedgeai_core_version, get_model_name_output=get_model_name_output) |
| with open(os.path.join(stm32ai_output, 'network_c_info.json'), 'r') as f: |
| report = json.load(f) |
|
|
| needed_rom = report["memory_footprint"]["weights"] |
| needed_ram = report["memory_footprint"]["activations"] |
|
|
| with open(os.path.join(board.config.memory_pool_path), 'r') as f: |
| memory_pool = json.load(f) |
|
|
| available_default_ram = int(next(item for item in memory_pool['memory']['mempools'] if item["name"] == "AXIRAM")["size"]["value"])*10**3 |
| externalRamSize_inBytes = int(next(item for item in memory_pool['memory']['mempools'] if item["name"] == "SDRAM")["size"]["value"])*10**3 |
|
|
| split_ram = available_default_ram < needed_ram |
|
|
| internalFlashSize_inBytes = int(re.split('(\d+)', board.config.internalFlash_size)[1])*10**3 |
| externalFlashSize_inBytes = int(re.split('(\d+)', board.config.externalFlash_size)[1])*10**3 |
| application_size_flash_inBytes = int(re.split('(\d+)', board.config.application_size)[1])*10**3 |
| if needed_rom > externalFlashSize_inBytes + internalFlashSize_inBytes - application_size_flash_inBytes: |
| raise ValueError("\033[31m The Model is too large (too much weights) to fit in the Board. It won't be compiled\033[39m") |
| if needed_ram > externalRamSize_inBytes + available_default_ram: |
| raise ValueError("\033[31m The Model is too large (too much activations) to fit in the Board. It won't be compiled\033[39m") |
|
|
| split_weights = needed_rom > (internalFlashSize_inBytes - application_size_flash_inBytes) |
|
|
| |
| os.environ["STM32_AI_EXE"] = path_to_stm32ai |
|
|
| |
| tools = stmaic.STMAiTools() |
| session.set_tools(tools) |
| print("[INFO] : Offline CubeAI used; Selected tools: ", tools, flush=True) |
|
|
| |
| shutil.rmtree(stm32ai_output, ignore_errors=True) |
|
|
| |
| opt = stmaic.STMAiCompileOptions(no_inputs_allocation=False, no_outputs_allocation=False, split_weights=split_weights) |
| opt.optimization = optimization |
|
|
| if split_ram: |
| print("[INFO] : Dispatch activations in different ram pools to fit the large model") |
| |
| stmaic.compile(session=session, options=opt, target=session._board_config) |
| else: |
| stmaic.compile(session=session, options=opt) |
|
|
| path_network_c_info = os.path.join(session.workspace, "network_c_info.json") |
|
|
| update_activation_c_code(c_project_path, model_path=model_path, path_network_c_info=path_network_c_info, available_AXIRAM=available_default_ram, aspect_ratio=aspect_ratio, custom_objects=custom_objects) |
|
|
| if split_weights: |
| print("[INFO] : Dispatch weights between internal and external flash to fit the large model") |
|
|
| |
| _dispatch_weights(internalFlashSizeFlash_KB=board.config.internalFlash_size, |
| kernelFlash_KB=board.config.lib_size, |
| applicationSizeFlash_KB=board.config.application_size, |
| path_network_c_info=path_network_c_info, |
| path_network_data_params=os.path.join(session.generated_dir, "network_data.c")) |
| else: |
| print("[INFO] : Weights fit in internal flash") |
|
|
| |
| _keep_internal_weights( |
| path_network_data_params=os.path.join(session.generated_dir, "network_data.c")) |
|
|
|
|
| |
| os.environ["STM32_CUBE_IDE_EXE"] = path_to_cube_ide |
|
|
| |
| if debug: |
| stmaic.set_log_level('debug') |
| elif verbosity is not None: |
| stmaic.set_log_level('info') |
|
|
| |
| session = stmaic.load(model_path, workspace_dir=output_dir) |
|
|
| |
| board_conf = os.path.join(c_project_path, stmaic_conf_filename) |
| board = stmaic.STMAiBoardConfig(board_conf) |
| session.set_board(board) |
| print("[INFO] : Selected board : ", board, flush=True) |
|
|
| |
| user_files = [] |
| |
| |
| |
| try: |
| aspect_ratio=cfg.preprocessing.resizing.aspect_ratio |
| except: |
| pass |
| print("[INFO] : Compiling the model and generating optimized C code + Lib/Inc files: ", model_path, flush=True) |
| if on_cloud: |
| |
| login_success, ai, _ = cloud_connect(stedgeai_core_version=stedgeai_core_version, credentials=credentials) |
| if login_success: |
| |
| if not check_large_model: |
|
|
| ai.generate(CliParameters(model=model_path, output=stm32ai_output, fromModel=get_model_name_output, |
| includeLibraryForSerie=CliLibrarySerie(stm32ai_serie.upper()), |
| includeLibraryForIde=CliLibraryIde(stm32ai_ide.lower()))) |
|
|
| else: |
| split_weights = False |
| split_ram = False |
|
|
| |
| results = cloud_analyze(ai=ai, model_path=model_path, optimization=optimization, |
| get_model_name_output=get_model_name_output) |
| needed_ram = int(results["activations_size"]) |
| needed_rom = int(results["weights"]) |
|
|
| with open(os.path.join(board.config.memory_pool_path), 'r') as f: |
| memory_pool = json.load(f) |
| available_default_ram = int(next(item for item in memory_pool['memory']['mempools'] if item["name"] == "AXIRAM")["size"]["value"])*10**3 |
| externalRamSize_inBytes = int(next(item for item in memory_pool['memory']['mempools'] if item["name"] == "SDRAM")["size"]["value"])*10**3 |
|
|
| split_ram = available_default_ram < needed_ram |
|
|
| internalFlashSize_inBytes = int(re.split('(\d+)', board.config.internalFlash_size)[1])*10**3 |
| externalFlashSize_inBytes = int(re.split('(\d+)', board.config.externalFlash_size)[1])*10**3 |
| application_size_flash_inBytes = int(re.split('(\d+)', board.config.application_size)[1])*10**3 |
| if needed_rom > externalFlashSize_inBytes + internalFlashSize_inBytes - application_size_flash_inBytes: |
| raise ValueError("\033[31m The Model is too large (too much weights) to fit in the Disco Board. It won't be compiled\033[39m") |
| if needed_ram > externalRamSize_inBytes + available_default_ram: |
| raise ValueError("\033[31m The Model is too large (too much activations) to fit in the Disco Board. It won't be compiled\033[39m") |
|
|
| split_weights = needed_rom > (internalFlashSize_inBytes - application_size_flash_inBytes) |
|
|
| |
| memory_pool_path = board.config.memory_pool_path if split_ram else None |
|
|
| ai.generate(CliParameters(model=model_path, output=stm32ai_output, fromModel=get_model_name_output, |
| includeLibraryForSerie=CliLibrarySerie(stm32ai_serie.upper()), |
| splitWeights=split_weights, target_info=memory_pool_path, |
| includeLibraryForIde=CliLibraryIde(stm32ai_ide.lower()))) |
|
|
| path_network_c_info = os.path.join(session.generated_dir, "network_c_info.json") |
|
|
| |
| update_activation_c_code(c_project_path, model_path=model_path, path_network_c_info=path_network_c_info, available_AXIRAM=available_default_ram, aspect_ratio=aspect_ratio, custom_objects=custom_objects) |
|
|
| if split_weights: |
| |
| _dispatch_weights(internalFlashSizeFlash_KB=board.config.internalFlash_size, |
| kernelFlash_KB=board.config.lib_size, |
| applicationSizeFlash_KB="10KB", |
| path_network_c_info=path_network_c_info, |
| path_network_data_params=os.path.join(stm32ai_output, "network_data.c")) |
| else: |
| print("[INFO] : Weights fit in internal flash") |
|
|
| |
| _keep_internal_weights( |
| path_network_data_params=os.path.join(session.generated_dir, "network_data.c")) |
|
|
| if os.path.exists(stm32ai_output): |
| |
| |
| if not stm32ai_output.lower() == os.path.join(output_dir, "generated").lower(): |
| shutil.move(stm32ai_output, os.path.join(output_dir, "generated")) |
| stm32ai_output = os.path.join(output_dir, "generated") |
|
|
| |
| if not os.listdir(stm32ai_output) or ('Lib' or 'Inc') not in os.listdir(stm32ai_output): |
| _stmaic_local_call(session) |
| else: |
| _stmaic_local_call(session) |
| else: |
| _stmaic_local_call(session) |
|
|
| print("[INFO] : Optimized C code + Lib/Inc files generation done.") |
|
|
| |
| print("[INFO] : Building the STM32 c-project..", flush=True) |
|
|
| user_files.extend([os.path.join(output_dir, "C_header/ai_model_config.h")]) |
| if additional_files: |
| for f in additional_files: |
| user_files.extend([os.path.join(output_dir, f)]) |
|
|
| stmaic.build(session, user_files=user_files, serial_number=stlink_serial_number) |
|
|
| def stm32ai_deploy_stm32n6(target: bool = False, |
| stlink_serial_number: str = None, |
| stedgeai_core_version: str = None, |
| c_project_path: str = None, |
| output_dir: str = None, |
| stm32ai_output: str = None, |
| optimization: str = None, |
| path_to_stm32ai: str = None, |
| path_to_cube_ide: str = None, |
| additional_files: list = None, |
| stmaic_conf_filename: str = 'stmaic_c_project.conf', |
| verbosity: int = None, |
| debug: bool = False, |
| model_path: str = None, |
| get_model_name_output: str = None, |
| stm32ai_ide: str = None, |
| stm32ai_serie: str = None, |
| credentials: list[str] = None, |
| on_cloud: bool =False, |
| check_large_model:bool = False, |
| build_conf: str = None, |
| cfg = None, |
| custom_objects: Dict = None, |
| input_data_type: str = '', |
| output_data_type: str = '', |
| inputs_ch_position: str = '', |
| outputs_ch_position: str = '', |
| name: str = 'network', |
| no_inputs_allocation: bool = False, |
| no_outputs_allocation: bool = False) -> None: |
| """ |
| Deploy an STM32 AI model to a target device. |
| |
| Args: |
| target (bool): Whether to generate the STM32Cube.AI library and header files on the target device. Defaults to False. |
| c_project_path (str): Path to the STM32CubeIDE C project. |
| verbosity (int, optional): Level of verbosity for the STM32Cube.AI driver. Defaults to None. |
| debug (bool, optional): Whether to enable debug mode. Defaults to False. |
| model_path (str, optional): Path to the AI model file. Defaults to None. |
| on_cloud(bool): whether to deploy using the cloud. Defaults to False |
| config(list): |
| |
| Returns: |
| split_weights (bool): return true if the weights has been splitted; False otherwise |
| """ |
| def _stmaic_local_call(session): |
| """ |
| Compile the AI model using the STM32Cube.AI compiler. |
| |
| Args: |
| session (stmaic.STMAiSession): The STM32Cube.AI session object. |
| |
| Returns: |
| None |
| """ |
| |
| os.environ["STM32_AI_EXE"] = path_to_stm32ai |
| |
| tools = stmaic.STMAiTools() |
| session.set_tools(tools) |
| print("[INFO] : Offline CubeAI used; Selected tools: ", tools, flush=True) |
|
|
| |
| shutil.rmtree(stm32ai_output, ignore_errors=True) |
|
|
| |
| neural_art_path = session._board_config.config.profile + "@" + session._board_config.config.neuralart_user_path |
| opt = stmaic.STMAiCompileOptions(st_neural_art=neural_art_path, input_data_type=input_data_type, inputs_ch_position=inputs_ch_position, |
| output_data_type = output_data_type, outputs_ch_position = outputs_ch_position, |
| name=name, no_outputs_allocation=no_outputs_allocation, no_inputs_allocation=no_inputs_allocation) |
|
|
| |
| board_conf = os.path.join(c_project_path, stmaic_conf_filename) |
| board = stmaic.STMAiBoardConfig(board_conf, build_conf) |
| session.set_board(board) |
|
|
| |
| stmaic.compile(session=session, options=opt, target=session._board_config) |
|
|
| |
| os.environ["STM32_CUBE_IDE_EXE"] = path_to_cube_ide |
|
|
| |
| if debug: |
| stmaic.set_log_level('debug') |
| elif verbosity is not None: |
| stmaic.set_log_level('info') |
|
|
| ret = check_submodule(c_project_path) |
| if not ret: |
| sys.exit(1) |
|
|
| |
| session = stmaic.load(model_path, workspace_dir=output_dir) |
|
|
| |
| board_conf = os.path.join(c_project_path, stmaic_conf_filename) |
| board = stmaic.STMAiBoardConfig(board_conf, build_conf) |
| session.set_board(board) |
| print("[INFO] : Selected board : ", board, flush=True) |
| |
| user_files = [] |
| print("[INFO] : Compiling the model and generating optimized C code + Lib/Inc files: ", model_path, flush=True) |
|
|
| if on_cloud: |
| |
| login_success, ai, _ = cloud_connect(stedgeai_core_version=stedgeai_core_version, credentials=credentials) |
|
|
| if login_success: |
|
|
| with open(session._board_config.config.neuralart_user_path) as file: |
| neuralart_options = json.load(file) |
| neuralart_options = neuralart_options['Profiles']['default']["options"].replace('--', "--atonnOptions.") |
|
|
| |
| ai.generate(CliParameters(model=model_path, output=stm32ai_output, fromModel=get_model_name_output, target="stm32n6", stNeuralArt="default", |
| allocateInputs=False, allocateOutputs=False, mpool=board._conf.mpool, extraCommandLineArguments=neuralart_options, |
| includeLibraryForSerie=CliLibrarySerie(stm32ai_serie.upper()), |
| includeLibraryForIde=CliLibraryIde(stm32ai_ide.lower()))) |
|
|
| if os.path.exists(stm32ai_output): |
| |
| |
| if stm32ai_output != os.path.join(output_dir, "generated"): |
| shutil.move(stm32ai_output, os.path.join(output_dir, "generated")) |
| stm32ai_output = os.path.join(output_dir, "generated") |
|
|
| |
| if not os.listdir(stm32ai_output) or ('Lib' or 'Inc') not in os.listdir(stm32ai_output): |
| _stmaic_local_call(session) |
| else: |
| _stmaic_local_call(session) |
| else: |
| _stmaic_local_call(session) |
|
|
| print("[INFO] : Optimized C code + Lib/Inc files generation done.") |
|
|
| |
| print("[INFO] : Building the STM32 c-project..", flush=True) |
|
|
| user_files.extend([os.path.join(output_dir, "C_header/app_config.h")]) |
| user_files.extend([os.path.join(output_dir, "C_header/ai_model_config.h")]) |
| if additional_files: |
| for f in additional_files: |
| user_files.extend([os.path.join(output_dir, f)]) |
|
|
| stmaic.build(session, user_files=user_files, serial_number=stlink_serial_number) |
|
|
|
|
| def stm32ai_deploy_mpu(target: bool = False, |
| board_ip_address: str = None, |
| board_deploy: str = None, |
| class_names: List = None, |
| c_project_path: str = None, |
| verbosity: int = None, |
| debug: bool = False, |
| model_path: str = None, |
| cfg = None) -> None: |
| """ |
| Deploy an STM32 AI model to a target device. |
| |
| Args: |
| target (bool): Whether to generate the STM32Cube.AI library and header files on the target device. Defaults to False. |
| c_project_path (str): Path to the STM32CubeIDE C project. |
| verbosity (int, optional): Level of verbosity for the STM32Cube.AI driver. Defaults to None. |
| debug (bool, optional): Whether to enable debug mode. Defaults to False. |
| model_path (str, optional): Path to the AI model file. Defaults to None. |
| on_cloud(bool): whether to deploy using the cloud. Defaults to False |
| config(list): |
| |
| Returns: |
| split_weights (bool): return true if the weights has been splitted; False otherwise |
| """ |
|
|
| |
| if board_ip_address is None: |
| print("Board IP address is missing, unable to deploy on target") |
| return False |
|
|
| |
| count = 5 |
| timeout = 100 |
| subprocess_timeout = 5 |
| count_params = '-n' if platform.system().lower() == 'windows' else '-c' |
| timeout_params = '-w' if platform.system().lower() == 'windows' else '-W' |
| model_extension = "tflite" |
|
|
| cmd = ['ping', count_params, str(count), timeout_params, str(timeout), board_ip_address] |
| try: |
| |
| res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=5, text=True) |
| |
| if res.returncode == 0: |
| print(f"[INFO] : Board is reachable at {board_ip_address} address") |
| else: |
| print(f"[FAIL] : Board is not reachable at {board_ip_address} address") |
| return False |
| except subprocess.TimeoutExpired: |
| print(f"[FAIL] : Board is not reachable, ping command timed out after {subprocess_timeout} seconds.") |
| return False |
| except Exception as e: |
| print(f"[FAIL] : Verification of the IP failed : {e}.") |
| return False |
|
|
| |
|
|
| |
| command = "mkdir -p " + board_deploy |
| ssh = subprocess.run("ssh -o \"StrictHostKeyChecking no\" root@"+board_ip_address+" \""+command+"\"", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=300) |
| if ssh.returncode != 0: |
| print(f"[FAIL] deploy directory creation failed, code: {ssh.returncode}") |
| return False |
|
|
| |
| path_to_application = c_project_path + "Application/" |
| path_to_resources = c_project_path + "Resources/" |
|
|
| |
| label_file = os.path.join(path_to_resources, 'class_names.txt') |
| if isinstance(class_names, list) and all(isinstance(name, str) for name in class_names): |
| with open(label_file, 'w') as file: |
| for class_name in class_names: |
| file.write(class_name + '\n') |
| elif isinstance(class_names, str) and class_names.endswith('.txt'): |
| shutil.copy(class_names, label_file) |
|
|
| command = "scp -r " + path_to_application + " " + path_to_resources + " " + model_path + " root@" + board_ip_address + ":" + board_deploy |
| deploy_res = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=300) |
|
|
| if deploy_res.returncode == 0 : |
| print(f"[INFO] : Application code successfully installed on target") |
| else: |
| print(f"[FAIL] : Application code deployment failed : {deploy_res.stderr} ") |
| return False |
|
|
| |
| if "STM32MP2" in target: |
| path_to_target_resources = c_project_path + "/STM32MP2/*.sh" |
| model_extension = "nbg" |
| else: |
| path_to_target_resources = c_project_path + "/STM32MP1/*.sh" |
| model_extension = "tflite" |
|
|
| command = "scp -r -p " + path_to_target_resources + " root@" + board_ip_address + ":" + board_deploy + "/Resources" |
| deploy_spe_res = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=300) |
|
|
| if deploy_spe_res.returncode != 0: |
| print(f"[FAIL] : Application code deployment failed : {deploy_spe_res.stderr} ") |
| return False |
|
|
| |
| remote_application_path = os.path.join(board_deploy, "Application") |
| remote_resources_path = os.path.join(board_deploy, "Resources") |
|
|
| |
| chmod_application_cmd = f"chmod +x {remote_application_path}/*.sh" |
| ssh_chmod_app = subprocess.run( |
| f"ssh -o \"StrictHostKeyChecking no\" root@{board_ip_address} \"{chmod_application_cmd}\"", |
| shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=60 |
| ) |
| if ssh_chmod_app.returncode != 0: |
| print(f"[WARN] : chmod +x on Application/*.sh failed: {ssh_chmod_app.stderr.decode().strip()}") |
|
|
| |
| chmod_resources_cmd = f"chmod +x {remote_resources_path}/*.sh" |
| ssh_chmod_res = subprocess.run( |
| f"ssh -o \"StrictHostKeyChecking no\" root@{board_ip_address} \"{chmod_resources_cmd}\"", |
| shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=60 |
| ) |
| if ssh_chmod_res.returncode != 0: |
| print(f"[WARN] : chmod +x on Resources/*.sh failed: {ssh_chmod_res.stderr.decode().strip()}") |
|
|
| |
| script_extension = ".sh" |
| file_names = [] |
| for item in os.listdir(path_to_application): |
| if Path(item).suffix == script_extension: |
| file_names.append(os.path.basename(item)) |
|
|
| launch_script = None |
| for file_name in file_names: |
| if "launch_" in file_name: |
| launch_script = file_name |
| break |
|
|
| if launch_script is None: |
| print("[FAIL] : Launch script not found in Application folder") |
| return False |
|
|
| |
| command = board_deploy + "/Application/" + launch_script + " " + model_extension + " " + board_deploy |
| print(f"[INFO] : To launch application directly on the target please run : {command}") |
| command = "ssh -o \"StrictHostKeyChecking no\" root@"+board_ip_address+" \""+command+"\"" |
| print(f"[INFO] : To launch application from your host computer please run : {command}") |
|
|
| return True |