import os import zipfile import io from pathlib import Path import numpy as np import pandas as pd import matplotlib.pyplot as plt import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers, regularizers # Load Kinect movement data JOINTS = ["head", "left_shoulder", "left_elbow", "right_shoulder", "right_elbow", "left_hand", "right_hand", "left_hip", "right_hip", "left_knee", "right_knee", "left_foot", "right_foot", ] N_JOINTS = len(JOINTS) # total number of body joints N_INPUT = N_JOINTS * 2 # input for each joint(x and y) N_OUTPUT = N_JOINTS * 1 # output/target z coordiante print(f'Input: {N_INPUT} \tOutput:{N_OUTPUT}') print(f'Joints: {N_JOINTS}') # Loads single csv file and splits into input and target array def load_single_csv(filepath_or_bytes): if isinstance(filepath_or_bytes, (str, os.PathLike)): df = pd.read_csv(filepath_or_bytes) else: df = pd.read_csv(io.BytesIO(filepath_or_bytes)) df.columns = df.columns.str.strip() x_cols = [f"{j}_x" for j in JOINTS] y_cols = [f"{j}_y" for j in JOINTS] z_cols = [f"{j}_z" for j in JOINTS] xy_cols = [] for j in JOINTS: xy_cols += [f"{j}_x", f"{j}_y"] X = df[xy_cols].values.astype(np.float32) # input y = df[z_cols].values.astype(np.float32) # Target return X, y # load all csv file from the folder def load_all_sequences(folder_path): sequences, file_names = [], [] # Get all CSV files in the folder csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')] csv_files.sort() print(f"Found {len(csv_files)} CSV files in folder.") for name in csv_files: file_path = os.path.join(folder_path, name) with open(file_path, 'rb') as f: raw = f.read() X, y = load_single_csv(raw) sequences.append((X, y)) # stores as (input,target) file_names.append(name) return sequences, file_names # For Dense MLP model, which treats each frame independently def flatten_sequences(sequences): X_flat = np.concatenate([s[0] for s in sequences], axis=0) y_flat = np.concatenate([s[1] for s in sequences], axis=0) return X_flat, y_flat # Create fixed-length windows of consecutive frames from each session for conv1d, lstm and gru def make_windowed_sequences(sequences, window_size=30, stride=1): X_list, y_list = [], [] for X, y in sequences: n = len(X) for start in range(0, n - window_size + 1, stride): X_list.append(X[start : start + window_size]) y_list.append(y[start : start + window_size]) X_seq = np.array(X_list, dtype=np.float32) # (N, window, 26) y_seq = np.array(y_list, dtype=np.float32) # (N, window, 13) return X_seq, y_seq REPO_ROOT = os.path.abspath(os.path.join(os.getcwd(), '..')) DATA_DIR = os.path.join(REPO_ROOT, 'Datasets_all') KINECT_DATA_PATH = os.path.join(DATA_DIR, 'kinect_good_preprocessed') # sequences contain list of tuples (X,y) sequences, file_names = load_all_sequences(KINECT_DATA_PATH) # Frame-level flat data (for Dense models) X_flat, y_flat = flatten_sequences(sequences) print(f"\nFlat dataset: X={X_flat.shape} y={y_flat.shape}") # Windowed sequences (for Conv1D / LSTM / GRU models) WINDOW_SIZE = 30 X_seq, y_seq = make_windowed_sequences(sequences, window_size=WINDOW_SIZE, stride=5) y_seq_last = y_seq[:, -1, :] # (N, 13) print(f"Windowed dataset: X={X_seq.shape} y_last={y_seq_last.shape}") # Define Deep Learning network architectures # DEnse MLP def build_dense_model( hidden_units=(128, 64), activation="relu", dropout_rate=0.2, l2_reg=1e-4,optimizer="adam", loss="mse", ): inputs = keras.Input(shape=(N_INPUT,), name="xy_input") x = inputs for i, units in enumerate(hidden_units): x = layers.Dense( units, activation=activation, kernel_regularizer=regularizers.l2(l2_reg) if l2_reg else None, name=f"dense_{i+1}", )(x) if dropout_rate > 0: x = layers.Dropout(dropout_rate, name=f"dropout_{i+1}")(x) outputs = layers.Dense(N_OUTPUT, activation="linear", name="z_output")(x) model = keras.Model(inputs, outputs, name="DenseModel") return model # Conv1D CNN def build_conv1d_model(filters=(64, 128), kernel_size=3, pool_size=2, dense_units=(64,), activation="relu", dropout_rate=0.2,optimizer="adam", loss="mse", ): inputs = keras.Input(shape=(WINDOW_SIZE, N_INPUT), name="xy_seq_input") x = inputs for i, f in enumerate(filters): x = layers.Conv1D(f, kernel_size, activation=activation, padding="same", name=f"conv_{i+1}")(x) x = layers.MaxPooling1D(pool_size, padding="same", name=f"pool_{i+1}")(x) if dropout_rate > 0: x = layers.Dropout(dropout_rate, name=f"drop_conv_{i+1}")(x) x = layers.GlobalAveragePooling1D(name="gap")(x) for i, units in enumerate(dense_units): x = layers.Dense(units, activation=activation, name=f"fc_{i+1}")(x) if dropout_rate > 0: x = layers.Dropout(dropout_rate, name=f"drop_fc_{i+1}")(x) outputs = layers.Dense(N_OUTPUT, activation="linear", name="z_output")(x) model = keras.Model(inputs, outputs, name="Conv1DModel") return model # layers.LSTM def build_lstm_model(lstm_units=(64, 32), dense_units=(32,), activation="tanh", dropout_rate=0.2, recurrent_dropout=0.0, optimizer="adam", loss="mse", ): inputs = keras.Input(shape=(WINDOW_SIZE, N_INPUT), name="xy_seq_input") x = inputs for i, units in enumerate(lstm_units): return_sequences = (i < len(lstm_units) - 1) x = layers.LSTM( units, return_sequences=return_sequences, dropout=dropout_rate, recurrent_dropout=recurrent_dropout, name=f"lstm_{i+1}", )(x) for i, units in enumerate(dense_units): x = layers.Dense(units, activation="relu", name=f"fc_{i+1}")(x) if dropout_rate > 0: x = layers.Dropout(dropout_rate, name=f"drop_fc_{i+1}")(x) outputs = layers.Dense(N_OUTPUT, activation="linear", name="z_output")(x) model = keras.Model(inputs, outputs, name="LSTMModel") return model # layers.GRU def build_gru_model(gru_units=(64, 32), dense_units=(32,), dropout_rate=0.2, recurrent_dropout=0.0, optimizer="adam", loss="mse",): inputs = keras.Input(shape=(WINDOW_SIZE, N_INPUT), name="xy_seq_input") x = inputs for i, units in enumerate(gru_units): return_sequences = (i < len(gru_units) - 1) x = layers.GRU( units, return_sequences=return_sequences, dropout=dropout_rate, recurrent_dropout=recurrent_dropout, name=f"gru_{i+1}", )(x) for i, units in enumerate(dense_units): x = layers.Dense(units, activation="relu", name=f"fc_{i+1}")(x) if dropout_rate > 0: x = layers.Dropout(dropout_rate, name=f"drop_fc_{i+1}")(x) outputs = layers.Dense(N_OUTPUT, activation="linear", name="z_output")(x) model = keras.Model(inputs, outputs, name="GRUModel") return model