23f3002638
Initial commit with LFS tracking
038ee19
"""
Layer 0: Preprocessing
- Load Excel data
- Clean and normalize time-series signal data
- Record phase boundaries and flag outliers
- Save cleaned data for feature extraction
"""
import pandas as pd
import numpy as np
import os
import json
import pickle
from sklearn.preprocessing import StandardScaler
import sys
# Add parent directory to path for config import
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import CFG
# Removed redundant function - See line 87 for the implementation used during inference.
def get_phase_boundaries(batch_df: pd.DataFrame) -> dict:
"""Extracts start/end minutes for each phase."""
boundaries = {}
for phase in CFG.PHASES:
p_data = batch_df[batch_df["Phase"] == phase]
if not p_data.empty:
boundaries[phase] = {
"start": int(p_data["Time_Minutes"].min()),
"end": int(p_data["Time_Minutes"].max()),
"duration": int(p_data["Time_Minutes"].max() - p_data["Time_Minutes"].min())
}
else:
boundaries[phase] = {"start": None, "end": None, "duration": 0}
return boundaries
def flag_outliers(batch_df: pd.DataFrame) -> dict:
"""Flags outliers using 3xIQR rule per phase per signal."""
flags = {}
for phase in CFG.PHASES:
p_df = batch_df[batch_df["Phase"] == phase]
if p_df.empty: continue
for sig in CFG.SIGNAL_COLS:
if sig not in p_df.columns: continue
data = p_df[sig].dropna()
if len(data) < 4: continue
q1 = data.quantile(0.25)
q3 = data.quantile(0.75)
iqr = q3 - q1
if iqr == 0: continue # Skip constant signals
lower = q1 - 3 * iqr
upper = q3 + 3 * iqr
outliers = p_df[(p_df[sig] < lower) | (p_df[sig] > upper)]
if len(outliers) > 0:
flags[f"{phase}__{sig}"] = len(outliers)
return flags
def preprocess_single_batch(raw_df: pd.DataFrame) -> pd.DataFrame:
"""
Real-time handling for a single new batch's raw time-series data.
Applicable during inference. Uses saved scaler.
"""
df = raw_df.copy()
# Deduplication
if "Time_Minutes" in df.columns:
df = df.drop_duplicates(subset=["Time_Minutes"])
df["Time_Minutes"] = df["Time_Minutes"].fillna(0).astype(int)
# Standardize signals to numeric
for col in CFG.SIGNAL_COLS:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# ffill().bfill() for missing values
df = df.ffill().bfill()
# Load and apply scaler
scaler_path = os.path.join(CFG.PROC_DIR, "scaler.pkl")
if os.path.exists(scaler_path):
with open(scaler_path, "rb") as f:
scaler = pickle.load(f)
# Handle cases where some signal columns might be missing in input
for col in CFG.SIGNAL_COLS:
if col not in df.columns:
df[col] = 0.0 # Default fallback
df[CFG.SIGNAL_COLS] = scaler.transform(df[CFG.SIGNAL_COLS])
else:
print("Warning: Scaler not found. Returning unscaled data.")
return df
def main():
CFG.make_dirs()
print(">>> Starting Layer 0: Preprocessing")
# 1. Ingest Process Data (60 sheets)
print(f"Reading process data from {CFG.PROCESS_FILE}...")
xls = pd.ExcelFile(CFG.PROCESS_FILE)
batch_dfs = []
for sheet in xls.sheet_names:
if "Summary" in sheet: continue
df = pd.read_excel(xls, sheet_name=sheet)
# Handle "Batch T011", "Batch_T011", "Batch T011 "
batch_id = sheet.replace("Batch", "").replace("_", "").replace(" ", "").strip()
df["Batch_ID"] = batch_id
# Standardize Time_Minutes as int
if "Time_Minutes" in df.columns:
df["Time_Minutes"] = df["Time_Minutes"].fillna(0).astype(int)
batch_dfs.append(df)
full_process = pd.concat(batch_dfs, ignore_index=True)
# Deduplication on (Batch_ID, Time_Minutes)
full_process = full_process.drop_duplicates(subset=["Batch_ID", "Time_Minutes"])
# 2. Ingest Production Data
print(f"Reading production data from {CFG.PROD_FILE}...")
prod_df = pd.read_excel(CFG.PROD_FILE, sheet_name="BatchData")
# Rename 'Batch' to 'Batch_ID' if needed
if "Batch" in prod_df.columns:
prod_df = prod_df.rename(columns={"Batch": "Batch_ID"})
prod_df["Batch_ID"] = prod_df["Batch_ID"].astype(str).str.replace("Batch", "").str.replace("_", "").str.replace(" ", "").str.strip()
# 3. Clean and process per batch
clean_batches = []
phase_boundaries = {}
outlier_report = {}
quality_report = []
batch_ids = full_process["Batch_ID"].unique()
print(f"Processing {len(batch_ids)} batches...")
for bid in batch_ids:
batch_df = full_process[full_process["Batch_ID"] == bid].sort_values("Time_Minutes").copy()
# Extract boundaries
boundaries = get_phase_boundaries(batch_df)
phase_boundaries[bid] = boundaries
# Quality check: count phases
present_phases = [p for p, v in boundaries.items() if v["start"] is not None]
status = "PASS" if len(present_phases) >= 7 else "FLAG" # Mark if > 1 phase missing
quality_report.append({"Batch_ID": bid, "Status": status, "Phases_Found": len(present_phases)})
# Flag outliers
outlier_report[bid] = flag_outliers(batch_df)
# Convert signals to numeric
for col in CFG.SIGNAL_COLS:
if col in batch_df.columns:
batch_df[col] = pd.to_numeric(batch_df[col], errors='coerce')
# ffill().bfill() per batch
batch_df = batch_df.ffill().bfill()
clean_batches.append(batch_df)
clean_process = pd.concat(clean_batches, ignore_index=True)
# 4. Global median fill for any remaining NaNs
for col in CFG.SIGNAL_COLS:
if col in clean_process.columns:
median_val = clean_process[col].median()
clean_process[col] = clean_process[col].fillna(median_val)
# 5. Split and Normalize
np.random.seed(CFG.RANDOM_STATE)
test_size = int(len(batch_ids) * CFG.TEST_SPLIT)
test_ids = list(np.random.choice(batch_ids, size=test_size, replace=False))
train_ids = [b for b in batch_ids if b not in test_ids]
scaler = StandardScaler()
train_df = clean_process[clean_process["Batch_ID"].isin(train_ids)]
scaler.fit(train_df[CFG.SIGNAL_COLS])
# Transform all
clean_process[CFG.SIGNAL_COLS] = scaler.transform(clean_process[CFG.SIGNAL_COLS])
# 6. Save Artifacts
print(f"Saving outputs to {CFG.PROC_DIR}...")
with open(os.path.join(CFG.PROC_DIR, "process_clean.pkl"), "wb") as f:
pickle.dump(clean_process, f)
# production_clean.pkl: only use TARGET_COLS + PARAM_COLS + Batch_ID
cols_prod = ["Batch_ID"] + CFG.TARGET_COLS + CFG.PARAM_COLS
prod_clean = prod_df[prod_df["Batch_ID"].isin(batch_ids)][cols_prod]
with open(os.path.join(CFG.PROC_DIR, "production_clean.pkl"), "wb") as f:
pickle.dump(prod_clean, f)
pd.DataFrame(quality_report).to_csv(os.path.join(CFG.PROC_DIR, "quality_report.csv"), index=False)
with open(os.path.join(CFG.PROC_DIR, "phase_boundaries.json"), "w") as f:
json.dump(phase_boundaries, f, indent=4)
with open(os.path.join(CFG.PROC_DIR, "outlier_report.json"), "w") as f:
json.dump(outlier_report, f, indent=4)
with open(os.path.join(CFG.PROC_DIR, "scaler.pkl"), "wb") as f:
pickle.dump(scaler, f)
with open(os.path.join(CFG.PROC_DIR, "train_batch_ids.pkl"), "wb") as f:
pickle.dump(train_ids, f)
with open(os.path.join(CFG.PROC_DIR, "test_batch_ids.pkl"), "wb") as f:
pickle.dump(test_ids, f)
print(f"Layer 0 complete. Processed {len(batch_ids)} batches.")
print("="*60)
print(f"LAYER 0 COMPLETE")
print(f" Output shape: {clean_process.shape}")
print(f" NaN count: {clean_process.isna().sum().sum()}")
print("="*60)
if __name__ == "__main__":
main()