"""Model validator for SentinelEdge Hub. Validates new global model weights against a held-out validation set derived from public FTC fraud report data. Implements Byzantine fault tolerance by rejecting model updates that degrade F1 by more than 2%. """ import logging import os import numpy as np logger = logging.getLogger(__name__) # Number of features the model expects (phone call fraud features). DEFAULT_N_FEATURES = 25 def _sigmoid(x: np.ndarray) -> np.ndarray: """Numerically stable sigmoid.""" return np.where( x >= 0, 1.0 / (1.0 + np.exp(-x)), np.exp(x) / (1.0 + np.exp(x)), ) def _generate_synthetic_validation_data( n_samples: int = 2000, n_features: int = DEFAULT_N_FEATURES, fraud_ratio: float = 0.15, seed: int = 42, ) -> tuple[np.ndarray, np.ndarray]: """Generate a synthetic validation set mimicking FTC fraud report patterns. This produces a reproducible dataset with realistic class imbalance (~15% fraud) and feature distributions inspired by phone call fraud indicators: call duration, time of day, caller ID mismatch, etc. Args: n_samples: Total number of validation samples. n_features: Number of features per sample. fraud_ratio: Fraction of positive (fraud) samples. seed: Random seed for reproducibility. Returns: Tuple of (X, y) where X is (n_samples, n_features) and y is (n_samples,). """ rng = np.random.RandomState(seed) n_fraud = int(n_samples * fraud_ratio) n_legit = n_samples - n_fraud # Legitimate calls: features drawn from a "normal" distribution X_legit = rng.randn(n_legit, n_features) * 0.8 # Fraud calls: shifted distribution -- certain features are elevated X_fraud = rng.randn(n_fraud, n_features) * 1.0 # Fraud indicators: features 0-4 tend to be elevated in fraud calls X_fraud[:, 0] += 2.0 # call duration anomaly X_fraud[:, 1] += 1.5 # time-of-day anomaly X_fraud[:, 2] += 1.8 # caller-id mismatch score X_fraud[:, 3] += 1.2 # rapid callback pattern X_fraud[:, 4] += 1.0 # geographic anomaly X = np.vstack([X_legit, X_fraud]) y = np.concatenate([np.zeros(n_legit), np.ones(n_fraud)]) # Shuffle perm = rng.permutation(n_samples) X = X[perm] y = y[perm] return X, y class ModelValidator: """Validates global model against held-out validation data. If a validation_data_path is provided and exists, loads it as a numpy .npz file with keys 'X' and 'y'. Otherwise, generates a reproducible synthetic dataset. """ def __init__( self, validation_data_path: str = "validation_data.npz", n_features: int = DEFAULT_N_FEATURES, ): """Load or generate validation dataset. Args: validation_data_path: Path to .npz file with 'X' and 'y' keys. n_features: Expected number of features (used for synthetic data). """ self.n_features = n_features self.X: np.ndarray self.y: np.ndarray if os.path.exists(validation_data_path): try: data = np.load(validation_data_path) self.X = data["X"] self.y = data["y"] logger.info( "Loaded validation data from %s: %d samples", validation_data_path, len(self.y), ) except Exception as e: logger.warning( "Failed to load %s (%s), generating synthetic data", validation_data_path, e, ) self.X, self.y = _generate_synthetic_validation_data( n_features=n_features ) else: logger.info( "Validation data file not found at %s, " "generating synthetic validation set", validation_data_path, ) self.X, self.y = _generate_synthetic_validation_data( n_features=n_features ) logger.info( "Validation set: %d samples, %.1f%% fraud", len(self.y), 100.0 * np.mean(self.y), ) def _predict(self, model_weights: np.ndarray, threshold: float = 0.5) -> np.ndarray: """Run logistic regression prediction with the given weights. The model is a simple linear model: y_hat = sigmoid(X @ w). Weights vector is expected to be of length n_features (no bias) or n_features+1 (last element is bias). Args: model_weights: Weight vector. threshold: Classification threshold. Returns: Binary predictions array. """ w = np.array(model_weights, dtype=np.float64) if len(w) == self.X.shape[1] + 1: # Last element is bias logits = self.X @ w[:-1] + w[-1] elif len(w) == self.X.shape[1]: logits = self.X @ w else: # Dimension mismatch: truncate or pad with zeros logger.warning( "Weight dimension %d != feature dimension %d, adjusting", len(w), self.X.shape[1], ) adjusted = np.zeros(self.X.shape[1], dtype=np.float64) n = min(len(w), self.X.shape[1]) adjusted[:n] = w[:n] logits = self.X @ adjusted probs = _sigmoid(logits) return (probs >= threshold).astype(np.float64) def validate( self, model_weights: np.ndarray, previous_f1: float ) -> tuple[float, bool]: """Test model, return (f1_score, should_accept). Reject if F1 drops more than 2% from previous round (Byzantine fault tolerance). Args: model_weights: New global model weight vector. previous_f1: F1 score from the previous round. Returns: Tuple of (f1_score, should_accept). """ metrics = self.compute_metrics(model_weights) f1 = metrics["f1"] # Accept if F1 is within 2% of previous, or improved max_allowed_drop = 0.02 should_accept = f1 >= (previous_f1 - max_allowed_drop) if not should_accept: logger.warning( "Model REJECTED: F1=%.4f, previous=%.4f, drop=%.4f > %.4f", f1, previous_f1, previous_f1 - f1, max_allowed_drop, ) else: logger.info( "Model ACCEPTED: F1=%.4f (previous=%.4f, delta=%+.4f)", f1, previous_f1, f1 - previous_f1, ) return f1, should_accept def compute_metrics(self, model_weights: np.ndarray) -> dict: """Compute full classification metrics. Args: model_weights: Model weight vector. Returns: Dict with keys: accuracy, precision, recall, f1, auc. """ y_pred = self._predict(model_weights) y_true = self.y tp = float(np.sum((y_pred == 1) & (y_true == 1))) fp = float(np.sum((y_pred == 1) & (y_true == 0))) fn = float(np.sum((y_pred == 0) & (y_true == 1))) tn = float(np.sum((y_pred == 0) & (y_true == 0))) accuracy = (tp + tn) / max(tp + tn + fp + fn, 1) precision = tp / max(tp + fp, 1e-10) recall = tp / max(tp + fn, 1e-10) f1 = ( 2 * precision * recall / max(precision + recall, 1e-10) ) # Approximate AUC using the trapezoidal rule across thresholds auc = self._compute_auc(model_weights) return { "accuracy": round(accuracy, 4), "precision": round(precision, 4), "recall": round(recall, 4), "f1": round(f1, 4), "auc": round(auc, 4), } def _compute_auc(self, model_weights: np.ndarray) -> float: """Compute approximate AUC-ROC. Uses 100 threshold values to build the ROC curve and computes area under curve via the trapezoidal rule. """ w = np.array(model_weights, dtype=np.float64) if len(w) == self.X.shape[1] + 1: logits = self.X @ w[:-1] + w[-1] elif len(w) == self.X.shape[1]: logits = self.X @ w else: adjusted = np.zeros(self.X.shape[1], dtype=np.float64) n = min(len(w), self.X.shape[1]) adjusted[:n] = w[:n] logits = self.X @ adjusted probs = _sigmoid(logits) y_true = self.y thresholds = np.linspace(0, 1, 101) tpr_list = [] fpr_list = [] total_pos = max(np.sum(y_true == 1), 1) total_neg = max(np.sum(y_true == 0), 1) for t in thresholds: y_pred = (probs >= t).astype(np.float64) tp = np.sum((y_pred == 1) & (y_true == 1)) fp = np.sum((y_pred == 1) & (y_true == 0)) tpr_list.append(tp / total_pos) fpr_list.append(fp / total_neg) # Sort by FPR for proper AUC calculation fpr_arr = np.array(fpr_list) tpr_arr = np.array(tpr_list) sorted_idx = np.argsort(fpr_arr) fpr_sorted = fpr_arr[sorted_idx] tpr_sorted = tpr_arr[sorted_idx] auc = float(np.trapezoid(tpr_sorted, fpr_sorted)) return max(0.0, min(1.0, abs(auc)))