Spaces:
Running
Running
| """ | |
| Layer 0: Preprocessing | |
| - Load Excel data | |
| - Clean and normalize time-series signal data | |
| - Record phase boundaries and flag outliers | |
| - Save cleaned data for feature extraction | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| import os | |
| import json | |
| import pickle | |
| from sklearn.preprocessing import StandardScaler | |
| import sys | |
| # Add parent directory to path for config import | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from config import CFG | |
| # Removed redundant function - See line 87 for the implementation used during inference. | |
| def get_phase_boundaries(batch_df: pd.DataFrame) -> dict: | |
| """Extracts start/end minutes for each phase.""" | |
| boundaries = {} | |
| for phase in CFG.PHASES: | |
| p_data = batch_df[batch_df["Phase"] == phase] | |
| if not p_data.empty: | |
| boundaries[phase] = { | |
| "start": int(p_data["Time_Minutes"].min()), | |
| "end": int(p_data["Time_Minutes"].max()), | |
| "duration": int(p_data["Time_Minutes"].max() - p_data["Time_Minutes"].min()) | |
| } | |
| else: | |
| boundaries[phase] = {"start": None, "end": None, "duration": 0} | |
| return boundaries | |
| def flag_outliers(batch_df: pd.DataFrame) -> dict: | |
| """Flags outliers using 3xIQR rule per phase per signal.""" | |
| flags = {} | |
| for phase in CFG.PHASES: | |
| p_df = batch_df[batch_df["Phase"] == phase] | |
| if p_df.empty: continue | |
| for sig in CFG.SIGNAL_COLS: | |
| if sig not in p_df.columns: continue | |
| data = p_df[sig].dropna() | |
| if len(data) < 4: continue | |
| q1 = data.quantile(0.25) | |
| q3 = data.quantile(0.75) | |
| iqr = q3 - q1 | |
| if iqr == 0: continue # Skip constant signals | |
| lower = q1 - 3 * iqr | |
| upper = q3 + 3 * iqr | |
| outliers = p_df[(p_df[sig] < lower) | (p_df[sig] > upper)] | |
| if len(outliers) > 0: | |
| flags[f"{phase}__{sig}"] = len(outliers) | |
| return flags | |
| def preprocess_single_batch(raw_df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Real-time handling for a single new batch's raw time-series data. | |
| Applicable during inference. Uses saved scaler. | |
| """ | |
| df = raw_df.copy() | |
| # Deduplication | |
| if "Time_Minutes" in df.columns: | |
| df = df.drop_duplicates(subset=["Time_Minutes"]) | |
| df["Time_Minutes"] = df["Time_Minutes"].fillna(0).astype(int) | |
| # Standardize signals to numeric | |
| for col in CFG.SIGNAL_COLS: | |
| if col in df.columns: | |
| df[col] = pd.to_numeric(df[col], errors='coerce') | |
| # ffill().bfill() for missing values | |
| df = df.ffill().bfill() | |
| # Load and apply scaler | |
| scaler_path = os.path.join(CFG.PROC_DIR, "scaler.pkl") | |
| if os.path.exists(scaler_path): | |
| with open(scaler_path, "rb") as f: | |
| scaler = pickle.load(f) | |
| # Handle cases where some signal columns might be missing in input | |
| for col in CFG.SIGNAL_COLS: | |
| if col not in df.columns: | |
| df[col] = 0.0 # Default fallback | |
| df[CFG.SIGNAL_COLS] = scaler.transform(df[CFG.SIGNAL_COLS]) | |
| else: | |
| print("Warning: Scaler not found. Returning unscaled data.") | |
| return df | |
| def main(): | |
| CFG.make_dirs() | |
| print(">>> Starting Layer 0: Preprocessing") | |
| # 1. Ingest Process Data (60 sheets) | |
| print(f"Reading process data from {CFG.PROCESS_FILE}...") | |
| xls = pd.ExcelFile(CFG.PROCESS_FILE) | |
| batch_dfs = [] | |
| for sheet in xls.sheet_names: | |
| if "Summary" in sheet: continue | |
| df = pd.read_excel(xls, sheet_name=sheet) | |
| # Handle "Batch T011", "Batch_T011", "Batch T011 " | |
| batch_id = sheet.replace("Batch", "").replace("_", "").replace(" ", "").strip() | |
| df["Batch_ID"] = batch_id | |
| # Standardize Time_Minutes as int | |
| if "Time_Minutes" in df.columns: | |
| df["Time_Minutes"] = df["Time_Minutes"].fillna(0).astype(int) | |
| batch_dfs.append(df) | |
| full_process = pd.concat(batch_dfs, ignore_index=True) | |
| # Deduplication on (Batch_ID, Time_Minutes) | |
| full_process = full_process.drop_duplicates(subset=["Batch_ID", "Time_Minutes"]) | |
| # 2. Ingest Production Data | |
| print(f"Reading production data from {CFG.PROD_FILE}...") | |
| prod_df = pd.read_excel(CFG.PROD_FILE, sheet_name="BatchData") | |
| # Rename 'Batch' to 'Batch_ID' if needed | |
| if "Batch" in prod_df.columns: | |
| prod_df = prod_df.rename(columns={"Batch": "Batch_ID"}) | |
| prod_df["Batch_ID"] = prod_df["Batch_ID"].astype(str).str.replace("Batch", "").str.replace("_", "").str.replace(" ", "").str.strip() | |
| # 3. Clean and process per batch | |
| clean_batches = [] | |
| phase_boundaries = {} | |
| outlier_report = {} | |
| quality_report = [] | |
| batch_ids = full_process["Batch_ID"].unique() | |
| print(f"Processing {len(batch_ids)} batches...") | |
| for bid in batch_ids: | |
| batch_df = full_process[full_process["Batch_ID"] == bid].sort_values("Time_Minutes").copy() | |
| # Extract boundaries | |
| boundaries = get_phase_boundaries(batch_df) | |
| phase_boundaries[bid] = boundaries | |
| # Quality check: count phases | |
| present_phases = [p for p, v in boundaries.items() if v["start"] is not None] | |
| status = "PASS" if len(present_phases) >= 7 else "FLAG" # Mark if > 1 phase missing | |
| quality_report.append({"Batch_ID": bid, "Status": status, "Phases_Found": len(present_phases)}) | |
| # Flag outliers | |
| outlier_report[bid] = flag_outliers(batch_df) | |
| # Convert signals to numeric | |
| for col in CFG.SIGNAL_COLS: | |
| if col in batch_df.columns: | |
| batch_df[col] = pd.to_numeric(batch_df[col], errors='coerce') | |
| # ffill().bfill() per batch | |
| batch_df = batch_df.ffill().bfill() | |
| clean_batches.append(batch_df) | |
| clean_process = pd.concat(clean_batches, ignore_index=True) | |
| # 4. Global median fill for any remaining NaNs | |
| for col in CFG.SIGNAL_COLS: | |
| if col in clean_process.columns: | |
| median_val = clean_process[col].median() | |
| clean_process[col] = clean_process[col].fillna(median_val) | |
| # 5. Split and Normalize | |
| np.random.seed(CFG.RANDOM_STATE) | |
| test_size = int(len(batch_ids) * CFG.TEST_SPLIT) | |
| test_ids = list(np.random.choice(batch_ids, size=test_size, replace=False)) | |
| train_ids = [b for b in batch_ids if b not in test_ids] | |
| scaler = StandardScaler() | |
| train_df = clean_process[clean_process["Batch_ID"].isin(train_ids)] | |
| scaler.fit(train_df[CFG.SIGNAL_COLS]) | |
| # Transform all | |
| clean_process[CFG.SIGNAL_COLS] = scaler.transform(clean_process[CFG.SIGNAL_COLS]) | |
| # 6. Save Artifacts | |
| print(f"Saving outputs to {CFG.PROC_DIR}...") | |
| with open(os.path.join(CFG.PROC_DIR, "process_clean.pkl"), "wb") as f: | |
| pickle.dump(clean_process, f) | |
| # production_clean.pkl: only use TARGET_COLS + PARAM_COLS + Batch_ID | |
| cols_prod = ["Batch_ID"] + CFG.TARGET_COLS + CFG.PARAM_COLS | |
| prod_clean = prod_df[prod_df["Batch_ID"].isin(batch_ids)][cols_prod] | |
| with open(os.path.join(CFG.PROC_DIR, "production_clean.pkl"), "wb") as f: | |
| pickle.dump(prod_clean, f) | |
| pd.DataFrame(quality_report).to_csv(os.path.join(CFG.PROC_DIR, "quality_report.csv"), index=False) | |
| with open(os.path.join(CFG.PROC_DIR, "phase_boundaries.json"), "w") as f: | |
| json.dump(phase_boundaries, f, indent=4) | |
| with open(os.path.join(CFG.PROC_DIR, "outlier_report.json"), "w") as f: | |
| json.dump(outlier_report, f, indent=4) | |
| with open(os.path.join(CFG.PROC_DIR, "scaler.pkl"), "wb") as f: | |
| pickle.dump(scaler, f) | |
| with open(os.path.join(CFG.PROC_DIR, "train_batch_ids.pkl"), "wb") as f: | |
| pickle.dump(train_ids, f) | |
| with open(os.path.join(CFG.PROC_DIR, "test_batch_ids.pkl"), "wb") as f: | |
| pickle.dump(test_ids, f) | |
| print(f"Layer 0 complete. Processed {len(batch_ids)} batches.") | |
| print("="*60) | |
| print(f"LAYER 0 COMPLETE") | |
| print(f" Output shape: {clean_process.shape}") | |
| print(f" NaN count: {clean_process.isna().sum().sum()}") | |
| print("="*60) | |
| if __name__ == "__main__": | |
| main() | |