File size: 4,332 Bytes
81e328b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
01f8cd5
81e328b
 
 
 
 
 
 
 
 
 
 
 
 
01f8cd5
81e328b
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
Attack injection logic for the PLL Cyberattack Detection OpenEnv.

Implements four attack types:
  1. Sinusoidal FDI (Easy)
  2. Ramp injection (Medium)
  3. Pulse/step bias (Medium)
  4. Stealthy low-and-slow phase drift (Hard)
"""

import math
import numpy as np
from typing import Dict, Any


def sample_sinusoidal_params(rng: np.random.Generator) -> Dict[str, Any]:
    """Sample parameters for a sinusoidal FDI attack."""
    return {
        "type": "sinusoidal",
        "amplitude": float(rng.uniform(0.05, 0.20)),
        "freq": float(rng.uniform(5.0, 20.0)),
        "phase": float(rng.uniform(0.0, 2.0 * math.pi)),
    }


def sample_ramp_params(rng: np.random.Generator) -> Dict[str, Any]:
    """Sample parameters for a ramp injection attack."""
    return {
        "type": "ramp",
        "rate": float(rng.uniform(0.0002, 0.001)),
    }


def sample_pulse_params(rng: np.random.Generator) -> Dict[str, Any]:
    """Sample parameters for a pulse/step bias attack."""
    return {
        "type": "pulse",
        "magnitude": float(rng.uniform(0.1, 0.3)),
        "duration": int(rng.integers(20, 81)),  # 20 to 80 steps inclusive
    }


def sample_stealthy_params(rng: np.random.Generator) -> Dict[str, Any]:
    """Sample parameters for a stealthy low-and-slow attack."""
    return {
        "type": "stealthy",
        "amplitude": 0.03,
        "drift_rate": float(rng.uniform(0.05, 0.2)),
    }


def sample_attack_start(rng: np.random.Generator) -> int:
    """Sample a random attack start step between 30 and 80 inclusive."""
    return int(rng.integers(30, 81))


class AttackGenerator:
    """Generates attack signals given parameters and current simulation state."""

    def __init__(self, attack_params: Dict[str, Any], attack_start_step: int):
        self.params = attack_params
        self.attack_start_step = attack_start_step
        self.attack_type_str = attack_params.get("type", "none")

        # For stealthy attack: track cumulative phase drift
        self.delta = 0.0

    def get_signal(self, current_step: int, sim_time: float) -> float:
        """
        Compute the attack signal value at the given step.
        Args:
            current_step: Current environment step (0-indexed).
            sim_time: Current simulation time in seconds.
        Returns:
            Attack signal value (pu). Returns 0.0 if attack not yet started.
        """
        if current_step < self.attack_start_step:
            return 0.0

        steps_since_start = current_step - self.attack_start_step
        dt = 1e-3  # time step

        if self.attack_type_str == "sinusoidal":
            A = self.params["amplitude"]
            fa = self.params["freq"]
            phi = self.params["phase"]
            return A * math.sin(2.0 * math.pi * fa * sim_time + phi)

        elif self.attack_type_str == "ramp":
            rate = self.params["rate"]
            return rate * steps_since_start

        elif self.attack_type_str == "pulse":
            mag = self.params["magnitude"]
            dur = self.params["duration"]
            if steps_since_start < dur:
                return mag
            else:
                return 0.0

        elif self.attack_type_str == "stealthy":
            A_s = self.params["amplitude"]
            drift_rate = self.params["drift_rate"]
            # δ(t) = δ(t-1) + drift_rate * Δt — accumulated each call
            self.delta += drift_rate * dt
            f0 = 50.0
            return A_s * math.sin(2.0 * math.pi * f0 * sim_time + self.delta)

        return 0.0

    def is_active(self, current_step: int) -> bool:
        """Check whether the attack is currently active at this specific step."""
        if current_step < self.attack_start_step:
            return False

        # Pulse attacks end after duration
        if self.attack_type_str == "pulse":
            steps_since_start = current_step - self.attack_start_step
            dur = self.params["duration"]
            return steps_since_start < dur

        return True


def get_attack_type_id(attack_type_str: str) -> int:
    """Map an attack type string to its corresponding integer ID."""
    mapping = {
        "none": 0,
        "sinusoidal": 1,
        "ramp": 2,
        "pulse": 3,
        "stealthy": 4,
    }
    return mapping.get(attack_type_str, 0)