| | import os |
| | import json |
| | import joblib |
| | import optuna |
| | import numpy as np |
| | import pandas as pd |
| | import matplotlib.pyplot as plt |
| | from dataclasses import dataclass |
| | from typing import Dict, Any, Tuple, Optional |
| | from datasets import load_from_disk, DatasetDict |
| | from sklearn.preprocessing import StandardScaler |
| | from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score |
| | from sklearn.svm import SVR |
| | import xgboost as xgb |
| | from lightning.pytorch import seed_everything |
| | import cupy as cp |
| | from cuml.linear_model import ElasticNet as cuElasticNet |
| | from scipy.stats import spearmanr |
| | seed_everything(1986) |
| |
|
| |
|
| | |
| | |
| | |
| | def to_gpu(X: np.ndarray): |
| | if isinstance(X, cp.ndarray): |
| | return X |
| | return cp.asarray(X, dtype=cp.float32) |
| |
|
| | def to_cpu(x): |
| | if isinstance(x, cp.ndarray): |
| | return cp.asnumpy(x) |
| | return np.asarray(x) |
| |
|
| |
|
| | |
| | |
| | |
| | @dataclass |
| | class SplitData: |
| | X_train: np.ndarray |
| | y_train: np.ndarray |
| | seq_train: Optional[np.ndarray] |
| | X_val: np.ndarray |
| | y_val: np.ndarray |
| | seq_val: Optional[np.ndarray] |
| |
|
| | def _stack_embeddings(col) -> np.ndarray: |
| | arr = np.asarray(col, dtype=np.float32) |
| | if arr.ndim != 2: |
| | arr = np.stack(col).astype(np.float32) |
| | return arr |
| |
|
| | def load_split_data(dataset_path: str) -> SplitData: |
| | ds = load_from_disk(dataset_path) |
| |
|
| | if isinstance(ds, DatasetDict) and "train" in ds and "val" in ds: |
| | train_ds, val_ds = ds["train"], ds["val"] |
| | else: |
| | if "split" not in ds.column_names: |
| | raise ValueError("Dataset must be a DatasetDict(train/val) or have a 'split' column.") |
| | train_ds = ds.filter(lambda x: x["split"] == "train") |
| | val_ds = ds.filter(lambda x: x["split"] == "val") |
| |
|
| | for required in ["embedding", "label"]: |
| | if required not in train_ds.column_names: |
| | raise ValueError(f"Missing column '{required}' in train split.") |
| | if required not in val_ds.column_names: |
| | raise ValueError(f"Missing column '{required}' in val split.") |
| |
|
| | X_train = _stack_embeddings(train_ds["embedding"]).astype(np.float32) |
| | X_val = _stack_embeddings(val_ds["embedding"]).astype(np.float32) |
| |
|
| | y_train = np.asarray(train_ds["label"], dtype=np.float32) |
| | y_val = np.asarray(val_ds["label"], dtype=np.float32) |
| |
|
| | seq_train = None |
| | seq_val = None |
| | if "sequence" in train_ds.column_names: |
| | seq_train = np.asarray(train_ds["sequence"]) |
| | if "sequence" in val_ds.column_names: |
| | seq_val = np.asarray(val_ds["sequence"]) |
| |
|
| | return SplitData(X_train, y_train, seq_train, X_val, y_val, seq_val) |
| |
|
| |
|
| | |
| | |
| | |
| | def safe_spearmanr(y_true: np.ndarray, y_pred: np.ndarray) -> float: |
| | rho = spearmanr(y_true, y_pred).correlation |
| | if rho is None or np.isnan(rho): |
| | return 0.0 |
| | return float(rho) |
| |
|
| | def eval_regression(y_true: np.ndarray, y_pred: np.ndarray) -> Dict[str, float]: |
| | |
| | try: |
| | from sklearn.metrics import root_mean_squared_error |
| | rmse = root_mean_squared_error(y_true, y_pred) |
| | except Exception: |
| | rmse = float(np.sqrt(mean_squared_error(y_true, y_pred))) |
| |
|
| | mae = float(mean_absolute_error(y_true, y_pred)) |
| | r2 = float(r2_score(y_true, y_pred)) |
| | rho = float(safe_spearmanr(y_true, y_pred)) |
| | return {"rmse": rmse, "mae": mae, "r2": r2, "spearman_rho": rho} |
| |
|
| |
|
| | |
| | |
| | |
| | def train_xgb_reg( |
| | X_train, y_train, X_val, y_val, params: Dict[str, Any] |
| | ) -> Tuple[xgb.Booster, np.ndarray, np.ndarray]: |
| | dtrain = xgb.DMatrix(X_train, label=y_train) |
| | dval = xgb.DMatrix(X_val, label=y_val) |
| |
|
| | num_boost_round = int(params.pop("num_boost_round")) |
| | early_stopping_rounds = int(params.pop("early_stopping_rounds")) |
| |
|
| | booster = xgb.train( |
| | params=params, |
| | dtrain=dtrain, |
| | num_boost_round=num_boost_round, |
| | evals=[(dval, "val")], |
| | early_stopping_rounds=early_stopping_rounds, |
| | verbose_eval=False, |
| | ) |
| |
|
| | p_train = booster.predict(dtrain) |
| | p_val = booster.predict(dval) |
| | return booster, p_train, p_val |
| |
|
| |
|
| | def train_cuml_elasticnet_reg( |
| | X_train, y_train, X_val, y_val, params: Dict[str, Any] |
| | ): |
| | Xtr = to_gpu(X_train) |
| | Xva = to_gpu(X_val) |
| | ytr = to_gpu(y_train).astype(cp.float32) |
| |
|
| | model = cuElasticNet( |
| | alpha=float(params["alpha"]), |
| | l1_ratio=float(params["l1_ratio"]), |
| | fit_intercept=True, |
| | max_iter=int(params.get("max_iter", 5000)), |
| | tol=float(params.get("tol", 1e-4)), |
| | selection=params.get("selection", "cyclic"), |
| | ) |
| | model.fit(Xtr, ytr) |
| |
|
| | p_train = to_cpu(model.predict(Xtr)) |
| | p_val = to_cpu(model.predict(Xva)) |
| | return model, p_train, p_val |
| |
|
| |
|
| | def train_svr_reg( |
| | X_train, y_train, X_val, y_val, params: Dict[str, Any] |
| | ): |
| | model = SVR( |
| | C=float(params["C"]), |
| | epsilon=float(params["epsilon"]), |
| | kernel=params["kernel"], |
| | gamma=params.get("gamma", "scale"), |
| | ) |
| | model.fit(X_train, y_train) |
| | p_train = model.predict(X_train) |
| | p_val = model.predict(X_val) |
| | return model, p_train, p_val |
| |
|
| |
|
| | |
| | |
| | |
| | def save_predictions_csv( |
| | out_dir: str, |
| | split_name: str, |
| | y_true: np.ndarray, |
| | y_pred: np.ndarray, |
| | sequences: Optional[np.ndarray] = None, |
| | ): |
| | os.makedirs(out_dir, exist_ok=True) |
| | df = pd.DataFrame({ |
| | "y_true": y_true.astype(float), |
| | "y_pred": y_pred.astype(float), |
| | "residual": (y_true - y_pred).astype(float), |
| | }) |
| | if sequences is not None: |
| | df.insert(0, "sequence", sequences) |
| | df.to_csv(os.path.join(out_dir, f"{split_name}_predictions.csv"), index=False) |
| |
|
| | def plot_regression_diagnostics(out_dir: str, y_true: np.ndarray, y_pred: np.ndarray): |
| | os.makedirs(out_dir, exist_ok=True) |
| |
|
| | plt.figure() |
| | plt.scatter(y_true, y_pred, s=8, alpha=0.5) |
| | plt.xlabel("y_true") |
| | plt.ylabel("y_pred") |
| | plt.title("Predicted vs True") |
| | plt.tight_layout() |
| | plt.savefig(os.path.join(out_dir, "pred_vs_true.png")) |
| | plt.close() |
| |
|
| | resid = y_true - y_pred |
| | plt.figure() |
| | plt.hist(resid, bins=50) |
| | plt.xlabel("residual (y_true - y_pred)") |
| | plt.ylabel("count") |
| | plt.title("Residual Histogram") |
| | plt.tight_layout() |
| | plt.savefig(os.path.join(out_dir, "residual_hist.png")) |
| | plt.close() |
| |
|
| | plt.figure() |
| | plt.scatter(y_pred, resid, s=8, alpha=0.5) |
| | plt.xlabel("y_pred") |
| | plt.ylabel("residual") |
| | plt.title("Residuals vs Prediction") |
| | plt.tight_layout() |
| | plt.savefig(os.path.join(out_dir, "residual_vs_pred.png")) |
| | plt.close() |
| |
|
| |
|
| | |
| | |
| | |
| | def make_objective(model_name: str, data: SplitData): |
| | Xtr, ytr, Xva, yva = data.X_train, data.y_train, data.X_val, data.y_val |
| |
|
| | def objective(trial: optuna.Trial) -> float: |
| | if model_name == "xgb_reg": |
| | params = { |
| | "objective": "reg:squarederror", |
| | "eval_metric": "rmse", |
| | "lambda": trial.suggest_float("lambda", 1e-10, 100.0, log=True), |
| | "alpha": trial.suggest_float("alpha", 1e-10, 100.0, log=True), |
| | "gamma": trial.suggest_float("gamma", 0.0, 10.0), |
| | "max_depth": trial.suggest_int("max_depth", 2, 16), |
| | "min_child_weight": trial.suggest_float("min_child_weight", 1e-3, 500.0, log=True), |
| | "subsample": trial.suggest_float("subsample", 0.5, 1.0), |
| | "colsample_bytree": trial.suggest_float("colsample_bytree", 0.3, 1.0), |
| | "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True), |
| | "tree_method": "hist", |
| | "device": "cuda", |
| | } |
| | params["num_boost_round"] = trial.suggest_int("num_boost_round", 50, 2000) |
| | params["early_stopping_rounds"] = trial.suggest_int("early_stopping_rounds", 20, 200) |
| |
|
| | model, p_tr, p_va = train_xgb_reg(Xtr, ytr, Xva, yva, params.copy()) |
| |
|
| | elif model_name == "enet_gpu": |
| | params = { |
| | "alpha": trial.suggest_float("alpha", 1e-8, 10.0, log=True), |
| | "l1_ratio": trial.suggest_float("l1_ratio", 0.0, 1.0), |
| | "max_iter": trial.suggest_int("max_iter", 1000, 20000), |
| | "tol": trial.suggest_float("tol", 1e-6, 1e-2, log=True), |
| | "selection": trial.suggest_categorical("selection", ["cyclic", "random"]), |
| | } |
| | model, p_tr, p_va = train_cuml_elasticnet_reg(Xtr, ytr, Xva, yva, params) |
| |
|
| | elif model_name == "svr": |
| | params = { |
| | "kernel": trial.suggest_categorical("kernel", ["rbf", "linear", "poly", "sigmoid"]), |
| | "C": trial.suggest_float("C", 1e-3, 1e3, log=True), |
| | "epsilon": trial.suggest_float("epsilon", 1e-4, 1.0, log=True), |
| | } |
| | if params["kernel"] in ["rbf", "poly", "sigmoid"]: |
| | params["gamma"] = trial.suggest_float("gamma", 1e-6, 10.0, log=True) |
| | else: |
| | params["gamma"] = "scale" |
| |
|
| | model, p_tr, p_va = train_svr_reg(Xtr, ytr, Xva, yva, params) |
| |
|
| | else: |
| | raise ValueError(f"Unknown model_name={model_name}") |
| |
|
| | metrics = eval_regression(yva, p_va) |
| | trial.set_user_attr("spearman_rho", metrics["spearman_rho"]) |
| | trial.set_user_attr("rmse", metrics["rmse"]) |
| | trial.set_user_attr("mae", metrics["mae"]) |
| | trial.set_user_attr("r2", metrics["r2"]) |
| |
|
| | |
| | return metrics["spearman_rho"] |
| |
|
| | return objective |
| |
|
| |
|
| | |
| | |
| | |
| | def run_optuna_and_refit( |
| | dataset_path: str, |
| | out_dir: str, |
| | model_name: str, |
| | n_trials: int = 200, |
| | standardize_X: bool = True, |
| | ): |
| | os.makedirs(out_dir, exist_ok=True) |
| |
|
| | data = load_split_data(dataset_path) |
| | print(f"[Data] Train: {data.X_train.shape}, Val: {data.X_val.shape}") |
| |
|
| | |
| | if standardize_X: |
| | scaler = StandardScaler() |
| | data.X_train = scaler.fit_transform(data.X_train).astype(np.float32) |
| | data.X_val = scaler.transform(data.X_val).astype(np.float32) |
| | joblib.dump(scaler, os.path.join(out_dir, "scaler.joblib")) |
| | print("[Preprocess] Saved StandardScaler -> scaler.joblib") |
| |
|
| | study = optuna.create_study( |
| | direction="maximize", |
| | pruner=optuna.pruners.MedianPruner() |
| | ) |
| | study.optimize(make_objective(model_name, data), n_trials=n_trials) |
| |
|
| | trials_df = study.trials_dataframe() |
| | trials_df.to_csv(os.path.join(out_dir, "study_trials.csv"), index=False) |
| |
|
| | best = study.best_trial |
| | best_params = dict(best.params) |
| |
|
| | best_rho = float(best.user_attrs.get("spearman_rho", best.value)) |
| | best_rmse = float(best.user_attrs.get("rmse", np.nan)) |
| | best_mae = float(best.user_attrs.get("mae", np.nan)) |
| | best_r2 = float(best.user_attrs.get("r2", np.nan)) |
| |
|
| | |
| | if model_name == "xgb_reg": |
| | params = { |
| | "objective": "reg:squarederror", |
| | "eval_metric": "rmse", |
| | "lambda": best_params["lambda"], |
| | "alpha": best_params["alpha"], |
| | "gamma": best_params["gamma"], |
| | "max_depth": best_params["max_depth"], |
| | "min_child_weight": best_params["min_child_weight"], |
| | "subsample": best_params["subsample"], |
| | "colsample_bytree": best_params["colsample_bytree"], |
| | "learning_rate": best_params["learning_rate"], |
| | "tree_method": "hist", |
| | "device": "cuda", |
| | "num_boost_round": best_params["num_boost_round"], |
| | "early_stopping_rounds": best_params["early_stopping_rounds"], |
| | } |
| | model, p_tr, p_va = train_xgb_reg( |
| | data.X_train, data.y_train, data.X_val, data.y_val, params |
| | ) |
| | model_path = os.path.join(out_dir, "best_model.json") |
| | model.save_model(model_path) |
| |
|
| | elif model_name == "enet_gpu": |
| | model, p_tr, p_va = train_cuml_elasticnet_reg( |
| | data.X_train, data.y_train, data.X_val, data.y_val, best_params |
| | ) |
| | model_path = os.path.join(out_dir, "best_model_cuml_enet.joblib") |
| | joblib.dump(model, model_path) |
| |
|
| | elif model_name == "svr": |
| | model, p_tr, p_va = train_svr_reg( |
| | data.X_train, data.y_train, data.X_val, data.y_val, best_params |
| | ) |
| | model_path = os.path.join(out_dir, "best_model_svr.joblib") |
| | joblib.dump(model, model_path) |
| |
|
| | else: |
| | raise ValueError(model_name) |
| |
|
| | save_predictions_csv(out_dir, "train", data.y_train, p_tr, data.seq_train) |
| | save_predictions_csv(out_dir, "val", data.y_val, p_va, data.seq_val) |
| |
|
| | plot_regression_diagnostics(out_dir, data.y_val, p_va) |
| |
|
| | summary = [ |
| | "=" * 72, |
| | f"MODEL: {model_name}", |
| | f"Best trial: {best.number}", |
| | f"Val Spearman rho (objective): {best_rho:.6f}", |
| | f"Val RMSE: {best_rmse:.6f}", |
| | f"Val MAE: {best_mae:.6f}", |
| | f"Val R2: {best_r2:.6f}", |
| | f"Model saved to: {model_path}", |
| | "Best params:", |
| | json.dumps(best_params, indent=2), |
| | "=" * 72, |
| | ] |
| | with open(os.path.join(out_dir, "optimization_summary.txt"), "w") as f: |
| | f.write("\n".join(summary)) |
| | print("\n".join(summary)) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | import argparse |
| | parser = argparse.ArgumentParser() |
| | parser.add_argument("--dataset_path", type=str, required=True) |
| | parser.add_argument("--out_dir", type=str, required=True) |
| | parser.add_argument("--model", type=str, choices=["xgb_reg", "enet_gpu", "svr"], required=True) |
| | parser.add_argument("--n_trials", type=int, default=200) |
| | parser.add_argument("--no_standardize", action="store_true", help="Disable StandardScaler on X") |
| | args = parser.parse_args() |
| |
|
| | run_optuna_and_refit( |
| | dataset_path=args.dataset_path, |
| | out_dir=args.out_dir, |
| | model_name=args.model, |
| | n_trials=args.n_trials, |
| | standardize_X=(not args.no_standardize), |
| | ) |
| |
|