Spaces:
Running
Running
| """ | |
| BatchMind OS: Layer 4 - Optuna Optimizer | |
| Multi-objective optimization for Quality Index and Energy Efficiency. | |
| """ | |
| import os | |
| import sys | |
| import pickle | |
| import pandas as pd | |
| import numpy as np | |
| import optuna | |
| from lightgbm import LGBMRegressor | |
| # Add parent directory to path for config import | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from config import CFG | |
| def calculate_quality_index(preds: dict) -> float: | |
| """Literal Quality Index formula from spec.""" | |
| diss = preds["Dissolution_Rate"] | |
| uni = preds["Content_Uniformity"] | |
| hard = preds["Hardness"] | |
| disin = preds["Disintegration_Time"] | |
| # QI = (0.40 * Diss + 0.30 * Uni + 0.15 * Hard/100 + 0.15 * (1/max(Disin,0.1))*10) | |
| qi = (0.40 * diss + 0.30 * uni + 0.15 * (hard / 100.0) + 0.15 * (1.0 / max(disin, 0.1)) * 10.0) | |
| return float(qi) | |
| def run_optimization(constraints: dict = None, n_trials: int = 80) -> pd.DataFrame: | |
| """Exposed function for real-time optimization. Returns Pareto front solutions.""" | |
| if constraints is None: | |
| constraints = CFG.DEFAULT_CONSTRAINTS | |
| # Load requirements | |
| with open(os.path.join(CFG.PROC_DIR, "X_final.pkl"), "rb") as f: | |
| X_train = pickle.load(f) | |
| with open(os.path.join(CFG.MODEL_DIR, "base_models_median.pkl"), "rb") as f: | |
| base_models = pickle.load(f) | |
| with open(os.path.join(CFG.MODEL_DIR, "meta_models.pkl"), "rb") as f: | |
| meta_models = pickle.load(f) | |
| feature_names = X_train.columns.tolist() | |
| base_vector = X_train.median() | |
| # Load Energy Model | |
| with open(os.path.join(CFG.PROC_DIR, "energy_model.pkl"), "rb") as f: | |
| energy_reg = pickle.load(f) | |
| energy_features = ["Compression_Force", "Machine_Speed", "Drying_Temp", "Drying_Time", "Granulation_Time"] | |
| def objective(trial): | |
| params = {} | |
| for name, (low, high) in CFG.PARAM_SPACE.items(): | |
| params[name] = trial.suggest_float(name, low, high) | |
| # Estimate Energy | |
| e_input = np.array([[params[f] for f in energy_features]]) | |
| energy_kwh = float(energy_reg.predict(e_input)[0]) | |
| # Build feature vector | |
| x_trial = base_vector.copy() | |
| for p_name, val in params.items(): | |
| matches = [f for f in feature_names if p_name in f] | |
| for m in matches: x_trial[m] = val | |
| X_input = pd.DataFrame([x_trial]) | |
| target_preds = {} | |
| for t in CFG.TARGET_COLS: | |
| # Match training: X_meta = pd.concat([X, base_target_pred], axis=1) | |
| b_pred = base_models[t].predict(X_input) | |
| X_meta = pd.concat([X_input.reset_index(drop=True), pd.Series(b_pred, name=f"base_{t}")], axis=1) | |
| X_meta.columns = [str(c) for c in X_meta.columns] | |
| target_preds[t] = meta_models[t].predict(X_meta.astype(float))[0] | |
| # Soft Constraints -> Graduated penalties (not flat 100) | |
| penalty = 0.0 | |
| hard_min = constraints.get("hardness_min", 80.0) | |
| fria_max = constraints.get("friability_max", 1.0) | |
| diss_min = constraints.get("dissolution_min", 85.0) | |
| energy_max = constraints.get("max_energy_kwh", 999.0) | |
| if target_preds["Hardness"] < hard_min: | |
| penalty += (hard_min - target_preds["Hardness"]) * 0.5 | |
| if target_preds["Friability"] > fria_max: | |
| penalty += (target_preds["Friability"] - fria_max) * 5.0 | |
| if target_preds["Dissolution_Rate"] < diss_min: | |
| penalty += (diss_min - target_preds["Dissolution_Rate"]) * 0.5 | |
| if energy_kwh > energy_max: | |
| penalty += (energy_kwh - energy_max) * 2.0 | |
| # Quality Index (0-100 range) | |
| qi = calculate_quality_index(target_preds) | |
| # Set attributes | |
| trial.set_user_attr("energy", energy_kwh) | |
| trial.set_user_attr("quality", qi) | |
| trial.set_user_attr("penalty", penalty) | |
| for t in CFG.TARGET_COLS: | |
| trial.set_user_attr(f"{t}_pred", target_preds[t]) | |
| return energy_kwh + penalty, -(qi - penalty) | |
| sampler = optuna.samplers.TPESampler(n_startup_trials=15, multivariate=True, seed=CFG.RANDOM_STATE) | |
| study = optuna.create_study(directions=["minimize", "minimize"], sampler=sampler) | |
| study.optimize(objective, n_trials=n_trials, timeout=CFG.OPTUNA_TIMEOUT_SEC) | |
| # Collect ALL completed trials, not just best_trials (which is too restrictive) | |
| all_rows = [] | |
| for trial in study.trials: | |
| if trial.state != optuna.trial.TrialState.COMPLETE: | |
| continue | |
| row = trial.params.copy() | |
| row.update(trial.user_attrs) | |
| all_rows.append(row) | |
| if not all_rows: | |
| return pd.DataFrame() | |
| all_df = pd.DataFrame(all_rows) | |
| # Filter to feasible solutions (low penalty) and build Pareto front | |
| # Keep trials with penalty < 5 (mostly feasible), or all if none qualify | |
| feasible = all_df[all_df["penalty"] < 5.0] if "penalty" in all_df.columns else all_df | |
| if feasible.empty: | |
| feasible = all_df.nsmallest(20, "penalty") | |
| # Extract Pareto-optimal front: non-dominated on (energy, -quality) | |
| solutions = [] | |
| sorted_df = feasible.sort_values("energy").reset_index(drop=True) | |
| best_quality = -1 | |
| for _, row in sorted_df.iterrows(): | |
| if row["quality"] > best_quality: | |
| best_quality = row["quality"] | |
| solutions.append(row.to_dict()) | |
| # If still too few, add top-quality diverse points | |
| if len(solutions) < 5 and len(feasible) >= 5: | |
| remaining = feasible[~feasible.index.isin([s.get('index', -1) for s in solutions])] | |
| extra = remaining.nlargest(min(10, len(remaining)), "quality") | |
| for _, row in extra.iterrows(): | |
| d = row.to_dict() | |
| if not any(abs(s["energy"] - d["energy"]) < 0.1 and abs(s["quality"] - d["quality"]) < 0.5 for s in solutions): | |
| solutions.append(d) | |
| if len(solutions) >= 15: | |
| break | |
| pareto_df = pd.DataFrame(solutions) | |
| # --- Hypervolume Indicator (2D Area Under Pareto) --- | |
| hypervolume = 0.0 | |
| if not pareto_df.empty and "energy" in pareto_df.columns and "quality" in pareto_df.columns: | |
| # Sort by energy ascending | |
| sorted_df = pareto_df.sort_values("energy") | |
| energies = sorted_df["energy"].values | |
| qualities = sorted_df["quality"].values | |
| # Reference point (Worst possible outcomes) | |
| # Assuming energy can go up to budget*1.2 and quality can go down to 0 | |
| ref_energy = max(energies) + 2.0 | |
| ref_quality = 0.0 | |
| # Calculate area of rectangles formed by Pareto points relative to ref point | |
| # For a 2D minimization/maximization: | |
| # Here we minimize Energy (x) and maximize Quality (y). | |
| # Shift quality to be minimized: q_min = 100 - quality | |
| q_min = 100.0 - qualities | |
| q_ref = 100.0 - ref_quality | |
| # Now we have two minimization objectives: energies, q_min | |
| # Reference: [ref_energy, q_ref] | |
| area = 0.0 | |
| prev_e = ref_energy | |
| for i in range(len(energies)): | |
| # Width = distance to previous energy (or ref) | |
| # Since sorted by energy, energies[i] is the best (smallest) energy in this slice | |
| width = prev_e - energies[i] | |
| height = q_ref - q_min[i] | |
| area += width * height | |
| prev_e = energies[i] | |
| # Normalize by total possible area ([0, ref_e], [0, 100]) | |
| hypervolume = area / (ref_energy * 100.0) | |
| print(f" Pareto Hypervolume (Normalized): {hypervolume:.6f}") | |
| if not pareto_df.empty: | |
| pareto_df.to_pickle(os.path.join(CFG.PROC_DIR, "last_pareto.pkl")) | |
| import json | |
| meta = {"hypervolume": float(hypervolume), "n_solutions": len(pareto_df)} | |
| with open(os.path.join(CFG.PROC_DIR, "pareto_meta.json"), "w") as f: | |
| json.dump(meta, f) | |
| return pareto_df | |
| def main(): | |
| print(">>> Starting Layer 4: Optuna Pareto Optimization") | |
| pareto = run_optimization(n_trials=CFG.OPTUNA_TRIALS_DEMO) | |
| print(f"Optimal solutions found: {len(pareto)}") | |
| print("="*60) | |
| print(f"✅ LAYER 4 COMPLETE") | |
| print("="*60) | |
| if __name__ == "__main__": | |
| main() | |