"""Phase 4: reward sub-scores, aggregation, logging, schema drift.""" import json import random import statistics from pathlib import Path import pytest from ghostexec.models import GhostexecAction from ghostexec.server import reward as reward_mod from ghostexec.server.reward import aggregate_scores from ghostexec.server.ghostexec_environment import GhostexecEnvironment ROOT = Path(__file__).resolve().parents[1] SCENARIO = ROOT / "scenarios" / "phase2_core.json" DRIFT = ROOT / "scenarios" / "schema_drift_test.json" def test_reward_weights_and_aggregator_helpers(): w = GhostexecEnvironment.load_world_from_json(SCENARIO) c, r, t = 1.0, -1.0, 2.5 weighted_inner = reward_mod.W_CONFLICT * c + reward_mod.W_REL * r + reward_mod.W_TASK * t bd = aggregate_scores( c, r, t, conflict_raw=c, critical_queue_bonus=0.0, weighted_inner=weighted_inner, weighted_base_only=weighted_inner, shaping_synergy=0.0, shaping_tradeoff=0.0, shaping_potential=0.0, shaping_scaffold=0.0, shaping_quality=0.0, action_ok=True, episode_done=False, world_after=w, ) assert bd.weighted_base == pytest.approx(reward_mod.WEIGHTED_OUTPUT_SCALE * weighted_inner) def test_catastrophic_and_completion_bonuses_only_when_episode_done(): w0 = GhostexecEnvironment.load_world_from_json(SCENARIO) w1 = w0.model_copy(deep=True) w1.stress = 30 w2 = w1.model_copy(deep=True) action = GhostexecAction(action_type="do_nothing") mid = reward_mod.compute_step_reward(w1, w2, action, action_ok=True, episode_done=False) assert mid.episode_completion_bonus == 0.0 assert mid.catastrophic_penalty == 0.0 w_bad = w1.model_copy(deep=True) for i, c in enumerate(w_bad.contacts): if c.name == "Marcus Webb": w_bad.contacts[i] = c.model_copy(update={"mood": "furious"}) break end = reward_mod.compute_step_reward(w1, w_bad, action, action_ok=True, episode_done=True) assert end.episode_completion_bonus == pytest.approx(10.0) assert end.catastrophic_penalty == pytest.approx(-15.0) def test_invalid_step_matches_do_nothing_subscores_plus_invalid_addon(): w = GhostexecEnvironment.load_world_from_json(SCENARIO) noop = GhostexecAction(action_type="do_nothing") bad = GhostexecAction(action_type="reply_email", email_id="missing", message_body="x") bd_ok = reward_mod.compute_step_reward(w, w, noop, action_ok=True, episode_done=False) bd_bad = reward_mod.compute_step_reward(w, w, bad, action_ok=False, episode_done=False) assert bd_bad.invalid_step_adjustment == pytest.approx(-0.25) # do_nothing carries an additional strict additive floor (-0.15) not applied to invalid non-idle actions. assert bd_bad.final == pytest.approx(bd_ok.final - (0.25 - 0.15)) def test_scripted_episode_reward_direction_and_log(tmp_path, monkeypatch): logf = tmp_path / "rewards.jsonl" env = GhostexecEnvironment(SCENARIO) env.reset() monkeypatch.setattr(env, "_reward_log_path", logf) r_resolve = env.step( GhostexecAction( action_type="reschedule_meeting", meeting_id="m02", new_time="2026-04-21T18:00:00", ) ) r_bad = env.step(GhostexecAction(action_type="do_nothing")) assert r_resolve.metadata.get("step_ok") is True assert r_bad.metadata.get("step_ok") is True assert (r_resolve.reward or 0) > (r_bad.reward or 0) assert logf.is_file() lines = logf.read_text(encoding="utf-8").strip().splitlines() assert len(lines) >= 2 row = json.loads(lines[0]) assert "reward" in row and "episode_id" in row assert row.get("action_type") == "reschedule_meeting" assert "conflict_raw" in row and "step_ok" in row assert "shaping_total" in row and "shaping_to_base_ratio" in row assert "shaping_scaffold" in row assert row.get("reward_mode") == "full" def test_reward_mode_base_turns_off_shaping_terms(): env = GhostexecEnvironment(SCENARIO, reward_mode="base") env.reset() obs = env.step( GhostexecAction( action_type="reschedule_meeting", meeting_id="m02", new_time="2026-04-21T18:00:00", ) ) bd = (obs.metadata or {}).get("reward_breakdown") or {} assert float(bd.get("shaping_synergy") or 0.0) == pytest.approx(0.0) assert float(bd.get("shaping_tradeoff") or 0.0) == pytest.approx(0.0) assert float(bd.get("shaping_potential") or 0.0) == pytest.approx(0.0) def test_schema_drift_events_mutate_world(): env = GhostexecEnvironment(SCENARIO, schema_drift_events_path=DRIFT) env.reset() assert env.step(GhostexecAction(action_type="do_nothing")).metadata.get("step_ok") is True assert any("schema drift: shifted" in x for x in env.world.action_log) env.step(GhostexecAction(action_type="do_nothing")) sarah = env.get_contact("Sarah Chen") assert sarah is not None assert sarah.communication_preference == "text" env.step(GhostexecAction(action_type="do_nothing")) t02 = next(t for t in env.world.tasks if t.id == "t02") assert t02.deadline == "2026-04-21T07:00:00" assert "Marcus Webb" in env._reply_relationship_suppressed # noqa: SLF001 def test_rewards_differ_between_helpful_and_idle_steps(): env = GhostexecEnvironment(SCENARIO) env.reset() r_help = env.step( GhostexecAction( action_type="reschedule_meeting", meeting_id="m02", new_time="2026-04-21T18:00:00", ) ).reward r_idle = env.step(GhostexecAction(action_type="do_nothing")).reward assert r_help is not None and r_idle is not None assert r_help != r_idle # Whitelisted reschedules (known non-overlapping targets for phase2_core at 08:00). _SAFE_RESCHEDULES: list[tuple[str, str]] = [ ("m02", "2026-04-21T18:00:00"), ("m03", "2026-04-21T18:30:00"), ("m06", "2026-04-21T20:00:00"), ("m09", "2026-04-21T21:00:00"), ] def test_seeded_stochastic_policy_reward_spread(): random.seed(1234) K = 80 archive_ids = [f"e{i:02d}" for i in range(1, 31)] contacts = ["Jordan Lee", "Jamie Liu", "Marcus Webb", "Sarah Chen"] env = GhostexecEnvironment(SCENARIO) env.reset() rewards: list[float] = [] ai = ri = 0 for _ in range(K): u = random.random() if u < 0.32: obs = env.step(GhostexecAction(action_type="do_nothing")) elif u < 0.58: eid = archive_ids[ai % len(archive_ids)] ai += 1 obs = env.step(GhostexecAction(action_type="archive_email", email_id=eid)) elif u < 0.78: mid, nt = _SAFE_RESCHEDULES[ri % len(_SAFE_RESCHEDULES)] ri += 1 obs = env.step( GhostexecAction(action_type="reschedule_meeting", meeting_id=mid, new_time=nt) ) else: cname = contacts[ai % len(contacts)] ai += 1 obs = env.step( GhostexecAction( action_type="send_message", contact_name=cname, message_body="Quick sync on priorities.", ) ) assert obs.reward is not None rewards.append(float(obs.reward)) std = statistics.pstdev(rewards) sr = sorted(rewards) p5 = sr[max(0, int(0.05 * (len(sr) - 1)))] p95 = sr[min(len(sr) - 1, int(0.95 * (len(sr) - 1)))] assert std > 0.06 assert (p95 - p5) > 0.09 def test_good_script_beats_do_nothing_spam_on_mean_reward(): good = GhostexecEnvironment(SCENARIO) good.reset() good_actions = [ GhostexecAction( action_type="reschedule_meeting", meeting_id="m02", new_time="2026-04-21T18:00:00", ), GhostexecAction(action_type="reply_email", email_id="e01", message_body="Drafting revised figures now."), GhostexecAction(action_type="archive_email", email_id="e09"), GhostexecAction( action_type="send_message", contact_name="Jordan Lee", message_body="Standup notes attached.", ), GhostexecAction(action_type="complete_task", task_id="t06"), ] g_rewards = [good.step(a).reward for a in good_actions] g_mean = sum(float(x) for x in g_rewards) / len(g_rewards) bad = GhostexecEnvironment(SCENARIO) bad.reset() b_rewards = [bad.step(GhostexecAction(action_type="do_nothing")).reward for _ in range(5)] b_mean = sum(float(x) for x in b_rewards) / len(b_rewards) assert g_mean > b_mean + 0.2