Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| """ | |
| Created on Wed Jan 15 10:25:34 2025 | |
| @author: Ashmitha | |
| """ | |
| #-----------------------------------------------------------Libraries---------------------------------------------------------------------------- | |
| import pandas as pd | |
| import numpy as np | |
| import gradio as gr | |
| #! pip install scikit-learn | |
| from sklearn.metrics import mean_squared_error,r2_score | |
| from scipy.stats import pearsonr | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.model_selection import KFold | |
| import tensorflow as tf | |
| from tensorflow.keras.models import Sequential | |
| from tensorflow.keras.layers import GRU,Dense,Dropout,BatchNormalization,LeakyReLU | |
| from tensorflow.keras.optimizers import Adam | |
| from tensorflow.keras import regularizers | |
| from tensorflow.keras.callbacks import ReduceLROnPlateau,EarlyStopping | |
| import os | |
| from sklearn.preprocessing import MinMaxScaler | |
| from keras.layers import Conv1D,MaxPooling1D,Dense,Flatten,Dropout,LeakyReLU | |
| from keras.callbacks import ReduceLROnPlateau,EarlyStopping | |
| from sklearn.ensemble import RandomForestRegressor | |
| from xgboost import XGBRegressor | |
| import io | |
| from sklearn.feature_selection import SelectFromModel | |
| import tempfile | |
| import pyinstaller | |
| #--------------------------------Random Forest for Feature selection------------------------------------------- | |
| def RandomForestFeatureSelection(trainX, trainy,num_features=60): | |
| rf=RandomForestRegressor(n_estimators=1000,random_state=50) | |
| rf.fit(trainX,trainy) | |
| importances=rf.feature_importances_ | |
| indices=np.argsort(importances)[-num_features:] | |
| return indices | |
| #------------------------------------------------------------------GRU model-------------------------------------------------- | |
| def GRUModel(trainX,trainy,testX,testy,epochs=1000,batch_size=64,learning_rate=0.0001,l1_reg=0.001,l2_reg=0.001,dropout_rate=0.2,feature_selection=True): | |
| if feature_selection: | |
| rf=RandomForestRegressor(n_estimators=100,random_state=42) | |
| rf.fit(trainX,trainy) | |
| selector=SelectFromModel(rf,threshold="mean",prefit=True) | |
| trainX=selector.transform(trainX) | |
| if testX is not None: | |
| testX=selector.transform(testX) | |
| print(f"Selected {trainX.shape[1]} features based on feature importance") | |
| scaler=MinMaxScaler() | |
| trainX_scaled=scaler.fit_transform(trainX) | |
| if testX is not None: | |
| testX_scaled=scaler.transform(testX) | |
| target_scaler=MinMaxScaler() | |
| trainy_scaled=target_scaler.fit_transform(trainy.reshape(-1,1)) | |
| trainX=trainX_scaled.reshape((trainX.shape[0],1,trainX.shape[1])) | |
| if testX is not None: | |
| testX=testX_scaled.reshape((testX.shape[0],1,testX.shape[1])) | |
| model=Sequential() | |
| model.add(GRU(512, input_shape=(trainX.shape[1],trainX.shape[2]), return_sequences=False,kernel_regularizer=regularizers.l1_l2(l1=l1_reg,l2=l2_reg))) | |
| model.add(Dense(256,kernel_initializer='he_normal',kernel_regularizer=regularizers.l1_l2(l1=l1_reg,l2=l2_reg))) | |
| model.add(BatchNormalization()) | |
| model.add(Dropout(dropout_rate)) | |
| model.add(LeakyReLU(alpha=0.1)) | |
| model.add(Dense(128,kernel_initializer="he_normal",kernel_regularizer=regularizers.l1_l2(l1=l1_reg,l2=l2_reg))) | |
| model.add(BatchNormalization()) | |
| model.add(Dropout(dropout_rate)) | |
| model.add(LeakyReLU(alpha=0.1)) | |
| model.add(Dense(64,kernel_initializer='he_normal',kernel_regularizer=regularizers.l1_l2(l1=l1_reg,l2=l2_reg))) | |
| model.add(BatchNormalization()) | |
| model.add(Dropout(dropout_rate)) | |
| model.add(LeakyReLU(alpha=0.1)) | |
| model.add(Dense(32,kernel_initializer='he_normal',kernel_regularizer=regularizers.l1_l2(l1=l1_reg,l2=l2_reg))) | |
| model.add(BatchNormalization()) | |
| model.add(Dropout(dropout_rate)) | |
| model.add(LeakyReLU(alpha=0.1)) | |
| model.add(Dense(1,activation="relu")) | |
| model.compile(loss="mse",optimizer=Adam(learning_rate=learning_rate),metrics=["mse"]) | |
| learning_rate_reduction=ReduceLROnPlateau(monitor="val_loss",patience=10,verbose=1,factor=0.5,min_lr=1e-6) | |
| early_stopping=EarlyStopping(monitor='val_loss',verbose=1,restore_best_weights=True,patience=10) | |
| history = model.fit(trainX, trainy_scaled, epochs=epochs, batch_size=batch_size, validation_split=0.1, verbose=1, | |
| callbacks=[learning_rate_reduction, early_stopping]) | |
| predicted_train=model.predict(trainX) | |
| predicted_test=model.predict(testX) if testX is not None else None | |
| predicted_train=model.predict(trainX) | |
| predicted_test=model.predict(testX) if testX is not None else None | |
| predicted_train=predicted_train.flatten() | |
| if predicted_test is not None: | |
| predicted_test =predicted_test.flatten() | |
| else: | |
| predicted_test=np.zeros_like(predicted_train) | |
| predicted_train=target_scaler.inverse_transform(predicted_train.reshape(-1,1)).flatten() | |
| if predicted_test is not None: | |
| predicted_test=target_scaler.inverse_transform(predicted_test.reshape(-1,1).flatten()) | |
| return predicted_train.predicted_test,history | |
| #----------------------------------------------------CNN----------------------------------------------- | |
| def CNNModel(trainX,trainy,testX,testy,epochs=1000,batch_size=64,learning_rate=0.0001,l1_reg=0.0001,l2_reg=0.0001,dropout_rate=0.3,feature_selection=True): | |
| if feature_selection: | |
| rf=RandomForestRegressor(n_estimators=100,random_state=42) | |
| rf.fit(trainX,trainy) | |
| selector=SelectFromModel(rf,threshold="mean",prefit=True) | |
| trainX=selector.transform(trainX) | |
| if testX is not None: | |
| testX=selector.transform(testX) | |
| print(f"Selected {trainX.shape[1]} feature based on the importance feature") | |
| scaler=MinMaxScaler() | |
| trainX_scaled=scaler.fit.transform(trainX) | |
| if testX is not None: | |
| testX_scaled=scaler.transfom(testX) | |
| trainX=trainX_scaled.reshape((trainX.shape[0], trainX.shape[1],1)) | |
| if testX is not None: | |
| testX = testX_scaled.reshape((testX.shape[0]),testX.shape[1],1) | |
| model=Sequential() | |
| model.add(Conv1D(512, kernel_size=3, activation='relu', input_shape=(trainX.shape[1], 1), kernel_regularizer=regularizers.l1_l2(l1=l1_reg, l2=l2_reg))) | |
| model.add(MaxPooling1D(pool_size=2)) | |
| model.add(Dropout(dropout_rate)) | |
| model.add(Conv1D(256, kernel_size=3, activation='relu', input_shape=(trainX.shape[1], 1), kernel_regularizer=regularizers.l1_l2(l1=l1_reg, l2=l2_reg))) | |
| model.add(MaxPooling1D(pool_size=2)) | |
| model.add(Dropout(dropout_rate)) | |
| model.add(Conv1D(128, kernel_size=3, activation='relu', input_shape=(trainX.shape[1], 1), kernel_regularizer=regularizers.l1_l2(l1=l1_reg, l2=l2_reg))) | |
| model.add(MaxPooling1D(pool_size=2)) | |
| model.add(Dropout(dropout_rate)) | |
| model.add(Flatten()) | |
| model.add(Dense(64, kernel_regularizer=regularizers.l1_l2(l1=l1_reg, l2=l2_reg))) | |
| model.add(LeakyReLU(alpha=0.1)) | |
| model.add(Dropout(dropout_rate)) | |
| model.add(Dense(1, activation='linear')) | |
| model.compile(loss='mse', optimizer=Adam(learning_rate=learning_rate), metrics=['mse']) | |
| learning_rate_reduction = ReduceLROnPlateau(monitor='val_loss', patience=5, verbose=1, factor=0.5, min_lr=1e-6) | |
| early_stopping = EarlyStopping(monitor='val_loss', verbose=1, restore_best_weights=True, patience=10) | |
| history = model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, validation_split=0.1, verbose=1, | |
| callbacks=[learning_rate_reduction, early_stopping]) | |
| predicted_train = model.predict(trainX).flatten() | |
| predicted_test = model.predict(testX).flatten() if testX is not None else None | |
| return predicted_train, predicted_test, history | |
| #-------------------------------------------------------------------RFModel--------------------------------------------------------- | |
| def RFModel(trainX, trainy, testX, testy, n_estimators=100, max_depth=None,feature_selection=True): | |
| if feature_selection: | |
| rf=RandomForestRegressor(n_estimators=100, random_state=42) | |
| rf.fit(trainX, trainy) | |
| selector=SelectFromModel(rf, threshold="mean", prefit=True) | |
| trainX=selector.transform(trainX) | |
| if testX is not None: | |
| testX=selector.transform(testX) | |
| print(f"Selected {trainX.shape[1]} feature based on the feature selection") | |
| scaler = MinMaxScaler() | |
| trainX_scaled = scaler.fit_transform(trainX) | |
| if testX is not None: | |
| testX_scaled = scaler.transform(testX) | |
| rf_model = RandomForestRegressor(n_estimators=n_estimators, max_depth=max_depth, random_state=42) | |
| history=rf_model.fit(trainX_scaled, trainy) | |
| predicted_train = rf_model.predict(trainX_scaled) | |
| predicted_test = rf_model.predict(testX_scaled) if testX is not None else None | |
| return predicted_train, predicted_test,history | |
| #------------------------------------------------------------------------------XGboost--------------------------------------------------------------- | |
| def XGBoostModel(trainX, trainy, testX, testy,learning_rate,min_child_weight,feature_selection=True, n_estimators=100, max_depth=None): | |
| if feature_selection: | |
| rf=RandomForestRegressor(n_estimators=100,random_state=42) | |
| rf.fit(trainX,trainy) | |
| selector=SelectFromModel(rf,threshold="mean",prefit=True) | |
| trainX=selector.transform(trainX) | |
| if testX is not None: | |
| testX=selector.transform(testX) | |
| print(f"Selected {trainX.shape[1]} features based on feature importance") | |
| scaler = MinMaxScaler() | |
| trainX_scaled = scaler.fit_transform(trainX) | |
| if testX is not None: | |
| testX_scaled = scaler.transform(testX) | |
| xgb_model=XGBRegressor(objective="reg:squarederror",random_state=42) | |
| history=xgb_model.fit(trainX, trainy) | |
| param_grid={ | |
| "learning_rate":0.01, | |
| "max_depth" : 10, | |
| "n_estimators": 100, | |
| "min_child_weight": 5 | |
| } | |
| # Predictions | |
| predicted_train = xgb_model.predict(trainX_scaled) | |
| predicted_test = xgb_model.predict(testX_scaled) if testX is not None else None | |
| return predicted_train, predicted_test,history | |
| #----------------------------------------reading file---------------------------------------------------------------------------------------- | |
| def read_csv_file(uploaded_file): | |
| if uploaded_file is not None: | |
| if hasattr(uploaded_file, 'data'): | |
| return pd.read_csv(io.BytesIO(uploaded_file.data)) | |
| elif hasattr(uploaded_file, 'name'): | |
| return pd.read_csv(uploaded_file.name) | |
| return None | |
| #-----------------------------------------------------------------calculate topsis score-------------------------------------------------------- | |
| def calculate_topsis_score(df): | |
| metrics = df[['Train_MSE', 'Train_RMSE', 'Train_R2', 'Train_Corr']].dropna() # Ensure no NaN values | |
| norm_metrics = metrics / np.sqrt((metrics ** 2).sum(axis=0)) | |
| ideal_best = pd.Series(index=norm_metrics.columns) | |
| ideal_worst = pd.Series(index=norm_metrics.columns) | |
| for col in ['Train_MSE', 'Train_RMSE']: | |
| ideal_best[col] = norm_metrics[col].min() | |
| ideal_worst[col] = norm_metrics[col].max() | |
| for col in ['Train_R2', 'Train_Corr']: | |
| ideal_best[col] = norm_metrics[col].max() | |
| ideal_worst[col] = norm_metrics[col].min() | |
| dist_to_best = np.sqrt(((norm_metrics - ideal_best) ** 2).sum(axis=1)) | |
| dist_to_worst = np.sqrt(((norm_metrics - ideal_worst) ** 2).sum(axis=1)) | |
| topsis_score = dist_to_worst / (dist_to_best + dist_to_worst) | |
| df['TOPSIS_Score'] = np.nan | |
| df.loc[metrics.index, 'TOPSIS_Score'] = topsis_score # Assign TOPSIS scores | |
| return df | |
| #--------------------------------------------------- Nested Cross validation--------------------------------------------------------------------------- | |
| def NestedKFoldCrossValidation(training_data, training_additive, testing_data, testing_additive, | |
| training_dominance, testing_dominance, epochs,learning_rate,min_child_weight, batch_size=64, | |
| outer_n_splits=2, inner_n_splits=2, output_file='cross_validation_results.csv', | |
| predicted_phenotype_file='predicted_phenotype.csv', feature_selection=True): | |
| if 'phenotypes' not in training_data.columns: | |
| raise ValueError("Training data does not contain the 'phenotypes' column.") | |
| training_additive = training_additive.iloc[:, 1:] | |
| testing_additive = testing_additive.iloc[:, 1:] | |
| training_dominance = training_dominance.iloc[:, 1:] | |
| testing_dominance = testing_dominance.iloc[:, 1:] | |
| # Merge training and testing data with additive and dominance components | |
| training_data_merged = pd.concat([training_data, training_additive, training_dominance], axis=1) | |
| testing_data_merged = pd.concat([testing_data, testing_additive, testing_dominance], axis=1) | |
| phenotypic_info = training_data['phenotypes'].values | |
| phenotypic_test_info = testing_data['phenotypes'].values if 'phenotypes' in testing_data.columns else None | |
| sample_ids = testing_data.iloc[:, 0].values | |
| training_genotypic_data_merged = training_data_merged.iloc[:, 2:].values | |
| testing_genotypic_data_merged = testing_data_merged.iloc[:, 2:].values | |
| if feature_selection: | |
| rf = RandomForestRegressor(n_estimators=100, random_state=42) | |
| rf.fit(training_genotypic_data_merged, phenotypic_info) | |
| selector = SelectFromModel(rf, threshold="mean", prefit=True) | |
| training_genotypic_data_merged = selector.transform(training_genotypic_data_merged) | |
| testing_genotypic_data_merged = selector.transform(testing_genotypic_data_merged) | |
| print(f"Selected {training_genotypic_data_merged.shape[1]} features based on importance.") | |
| scaler = StandardScaler() | |
| training_genotypic_data_merged = scaler.fit_transform(training_genotypic_data_merged) | |
| testing_genotypic_data_merged = scaler.transform(testing_genotypic_data_merged) | |
| outer_kf = KFold(n_splits=outer_n_splits) | |
| results = [] | |
| all_predicted_phenotypes = [] | |
| def calculate_metrics(true_values, predicted_values): | |
| mse = mean_squared_error(true_values, predicted_values) | |
| rmse = np.sqrt(mse) | |
| r2 = r2_score(true_values, predicted_values) | |
| corr = pearsonr(true_values, predicted_values)[0] | |
| return mse, rmse, r2, corr | |
| models = [ | |
| ('GRUModel', GRUModel), | |
| ('CNNModel', CNNModel), | |
| ('RFModel', RFModel), | |
| ('XGBoostModel', XGBoostModel) | |
| ] | |
| for outer_fold, (outer_train_index, outer_test_index) in enumerate(outer_kf.split(phenotypic_info), 1): | |
| outer_trainX = training_genotypic_data_merged[outer_train_index] | |
| outer_trainy = phenotypic_info[outer_train_index] | |
| outer_testX = testing_genotypic_data_merged | |
| outer_testy = phenotypic_test_info | |
| for model_name, model_func in models: | |
| print(f"Running model: {model_name} for fold {outer_fold}") | |
| if model_name in ['GRUModel', 'CNNModel']: | |
| predicted_train, predicted_test, history = model_func(outer_trainX, outer_trainy, outer_testX, outer_testy, epochs=epochs, batch_size=batch_size) | |
| elif model_name in ['RFModel']: | |
| predicted_train, predicted_test, history = model_func(outer_trainX, outer_trainy, outer_testX, outer_testy) | |
| else: | |
| predicted_train, predicted_test, history = model_func(outer_trainX, outer_trainy, outer_testX, outer_testy,learning_rate,min_child_weight) | |
| mse_train, rmse_train, r2_train, corr_train = calculate_metrics(outer_trainy, predicted_train) | |
| mse_test, rmse_test, r2_test, corr_test = calculate_metrics(outer_testy, predicted_test) if outer_testy is not None else (None, None, None, None) | |
| results.append({ | |
| 'Model': model_name, | |
| 'Fold': outer_fold, | |
| 'Train_MSE': mse_train, | |
| 'Train_RMSE': rmse_train, | |
| 'Train_R2': r2_train, | |
| 'Train_Corr': corr_train, | |
| 'Test_MSE': mse_test, | |
| 'Test_RMSE': rmse_test, | |
| 'Test_R2': r2_test, | |
| 'Test_Corr': corr_test | |
| }) | |
| if predicted_test is not None: | |
| predicted_test_df = pd.DataFrame({ | |
| 'Sample_ID': sample_ids, | |
| 'Predicted_Phenotype': predicted_test, | |
| 'Model': model_name | |
| }) | |
| all_predicted_phenotypes.append(predicted_test_df) | |
| results_df = pd.DataFrame(results) | |
| avg_results_df = results_df.groupby('Model').agg({ | |
| 'Train_MSE': 'mean', | |
| 'Train_RMSE': 'mean', | |
| 'Train_R2': 'mean', | |
| 'Train_Corr': 'mean', | |
| 'Test_MSE': 'mean', | |
| 'Test_RMSE': 'mean', | |
| 'Test_R2': 'mean', | |
| 'Test_Corr': 'mean' | |
| }).reset_index() | |
| def calculate_topsis_score(df): | |
| norm_df = (df.iloc[:, 1:] - df.iloc[:, 1:].min()) / (df.iloc[:, 1:].max() - df.iloc[:, 1:].min()) | |
| ideal_positive = norm_df.max(axis=0) | |
| ideal_negative = norm_df.min(axis=0) | |
| dist_positive = np.sqrt(((norm_df - ideal_positive) ** 2).sum(axis=1)) | |
| dist_negative = np.sqrt(((norm_df - ideal_negative) ** 2).sum(axis=1)) | |
| topsis_score = dist_negative / (dist_positive + dist_negative) | |
| df['TOPSIS_Score'] = topsis_score | |
| return df | |
| avg_results_df = calculate_topsis_score(avg_results_df) | |
| avg_results_df.to_csv(output_file, index=False) | |
| if all_predicted_phenotypes: | |
| predicted_all_df = pd.concat(all_predicted_phenotypes, axis=0, ignore_index=True) | |
| predicted_all_df.to_csv(predicted_phenotype_file, index=False) | |
| return avg_results_df, predicted_all_df if all_predicted_phenotypes else None | |
| #--------------------------------------------------------------------Gradio interface--------------------------------------------------------------- | |
| def run_cross_validation(training_file, training_additive_file, testing_file, testing_additive_file, | |
| training_dominance_file, testing_dominance_file,feature_selection,learning_rate,min_child_weight): | |
| epochs = 1000 | |
| batch_size = 64 | |
| outer_n_splits = 2 | |
| inner_n_splits = 2 | |
| min_child_weight=5 | |
| learning_rate=0.001 | |
| training_data = pd.read_csv(training_file.name) | |
| training_additive = pd.read_csv(training_additive_file.name) | |
| testing_data = pd.read_csv(testing_file.name) | |
| testing_additive = pd.read_csv(testing_additive_file.name) | |
| training_dominance = pd.read_csv(training_dominance_file.name) | |
| testing_dominance = pd.read_csv(testing_dominance_file.name) | |
| results, predicted_phenotypes = NestedKFoldCrossValidation( | |
| training_data=training_data, | |
| training_additive=training_additive, | |
| testing_data=testing_data, | |
| testing_additive=testing_additive, | |
| training_dominance=training_dominance, | |
| testing_dominance=testing_dominance, | |
| epochs=epochs, | |
| batch_size=batch_size, | |
| outer_n_splits=outer_n_splits, | |
| inner_n_splits=inner_n_splits, | |
| learning_rate=learning_rate, | |
| min_child_weight=min_child_weight, | |
| feature_selection=feature_selection | |
| ) | |
| results_file = "cross_validation_results.csv" | |
| predicted_file = "predicted_phenotype.csv" | |
| results.to_csv(results_file, index=False) | |
| predicted_phenotypes.to_csv(predicted_file, index=False) | |
| return results_file, predicted_file | |
| with gr.Blocks() as interface: | |
| gr.Markdown("# DeepMap - An Integrated GUI for Genotype to Phenotype Prediction") | |
| with gr.Row(): | |
| training_file = gr.File(label="Upload Training Data (CSV)") | |
| training_additive_file = gr.File(label="Upload Training Additive Data (CSV)") | |
| training_dominance_file = gr.File(label="Upload Training Dominance Data (CSV)") | |
| with gr.Row(): | |
| testing_file = gr.File(label="Upload Testing Data (CSV)") | |
| testing_additive_file = gr.File(label="Upload Testing Additive Data (CSV)") | |
| testing_dominance_file = gr.File(label="Upload Testing Dominance Data (CSV)") | |
| with gr.Row(): | |
| feature_selection = gr.Checkbox(label="Enable Feature Selection", value=True) | |
| output1 = gr.File(label="Cross-Validation Results (CSV)") | |
| output2 = gr.File(label="Predicted Phenotypes (CSV)") | |
| submit_btn = gr.Button("Run DeepMap") | |
| submit_btn.click( | |
| run_cross_validation, | |
| inputs=[ | |
| training_file, training_additive_file, testing_file, | |
| testing_additive_file, training_dominance_file,testing_dominance_file, | |
| feature_selection | |
| ], | |
| outputs=[output1, output2] | |
| ) | |
| interface.launch() | |