""" BatchMind OS: Layer 4 - Optuna Optimizer Multi-objective optimization for Quality Index and Energy Efficiency. """ import os import sys import pickle import pandas as pd import numpy as np import optuna from lightgbm import LGBMRegressor # Add parent directory to path for config import sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from config import CFG def calculate_quality_index(preds: dict) -> float: """Literal Quality Index formula from spec.""" diss = preds["Dissolution_Rate"] uni = preds["Content_Uniformity"] hard = preds["Hardness"] disin = preds["Disintegration_Time"] # QI = (0.40 * Diss + 0.30 * Uni + 0.15 * Hard/100 + 0.15 * (1/max(Disin,0.1))*10) qi = (0.40 * diss + 0.30 * uni + 0.15 * (hard / 100.0) + 0.15 * (1.0 / max(disin, 0.1)) * 10.0) return float(qi) def run_optimization(constraints: dict = None, n_trials: int = 80) -> pd.DataFrame: """Exposed function for real-time optimization. Returns Pareto front solutions.""" if constraints is None: constraints = CFG.DEFAULT_CONSTRAINTS # Load requirements with open(os.path.join(CFG.PROC_DIR, "X_final.pkl"), "rb") as f: X_train = pickle.load(f) with open(os.path.join(CFG.MODEL_DIR, "base_models_median.pkl"), "rb") as f: base_models = pickle.load(f) with open(os.path.join(CFG.MODEL_DIR, "meta_models.pkl"), "rb") as f: meta_models = pickle.load(f) feature_names = X_train.columns.tolist() base_vector = X_train.median() # Load Energy Model with open(os.path.join(CFG.PROC_DIR, "energy_model.pkl"), "rb") as f: energy_reg = pickle.load(f) energy_features = ["Compression_Force", "Machine_Speed", "Drying_Temp", "Drying_Time", "Granulation_Time"] def objective(trial): params = {} for name, (low, high) in CFG.PARAM_SPACE.items(): params[name] = trial.suggest_float(name, low, high) # Estimate Energy e_input = np.array([[params[f] for f in energy_features]]) energy_kwh = float(energy_reg.predict(e_input)[0]) # Build feature vector x_trial = base_vector.copy() for p_name, val in params.items(): matches = [f for f in feature_names if p_name in f] for m in matches: x_trial[m] = val X_input = pd.DataFrame([x_trial]) target_preds = {} for t in CFG.TARGET_COLS: # Match training: X_meta = pd.concat([X, base_target_pred], axis=1) b_pred = base_models[t].predict(X_input) X_meta = pd.concat([X_input.reset_index(drop=True), pd.Series(b_pred, name=f"base_{t}")], axis=1) X_meta.columns = [str(c) for c in X_meta.columns] target_preds[t] = meta_models[t].predict(X_meta.astype(float))[0] # Soft Constraints -> Graduated penalties (not flat 100) penalty = 0.0 hard_min = constraints.get("hardness_min", 80.0) fria_max = constraints.get("friability_max", 1.0) diss_min = constraints.get("dissolution_min", 85.0) energy_max = constraints.get("max_energy_kwh", 999.0) if target_preds["Hardness"] < hard_min: penalty += (hard_min - target_preds["Hardness"]) * 0.5 if target_preds["Friability"] > fria_max: penalty += (target_preds["Friability"] - fria_max) * 5.0 if target_preds["Dissolution_Rate"] < diss_min: penalty += (diss_min - target_preds["Dissolution_Rate"]) * 0.5 if energy_kwh > energy_max: penalty += (energy_kwh - energy_max) * 2.0 # Quality Index (0-100 range) qi = calculate_quality_index(target_preds) # Set attributes trial.set_user_attr("energy", energy_kwh) trial.set_user_attr("quality", qi) trial.set_user_attr("penalty", penalty) for t in CFG.TARGET_COLS: trial.set_user_attr(f"{t}_pred", target_preds[t]) return energy_kwh + penalty, -(qi - penalty) sampler = optuna.samplers.TPESampler(n_startup_trials=15, multivariate=True, seed=CFG.RANDOM_STATE) study = optuna.create_study(directions=["minimize", "minimize"], sampler=sampler) study.optimize(objective, n_trials=n_trials, timeout=CFG.OPTUNA_TIMEOUT_SEC) # Collect ALL completed trials, not just best_trials (which is too restrictive) all_rows = [] for trial in study.trials: if trial.state != optuna.trial.TrialState.COMPLETE: continue row = trial.params.copy() row.update(trial.user_attrs) all_rows.append(row) if not all_rows: return pd.DataFrame() all_df = pd.DataFrame(all_rows) # Filter to feasible solutions (low penalty) and build Pareto front # Keep trials with penalty < 5 (mostly feasible), or all if none qualify feasible = all_df[all_df["penalty"] < 5.0] if "penalty" in all_df.columns else all_df if feasible.empty: feasible = all_df.nsmallest(20, "penalty") # Extract Pareto-optimal front: non-dominated on (energy, -quality) solutions = [] sorted_df = feasible.sort_values("energy").reset_index(drop=True) best_quality = -1 for _, row in sorted_df.iterrows(): if row["quality"] > best_quality: best_quality = row["quality"] solutions.append(row.to_dict()) # If still too few, add top-quality diverse points if len(solutions) < 5 and len(feasible) >= 5: remaining = feasible[~feasible.index.isin([s.get('index', -1) for s in solutions])] extra = remaining.nlargest(min(10, len(remaining)), "quality") for _, row in extra.iterrows(): d = row.to_dict() if not any(abs(s["energy"] - d["energy"]) < 0.1 and abs(s["quality"] - d["quality"]) < 0.5 for s in solutions): solutions.append(d) if len(solutions) >= 15: break pareto_df = pd.DataFrame(solutions) # --- Hypervolume Indicator (2D Area Under Pareto) --- hypervolume = 0.0 if not pareto_df.empty and "energy" in pareto_df.columns and "quality" in pareto_df.columns: # Sort by energy ascending sorted_df = pareto_df.sort_values("energy") energies = sorted_df["energy"].values qualities = sorted_df["quality"].values # Reference point (Worst possible outcomes) # Assuming energy can go up to budget*1.2 and quality can go down to 0 ref_energy = max(energies) + 2.0 ref_quality = 0.0 # Calculate area of rectangles formed by Pareto points relative to ref point # For a 2D minimization/maximization: # Here we minimize Energy (x) and maximize Quality (y). # Shift quality to be minimized: q_min = 100 - quality q_min = 100.0 - qualities q_ref = 100.0 - ref_quality # Now we have two minimization objectives: energies, q_min # Reference: [ref_energy, q_ref] area = 0.0 prev_e = ref_energy for i in range(len(energies)): # Width = distance to previous energy (or ref) # Since sorted by energy, energies[i] is the best (smallest) energy in this slice width = prev_e - energies[i] height = q_ref - q_min[i] area += width * height prev_e = energies[i] # Normalize by total possible area ([0, ref_e], [0, 100]) hypervolume = area / (ref_energy * 100.0) print(f" Pareto Hypervolume (Normalized): {hypervolume:.6f}") if not pareto_df.empty: pareto_df.to_pickle(os.path.join(CFG.PROC_DIR, "last_pareto.pkl")) import json meta = {"hypervolume": float(hypervolume), "n_solutions": len(pareto_df)} with open(os.path.join(CFG.PROC_DIR, "pareto_meta.json"), "w") as f: json.dump(meta, f) return pareto_df def main(): print(">>> Starting Layer 4: Optuna Pareto Optimization") pareto = run_optimization(n_trials=CFG.OPTUNA_TRIALS_DEMO) print(f"Optimal solutions found: {len(pareto)}") print("="*60) print(f"✅ LAYER 4 COMPLETE") print("="*60) if __name__ == "__main__": main()