Spaces:
Sleeping
Sleeping
File size: 4,925 Bytes
20bc5e4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | """
Adaptive Physics-informed cyberattack detector for the PLL OpenEnv.
Uses residual-based and pattern-based features derived from the
observation windows to detect, classify, and recommend protective
actions. The detector builds a baseline from the first 20 observations
(warmup window) of the episode to normalize the features individually.
"""
import numpy as np
from typing import Dict, Any
from src.models import Observation
class AdaptiveDetector:
def __init__(self):
# Baseline collections
self.r1_history = []
self.r3_history = []
self.r4_history = []
self.r5_history = []
# Calibrated statistics
self.mean_R1 = 0.0
self.std_R1 = 1e-6
self.mean_R3 = 0.0
self.std_R3 = 1e-6
self.mean_R4 = 0.0
self.std_R4 = 1e-6
self.mean_R5 = 0.0
self.std_R5 = 1e-6
self.is_calibrated = False
def detect(self, observation) -> Dict[str, Any]:
"""
Run physics-informed anomaly detection on the current observation.
"""
vq = np.array(observation.vq_window, dtype=np.float64)
vd = np.array(observation.vd_window, dtype=np.float64)
omega = np.array(observation.omega_window, dtype=np.float64)
omega_dev = np.array(observation.omega_deviation_window, dtype=np.float64)
va, vb, vc = observation.raw_voltages
# ---- Step 1: Feature Extraction --------------------------------
vq_mean = float(np.mean(np.abs(vq)))
vd_mean = float(np.mean(np.abs(vd)))
vq_ratio = vq_mean / (vd_mean + 1e-6)
omega_var = float(np.var(omega))
omega_dev_var = float(np.var(omega_dev))
vd_var = float(np.var(vd))
abs_v_sum = abs(va) + abs(vb) + abs(vc) + 1e-6
symmetry_ratio = float(abs(va + vb + vc) / abs_v_sum)
vq_diff = np.diff(vq) if len(vq) > 1 else np.array([0.0])
vq_trend = float(np.mean(vq_diff))
vq_spike = float(np.max(np.abs(vq)))
vq_drift = float(np.sum(vq))
step = observation.step
# ---- Step 2: Baseline Calibration ------------------------------
if step < 20:
self.r1_history.append(vq_ratio)
self.r3_history.append(omega_var)
self.r4_history.append(vd_var)
self.r5_history.append(symmetry_ratio)
return {
"attack_detected": False,
"attack_type": 0,
"confidence": 0.0,
"protective_action": 0,
"score": 0.0,
"baseline_score": 0.0
}
if not self.is_calibrated:
self.mean_R1 = float(np.mean(self.r1_history))
self.std_R1 = max(float(np.std(self.r1_history)), 1e-6)
self.mean_R3 = float(np.mean(self.r3_history))
self.std_R3 = max(float(np.std(self.r3_history)), 1e-6)
self.mean_R4 = float(np.mean(self.r4_history))
self.std_R4 = max(float(np.std(self.r4_history)), 1e-6)
self.mean_R5 = float(np.mean(self.r5_history))
self.std_R5 = max(float(np.std(self.r5_history)), 1e-6)
self.is_calibrated = True
# ---- Step 3: Normalized Features ------------------------------
R1 = (vq_ratio - self.mean_R1) / self.std_R1
R3 = (omega_var - self.mean_R3) / self.std_R3
R4 = (vd_var - self.mean_R4) / self.std_R4
R5 = (symmetry_ratio - self.mean_R5) / self.std_R5
# ---- Step 4: Score --------------------------------------------
score = 0.4 * R1 + 0.2 * R3 + 0.2 * R5 + 0.2 * R4
# ---- Step 5: Detection ----------------------------------------
attack_detected = score > 5.0
confidence = min(1.0, score / 5.0) if attack_detected else 0.0
# ---- Step 6: Classification -----------------------------------
if not attack_detected:
attack_type = 0
else:
if R3 > 2:
attack_type = 1 # sinusoidal
elif abs(vq_trend) > 0.01:
attack_type = 2 # ramp
elif vq_spike > 0.1:
attack_type = 3 # pulse
else:
attack_type = 4 # stealthy
# ---- Step 7: Protective Action --------------------------------
if score > 6:
protective_action = 3
elif score > 3:
protective_action = 2
else:
protective_action = 1
if not attack_detected:
protective_action = 0
return {
"attack_detected": bool(attack_detected),
"attack_type": int(attack_type),
"confidence": float(confidence),
"protective_action": int(protective_action),
"score": float(score),
"baseline_score": 0.0
}
|