| | from utils import calculate_metrics, get_classes, CLASSES |
| | from sklearn.ensemble import RandomForestRegressor |
| | from sklearn.utils import shuffle |
| | from typing import List, Tuple |
| | from sklearn import metrics |
| | from tqdm import tqdm |
| | import pandas as pd |
| | import numpy as np |
| | import joblib |
| | import pywt |
| | import fire |
| | import json |
| | import os |
| |
|
| |
|
| | TRAIN_SIZE = 1732 |
| | TEST_SIZE = 1154 |
| | TRAIN_DIR = "train_data_simulated/" |
| | TEST_DIR = "test_data_simulated/" |
| |
|
| |
|
| | def load_data() -> Tuple[List, pd.DataFrame, List, pd.DataFrame]: |
| | X_train = [os.path.join(TRAIN_DIR, f"{i}.npz") for i in range(TRAIN_SIZE)] |
| | X_test = [os.path.join(TEST_DIR, f"{i}.npz") for i in range(TEST_SIZE)] |
| | y_train = pd.read_csv("train_gt.csv") |
| | y_test = pd.read_csv("test_gt.csv") |
| | return X_train, y_train, X_test, y_test |
| |
|
| |
|
| | class SpectralCurveFiltering: |
| | def __init__(self, merge_function=np.mean): |
| | self.merge_function = merge_function |
| |
|
| | def __call__(self, sample: np.ndarray) -> np.ndarray: |
| | return self.merge_function(sample, axis=(1, 2)) |
| |
|
| |
|
| | class BaselineRegressor: |
| | def __init__(self): |
| | self.mean = 0 |
| |
|
| | def fit(self, X_train: np.ndarray, y_train: np.ndarray): |
| | self.mean = np.mean(y_train, axis=0) |
| | self.classes_count = y_train.shape[1] |
| | return self |
| |
|
| | def predict(self, X_test: np.ndarray) -> np.ndarray: |
| | return np.full((len(X_test), self.classes_count), self.mean) |
| |
|
| |
|
| | def preprocess(samples_lst: List[str], features: List[str]) -> Tuple: |
| | def _shape_pad(data: np.ndarray) -> np.ndarray: |
| | """ |
| | This sub-function makes padding to have square fields sizes. |
| | Not mandatory but eliminates the risk of calculation error |
| | in singular value decomposition. |
| | Padding by warping also improves the performance slightly. |
| | """ |
| | max_edge = np.max(data.shape[1:]) |
| | shape = (max_edge, max_edge) |
| | padded = np.pad( |
| | data, |
| | ((0, 0), (0, (shape[0] - data.shape[1])), (0, (shape[1] - data.shape[2]))), |
| | "wrap", |
| | ) |
| | return padded |
| |
|
| | filtering = SpectralCurveFiltering() |
| | w1 = pywt.Wavelet("sym3") |
| | w2 = pywt.Wavelet("dmey") |
| |
|
| | all_feature_names = [] |
| |
|
| | for sample_index, sample_path in tqdm( |
| | enumerate(samples_lst), total=len(samples_lst) |
| | ): |
| | with np.load(sample_path) as npz: |
| | data = np.ma.MaskedArray(**npz) |
| | data = _shape_pad(data) |
| | |
| | s = np.linalg.svd(data, full_matrices=False, compute_uv=False) |
| | s0 = s[:, 0] |
| | s1 = s[:, 1] |
| | s2 = s[:, 2] |
| | s3 = s[:, 3] |
| | s4 = s[:, 4] |
| | dXds1 = s0 / (s1 + np.finfo(float).eps) |
| | ffts = np.fft.fft(s0) |
| | reals = np.real(ffts) |
| | imags = np.imag(ffts) |
| |
|
| | |
| | data = filtering(data) |
| |
|
| | cA0, cD0 = pywt.dwt(data, wavelet=w2, mode="constant") |
| | cAx, cDx = pywt.dwt(cA0[12:92], wavelet=w2, mode="constant") |
| | cAy, cDy = pywt.dwt(cAx[15:55], wavelet=w2, mode="constant") |
| | cAz, cDz = pywt.dwt(cAy[15:35], wavelet=w2, mode="constant") |
| | cAw2 = np.concatenate((cA0[12:92], cAx[15:55], cAy[15:35], cAz[15:25]), -1) |
| | cDw2 = np.concatenate((cD0[12:92], cDx[15:55], cDy[15:35], cDz[15:25]), -1) |
| |
|
| | cA0, cD0 = pywt.dwt(data, wavelet=w1, mode="constant") |
| | cAx, cDx = pywt.dwt(cA0[1:-1], wavelet=w1, mode="constant") |
| | cAy, cDy = pywt.dwt(cAx[1:-1], wavelet=w1, mode="constant") |
| | cAz, cDz = pywt.dwt(cAy[1:-1], wavelet=w1, mode="constant") |
| | cAw1 = np.concatenate((cA0, cAx, cAy, cAz), -1) |
| | cDw1 = np.concatenate((cD0, cDx, cDy, cDz), -1) |
| |
|
| | dXdl = np.gradient(data, axis=0) |
| | d2Xdl2 = np.gradient(dXdl, axis=0) |
| | d3Xdl3 = np.gradient(d2Xdl2, axis=0) |
| |
|
| | fft = np.fft.fft(data) |
| | real = np.real(fft) |
| | imag = np.imag(fft) |
| |
|
| | features_to_select = { |
| | "spatial": (dXds1, s0, s1, s2, s3, s4, reals, imags), |
| | "fft": (real, imag), |
| | "gradient": (dXdl, d2Xdl2, d3Xdl3), |
| | "mean": (data,), |
| | "dwt": (cAw1, cAw2), |
| | } |
| |
|
| | |
| | sample_features = [] |
| | sample_feature_names = [] |
| | for feature_name in features: |
| | sample_features.extend(features_to_select[feature_name]) |
| | sample_feature_names.extend( |
| | [feature_name] |
| | * len(np.concatenate(features_to_select[feature_name])) |
| | ) |
| |
|
| | sample_features = np.concatenate(sample_features, -1) |
| | samples_lst[sample_index] = sample_features |
| | all_feature_names.append(sample_feature_names) |
| |
|
| | return np.vstack(samples_lst), all_feature_names |
| |
|
| |
|
| | def runner(features: List[str] = "spatial,fft,dwt,gradient,mean".split(",")): |
| | X_train, y_train, X_test, y_test = load_data() |
| |
|
| | X_train, train_feature_names = preprocess(X_train, features) |
| | X_test, test_feature_names = preprocess(X_test, features) |
| |
|
| | X_train, y_train = shuffle(X_train, y_train, random_state=2023) |
| |
|
| | model = RandomForestRegressor(random_state=2023) |
| | print(f"Training model on {X_train.shape} features...") |
| | model = model.fit(X_train, y_train[CLASSES].values) |
| |
|
| | joblib.dump(model, f"RF_model_{'-'.join(features)}.joblib") |
| |
|
| | submission_df = pd.DataFrame(data=model.predict(X_test), columns=CLASSES) |
| | submission_df.to_csv(",".join(features) + ".csv", index_label="sample_index") |
| |
|
| | baseline_reg = BaselineRegressor() |
| | baseline_reg = baseline_reg.fit(X_train, y_train[CLASSES].values) |
| | baselines_mse = np.mean( |
| | (y_test[CLASSES].values - baseline_reg.predict(X_test)) ** 2, axis=0 |
| | ) |
| |
|
| | mse = np.mean((y_test[CLASSES].values - submission_df[CLASSES].values) ** 2, axis=0) |
| | scores = mse / baselines_mse |
| | final_score = np.mean(scores) |
| |
|
| | r2 = metrics.r2_score( |
| | y_true=y_test[CLASSES].values, |
| | y_pred=submission_df[CLASSES].values, |
| | multioutput="raw_values", |
| | ) |
| | mse = metrics.mean_squared_error( |
| | y_true=y_test[CLASSES].values, |
| | y_pred=submission_df[CLASSES].values, |
| | multioutput="raw_values", |
| | ) |
| | mae = metrics.mean_absolute_error( |
| | y_true=y_test[CLASSES].values, |
| | y_pred=submission_df[CLASSES].values, |
| | multioutput="raw_values", |
| | ) |
| | all_metrics = calculate_metrics( |
| | y_pred=get_classes(submission_df[CLASSES]), |
| | y_true=get_classes(y_test[CLASSES]), |
| | ) |
| | mse = {k + "_mse": v for k, v in zip(["P", "K", "Mg", "pH"], mse.tolist())} |
| | r2 = {k + "_r2": v for k, v in zip(["P", "K", "Mg", "pH"], r2.tolist())} |
| | mae = {k + "_mae": v for k, v in zip(["P", "K", "Mg", "pH"], mae.tolist())} |
| |
|
| | all_metrics["custom"] = final_score |
| | all_metrics = pd.DataFrame.from_dict({**all_metrics, **r2, **mse, **mae}) |
| | all_metrics.to_csv(f"all_metrics.csv", index=False) |
| |
|
| | with open("all_metrics.json", "w", encoding="utf-8") as f: |
| | json.dump(all_metrics.to_dict(), f, ensure_ascii=True, indent=4) |
| |
|
| | print(f"Custom score: {final_score}") |
| | return final_score |
| |
|
| |
|
| | if __name__ == "__main__": |
| | fire.Fire(runner) |
| | model = joblib.load( |
| | f"RF_model_{'-'.join('spatial,fft,dwt,gradient,mean'.split(','))}.joblib" |
| | ) |
| | import sklearn |
| |
|
| | assert isinstance(model, sklearn.ensemble._forest.RandomForestRegressor) |
| |
|