""" api.routers.simulate ==================== Bulk battery lifecycle simulation endpoint - vectorized ML-driven. Performance design (O(1) Python overhead per battery regardless of step count): 1. SEI impedance growth - numpy cumsum (no Python loop) 2. Feature matrix build - numpy column_stack -> (N_steps, n_features) 3. ML prediction - single model.predict() call via predict_array() 4. RUL / EOL - numpy diff / cumsum / searchsorted 5. Classify / colorize - numpy searchsorted on pre-built label arrays Scaler dispatch mirrors training exactly: Tree models (RF / ET / XGB / LGB / GB) -> raw numpy (no scaler) Linear / SVR / KNN -> standard_scaler.joblib.transform(X) best_ensemble -> per-component dispatch (same rules) Deep sequence models (PyTorch / Keras) -> not batchable, falls back to physics """ from __future__ import annotations import logging from pathlib import Path from typing import List, Optional import joblib import numpy as np from fastapi import APIRouter from pydantic import BaseModel, Field from api.model_registry import ( FEATURE_COLS_SCALAR, V3_FEATURE_COLS, classify_degradation, soh_to_color, registry_v3 as registry_v2, ) log = logging.getLogger(__name__) router = APIRouter(prefix="/api/v3", tags=["simulation"]) _Q_NOM = 2.0 # NASA PCoE nominal capacity (Ah) _TIME_UNIT_SECONDS: dict[str, float | None] = { "cycle": None, "second": 1.0, "minute": 60.0, "hour": 3_600.0, "day": 86_400.0, "week": 604_800.0, "month": 2_592_000.0, "year": 31_536_000.0, } _TIME_UNIT_LABELS: dict[str, str] = { "cycle": "Cycles", "second": "Seconds", "minute": "Minutes", "hour": "Hours", "day": "Days", "week": "Weeks", "month": "Months", "year": "Years", } # Column index map - must stay in sync with FEATURE_COLS_SCALAR (12 features) _F = {col: idx for idx, col in enumerate(FEATURE_COLS_SCALAR)} # Column index map for V3_FEATURE_COLS (18 features) _F3 = {col: idx for idx, col in enumerate(V3_FEATURE_COLS)} # Pre-built label/color arrays for O(1) numpy-vectorized classification _SOH_BINS = np.array([70.0, 80.0, 90.0]) # searchsorted thresholds _DEG_LABELS = np.array(["End-of-Life", "Degraded", "Moderate", "Healthy"], dtype=object) _COLOR_HEX = np.array(["#ef4444", "#f97316", "#eab308", "#22c55e"], dtype=object) # Optional learned Re/Rct progression model (trained in NB03). _RE_RCT_MODEL_PATH = Path("artifacts") / "v3" / "models" / "classical" / "re_rct_progression.joblib" try: _re_rct_model = joblib.load(_RE_RCT_MODEL_PATH) except Exception: _re_rct_model = None def _vec_classify(soh: np.ndarray) -> list[str]: """Vectorized classify_degradation - single numpy call, no Python for-loop.""" return _DEG_LABELS[np.searchsorted(_SOH_BINS, soh, side="left")].tolist() def _vec_color(soh: np.ndarray) -> list[str]: """Vectorized soh_to_color - single numpy call, no Python for-loop.""" return _COLOR_HEX[np.searchsorted(_SOH_BINS, soh, side="left")].tolist() # -- Schemas ------------------------------------------------------------------ class BatterySimConfig(BaseModel): battery_id: str label: Optional[str] = None initial_soh: float = Field(default=100.0, ge=0.0, le=100.0) start_cycle: int = Field(default=1, ge=1) ambient_temperature: float = Field(default=24.0) peak_voltage: float = Field(default=4.19) min_voltage: float = Field(default=2.61) avg_current: float = Field(default=1.82) avg_temp: float = Field(default=32.6) temp_rise: float = Field(default=14.7) cycle_duration: float = Field(default=3690.0) Re: float = Field(default=0.045) Rct: float = Field(default=0.069) delta_capacity: float = Field(default=-0.005) class SimulateRequest(BaseModel): batteries: List[BatterySimConfig] steps: int = Field(default=200, ge=1, le=10_000) time_unit: str = Field(default="day") eol_threshold: float = Field(default=70.0, ge=0.0, le=100.0) model_name: Optional[str] = Field(default=None) use_ml: bool = Field(default=True) class BatterySimResult(BaseModel): battery_id: str label: Optional[str] soh_history: List[float] rul_history: List[float] rul_time_history: List[float] re_history: List[float] rct_history: List[float] cycle_history: List[int] time_history: List[float] degradation_history: List[str] color_history: List[str] eol_cycle: Optional[int] eol_time: Optional[float] final_soh: float final_rul: float deg_rate_avg: float model_used: str = "physics" class SimulateResponse(BaseModel): results: List[BatterySimResult] time_unit: str time_unit_label: str steps: int model_used: str = "physics" # -- Helpers ------------------------------------------------------------------ def _build_feature_matrix( b: BatterySimConfig, cycle_arr: np.ndarray, re_arr: np.ndarray, rct_arr: np.ndarray, soh_rolling_override: np.ndarray | None = None, ) -> np.ndarray: """Build (steps, n_features) feature matrix in registry.feature_cols order. For v3 (18 features) the extra 6 engineered columns are estimated from physics: - capacity_retention: current capacity / initial capacity - cumulative_energy: cumsumed capacity per cycle (Ah) - dRe_dn / dRct_dn: per-cycle derivative of SEI impedances - soh_rolling_mean: rolling mean of physics-estimated SOH trajectory - voltage_slope: assumed constant (0) in simulation Column ordering uses registry_v2.feature_cols so predictions are correct regardless of whether the registry is v1/v2 (12 cols) or v3 (18 cols). """ cycles = np.asarray(cycle_arr, dtype=np.float64) N = len(cycles) steps_elapsed = np.maximum(cycles - cycles[0], 0.0) # ---- 12 base features (always present) --------------------------------- feat_dict: dict[str, np.ndarray] = { "cycle_number": cycles, "ambient_temperature": np.full(N, b.ambient_temperature), "peak_voltage": np.full(N, b.peak_voltage), "min_voltage": np.full(N, b.min_voltage), "voltage_range": np.full(N, b.peak_voltage - b.min_voltage), "avg_current": np.full(N, b.avg_current), "avg_temp": np.full(N, b.avg_temp), "temp_rise": np.full(N, b.temp_rise), "cycle_duration": np.full(N, b.cycle_duration), "Re": re_arr, "Rct": rct_arr, "delta_capacity": np.full(N, b.delta_capacity), } # ---- 6 extra v3 features (estimated from physics) ---------------------- initial_cap = max(b.initial_soh / 100.0 * _Q_NOM, 1e-6) # Ah cap_per_step = np.maximum(initial_cap + b.delta_capacity * steps_elapsed, 0.0) # capacity_retention = current_capacity / initial_capacity (ratio ~0-1) cap_retention = np.clip(cap_per_step / initial_cap, 0.0, None) # cumulative energy delivered (Ah) cum_energy = np.cumsum(cap_per_step) # per-cycle SEI impedance derivatives dRe_dn = np.diff(re_arr, prepend=b.Re) dRct_dn = np.diff(rct_arr, prepend=b.Rct) # physics-estimated SOH rolling mean (window=10, min_periods=1) --- # used as a proxy since soh_rolling_mean is a v3 training feature deg_pct_per_cycle = abs(b.delta_capacity) / _Q_NOM * 100.0 soh_est = np.maximum(b.initial_soh - deg_pct_per_cycle * steps_elapsed, 0.0) # rolling mean via cumsum (O(N), no Python loop) window = 10 csoh = np.cumsum(np.concatenate([[0.0], soh_est])) cnt = np.minimum(np.arange(1, N + 1), window) start = np.maximum(np.arange(N + 1)[1:] - window, 0) soh_rolling = (csoh[np.arange(1, N + 1)] - csoh[start]) / cnt if soh_rolling_override is not None: soh_rolling = np.asarray(soh_rolling_override, dtype=np.float64) feat_dict.update({ "capacity_retention": cap_retention, "cumulative_energy": cum_energy, "dRe_dn": dRe_dn, "dRct_dn": dRct_dn, "soh_rolling_mean": soh_rolling, "voltage_slope": np.zeros(N), "coulombic_efficiency": np.zeros(N), # always 0 in training data }) # Build matrix in registry's feature_cols order; unknown cols default to 0 feat_cols = registry_v2.feature_cols # 12 for v1/v2, 18 for v3 return np.column_stack([feat_dict.get(col, np.zeros(N)) for col in feat_cols]) def _rolling_mean(arr: np.ndarray, window: int = 5) -> np.ndarray: """Fast rolling mean with min_periods=1 semantics.""" x = np.asarray(arr, dtype=np.float64) n = len(x) csum = np.cumsum(np.concatenate([[0.0], x])) idx = np.arange(1, n + 1) start = np.maximum(idx - window, 0) count = np.minimum(idx, window) return (csum[idx] - csum[start]) / count def _ml_re_rct( re0: float, rct0: float, cycle_arr: np.ndarray, elapsed_cycles: np.ndarray, ) -> tuple[np.ndarray, np.ndarray]: """Predict Re/Rct from learned regressors and anchor to user initial state.""" if _re_rct_model is None: # Minimal linear fallback if helper model is unavailable. n = np.asarray(elapsed_cycles, dtype=np.float64) re_arr = np.minimum(re0 + 0.00012 * n, 2.0) rct_arr = np.minimum(rct0 + 0.00018 * n, 3.0) return re_arr, rct_arr x = np.asarray(cycle_arr, dtype=np.float64).reshape(-1, 1) re_pred = _re_rct_model["re_model"].predict(x) rct_pred = _re_rct_model["rct_model"].predict(x) re_arr = np.clip(re_pred + (re0 - float(re_pred[0])), 0.0, 2.0) rct_arr = np.clip(rct_pred + (rct0 - float(rct_pred[0])), 0.0, 3.0) return re_arr, rct_arr def _compute_rul_and_eol( soh_arr: np.ndarray, initial_soh: float, eol_thr: float, cycle_start: int, cycle_arr: np.ndarray, elapsed_cycles: np.ndarray, cycle_dur: float, tu_sec: float | None, ) -> tuple[np.ndarray, np.ndarray, Optional[int], Optional[float]]: """Vectorized RUL and EOL from SOH trajectory. Returns (rul_cycles, rul_time, eol_cycle, eol_time). Uses rolling-average degradation rate for smooth RUL estimate. """ cycles = np.asarray(cycle_arr, dtype=np.int64) elapsed = np.asarray(elapsed_cycles, dtype=np.float64) # Rolling average degradation rate (smoothed, avoids division-by-zero) soh_prev = np.concatenate([[initial_soh], soh_arr[:-1]]) step_deg = np.maximum(0.0, soh_prev - soh_arr) cum_deg = np.cumsum(step_deg) avg_rate = np.maximum(cum_deg / np.maximum(elapsed, 1.0), 1e-6) rul_cycles = np.where(soh_arr > eol_thr, (soh_arr - eol_thr) / avg_rate, 0.0) rul_time = (rul_cycles * cycle_dur / tu_sec) if tu_sec is not None else rul_cycles.copy() # EOL: first step where SOH <= threshold below = soh_arr <= eol_thr eol_cycle: Optional[int] = None eol_time: Optional[float] = None if below.any(): idx = int(np.argmax(below)) eol_cycle = int(cycles[idx]) elapsed_s = max(0.0, float(eol_cycle - cycle_start) * cycle_dur) eol_time = round((elapsed_s / tu_sec) if tu_sec else float(eol_cycle), 3) return rul_cycles, rul_time, eol_cycle, eol_time # -- Endpoint ----------------------------------------------------------------- @router.post( "/simulate", response_model=SimulateResponse, summary="Bulk battery lifecycle simulation (vectorized, ML-driven)", ) async def simulate_batteries(req: SimulateRequest): """ Vectorized simulation: builds all N feature rows at once per battery, dispatches to the ML model as a single batch predict() call, then post-processes entirely with numpy (no Python for-loops). Scaler usage mirrors NB03 training exactly: - Tree models (RF/ET/XGB/LGB/GB): raw numpy X, no scaler - Linear/SVR/KNN: standard_scaler.joblib.transform(X) - best_ensemble: per-component family dispatch """ time_unit = req.time_unit.lower() if time_unit not in _TIME_UNIT_SECONDS: time_unit = "day" tu_sec = _TIME_UNIT_SECONDS[time_unit] tu_label = _TIME_UNIT_LABELS[time_unit] eol_thr = req.eol_threshold N = req.steps requested_model = req.model_name or registry_v2.default_model or "best_ensemble" # Resolve to a batchable loaded model once (to avoid per-battery fallback spam). # Priority: requested -> registry default -> first loaded classical model. model_name = requested_model if requested_model == "best_ensemble": ensemble_components = registry_v2.model_meta.get("best_ensemble", {}).get("components", []) if not ensemble_components: model_name = registry_v2.default_model or "" family = registry_v2.model_meta.get(model_name, {}).get("family", "classical") is_deep = family in ("deep_pytorch", "deep_keras") if (model_name not in registry_v2.models) or is_deep: fallback_loaded = [ name for name, meta in registry_v2.model_meta.items() if name in registry_v2.models and meta.get("family") == "classical" ] if fallback_loaded: model_name = fallback_loaded[0] family = registry_v2.model_meta.get(model_name, {}).get("family", "classical") is_deep = family in ("deep_pytorch", "deep_keras") ml_batchable = req.use_ml and not is_deep and (model_name == "best_ensemble" or model_name in registry_v2.models) # Determine scaler note for logging (mirrors training decision exactly) if registry_v2.model_meta.get(model_name, {}).get("requires_scaling", False): scaler_note = "standard_scaler" elif model_name == "best_ensemble": scaler_note = "per-component (tree=none / linear=standard_scaler)" else: scaler_note = "none (tree)" effective_model = "linear_fallback" log.info( "simulate: %d batteries x %d steps | requested=%s | effective=%s | batchable=%s | scaler=%s | unit=%s", len(req.batteries), N, requested_model, model_name, ml_batchable, scaler_note, time_unit, ) results: list[BatterySimResult] = [] for b in req.batteries: cycles_per_step = 1.0 if tu_sec is None else (tu_sec / max(b.cycle_duration, 1e-6)) elapsed_cycles = np.arange(N, dtype=np.float64) * cycles_per_step cycle_arr_float = b.start_cycle + elapsed_cycles cycle_arr = np.maximum(b.start_cycle, np.floor(cycle_arr_float).astype(np.int64)) # 1. Re/Rct progression - prefer learned model, fallback to simple linear slopes. re_arr, rct_arr = _ml_re_rct(b.Re, b.Rct, cycle_arr, elapsed_cycles) # 2. SOH prediction with two-pass soh_rolling_mean correction. if ml_batchable: X_pass1 = _build_feature_matrix(b, cycle_arr, re_arr, rct_arr) try: soh_pass1, effective_model = registry_v2.predict_array(X_pass1, model_name) soh_roll = _rolling_mean(soh_pass1, window=5) X_pass2 = _build_feature_matrix( b, cycle_arr, re_arr, rct_arr, soh_rolling_override=soh_roll, ) soh_arr, effective_model = registry_v2.predict_array(X_pass2, model_name) except Exception as exc: log.warning( "predict_array failed for %s (%s) - falling back to linear", b.battery_id, exc, ) deg_pct_per_cycle = abs(b.delta_capacity) / _Q_NOM * 100.0 soh_arr = np.clip(b.initial_soh - deg_pct_per_cycle * (elapsed_cycles + 1.0), 0.0, 100.0) effective_model = "linear_fallback" else: deg_pct_per_cycle = abs(b.delta_capacity) / _Q_NOM * 100.0 soh_arr = np.clip(b.initial_soh - deg_pct_per_cycle * (elapsed_cycles + 1.0), 0.0, 100.0) effective_model = "linear_fallback" soh_arr = np.clip(soh_arr, 0.0, 100.0) # 3. RUL + EOL - vectorized rul_cycles, rul_time, eol_cycle, eol_time = _compute_rul_and_eol( soh_arr, b.initial_soh, eol_thr, b.start_cycle, cycle_arr, elapsed_cycles, b.cycle_duration, tu_sec, ) # 4. Time axis - vectorized time_arr = np.arange(N, dtype=np.float64) if tu_sec is not None else cycle_arr.astype(np.float64) # 5. Labels + colors - fully vectorized via numpy searchsorted # Replaces O(N) Python for-loop with a single C-level call deg_h = _vec_classify(soh_arr) color_h = _vec_color(soh_arr) avg_dr = float(np.mean(np.maximum(0.0, -np.diff(soh_arr, prepend=b.initial_soh)))) # 6. Build result - numpy round + .tolist() (no per-element Python conversion) results.append(BatterySimResult( battery_id = b.battery_id, label = b.label or b.battery_id, soh_history = np.round(soh_arr, 3).tolist(), rul_history = np.round(rul_cycles, 1).tolist(), rul_time_history = np.round(rul_time, 2).tolist(), re_history = np.round(re_arr, 6).tolist(), rct_history = np.round(rct_arr, 6).tolist(), cycle_history = cycle_arr.tolist(), time_history = np.round(time_arr, 3).tolist(), degradation_history = deg_h, color_history = color_h, eol_cycle = eol_cycle, eol_time = eol_time, final_soh = round(float(soh_arr[-1]), 3), final_rul = round(float(rul_cycles[-1]), 1), deg_rate_avg = round(avg_dr, 6), model_used = effective_model, )) return SimulateResponse( results = results, time_unit = time_unit, time_unit_label = tu_label, steps = N, model_used = effective_model, )