batchmind-os / batchmind_os /layer4_optimizer /optuna_optimizer.py
23f3002638
Initial commit with LFS tracking
038ee19
"""
BatchMind OS: Layer 4 - Optuna Optimizer
Multi-objective optimization for Quality Index and Energy Efficiency.
"""
import os
import sys
import pickle
import pandas as pd
import numpy as np
import optuna
from lightgbm import LGBMRegressor
# Add parent directory to path for config import
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import CFG
def calculate_quality_index(preds: dict) -> float:
"""Literal Quality Index formula from spec."""
diss = preds["Dissolution_Rate"]
uni = preds["Content_Uniformity"]
hard = preds["Hardness"]
disin = preds["Disintegration_Time"]
# QI = (0.40 * Diss + 0.30 * Uni + 0.15 * Hard/100 + 0.15 * (1/max(Disin,0.1))*10)
qi = (0.40 * diss + 0.30 * uni + 0.15 * (hard / 100.0) + 0.15 * (1.0 / max(disin, 0.1)) * 10.0)
return float(qi)
def run_optimization(constraints: dict = None, n_trials: int = 80) -> pd.DataFrame:
"""Exposed function for real-time optimization. Returns Pareto front solutions."""
if constraints is None:
constraints = CFG.DEFAULT_CONSTRAINTS
# Load requirements
with open(os.path.join(CFG.PROC_DIR, "X_final.pkl"), "rb") as f:
X_train = pickle.load(f)
with open(os.path.join(CFG.MODEL_DIR, "base_models_median.pkl"), "rb") as f:
base_models = pickle.load(f)
with open(os.path.join(CFG.MODEL_DIR, "meta_models.pkl"), "rb") as f:
meta_models = pickle.load(f)
feature_names = X_train.columns.tolist()
base_vector = X_train.median()
# Load Energy Model
with open(os.path.join(CFG.PROC_DIR, "energy_model.pkl"), "rb") as f:
energy_reg = pickle.load(f)
energy_features = ["Compression_Force", "Machine_Speed", "Drying_Temp", "Drying_Time", "Granulation_Time"]
def objective(trial):
params = {}
for name, (low, high) in CFG.PARAM_SPACE.items():
params[name] = trial.suggest_float(name, low, high)
# Estimate Energy
e_input = np.array([[params[f] for f in energy_features]])
energy_kwh = float(energy_reg.predict(e_input)[0])
# Build feature vector
x_trial = base_vector.copy()
for p_name, val in params.items():
matches = [f for f in feature_names if p_name in f]
for m in matches: x_trial[m] = val
X_input = pd.DataFrame([x_trial])
target_preds = {}
for t in CFG.TARGET_COLS:
# Match training: X_meta = pd.concat([X, base_target_pred], axis=1)
b_pred = base_models[t].predict(X_input)
X_meta = pd.concat([X_input.reset_index(drop=True), pd.Series(b_pred, name=f"base_{t}")], axis=1)
X_meta.columns = [str(c) for c in X_meta.columns]
target_preds[t] = meta_models[t].predict(X_meta.astype(float))[0]
# Soft Constraints -> Graduated penalties (not flat 100)
penalty = 0.0
hard_min = constraints.get("hardness_min", 80.0)
fria_max = constraints.get("friability_max", 1.0)
diss_min = constraints.get("dissolution_min", 85.0)
energy_max = constraints.get("max_energy_kwh", 999.0)
if target_preds["Hardness"] < hard_min:
penalty += (hard_min - target_preds["Hardness"]) * 0.5
if target_preds["Friability"] > fria_max:
penalty += (target_preds["Friability"] - fria_max) * 5.0
if target_preds["Dissolution_Rate"] < diss_min:
penalty += (diss_min - target_preds["Dissolution_Rate"]) * 0.5
if energy_kwh > energy_max:
penalty += (energy_kwh - energy_max) * 2.0
# Quality Index (0-100 range)
qi = calculate_quality_index(target_preds)
# Set attributes
trial.set_user_attr("energy", energy_kwh)
trial.set_user_attr("quality", qi)
trial.set_user_attr("penalty", penalty)
for t in CFG.TARGET_COLS:
trial.set_user_attr(f"{t}_pred", target_preds[t])
return energy_kwh + penalty, -(qi - penalty)
sampler = optuna.samplers.TPESampler(n_startup_trials=15, multivariate=True, seed=CFG.RANDOM_STATE)
study = optuna.create_study(directions=["minimize", "minimize"], sampler=sampler)
study.optimize(objective, n_trials=n_trials, timeout=CFG.OPTUNA_TIMEOUT_SEC)
# Collect ALL completed trials, not just best_trials (which is too restrictive)
all_rows = []
for trial in study.trials:
if trial.state != optuna.trial.TrialState.COMPLETE:
continue
row = trial.params.copy()
row.update(trial.user_attrs)
all_rows.append(row)
if not all_rows:
return pd.DataFrame()
all_df = pd.DataFrame(all_rows)
# Filter to feasible solutions (low penalty) and build Pareto front
# Keep trials with penalty < 5 (mostly feasible), or all if none qualify
feasible = all_df[all_df["penalty"] < 5.0] if "penalty" in all_df.columns else all_df
if feasible.empty:
feasible = all_df.nsmallest(20, "penalty")
# Extract Pareto-optimal front: non-dominated on (energy, -quality)
solutions = []
sorted_df = feasible.sort_values("energy").reset_index(drop=True)
best_quality = -1
for _, row in sorted_df.iterrows():
if row["quality"] > best_quality:
best_quality = row["quality"]
solutions.append(row.to_dict())
# If still too few, add top-quality diverse points
if len(solutions) < 5 and len(feasible) >= 5:
remaining = feasible[~feasible.index.isin([s.get('index', -1) for s in solutions])]
extra = remaining.nlargest(min(10, len(remaining)), "quality")
for _, row in extra.iterrows():
d = row.to_dict()
if not any(abs(s["energy"] - d["energy"]) < 0.1 and abs(s["quality"] - d["quality"]) < 0.5 for s in solutions):
solutions.append(d)
if len(solutions) >= 15:
break
pareto_df = pd.DataFrame(solutions)
# --- Hypervolume Indicator (2D Area Under Pareto) ---
hypervolume = 0.0
if not pareto_df.empty and "energy" in pareto_df.columns and "quality" in pareto_df.columns:
# Sort by energy ascending
sorted_df = pareto_df.sort_values("energy")
energies = sorted_df["energy"].values
qualities = sorted_df["quality"].values
# Reference point (Worst possible outcomes)
# Assuming energy can go up to budget*1.2 and quality can go down to 0
ref_energy = max(energies) + 2.0
ref_quality = 0.0
# Calculate area of rectangles formed by Pareto points relative to ref point
# For a 2D minimization/maximization:
# Here we minimize Energy (x) and maximize Quality (y).
# Shift quality to be minimized: q_min = 100 - quality
q_min = 100.0 - qualities
q_ref = 100.0 - ref_quality
# Now we have two minimization objectives: energies, q_min
# Reference: [ref_energy, q_ref]
area = 0.0
prev_e = ref_energy
for i in range(len(energies)):
# Width = distance to previous energy (or ref)
# Since sorted by energy, energies[i] is the best (smallest) energy in this slice
width = prev_e - energies[i]
height = q_ref - q_min[i]
area += width * height
prev_e = energies[i]
# Normalize by total possible area ([0, ref_e], [0, 100])
hypervolume = area / (ref_energy * 100.0)
print(f" Pareto Hypervolume (Normalized): {hypervolume:.6f}")
if not pareto_df.empty:
pareto_df.to_pickle(os.path.join(CFG.PROC_DIR, "last_pareto.pkl"))
import json
meta = {"hypervolume": float(hypervolume), "n_solutions": len(pareto_df)}
with open(os.path.join(CFG.PROC_DIR, "pareto_meta.json"), "w") as f:
json.dump(meta, f)
return pareto_df
def main():
print(">>> Starting Layer 4: Optuna Pareto Optimization")
pareto = run_optimization(n_trials=CFG.OPTUNA_TRIALS_DEMO)
print(f"Optimal solutions found: {len(pareto)}")
print("="*60)
print(f"✅ LAYER 4 COMPLETE")
print("="*60)
if __name__ == "__main__":
main()