""" Data Cleaning Tools Tools for handling missing values, outliers, and data type issues. """ import polars as pl import numpy as np from typing import Dict, Any, List, Optional from pathlib import Path import sys import os # Add parent directory to path for imports sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from ..utils.polars_helpers import ( load_dataframe, save_dataframe, get_numeric_columns, get_categorical_columns, get_datetime_columns, detect_id_columns, ) from ..utils.validation import ( validate_file_exists, validate_file_format, validate_dataframe, validate_columns_exist, ) def clean_missing_values(file_path: str, strategy, output_path: str, threshold: float = 0.4) -> Dict[str, Any]: """ Handle missing values using appropriate strategies with smart threshold-based column dropping. Args: file_path: Path to CSV or Parquet file strategy: Either "auto" (string) to automatically decide strategies for all columns, or a dictionary mapping column names to strategies ('median', 'mean', 'mode', 'forward_fill', 'drop') output_path: Path to save cleaned dataset threshold: For "auto" strategy, drop columns with missing % > threshold (default: 0.4 = 40%) Returns: Dictionary with cleaning report Auto Strategy Behavior: 1. Drop columns with >threshold missing (default 40%) 2. Impute numeric columns with median 3. Impute categorical columns with mode 4. Forward-fill for time series columns """ # Validation validate_file_exists(file_path) validate_file_format(file_path) # Load data df = load_dataframe(file_path) validate_dataframe(df) # Get column type information numeric_cols = get_numeric_columns(df) categorical_cols = get_categorical_columns(df) datetime_cols = get_datetime_columns(df) id_cols = detect_id_columns(df) report = { "original_rows": len(df), "original_columns": len(df.columns), "columns_dropped": [], "columns_processed": {}, "rows_dropped": 0, "threshold_used": threshold } # Handle string strategy modes if isinstance(strategy, str): if strategy == "auto": # Step 1: Identify and drop high-missing columns (>threshold) cols_to_drop = [] for col in df.columns: null_count = df[col].null_count() null_pct = null_count / len(df) if len(df) > 0 else 0 if null_pct > threshold: cols_to_drop.append(col) report["columns_dropped"].append({ "column": col, "missing_percentage": round(null_pct * 100, 2), "reason": f"Missing >{threshold*100}% of values" }) # Drop high-missing columns if cols_to_drop: df = df.drop(cols_to_drop) print(f"đŸ—‘ī¸ Dropped {len(cols_to_drop)} columns with >{threshold*100}% missing:") for col_info in report["columns_dropped"]: print(f" - {col_info['column']} ({col_info['missing_percentage']}% missing)") # Step 2: Build strategy for remaining columns strategy = {} for col in df.columns: if df[col].null_count() > 0: if col in id_cols: strategy[col] = "drop" # Drop rows with missing IDs elif col in datetime_cols: strategy[col] = "forward_fill" # Forward fill for time series elif col in numeric_cols: strategy[col] = "median" # Median for numeric (robust to outliers) elif col in categorical_cols: strategy[col] = "mode" # Mode for categorical else: strategy[col] = "mode" # Default to mode print(f"🔧 Auto-detected strategies for {len(strategy)} remaining columns with missing values") elif strategy in ["median", "mean", "mode", "forward_fill", "drop"]: # Apply same strategy to all columns with missing values strategy_dict = {} for col in df.columns: if df[col].null_count() > 0: strategy_dict[col] = strategy strategy = strategy_dict print(f"🔧 Applying '{list(strategy_dict.values())[0] if strategy_dict else strategy}' strategy to {len(strategy_dict)} columns with missing values") elif strategy in ["iterative", "mice"]: # MICE / Iterative Imputation using sklearn IterativeImputer # This handles ALL numeric columns at once (multivariate imputation) print(f"🔧 Applying Iterative (MICE) imputation to numeric columns...") try: from sklearn.experimental import enable_iterative_imputer # noqa: F401 from sklearn.impute import IterativeImputer from sklearn.linear_model import BayesianRidge import pandas as pd # Identify numeric columns with missing values numeric_cols_with_nulls = [ col for col in numeric_cols if df[col].null_count() > 0 ] if not numeric_cols_with_nulls: print(" â„šī¸ No numeric columns with missing values for MICE imputation") else: # Convert numeric columns to pandas for IterativeImputer df_pd = df.select(numeric_cols).to_pandas() # Fit and transform imputer = IterativeImputer( estimator=BayesianRidge(), max_iter=10, random_state=42, missing_values=float('nan') ) imputed_data = imputer.fit_transform(df_pd) # Replace columns back in Polars DataFrame for i, col_name in enumerate(numeric_cols): df = df.with_columns( pl.Series(col_name, imputed_data[:, i]) ) for col_name in numeric_cols_with_nulls: report["columns_processed"][col_name] = { "status": "success", "strategy": "iterative_mice", "nulls_before": int(df[col_name].null_count()), # Should be 0 now "nulls_after": 0 } print(f" ✅ MICE imputed {len(numeric_cols_with_nulls)} numeric columns using {len(numeric_cols)} features") # Handle remaining non-numeric columns with mode for col in df.columns: if df[col].null_count() > 0 and col not in numeric_cols: mode_val = df[col].drop_nulls().mode().first() if mode_val is not None: df = df.with_columns( pl.col(col).fill_null(mode_val).alias(col) ) report["columns_processed"][col] = { "status": "success", "strategy": "mode (non-numeric fallback)", "nulls_before": int(df[col].null_count()), "nulls_after": 0 } except ImportError: return { "success": False, "error": "IterativeImputer requires scikit-learn >= 1.4. Install with: pip install scikit-learn>=1.4", "error_type": "MissingDependency" } # Skip per-column processing for MICE (already handled above) strategy = {} else: return { "success": False, "error": f"Invalid strategy '{strategy}'. Use 'auto', 'median', 'mean', 'mode', 'forward_fill', 'drop', 'iterative', 'mice', or provide a dictionary.", "error_type": "ValueError" } # Process each column based on strategy for col, strat in strategy.items(): if col not in df.columns: report["columns_processed"][col] = { "status": "error", "message": f"Column not found (may have been dropped)" } continue null_count_before = df[col].null_count() if null_count_before == 0: report["columns_processed"][col] = { "status": "skipped", "message": "No missing values" } continue # Don't impute ID columns - drop rows instead if col in id_cols and strat != "drop": report["columns_processed"][col] = { "status": "skipped", "message": "ID column - not imputed (use 'drop' to remove rows)" } continue # Apply strategy try: rows_before = len(df) if strat == "median": if col in numeric_cols: median_val = df[col].median() df = df.with_columns( pl.col(col).fill_null(median_val).alias(col) ) report["columns_processed"][col] = { "status": "success", "strategy": "median", "nulls_before": int(null_count_before), "nulls_after": int(df[col].null_count()), "fill_value": float(median_val) } else: report["columns_processed"][col] = { "status": "error", "message": "Cannot use median on non-numeric column" } continue elif strat == "mean": if col in numeric_cols: mean_val = df[col].mean() df = df.with_columns( pl.col(col).fill_null(mean_val).alias(col) ) report["columns_processed"][col] = { "status": "success", "strategy": "mean", "nulls_before": int(null_count_before), "nulls_after": int(df[col].null_count()), "fill_value": float(mean_val) } else: report["columns_processed"][col] = { "status": "error", "message": "Cannot use mean on non-numeric column" } continue elif strat == "mode": mode_val = df[col].drop_nulls().mode().first() if mode_val is not None: df = df.with_columns( pl.col(col).fill_null(mode_val).alias(col) ) report["columns_processed"][col] = { "status": "success", "strategy": "mode", "nulls_before": int(null_count_before), "nulls_after": int(df[col].null_count()), "fill_value": str(mode_val) } elif strat == "forward_fill": df = df.with_columns( pl.col(col).forward_fill().alias(col) ) report["columns_processed"][col] = { "status": "success", "strategy": "forward_fill", "nulls_before": int(null_count_before), "nulls_after": int(df[col].null_count()) } elif strat == "drop": df = df.filter(pl.col(col).is_not_null()) rows_after = len(df) report["columns_processed"][col] = { "status": "success", "strategy": "drop", "nulls_before": int(null_count_before), "rows_dropped": rows_before - rows_after } else: report["columns_processed"][col] = { "status": "error", "message": f"Unknown strategy: {strat}" } continue except Exception as e: report["columns_processed"][col] = { "status": "error", "message": str(e) } report["final_rows"] = len(df) report["final_columns"] = len(df.columns) report["rows_dropped"] = report["original_rows"] - report["final_rows"] report["columns_dropped_count"] = len(report["columns_dropped"]) # Save cleaned dataset Path(output_path).parent.mkdir(parents=True, exist_ok=True) save_dataframe(df, output_path) report["output_path"] = output_path # Summary message report["message"] = f"Cleaned {report['original_rows']} rows → {report['final_rows']} rows. " report["message"] += f"Dropped {report['columns_dropped_count']} columns. " report["message"] += f"Processed {len([c for c in report['columns_processed'].values() if c['status'] == 'success'])} columns." return report def handle_outliers(file_path: str, strategy: str, columns: List[str], output_path: str) -> Dict[str, Any]: """ Detect and handle outliers in numeric columns. Args: file_path: Path to CSV or Parquet file strategy: Method to handle outliers ('clip', 'cap', 'winsorize', 'remove') columns: List of columns to check, or ['all'] for all numeric columns output_path: Path to save cleaned dataset Returns: Dictionary with outlier handling report """ # Validation validate_file_exists(file_path) validate_file_format(file_path) # Load data df = load_dataframe(file_path) validate_dataframe(df) # Determine which columns to process numeric_cols = get_numeric_columns(df) if columns == ["all"]: target_cols = numeric_cols else: # Filter to only existing numeric columns (auto-skip dropped columns) target_cols = [] for col in columns: if col not in df.columns: print(f"âš ī¸ Skipping '{col}' - column was dropped in previous step") continue if col not in numeric_cols: print(f"âš ī¸ Skipping '{col}' - not numeric") continue target_cols.append(col) # If no valid columns remain, return early if not target_cols: return { "success": False, "error": f"None of the requested columns exist in the dataset. Available numeric columns: {', '.join(numeric_cols[:20])}", "error_type": "ValueError" } report = { "original_rows": len(df), "strategy": strategy, "columns_processed": {} } # Process each column for col in target_cols: col_data = df[col].drop_nulls() if len(col_data) == 0: report["columns_processed"][col] = { "status": "skipped", "message": "All values are null" } continue # Calculate IQR bounds q1 = col_data.quantile(0.25) q3 = col_data.quantile(0.75) iqr = q3 - q1 lower_bound = q1 - 1.5 * iqr upper_bound = q3 + 1.5 * iqr # Count outliers outliers_mask = (df[col] < lower_bound) | (df[col] > upper_bound) outlier_count = outliers_mask.sum() if outlier_count == 0: report["columns_processed"][col] = { "status": "skipped", "message": "No outliers detected" } continue # Apply strategy if strategy == "clip" or strategy == "cap": # Clip/cap values to bounds df = df.with_columns( pl.col(col).clip(lower_bound, upper_bound).alias(col) ) elif strategy == "winsorize": # Winsorize: cap at 1st and 99th percentiles p1 = col_data.quantile(0.01) p99 = col_data.quantile(0.99) df = df.with_columns( pl.col(col).clip(p1, p99).alias(col) ) elif strategy == "remove": # Remove rows with outliers df = df.filter(~outliers_mask) report["columns_processed"][col] = { "status": "success", "outliers_detected": int(outlier_count), "bounds": { "lower": float(lower_bound), "upper": float(upper_bound) } } report["final_rows"] = len(df) report["rows_dropped"] = report["original_rows"] - report["final_rows"] # Save cleaned dataset Path(output_path).parent.mkdir(parents=True, exist_ok=True) save_dataframe(df, output_path) report["output_path"] = output_path return report def fix_data_types(file_path: str, type_mapping: Optional[Dict[str, str]] = None, output_path: str = None) -> Dict[str, Any]: """ Auto-detect and fix incorrect data types. Args: file_path: Path to CSV or Parquet file type_mapping: Optional dictionary mapping columns to target types ('int', 'float', 'string', 'date', 'bool', 'category') Use 'auto' or None for automatic detection output_path: Path to save dataset with fixed types Returns: Dictionary with type fixing report """ # Validation validate_file_exists(file_path) validate_file_format(file_path) # Load data df = load_dataframe(file_path) validate_dataframe(df) if type_mapping is None or type_mapping == {"auto": "auto"}: type_mapping = {} report = { "columns_processed": {} } for col in df.columns: original_dtype = str(df[col].dtype) # Get target type from mapping or auto-detect if col in type_mapping and type_mapping[col] != "auto": target_type = type_mapping[col] else: # Auto-detect target type target_type = _auto_detect_type(df[col]) if target_type is None: report["columns_processed"][col] = { "status": "skipped", "original_dtype": original_dtype, "message": "Could not auto-detect type" } continue # Try to convert try: if target_type == "int": df = df.with_columns( pl.col(col).cast(pl.Int64, strict=False).alias(col) ) elif target_type == "float": df = df.with_columns( pl.col(col).cast(pl.Float64, strict=False).alias(col) ) elif target_type == "string": df = df.with_columns( pl.col(col).cast(pl.Utf8).alias(col) ) elif target_type == "date": df = df.with_columns( pl.col(col).str.strptime(pl.Date, "%Y-%m-%d", strict=False).alias(col) ) elif target_type == "bool": df = df.with_columns( pl.col(col).cast(pl.Boolean, strict=False).alias(col) ) elif target_type == "category": df = df.with_columns( pl.col(col).cast(pl.Categorical).alias(col) ) new_dtype = str(df[col].dtype) report["columns_processed"][col] = { "status": "success", "original_dtype": original_dtype, "new_dtype": new_dtype, "target_type": target_type } except Exception as e: report["columns_processed"][col] = { "status": "error", "original_dtype": original_dtype, "target_type": target_type, "message": str(e) } # Save dataset Path(output_path).parent.mkdir(parents=True, exist_ok=True) save_dataframe(df, output_path) report["output_path"] = output_path return report def _auto_detect_type(series: pl.Series) -> Optional[str]: """ Auto-detect appropriate type for a series. Args: series: Polars series Returns: Detected type string or None """ # Already correct type if series.dtype in pl.NUMERIC_DTYPES: return None if series.dtype in [pl.Date, pl.Datetime]: return None # Try to detect from string values if series.dtype == pl.Utf8: sample = series.drop_nulls().head(100) if len(sample) == 0: return None # Check for boolean unique_vals = set(str(v).lower() for v in sample.to_list()) if unique_vals.issubset({'true', 'false', '1', '0', 'yes', 'no', 't', 'f'}): return "bool" # Check for numeric try: sample.cast(pl.Float64) # Check if all are integers if all('.' not in str(v) for v in sample.to_list() if v is not None): return "int" return "float" except: pass # Check for date try: sample.str.strptime(pl.Date, "%Y-%m-%d", strict=False) return "date" except: pass # Check if should be categorical (low cardinality) n_unique = series.n_unique() if n_unique < len(series) * 0.5 and n_unique < 100: return "category" return None