Spaces:
Running
Running
File size: 8,275 Bytes
038ee19 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 | """
Layer 0: Preprocessing
- Load Excel data
- Clean and normalize time-series signal data
- Record phase boundaries and flag outliers
- Save cleaned data for feature extraction
"""
import pandas as pd
import numpy as np
import os
import json
import pickle
from sklearn.preprocessing import StandardScaler
import sys
# Add parent directory to path for config import
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import CFG
# Removed redundant function - See line 87 for the implementation used during inference.
def get_phase_boundaries(batch_df: pd.DataFrame) -> dict:
"""Extracts start/end minutes for each phase."""
boundaries = {}
for phase in CFG.PHASES:
p_data = batch_df[batch_df["Phase"] == phase]
if not p_data.empty:
boundaries[phase] = {
"start": int(p_data["Time_Minutes"].min()),
"end": int(p_data["Time_Minutes"].max()),
"duration": int(p_data["Time_Minutes"].max() - p_data["Time_Minutes"].min())
}
else:
boundaries[phase] = {"start": None, "end": None, "duration": 0}
return boundaries
def flag_outliers(batch_df: pd.DataFrame) -> dict:
"""Flags outliers using 3xIQR rule per phase per signal."""
flags = {}
for phase in CFG.PHASES:
p_df = batch_df[batch_df["Phase"] == phase]
if p_df.empty: continue
for sig in CFG.SIGNAL_COLS:
if sig not in p_df.columns: continue
data = p_df[sig].dropna()
if len(data) < 4: continue
q1 = data.quantile(0.25)
q3 = data.quantile(0.75)
iqr = q3 - q1
if iqr == 0: continue # Skip constant signals
lower = q1 - 3 * iqr
upper = q3 + 3 * iqr
outliers = p_df[(p_df[sig] < lower) | (p_df[sig] > upper)]
if len(outliers) > 0:
flags[f"{phase}__{sig}"] = len(outliers)
return flags
def preprocess_single_batch(raw_df: pd.DataFrame) -> pd.DataFrame:
"""
Real-time handling for a single new batch's raw time-series data.
Applicable during inference. Uses saved scaler.
"""
df = raw_df.copy()
# Deduplication
if "Time_Minutes" in df.columns:
df = df.drop_duplicates(subset=["Time_Minutes"])
df["Time_Minutes"] = df["Time_Minutes"].fillna(0).astype(int)
# Standardize signals to numeric
for col in CFG.SIGNAL_COLS:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# ffill().bfill() for missing values
df = df.ffill().bfill()
# Load and apply scaler
scaler_path = os.path.join(CFG.PROC_DIR, "scaler.pkl")
if os.path.exists(scaler_path):
with open(scaler_path, "rb") as f:
scaler = pickle.load(f)
# Handle cases where some signal columns might be missing in input
for col in CFG.SIGNAL_COLS:
if col not in df.columns:
df[col] = 0.0 # Default fallback
df[CFG.SIGNAL_COLS] = scaler.transform(df[CFG.SIGNAL_COLS])
else:
print("Warning: Scaler not found. Returning unscaled data.")
return df
def main():
CFG.make_dirs()
print(">>> Starting Layer 0: Preprocessing")
# 1. Ingest Process Data (60 sheets)
print(f"Reading process data from {CFG.PROCESS_FILE}...")
xls = pd.ExcelFile(CFG.PROCESS_FILE)
batch_dfs = []
for sheet in xls.sheet_names:
if "Summary" in sheet: continue
df = pd.read_excel(xls, sheet_name=sheet)
# Handle "Batch T011", "Batch_T011", "Batch T011 "
batch_id = sheet.replace("Batch", "").replace("_", "").replace(" ", "").strip()
df["Batch_ID"] = batch_id
# Standardize Time_Minutes as int
if "Time_Minutes" in df.columns:
df["Time_Minutes"] = df["Time_Minutes"].fillna(0).astype(int)
batch_dfs.append(df)
full_process = pd.concat(batch_dfs, ignore_index=True)
# Deduplication on (Batch_ID, Time_Minutes)
full_process = full_process.drop_duplicates(subset=["Batch_ID", "Time_Minutes"])
# 2. Ingest Production Data
print(f"Reading production data from {CFG.PROD_FILE}...")
prod_df = pd.read_excel(CFG.PROD_FILE, sheet_name="BatchData")
# Rename 'Batch' to 'Batch_ID' if needed
if "Batch" in prod_df.columns:
prod_df = prod_df.rename(columns={"Batch": "Batch_ID"})
prod_df["Batch_ID"] = prod_df["Batch_ID"].astype(str).str.replace("Batch", "").str.replace("_", "").str.replace(" ", "").str.strip()
# 3. Clean and process per batch
clean_batches = []
phase_boundaries = {}
outlier_report = {}
quality_report = []
batch_ids = full_process["Batch_ID"].unique()
print(f"Processing {len(batch_ids)} batches...")
for bid in batch_ids:
batch_df = full_process[full_process["Batch_ID"] == bid].sort_values("Time_Minutes").copy()
# Extract boundaries
boundaries = get_phase_boundaries(batch_df)
phase_boundaries[bid] = boundaries
# Quality check: count phases
present_phases = [p for p, v in boundaries.items() if v["start"] is not None]
status = "PASS" if len(present_phases) >= 7 else "FLAG" # Mark if > 1 phase missing
quality_report.append({"Batch_ID": bid, "Status": status, "Phases_Found": len(present_phases)})
# Flag outliers
outlier_report[bid] = flag_outliers(batch_df)
# Convert signals to numeric
for col in CFG.SIGNAL_COLS:
if col in batch_df.columns:
batch_df[col] = pd.to_numeric(batch_df[col], errors='coerce')
# ffill().bfill() per batch
batch_df = batch_df.ffill().bfill()
clean_batches.append(batch_df)
clean_process = pd.concat(clean_batches, ignore_index=True)
# 4. Global median fill for any remaining NaNs
for col in CFG.SIGNAL_COLS:
if col in clean_process.columns:
median_val = clean_process[col].median()
clean_process[col] = clean_process[col].fillna(median_val)
# 5. Split and Normalize
np.random.seed(CFG.RANDOM_STATE)
test_size = int(len(batch_ids) * CFG.TEST_SPLIT)
test_ids = list(np.random.choice(batch_ids, size=test_size, replace=False))
train_ids = [b for b in batch_ids if b not in test_ids]
scaler = StandardScaler()
train_df = clean_process[clean_process["Batch_ID"].isin(train_ids)]
scaler.fit(train_df[CFG.SIGNAL_COLS])
# Transform all
clean_process[CFG.SIGNAL_COLS] = scaler.transform(clean_process[CFG.SIGNAL_COLS])
# 6. Save Artifacts
print(f"Saving outputs to {CFG.PROC_DIR}...")
with open(os.path.join(CFG.PROC_DIR, "process_clean.pkl"), "wb") as f:
pickle.dump(clean_process, f)
# production_clean.pkl: only use TARGET_COLS + PARAM_COLS + Batch_ID
cols_prod = ["Batch_ID"] + CFG.TARGET_COLS + CFG.PARAM_COLS
prod_clean = prod_df[prod_df["Batch_ID"].isin(batch_ids)][cols_prod]
with open(os.path.join(CFG.PROC_DIR, "production_clean.pkl"), "wb") as f:
pickle.dump(prod_clean, f)
pd.DataFrame(quality_report).to_csv(os.path.join(CFG.PROC_DIR, "quality_report.csv"), index=False)
with open(os.path.join(CFG.PROC_DIR, "phase_boundaries.json"), "w") as f:
json.dump(phase_boundaries, f, indent=4)
with open(os.path.join(CFG.PROC_DIR, "outlier_report.json"), "w") as f:
json.dump(outlier_report, f, indent=4)
with open(os.path.join(CFG.PROC_DIR, "scaler.pkl"), "wb") as f:
pickle.dump(scaler, f)
with open(os.path.join(CFG.PROC_DIR, "train_batch_ids.pkl"), "wb") as f:
pickle.dump(train_ids, f)
with open(os.path.join(CFG.PROC_DIR, "test_batch_ids.pkl"), "wb") as f:
pickle.dump(test_ids, f)
print(f"Layer 0 complete. Processed {len(batch_ids)} batches.")
print("="*60)
print(f"LAYER 0 COMPLETE")
print(f" Output shape: {clean_process.shape}")
print(f" NaN count: {clean_process.isna().sum().sum()}")
print("="*60)
if __name__ == "__main__":
main()
|