File size: 8,275 Bytes
038ee19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
"""
Layer 0: Preprocessing
- Load Excel data
- Clean and normalize time-series signal data
- Record phase boundaries and flag outliers
- Save cleaned data for feature extraction
"""

import pandas as pd
import numpy as np
import os
import json
import pickle
from sklearn.preprocessing import StandardScaler
import sys

# Add parent directory to path for config import
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import CFG

# Removed redundant function - See line 87 for the implementation used during inference.

def get_phase_boundaries(batch_df: pd.DataFrame) -> dict:
    """Extracts start/end minutes for each phase."""
    boundaries = {}
    for phase in CFG.PHASES:
        p_data = batch_df[batch_df["Phase"] == phase]
        if not p_data.empty:
            boundaries[phase] = {
                "start": int(p_data["Time_Minutes"].min()),
                "end": int(p_data["Time_Minutes"].max()),
                "duration": int(p_data["Time_Minutes"].max() - p_data["Time_Minutes"].min())
            }
        else:
            boundaries[phase] = {"start": None, "end": None, "duration": 0}
    return boundaries

def flag_outliers(batch_df: pd.DataFrame) -> dict:
    """Flags outliers using 3xIQR rule per phase per signal."""
    flags = {}
    for phase in CFG.PHASES:
        p_df = batch_df[batch_df["Phase"] == phase]
        if p_df.empty: continue
        
        for sig in CFG.SIGNAL_COLS:
            if sig not in p_df.columns: continue
            data = p_df[sig].dropna()
            if len(data) < 4: continue
            
            q1 = data.quantile(0.25)
            q3 = data.quantile(0.75)
            iqr = q3 - q1
            if iqr == 0: continue # Skip constant signals
            
            lower = q1 - 3 * iqr
            upper = q3 + 3 * iqr
            
            outliers = p_df[(p_df[sig] < lower) | (p_df[sig] > upper)]
            if len(outliers) > 0:
                flags[f"{phase}__{sig}"] = len(outliers)
    return flags

def preprocess_single_batch(raw_df: pd.DataFrame) -> pd.DataFrame:
    """
    Real-time handling for a single new batch's raw time-series data.
    Applicable during inference. Uses saved scaler.
    """
    df = raw_df.copy()
    # Deduplication
    if "Time_Minutes" in df.columns:
        df = df.drop_duplicates(subset=["Time_Minutes"])
        df["Time_Minutes"] = df["Time_Minutes"].fillna(0).astype(int)
        
    # Standardize signals to numeric
    for col in CFG.SIGNAL_COLS:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')
            
    # ffill().bfill() for missing values
    df = df.ffill().bfill()
    
    # Load and apply scaler
    scaler_path = os.path.join(CFG.PROC_DIR, "scaler.pkl")
    if os.path.exists(scaler_path):
        with open(scaler_path, "rb") as f:
            scaler = pickle.load(f)
        # Handle cases where some signal columns might be missing in input
        for col in CFG.SIGNAL_COLS:
            if col not in df.columns:
                df[col] = 0.0 # Default fallback
        df[CFG.SIGNAL_COLS] = scaler.transform(df[CFG.SIGNAL_COLS])
    else:
        print("Warning: Scaler not found. Returning unscaled data.")
        
    return df

def main():
    CFG.make_dirs()
    print(">>> Starting Layer 0: Preprocessing")
    
    # 1. Ingest Process Data (60 sheets)
    print(f"Reading process data from {CFG.PROCESS_FILE}...")
    xls = pd.ExcelFile(CFG.PROCESS_FILE)
    batch_dfs = []
    for sheet in xls.sheet_names:
        if "Summary" in sheet: continue
        df = pd.read_excel(xls, sheet_name=sheet)
        # Handle "Batch T011", "Batch_T011", "Batch T011 "
        batch_id = sheet.replace("Batch", "").replace("_", "").replace(" ", "").strip()
        df["Batch_ID"] = batch_id
        # Standardize Time_Minutes as int
        if "Time_Minutes" in df.columns:
            df["Time_Minutes"] = df["Time_Minutes"].fillna(0).astype(int)
        batch_dfs.append(df)
    
    full_process = pd.concat(batch_dfs, ignore_index=True)
    # Deduplication on (Batch_ID, Time_Minutes)
    full_process = full_process.drop_duplicates(subset=["Batch_ID", "Time_Minutes"])
    
    # 2. Ingest Production Data
    print(f"Reading production data from {CFG.PROD_FILE}...")
    prod_df = pd.read_excel(CFG.PROD_FILE, sheet_name="BatchData")
    # Rename 'Batch' to 'Batch_ID' if needed
    if "Batch" in prod_df.columns:
        prod_df = prod_df.rename(columns={"Batch": "Batch_ID"})
    prod_df["Batch_ID"] = prod_df["Batch_ID"].astype(str).str.replace("Batch", "").str.replace("_", "").str.replace(" ", "").str.strip()
    
    # 3. Clean and process per batch
    clean_batches = []
    phase_boundaries = {}
    outlier_report = {}
    quality_report = []
    
    batch_ids = full_process["Batch_ID"].unique()
    print(f"Processing {len(batch_ids)} batches...")
    
    for bid in batch_ids:
        batch_df = full_process[full_process["Batch_ID"] == bid].sort_values("Time_Minutes").copy()
        
        # Extract boundaries
        boundaries = get_phase_boundaries(batch_df)
        phase_boundaries[bid] = boundaries
        
        # Quality check: count phases
        present_phases = [p for p, v in boundaries.items() if v["start"] is not None]
        status = "PASS" if len(present_phases) >= 7 else "FLAG" # Mark if > 1 phase missing
        quality_report.append({"Batch_ID": bid, "Status": status, "Phases_Found": len(present_phases)})
        
        # Flag outliers
        outlier_report[bid] = flag_outliers(batch_df)
        
        # Convert signals to numeric
        for col in CFG.SIGNAL_COLS:
            if col in batch_df.columns:
                batch_df[col] = pd.to_numeric(batch_df[col], errors='coerce')
        
        # ffill().bfill() per batch
        batch_df = batch_df.ffill().bfill()
        clean_batches.append(batch_df)
        
    clean_process = pd.concat(clean_batches, ignore_index=True)
    
    # 4. Global median fill for any remaining NaNs
    for col in CFG.SIGNAL_COLS:
        if col in clean_process.columns:
            median_val = clean_process[col].median()
            clean_process[col] = clean_process[col].fillna(median_val)
            
    # 5. Split and Normalize
    np.random.seed(CFG.RANDOM_STATE)
    test_size = int(len(batch_ids) * CFG.TEST_SPLIT)
    test_ids = list(np.random.choice(batch_ids, size=test_size, replace=False))
    train_ids = [b for b in batch_ids if b not in test_ids]
    
    scaler = StandardScaler()
    train_df = clean_process[clean_process["Batch_ID"].isin(train_ids)]
    scaler.fit(train_df[CFG.SIGNAL_COLS])
    
    # Transform all
    clean_process[CFG.SIGNAL_COLS] = scaler.transform(clean_process[CFG.SIGNAL_COLS])
    
    # 6. Save Artifacts
    print(f"Saving outputs to {CFG.PROC_DIR}...")
    with open(os.path.join(CFG.PROC_DIR, "process_clean.pkl"), "wb") as f:
        pickle.dump(clean_process, f)
        
    # production_clean.pkl: only use TARGET_COLS + PARAM_COLS + Batch_ID
    cols_prod = ["Batch_ID"] + CFG.TARGET_COLS + CFG.PARAM_COLS
    prod_clean = prod_df[prod_df["Batch_ID"].isin(batch_ids)][cols_prod]
    with open(os.path.join(CFG.PROC_DIR, "production_clean.pkl"), "wb") as f:
        pickle.dump(prod_clean, f)
        
    pd.DataFrame(quality_report).to_csv(os.path.join(CFG.PROC_DIR, "quality_report.csv"), index=False)
    with open(os.path.join(CFG.PROC_DIR, "phase_boundaries.json"), "w") as f:
        json.dump(phase_boundaries, f, indent=4)
    with open(os.path.join(CFG.PROC_DIR, "outlier_report.json"), "w") as f:
        json.dump(outlier_report, f, indent=4)
    with open(os.path.join(CFG.PROC_DIR, "scaler.pkl"), "wb") as f:
        pickle.dump(scaler, f)
    with open(os.path.join(CFG.PROC_DIR, "train_batch_ids.pkl"), "wb") as f:
        pickle.dump(train_ids, f)
    with open(os.path.join(CFG.PROC_DIR, "test_batch_ids.pkl"), "wb") as f:
        pickle.dump(test_ids, f)
        
    print(f"Layer 0 complete. Processed {len(batch_ids)} batches.")
    print("="*60)
    print(f"LAYER 0 COMPLETE")
    print(f"   Output shape: {clean_process.shape}")
    print(f"   NaN count:    {clean_process.isna().sum().sum()}")
    print("="*60)

if __name__ == "__main__":
    main()