Spaces:
Sleeping
Sleeping
File size: 8,333 Bytes
038ee19 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | """
BatchMind OS: Layer 4 - Optuna Optimizer
Multi-objective optimization for Quality Index and Energy Efficiency.
"""
import os
import sys
import pickle
import pandas as pd
import numpy as np
import optuna
from lightgbm import LGBMRegressor
# Add parent directory to path for config import
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import CFG
def calculate_quality_index(preds: dict) -> float:
"""Literal Quality Index formula from spec."""
diss = preds["Dissolution_Rate"]
uni = preds["Content_Uniformity"]
hard = preds["Hardness"]
disin = preds["Disintegration_Time"]
# QI = (0.40 * Diss + 0.30 * Uni + 0.15 * Hard/100 + 0.15 * (1/max(Disin,0.1))*10)
qi = (0.40 * diss + 0.30 * uni + 0.15 * (hard / 100.0) + 0.15 * (1.0 / max(disin, 0.1)) * 10.0)
return float(qi)
def run_optimization(constraints: dict = None, n_trials: int = 80) -> pd.DataFrame:
"""Exposed function for real-time optimization. Returns Pareto front solutions."""
if constraints is None:
constraints = CFG.DEFAULT_CONSTRAINTS
# Load requirements
with open(os.path.join(CFG.PROC_DIR, "X_final.pkl"), "rb") as f:
X_train = pickle.load(f)
with open(os.path.join(CFG.MODEL_DIR, "base_models_median.pkl"), "rb") as f:
base_models = pickle.load(f)
with open(os.path.join(CFG.MODEL_DIR, "meta_models.pkl"), "rb") as f:
meta_models = pickle.load(f)
feature_names = X_train.columns.tolist()
base_vector = X_train.median()
# Load Energy Model
with open(os.path.join(CFG.PROC_DIR, "energy_model.pkl"), "rb") as f:
energy_reg = pickle.load(f)
energy_features = ["Compression_Force", "Machine_Speed", "Drying_Temp", "Drying_Time", "Granulation_Time"]
def objective(trial):
params = {}
for name, (low, high) in CFG.PARAM_SPACE.items():
params[name] = trial.suggest_float(name, low, high)
# Estimate Energy
e_input = np.array([[params[f] for f in energy_features]])
energy_kwh = float(energy_reg.predict(e_input)[0])
# Build feature vector
x_trial = base_vector.copy()
for p_name, val in params.items():
matches = [f for f in feature_names if p_name in f]
for m in matches: x_trial[m] = val
X_input = pd.DataFrame([x_trial])
target_preds = {}
for t in CFG.TARGET_COLS:
# Match training: X_meta = pd.concat([X, base_target_pred], axis=1)
b_pred = base_models[t].predict(X_input)
X_meta = pd.concat([X_input.reset_index(drop=True), pd.Series(b_pred, name=f"base_{t}")], axis=1)
X_meta.columns = [str(c) for c in X_meta.columns]
target_preds[t] = meta_models[t].predict(X_meta.astype(float))[0]
# Soft Constraints -> Graduated penalties (not flat 100)
penalty = 0.0
hard_min = constraints.get("hardness_min", 80.0)
fria_max = constraints.get("friability_max", 1.0)
diss_min = constraints.get("dissolution_min", 85.0)
energy_max = constraints.get("max_energy_kwh", 999.0)
if target_preds["Hardness"] < hard_min:
penalty += (hard_min - target_preds["Hardness"]) * 0.5
if target_preds["Friability"] > fria_max:
penalty += (target_preds["Friability"] - fria_max) * 5.0
if target_preds["Dissolution_Rate"] < diss_min:
penalty += (diss_min - target_preds["Dissolution_Rate"]) * 0.5
if energy_kwh > energy_max:
penalty += (energy_kwh - energy_max) * 2.0
# Quality Index (0-100 range)
qi = calculate_quality_index(target_preds)
# Set attributes
trial.set_user_attr("energy", energy_kwh)
trial.set_user_attr("quality", qi)
trial.set_user_attr("penalty", penalty)
for t in CFG.TARGET_COLS:
trial.set_user_attr(f"{t}_pred", target_preds[t])
return energy_kwh + penalty, -(qi - penalty)
sampler = optuna.samplers.TPESampler(n_startup_trials=15, multivariate=True, seed=CFG.RANDOM_STATE)
study = optuna.create_study(directions=["minimize", "minimize"], sampler=sampler)
study.optimize(objective, n_trials=n_trials, timeout=CFG.OPTUNA_TIMEOUT_SEC)
# Collect ALL completed trials, not just best_trials (which is too restrictive)
all_rows = []
for trial in study.trials:
if trial.state != optuna.trial.TrialState.COMPLETE:
continue
row = trial.params.copy()
row.update(trial.user_attrs)
all_rows.append(row)
if not all_rows:
return pd.DataFrame()
all_df = pd.DataFrame(all_rows)
# Filter to feasible solutions (low penalty) and build Pareto front
# Keep trials with penalty < 5 (mostly feasible), or all if none qualify
feasible = all_df[all_df["penalty"] < 5.0] if "penalty" in all_df.columns else all_df
if feasible.empty:
feasible = all_df.nsmallest(20, "penalty")
# Extract Pareto-optimal front: non-dominated on (energy, -quality)
solutions = []
sorted_df = feasible.sort_values("energy").reset_index(drop=True)
best_quality = -1
for _, row in sorted_df.iterrows():
if row["quality"] > best_quality:
best_quality = row["quality"]
solutions.append(row.to_dict())
# If still too few, add top-quality diverse points
if len(solutions) < 5 and len(feasible) >= 5:
remaining = feasible[~feasible.index.isin([s.get('index', -1) for s in solutions])]
extra = remaining.nlargest(min(10, len(remaining)), "quality")
for _, row in extra.iterrows():
d = row.to_dict()
if not any(abs(s["energy"] - d["energy"]) < 0.1 and abs(s["quality"] - d["quality"]) < 0.5 for s in solutions):
solutions.append(d)
if len(solutions) >= 15:
break
pareto_df = pd.DataFrame(solutions)
# --- Hypervolume Indicator (2D Area Under Pareto) ---
hypervolume = 0.0
if not pareto_df.empty and "energy" in pareto_df.columns and "quality" in pareto_df.columns:
# Sort by energy ascending
sorted_df = pareto_df.sort_values("energy")
energies = sorted_df["energy"].values
qualities = sorted_df["quality"].values
# Reference point (Worst possible outcomes)
# Assuming energy can go up to budget*1.2 and quality can go down to 0
ref_energy = max(energies) + 2.0
ref_quality = 0.0
# Calculate area of rectangles formed by Pareto points relative to ref point
# For a 2D minimization/maximization:
# Here we minimize Energy (x) and maximize Quality (y).
# Shift quality to be minimized: q_min = 100 - quality
q_min = 100.0 - qualities
q_ref = 100.0 - ref_quality
# Now we have two minimization objectives: energies, q_min
# Reference: [ref_energy, q_ref]
area = 0.0
prev_e = ref_energy
for i in range(len(energies)):
# Width = distance to previous energy (or ref)
# Since sorted by energy, energies[i] is the best (smallest) energy in this slice
width = prev_e - energies[i]
height = q_ref - q_min[i]
area += width * height
prev_e = energies[i]
# Normalize by total possible area ([0, ref_e], [0, 100])
hypervolume = area / (ref_energy * 100.0)
print(f" Pareto Hypervolume (Normalized): {hypervolume:.6f}")
if not pareto_df.empty:
pareto_df.to_pickle(os.path.join(CFG.PROC_DIR, "last_pareto.pkl"))
import json
meta = {"hypervolume": float(hypervolume), "n_solutions": len(pareto_df)}
with open(os.path.join(CFG.PROC_DIR, "pareto_meta.json"), "w") as f:
json.dump(meta, f)
return pareto_df
def main():
print(">>> Starting Layer 4: Optuna Pareto Optimization")
pareto = run_optimization(n_trials=CFG.OPTUNA_TRIALS_DEMO)
print(f"Optimal solutions found: {len(pareto)}")
print("="*60)
print(f"✅ LAYER 4 COMPLETE")
print("="*60)
if __name__ == "__main__":
main()
|