File size: 4,319 Bytes
96bb363
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""Tests for SimulationClock: is_due alignment and live-mode lag detection."""

from __future__ import annotations

import warnings
from fractions import Fraction
from unittest.mock import patch

import pytest

from openg2g.clock import SimulationClock


def test_time_starts_at_zero():
    clock = SimulationClock(tick_s=Fraction(1, 10))
    assert clock.time_s == 0.0
    assert clock.step == 0


def test_advance_increments():
    clock = SimulationClock(tick_s=Fraction(1, 2))
    t = clock.advance()
    assert t == 0.5
    assert clock.step == 1
    t = clock.advance()
    assert t == 1.0
    assert clock.step == 2


def test_is_due_every_tick():
    clock = SimulationClock(tick_s=Fraction(1, 10))
    assert clock.is_due(Fraction(1, 10))
    clock.advance()
    assert clock.is_due(Fraction(1, 10))


def test_is_due_multi_rate():
    clock = SimulationClock(tick_s=Fraction(1, 10))
    # At step 0: everything is due
    assert clock.is_due(Fraction(1, 10))
    assert clock.is_due(Fraction(1))
    assert clock.is_due(Fraction(60))

    # Advance 1 tick (step=1, t=0.1s)
    clock.advance()
    assert clock.is_due(Fraction(1, 10))
    assert not clock.is_due(Fraction(1))
    assert not clock.is_due(Fraction(60))

    # Advance to step=10 (t=1.0s)
    for _ in range(9):
        clock.advance()
    assert clock.step == 10
    assert clock.is_due(Fraction(1, 10))
    assert clock.is_due(Fraction(1))
    assert not clock.is_due(Fraction(60))

    # Advance to step=600 (t=60.0s)
    for _ in range(590):
        clock.advance()
    assert clock.step == 600
    assert clock.is_due(Fraction(1, 10))
    assert clock.is_due(Fraction(1))
    assert clock.is_due(Fraction(60))


def test_is_due_non_power_of_ten():
    """Period 3/10 is an exact multiple of tick 1/10 → 3 ticks."""
    clock = SimulationClock(tick_s=Fraction(1, 10))
    assert clock.is_due(Fraction(3, 10))  # step 0
    clock.advance()  # step 1
    assert not clock.is_due(Fraction(3, 10))
    clock.advance()  # step 2
    assert not clock.is_due(Fraction(3, 10))
    clock.advance()  # step 3
    assert clock.is_due(Fraction(3, 10))


def test_is_due_period_not_multiple_of_tick_raises():
    """If period is not an exact multiple of tick, raise ValueError."""
    clock = SimulationClock(tick_s=Fraction(1))
    with pytest.raises(ValueError, match="not an exact multiple"):
        clock.is_due(Fraction(1, 100))


def test_tick_s_must_be_fraction():
    """Passing a float for tick_s raises TypeError."""
    with pytest.raises(TypeError, match="must be a Fraction"):
        SimulationClock(tick_s=0.1)  # type: ignore[invalid-argument-type]


def test_tick_s_must_be_positive():
    with pytest.raises(ValueError, match="must be positive"):
        SimulationClock(tick_s=Fraction(0))
    with pytest.raises(ValueError, match="must be positive"):
        SimulationClock(tick_s=Fraction(-1, 10))


def test_live_mode_lag_warning():
    """Live mode warns when computation takes longer than a tick."""
    clock = SimulationClock(tick_s=Fraction(1, 10), live=True)

    # Simulate wall time progressing faster than real-time
    # First advance sets _wall_t0
    times = iter([0.0, 0.0, 0.5])  # t0=0, then check at 0, then lagging at 0.5

    with patch("openg2g.clock.time") as mock_time:
        mock_time.monotonic = lambda: next(times)
        mock_time.sleep = lambda _: None

        clock.advance()  # step 1, t=0.1s. Sets _wall_t0=0.0, now=0.0 -> ahead, sleep
        with warnings.catch_warnings(record=True) as w:
            warnings.simplefilter("always")
            clock.advance()  # step 2, t=0.2s. now=0.5 -> lag of 0.3 > tick_s
            assert len(w) == 1
            assert "Clock lag" in str(w[0].message)


def test_live_mode_sleeps_when_ahead():
    """Live mode sleeps when ahead of wall time."""
    clock = SimulationClock(tick_s=Fraction(1), live=True)

    sleep_calls = []

    with patch("openg2g.clock.time") as mock_time:
        # First advance: sets wall_t0=100.0, then checks expected=101.0 vs now=100.0
        mock_time.monotonic = lambda: 100.0
        mock_time.sleep = lambda dt: sleep_calls.append(dt)

        clock.advance()
        # Expected wall = 100.0 + 1.0 = 101.0, now = 100.0 -> sleep(1.0)
        assert len(sleep_calls) == 1
        assert abs(sleep_calls[0] - 1.0) < 0.01