""" Layer 0: Preprocessing - Load Excel data - Clean and normalize time-series signal data - Record phase boundaries and flag outliers - Save cleaned data for feature extraction """ import pandas as pd import numpy as np import os import json import pickle from sklearn.preprocessing import StandardScaler import sys # Add parent directory to path for config import sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from config import CFG # Removed redundant function - See line 87 for the implementation used during inference. def get_phase_boundaries(batch_df: pd.DataFrame) -> dict: """Extracts start/end minutes for each phase.""" boundaries = {} for phase in CFG.PHASES: p_data = batch_df[batch_df["Phase"] == phase] if not p_data.empty: boundaries[phase] = { "start": int(p_data["Time_Minutes"].min()), "end": int(p_data["Time_Minutes"].max()), "duration": int(p_data["Time_Minutes"].max() - p_data["Time_Minutes"].min()) } else: boundaries[phase] = {"start": None, "end": None, "duration": 0} return boundaries def flag_outliers(batch_df: pd.DataFrame) -> dict: """Flags outliers using 3xIQR rule per phase per signal.""" flags = {} for phase in CFG.PHASES: p_df = batch_df[batch_df["Phase"] == phase] if p_df.empty: continue for sig in CFG.SIGNAL_COLS: if sig not in p_df.columns: continue data = p_df[sig].dropna() if len(data) < 4: continue q1 = data.quantile(0.25) q3 = data.quantile(0.75) iqr = q3 - q1 if iqr == 0: continue # Skip constant signals lower = q1 - 3 * iqr upper = q3 + 3 * iqr outliers = p_df[(p_df[sig] < lower) | (p_df[sig] > upper)] if len(outliers) > 0: flags[f"{phase}__{sig}"] = len(outliers) return flags def preprocess_single_batch(raw_df: pd.DataFrame) -> pd.DataFrame: """ Real-time handling for a single new batch's raw time-series data. Applicable during inference. Uses saved scaler. """ df = raw_df.copy() # Deduplication if "Time_Minutes" in df.columns: df = df.drop_duplicates(subset=["Time_Minutes"]) df["Time_Minutes"] = df["Time_Minutes"].fillna(0).astype(int) # Standardize signals to numeric for col in CFG.SIGNAL_COLS: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') # ffill().bfill() for missing values df = df.ffill().bfill() # Load and apply scaler scaler_path = os.path.join(CFG.PROC_DIR, "scaler.pkl") if os.path.exists(scaler_path): with open(scaler_path, "rb") as f: scaler = pickle.load(f) # Handle cases where some signal columns might be missing in input for col in CFG.SIGNAL_COLS: if col not in df.columns: df[col] = 0.0 # Default fallback df[CFG.SIGNAL_COLS] = scaler.transform(df[CFG.SIGNAL_COLS]) else: print("Warning: Scaler not found. Returning unscaled data.") return df def main(): CFG.make_dirs() print(">>> Starting Layer 0: Preprocessing") # 1. Ingest Process Data (60 sheets) print(f"Reading process data from {CFG.PROCESS_FILE}...") xls = pd.ExcelFile(CFG.PROCESS_FILE) batch_dfs = [] for sheet in xls.sheet_names: if "Summary" in sheet: continue df = pd.read_excel(xls, sheet_name=sheet) # Handle "Batch T011", "Batch_T011", "Batch T011 " batch_id = sheet.replace("Batch", "").replace("_", "").replace(" ", "").strip() df["Batch_ID"] = batch_id # Standardize Time_Minutes as int if "Time_Minutes" in df.columns: df["Time_Minutes"] = df["Time_Minutes"].fillna(0).astype(int) batch_dfs.append(df) full_process = pd.concat(batch_dfs, ignore_index=True) # Deduplication on (Batch_ID, Time_Minutes) full_process = full_process.drop_duplicates(subset=["Batch_ID", "Time_Minutes"]) # 2. Ingest Production Data print(f"Reading production data from {CFG.PROD_FILE}...") prod_df = pd.read_excel(CFG.PROD_FILE, sheet_name="BatchData") # Rename 'Batch' to 'Batch_ID' if needed if "Batch" in prod_df.columns: prod_df = prod_df.rename(columns={"Batch": "Batch_ID"}) prod_df["Batch_ID"] = prod_df["Batch_ID"].astype(str).str.replace("Batch", "").str.replace("_", "").str.replace(" ", "").str.strip() # 3. Clean and process per batch clean_batches = [] phase_boundaries = {} outlier_report = {} quality_report = [] batch_ids = full_process["Batch_ID"].unique() print(f"Processing {len(batch_ids)} batches...") for bid in batch_ids: batch_df = full_process[full_process["Batch_ID"] == bid].sort_values("Time_Minutes").copy() # Extract boundaries boundaries = get_phase_boundaries(batch_df) phase_boundaries[bid] = boundaries # Quality check: count phases present_phases = [p for p, v in boundaries.items() if v["start"] is not None] status = "PASS" if len(present_phases) >= 7 else "FLAG" # Mark if > 1 phase missing quality_report.append({"Batch_ID": bid, "Status": status, "Phases_Found": len(present_phases)}) # Flag outliers outlier_report[bid] = flag_outliers(batch_df) # Convert signals to numeric for col in CFG.SIGNAL_COLS: if col in batch_df.columns: batch_df[col] = pd.to_numeric(batch_df[col], errors='coerce') # ffill().bfill() per batch batch_df = batch_df.ffill().bfill() clean_batches.append(batch_df) clean_process = pd.concat(clean_batches, ignore_index=True) # 4. Global median fill for any remaining NaNs for col in CFG.SIGNAL_COLS: if col in clean_process.columns: median_val = clean_process[col].median() clean_process[col] = clean_process[col].fillna(median_val) # 5. Split and Normalize np.random.seed(CFG.RANDOM_STATE) test_size = int(len(batch_ids) * CFG.TEST_SPLIT) test_ids = list(np.random.choice(batch_ids, size=test_size, replace=False)) train_ids = [b for b in batch_ids if b not in test_ids] scaler = StandardScaler() train_df = clean_process[clean_process["Batch_ID"].isin(train_ids)] scaler.fit(train_df[CFG.SIGNAL_COLS]) # Transform all clean_process[CFG.SIGNAL_COLS] = scaler.transform(clean_process[CFG.SIGNAL_COLS]) # 6. Save Artifacts print(f"Saving outputs to {CFG.PROC_DIR}...") with open(os.path.join(CFG.PROC_DIR, "process_clean.pkl"), "wb") as f: pickle.dump(clean_process, f) # production_clean.pkl: only use TARGET_COLS + PARAM_COLS + Batch_ID cols_prod = ["Batch_ID"] + CFG.TARGET_COLS + CFG.PARAM_COLS prod_clean = prod_df[prod_df["Batch_ID"].isin(batch_ids)][cols_prod] with open(os.path.join(CFG.PROC_DIR, "production_clean.pkl"), "wb") as f: pickle.dump(prod_clean, f) pd.DataFrame(quality_report).to_csv(os.path.join(CFG.PROC_DIR, "quality_report.csv"), index=False) with open(os.path.join(CFG.PROC_DIR, "phase_boundaries.json"), "w") as f: json.dump(phase_boundaries, f, indent=4) with open(os.path.join(CFG.PROC_DIR, "outlier_report.json"), "w") as f: json.dump(outlier_report, f, indent=4) with open(os.path.join(CFG.PROC_DIR, "scaler.pkl"), "wb") as f: pickle.dump(scaler, f) with open(os.path.join(CFG.PROC_DIR, "train_batch_ids.pkl"), "wb") as f: pickle.dump(train_ids, f) with open(os.path.join(CFG.PROC_DIR, "test_batch_ids.pkl"), "wb") as f: pickle.dump(test_ids, f) print(f"Layer 0 complete. Processed {len(batch_ids)} batches.") print("="*60) print(f"LAYER 0 COMPLETE") print(f" Output shape: {clean_process.shape}") print(f" NaN count: {clean_process.isna().sum().sum()}") print("="*60) if __name__ == "__main__": main()