File size: 8,656 Bytes
ff293b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ee21104
 
 
 
 
 
ff293b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ee21104
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ff293b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
"""Phase 4: reward sub-scores, aggregation, logging, schema drift."""

import json
import random
import statistics
from pathlib import Path

import pytest

from ghostexec.models import GhostexecAction
from ghostexec.server import reward as reward_mod
from ghostexec.server.reward import aggregate_scores
from ghostexec.server.ghostexec_environment import GhostexecEnvironment

ROOT = Path(__file__).resolve().parents[1]
SCENARIO = ROOT / "scenarios" / "phase2_core.json"
DRIFT = ROOT / "scenarios" / "schema_drift_test.json"


def test_reward_weights_and_aggregator_helpers():
    w = GhostexecEnvironment.load_world_from_json(SCENARIO)
    c, r, t = 1.0, -1.0, 2.5
    weighted_inner = reward_mod.W_CONFLICT * c + reward_mod.W_REL * r + reward_mod.W_TASK * t
    bd = aggregate_scores(
        c,
        r,
        t,
        conflict_raw=c,
        critical_queue_bonus=0.0,
        weighted_inner=weighted_inner,
        weighted_base_only=weighted_inner,
        shaping_synergy=0.0,
        shaping_tradeoff=0.0,
        shaping_potential=0.0,
        shaping_scaffold=0.0,
        shaping_quality=0.0,
        action_ok=True,
        episode_done=False,
        world_after=w,
    )
    assert bd.weighted_base == pytest.approx(reward_mod.WEIGHTED_OUTPUT_SCALE * weighted_inner)


def test_catastrophic_and_completion_bonuses_only_when_episode_done():
    w0 = GhostexecEnvironment.load_world_from_json(SCENARIO)
    w1 = w0.model_copy(deep=True)
    w1.stress = 30
    w2 = w1.model_copy(deep=True)
    action = GhostexecAction(action_type="do_nothing")
    mid = reward_mod.compute_step_reward(w1, w2, action, action_ok=True, episode_done=False)
    assert mid.episode_completion_bonus == 0.0
    assert mid.catastrophic_penalty == 0.0

    w_bad = w1.model_copy(deep=True)
    for i, c in enumerate(w_bad.contacts):
        if c.name == "Marcus Webb":
            w_bad.contacts[i] = c.model_copy(update={"mood": "furious"})
            break
    end = reward_mod.compute_step_reward(w1, w_bad, action, action_ok=True, episode_done=True)
    assert end.episode_completion_bonus == pytest.approx(10.0)
    assert end.catastrophic_penalty == pytest.approx(-15.0)


def test_invalid_step_matches_do_nothing_subscores_plus_invalid_addon():
    w = GhostexecEnvironment.load_world_from_json(SCENARIO)
    noop = GhostexecAction(action_type="do_nothing")
    bad = GhostexecAction(action_type="reply_email", email_id="missing", message_body="x")
    bd_ok = reward_mod.compute_step_reward(w, w, noop, action_ok=True, episode_done=False)
    bd_bad = reward_mod.compute_step_reward(w, w, bad, action_ok=False, episode_done=False)
    assert bd_bad.invalid_step_adjustment == pytest.approx(-0.25)
    # do_nothing carries an additional strict additive floor (-0.15) not applied to invalid non-idle actions.
    assert bd_bad.final == pytest.approx(bd_ok.final - (0.25 - 0.15))


def test_scripted_episode_reward_direction_and_log(tmp_path, monkeypatch):
    logf = tmp_path / "rewards.jsonl"
    env = GhostexecEnvironment(SCENARIO)
    env.reset()
    monkeypatch.setattr(env, "_reward_log_path", logf)

    r_resolve = env.step(
        GhostexecAction(
            action_type="reschedule_meeting",
            meeting_id="m02",
            new_time="2026-04-21T18:00:00",
        )
    )
    r_bad = env.step(GhostexecAction(action_type="do_nothing"))

    assert r_resolve.metadata.get("step_ok") is True
    assert r_bad.metadata.get("step_ok") is True
    assert (r_resolve.reward or 0) > (r_bad.reward or 0)

    assert logf.is_file()
    lines = logf.read_text(encoding="utf-8").strip().splitlines()
    assert len(lines) >= 2
    row = json.loads(lines[0])
    assert "reward" in row and "episode_id" in row
    assert row.get("action_type") == "reschedule_meeting"
    assert "conflict_raw" in row and "step_ok" in row
    assert "shaping_total" in row and "shaping_to_base_ratio" in row
    assert "shaping_scaffold" in row
    assert row.get("reward_mode") == "full"


def test_reward_mode_base_turns_off_shaping_terms():
    env = GhostexecEnvironment(SCENARIO, reward_mode="base")
    env.reset()
    obs = env.step(
        GhostexecAction(
            action_type="reschedule_meeting",
            meeting_id="m02",
            new_time="2026-04-21T18:00:00",
        )
    )
    bd = (obs.metadata or {}).get("reward_breakdown") or {}
    assert float(bd.get("shaping_synergy") or 0.0) == pytest.approx(0.0)
    assert float(bd.get("shaping_tradeoff") or 0.0) == pytest.approx(0.0)
    assert float(bd.get("shaping_potential") or 0.0) == pytest.approx(0.0)


def test_schema_drift_events_mutate_world():
    env = GhostexecEnvironment(SCENARIO, schema_drift_events_path=DRIFT)
    env.reset()
    assert env.step(GhostexecAction(action_type="do_nothing")).metadata.get("step_ok") is True
    assert any("schema drift: shifted" in x for x in env.world.action_log)
    env.step(GhostexecAction(action_type="do_nothing"))
    sarah = env.get_contact("Sarah Chen")
    assert sarah is not None
    assert sarah.communication_preference == "text"
    env.step(GhostexecAction(action_type="do_nothing"))
    t02 = next(t for t in env.world.tasks if t.id == "t02")
    assert t02.deadline == "2026-04-21T07:00:00"
    assert "Marcus Webb" in env._reply_relationship_suppressed  # noqa: SLF001


def test_rewards_differ_between_helpful_and_idle_steps():
    env = GhostexecEnvironment(SCENARIO)
    env.reset()
    r_help = env.step(
        GhostexecAction(
            action_type="reschedule_meeting",
            meeting_id="m02",
            new_time="2026-04-21T18:00:00",
        )
    ).reward
    r_idle = env.step(GhostexecAction(action_type="do_nothing")).reward
    assert r_help is not None and r_idle is not None
    assert r_help != r_idle


# Whitelisted reschedules (known non-overlapping targets for phase2_core at 08:00).
_SAFE_RESCHEDULES: list[tuple[str, str]] = [
    ("m02", "2026-04-21T18:00:00"),
    ("m03", "2026-04-21T18:30:00"),
    ("m06", "2026-04-21T20:00:00"),
    ("m09", "2026-04-21T21:00:00"),
]


def test_seeded_stochastic_policy_reward_spread():
    random.seed(1234)
    K = 80
    archive_ids = [f"e{i:02d}" for i in range(1, 31)]
    contacts = ["Jordan Lee", "Jamie Liu", "Marcus Webb", "Sarah Chen"]
    env = GhostexecEnvironment(SCENARIO)
    env.reset()
    rewards: list[float] = []
    ai = ri = 0
    for _ in range(K):
        u = random.random()
        if u < 0.32:
            obs = env.step(GhostexecAction(action_type="do_nothing"))
        elif u < 0.58:
            eid = archive_ids[ai % len(archive_ids)]
            ai += 1
            obs = env.step(GhostexecAction(action_type="archive_email", email_id=eid))
        elif u < 0.78:
            mid, nt = _SAFE_RESCHEDULES[ri % len(_SAFE_RESCHEDULES)]
            ri += 1
            obs = env.step(
                GhostexecAction(action_type="reschedule_meeting", meeting_id=mid, new_time=nt)
            )
        else:
            cname = contacts[ai % len(contacts)]
            ai += 1
            obs = env.step(
                GhostexecAction(
                    action_type="send_message",
                    contact_name=cname,
                    message_body="Quick sync on priorities.",
                )
            )
        assert obs.reward is not None
        rewards.append(float(obs.reward))

    std = statistics.pstdev(rewards)
    sr = sorted(rewards)
    p5 = sr[max(0, int(0.05 * (len(sr) - 1)))]
    p95 = sr[min(len(sr) - 1, int(0.95 * (len(sr) - 1)))]
    assert std > 0.06
    assert (p95 - p5) > 0.09


def test_good_script_beats_do_nothing_spam_on_mean_reward():
    good = GhostexecEnvironment(SCENARIO)
    good.reset()
    good_actions = [
        GhostexecAction(
            action_type="reschedule_meeting",
            meeting_id="m02",
            new_time="2026-04-21T18:00:00",
        ),
        GhostexecAction(action_type="reply_email", email_id="e01", message_body="Drafting revised figures now."),
        GhostexecAction(action_type="archive_email", email_id="e09"),
        GhostexecAction(
            action_type="send_message",
            contact_name="Jordan Lee",
            message_body="Standup notes attached.",
        ),
        GhostexecAction(action_type="complete_task", task_id="t06"),
    ]
    g_rewards = [good.step(a).reward for a in good_actions]
    g_mean = sum(float(x) for x in g_rewards) / len(g_rewards)

    bad = GhostexecEnvironment(SCENARIO)
    bad.reset()
    b_rewards = [bad.step(GhostexecAction(action_type="do_nothing")).reward for _ in range(5)]
    b_mean = sum(float(x) for x in b_rewards) / len(b_rewards)

    assert g_mean > b_mean + 0.2