Spaces:
Sleeping
Sleeping
File size: 14,188 Bytes
81e328b 20bc5e4 81e328b 4c2a495 81e328b 20bc5e4 81e328b 20bc5e4 81e328b 01f8cd5 81e328b 20bc5e4 81e328b 01f8cd5 81e328b 20bc5e4 81e328b 3f7ca79 81e328b 20bc5e4 81e328b 3f7ca79 81e328b 3f7ca79 81e328b 3f7ca79 81e328b 01f8cd5 81e328b 01f8cd5 81e328b 20bc5e4 81e328b 20bc5e4 81e328b 3f7ca79 81e328b 20bc5e4 81e328b 20bc5e4 81e328b 20bc5e4 81e328b 20bc5e4 81e328b 20bc5e4 81e328b 20bc5e4 81e328b 20bc5e4 81e328b 3f7ca79 81e328b 3f7ca79 81e328b 01f8cd5 81e328b 3f7ca79 81e328b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 | """
Main environment class for the PLL Cyberattack Detection OpenEnv.
Implements step(), reset(), get_state(), and compute_reward().
Manages the PLL simulation, attack injection, observation windowing,
episode history, and grading.
"""
import uuid
import numpy as np
from typing import Tuple, Dict, Any, List, Optional
from collections import deque
from src.models import Observation, Action, Reward, State
from src.pll_sim import SRFPLLSimulator, OMEGA0
from src.attacks import (
AttackGenerator,
sample_sinusoidal_params,
sample_ramp_params,
sample_pulse_params,
sample_stealthy_params,
sample_attack_start,
get_attack_type_id,
)
from src.graders import grade_task_easy, grade_task_medium, grade_task_hard
from src.detector import AdaptiveDetector
WINDOW_SIZE = 20
MAX_STEPS = 500
LOCK_LOSS_THRESHOLD = 0.0873 # 5 degrees in radians
DETECTION_THRESHOLD = 2.0
EARLY_DETECTION_WINDOW = 100
FALSE_ALARM_PENALTY = -0.2
TRUE_POSITIVE_REWARD = 0.1
TRUE_NEGATIVE_REWARD = 0.05
MISSED_DETECTION_PENALTY = -0.05
CLASSIFICATION_BONUS = 0.05
LOCK_LOSS_PENALTY = -2.0
class PLLAttackEnv:
"""OpenEnv-compliant PLL cyberattack detection environment."""
def __init__(self):
self.pll = SRFPLLSimulator()
self.rng: Optional[np.random.Generator] = None
self.task_id = 0
self.step_count = 0
self.episode_id = ""
self.done = False
# Attack state
self.attack_generator: Optional[AttackGenerator] = None
self.attack_active = False
self.attack_type = 0
self.attack_params: Dict[str, Any] = {}
self.attack_start_step = 0
self.true_attack_type = 0
# Detection tracking
self.first_detection_recorded = False
self.first_detection_step = 0
# Lock loss tracking (Task 2 / hard)
self.lock_lost = False
self.lock_loss_step: Optional[int] = None
self.lock_loss_penalized = False
# Observation windows
self.vq_window: deque = deque(maxlen=WINDOW_SIZE)
self.vd_window: deque = deque(maxlen=WINDOW_SIZE)
self.omega_window: deque = deque(maxlen=WINDOW_SIZE)
self.omega_deviation_window: deque = deque(maxlen=WINDOW_SIZE)
# Detector
self.detector = AdaptiveDetector()
# Episode history for grading
self.history: List[Dict[str, Any]] = []
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def reset(self, task_id: int = 0, seed: Optional[int] = None) -> Observation:
"""
Reset the environment for a new episode.
Args:
task_id: 0=easy (sinusoidal), 1=medium (multi-type),
2=hard (stealthy).
seed: Optional RNG seed for reproducibility.
Returns:
Initial Observation with non-zero raw_voltages.
"""
self.rng = np.random.default_rng(seed) # seed=None → random
self.task_id = task_id
self.step_count = 0
self.episode_id = str(uuid.uuid4())
self.done = False
# Reset PLL simulator
self.pll.reset()
# Reset detection tracking
self.first_detection_recorded = False
self.first_detection_step = 0
# Reset lock-loss tracking
self.lock_lost = False
self.lock_loss_step = None
self.lock_loss_penalized = False
# Reset history
self.history = []
# Reset observation windows
self.vq_window = deque(maxlen=WINDOW_SIZE)
self.vd_window = deque(maxlen=WINDOW_SIZE)
self.omega_window = deque(maxlen=WINDOW_SIZE)
self.omega_deviation_window = deque(maxlen=WINDOW_SIZE)
# Reset detector
self.detector = AdaptiveDetector()
# Sample attack for this episode
self._setup_attack()
for _ in range(WINDOW_SIZE):
pll_out = self.pll.step(0.0) # no attack during warm-up
omega_norm = (pll_out["omega_hat"] - OMEGA0) / OMEGA0
omega_dev = pll_out["omega_hat"] - OMEGA0
self.vq_window.append(pll_out["vq"])
self.vd_window.append(pll_out["vd"])
self.omega_window.append(omega_norm)
self.omega_deviation_window.append(omega_dev)
# step_count stays at 0 — warm-up steps are invisible to the agent
return self._get_observation()
def step(self, action: Action) -> Tuple[Observation, Reward, bool, Dict[str, Any]]:
"""
Advance the environment by one step.
Args:
action: Agent's Action for this step.
Returns:
(observation, reward, done, info)
"""
if self.done:
return (
self._get_observation(),
Reward(
total=0.0, detection_reward=0.0, classification_bonus=0.0,
early_detection_bonus=0.0, false_alarm_penalty=0.0,
lock_loss_penalty=0.0,
),
True,
{"message": "Episode already done. Call /reset to start a new episode."},
)
# --- Attack signal ------------------------------------------------
# attack_active uses is_active() (step-based). It does NOT depend on the instantaneous
# signal value, because the attack signal can cross zero even while the attack is active.
attack_signal = self.attack_generator.get_signal(self.step_count, self.pll.t)
self.attack_active = self.attack_generator.is_active(self.step_count)
# --- Advance PLL --------------------------------------------------
pll_out = self.pll.step(attack_signal)
# --- Updating observation windows -----------------------------------
omega_norm = (pll_out["omega_hat"] - OMEGA0) / OMEGA0
omega_dev = pll_out["omega_hat"] - OMEGA0 # raw deviation (rad/s)
self.vq_window.append(pll_out["vq"])
self.vd_window.append(pll_out["vd"])
self.omega_window.append(omega_norm)
self.omega_deviation_window.append(omega_dev)
# --- Lock-loss check (Task 2) -------------------------
PLL_CONVERGENCE_STEPS = 60 # PLL transient settles by ~step 50, using 60 for margin
if (
self.task_id == 2
and not self.lock_lost
and self.step_count > self.attack_start_step
and self.step_count > PLL_CONVERGENCE_STEPS # guard against startup transient
):
if abs(pll_out["theta_err"]) > LOCK_LOSS_THRESHOLD:
self.lock_lost = True
self.lock_loss_step = self.step_count
# --- Reward -------------------------------------------------------
reward = self.compute_reward(action)
# --- Record history entry for graders ----------------------------
self.history.append({
"step": self.step_count,
"attack_active": self.attack_active,
"attack_detected": action.attack_detected,
"true_attack_type": self.true_attack_type,
"agent_attack_type": action.attack_type,
"theta_err": pll_out["theta_err"],
})
# --- Advance step counter ----------------------------------------
self.step_count += 1
# Terminate Task 2 early upon losing lock to save computational steps
if self.step_count >= MAX_STEPS:
self.done = True
elif self.task_id == 2 and self.lock_lost:
self.done = True
# --- Physics-informed detector (evaluation/debug only) ------------
detector_output = self.detector.detect(self._get_observation())
# --- Build info --------------------------------------------------
info: Dict[str, Any] = {
"detector": detector_output,
"detector_features": {"step": self.step_count, "raw_score": detector_output.get("score")}
}
if self.done:
info["grader_score"] = self._compute_grader_score()
info["episode_id"] = self.episode_id
info["total_steps"] = self.step_count
info["lock_lost"] = self.lock_lost
return self._get_observation(), reward, self.done, info
def compute_reward(self, action: Action) -> Reward:
"""
Computes the dense reward signal for the current step.
Reward components:
detection_reward: +0.10 true positive (per step)
+0.05 true negative (per step)
-0.05 missed detection (per step)
false_alarm_penalty: -0.20 per false-positive step
classification_bonus: +0.05 per step correct type (task 1 only)
early_detection_bonus: one-time sparse, scaled by detection speed
lock_loss_penalty: -2.00 one-time on lock loss (task 2 only)
"""
detection_reward = 0.0
false_alarm_penalty = 0.0
classification_bonus = 0.0
early_detection_bonus = 0.0
lock_loss_penalty = 0.0
if self.attack_active:
if action.attack_detected:
detection_reward = TRUE_POSITIVE_REWARD
# One-time early detection bonus on first correct detection
if not self.first_detection_recorded:
self.first_detection_step = self.step_count
self.first_detection_recorded = True
# Relative steps since attack started
t = self.first_detection_step - self.attack_start_step
early_detection_bonus = max(0.0, 1.0 - t / EARLY_DETECTION_WINDOW)
else:
detection_reward = MISSED_DETECTION_PENALTY
else:
if action.attack_detected:
false_alarm_penalty = FALSE_ALARM_PENALTY
else:
detection_reward = TRUE_NEGATIVE_REWARD
# Task 1 (medium): per-step classification bonus
if self.task_id == 1 and self.attack_active:
if action.attack_type == self.true_attack_type:
classification_bonus = CLASSIFICATION_BONUS
# Task 2 (hard): one-time lock-loss penalty
if self.task_id == 2 and self.lock_lost and not self.lock_loss_penalized:
lock_loss_penalty = LOCK_LOSS_PENALTY
self.lock_loss_penalized = True
total = (
detection_reward
+ false_alarm_penalty
+ classification_bonus
+ early_detection_bonus
+ lock_loss_penalty
)
return Reward(
total=total,
detection_reward=detection_reward,
classification_bonus=classification_bonus,
early_detection_bonus=early_detection_bonus,
false_alarm_penalty=false_alarm_penalty,
lock_loss_penalty=lock_loss_penalty,
)
def get_state(self) -> State:
"""Returning full internal state for debugging / GET /state endpoint."""
return State(
theta_true=self.pll.theta_true,
theta_hat=self.pll.theta_hat,
omega_hat=self.pll.omega_hat,
vq_integral=self.pll.vq_integral,
attack_active=self.attack_active,
attack_type=self.attack_type,
attack_params=self.attack_params,
attack_start_step=self.attack_start_step,
lock_lost=self.lock_lost,
step=self.step_count,
episode_id=self.episode_id,
task_id=self.task_id,
)
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _setup_attack(self) -> None:
"""Sample attack type and parameters based on current task_id."""
self.attack_start_step = sample_attack_start(self.rng)
if self.task_id == 0:
# Easy: sinusoidal FDI only
self.attack_params = sample_sinusoidal_params(self.rng)
self.true_attack_type = 1
elif self.task_id == 1:
# Medium: random choice of sinusoidal / ramp / pulse
choice = int(self.rng.integers(0, 3))
if choice == 0:
self.attack_params = sample_sinusoidal_params(self.rng)
self.true_attack_type = 1
elif choice == 1:
self.attack_params = sample_ramp_params(self.rng)
self.true_attack_type = 2
else:
self.attack_params = sample_pulse_params(self.rng)
self.true_attack_type = 3
elif self.task_id == 2:
# Hard: stealthy low-and-slow
self.attack_params = sample_stealthy_params(self.rng)
self.true_attack_type = 4
self.attack_type = get_attack_type_id(self.attack_params.get("type", "none"))
self.attack_generator = AttackGenerator(self.attack_params, self.attack_start_step)
def _get_observation(self) -> Observation:
"""
Building the current Observation from internal windows.
"""
return Observation(
vq_window=list(self.vq_window),
vd_window=list(self.vd_window),
omega_window=list(self.omega_window),
omega_deviation_window=list(self.omega_deviation_window),
raw_voltages=[self.pll.va_m, self.pll.vb_m, self.pll.vc_m],
task_id=self.task_id,
step=self.step_count,
)
def _compute_grader_score(self) -> float:
"""Running the appropriate grader at episode end."""
if self.task_id == 0:
return grade_task_easy(self.history, self.attack_start_step)
elif self.task_id == 1:
return grade_task_medium(self.history, self.attack_start_step)
elif self.task_id == 2:
return grade_task_hard(
self.history,
self.lock_loss_step,
self.attack_start_step,
)
return 0.0
|