File size: 8,277 Bytes
ff293b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# End-to-end stack test: FastAPI/OpenEnv HTTP + WebSocket, GhostExec env,
# and (optionally) GhostexecEnv client over ASGI TestClient.

from __future__ import annotations

import json
import os
import shutil
import subprocess
import sys
from pathlib import Path
import pytest
from fastapi.testclient import TestClient

from ghostexec.models import GhostexecAction
from ghostexec.server.app import app
from ghostexec.server.ghostexec_environment import GhostexecEnvironment

ROOT = Path(__file__).resolve().parents[1]
SCENARIO = ROOT / "scenarios" / "phase2_core.json"
MONDAY = ROOT / "scenarios" / "monday_morning.json"


def _http_paths(client: TestClient) -> set[str]:
    paths: set[str] = set()
    for r in app.routes:
        p = getattr(r, "path", None)
        if isinstance(p, str) and p:
            paths.add(p)
    return paths


def test_server_app_import_matches_uvicorn_server_string() -> None:
    """`uvicorn server.app:app` loads `server.app` with cwd on path (no `ghostexec.` prefix)."""
    rc = subprocess.run(
        [sys.executable, "-c", "import server.app; assert server.app.app is not None"],
        cwd=str(ROOT),
        check=False,
    )
    assert rc.returncode == 0, "import server.app must work from ghostexec repo root"


def test_openapi_docs_and_schema_discovery() -> None:
    with TestClient(app, raise_server_exceptions=True) as client:
        r = client.get("/openapi.json")
        assert r.status_code == 200
        spec = r.json()
        assert spec.get("openapi")
        assert "paths" in spec and spec["paths"]

        for path in ("/docs", "/redoc"):
            resp = client.get(path)
            assert resp.status_code == 200
            assert len(resp.text) > 100


def test_openapi_examples_match_ghostexec_observation_shape() -> None:
    spec = app.openapi()
    for path in ("/reset", "/step"):
        ex = spec["paths"][path]["post"]["responses"]["200"]["content"]["application/json"]["example"]
        obs = ex["observation"]
        assert "echoed_message" in obs and "message_length" in obs
        assert "status" not in obs and "data" not in obs
        assert "reward" in ex and "done" in ex


def test_openapi_info_documents_http_vs_websocket_episode() -> None:
    """Runtime-visible API docs: HTTP reset/step are not one persistent episode; /ws is."""
    spec = app.openapi()
    desc = spec.get("info", {}).get("description") or ""
    assert "Ghostexec / OpenEnv HTTP" in desc
    assert "/ws" in desc and "WebSocket" in desc


def test_all_registered_get_post_routes_smoke() -> None:
    """Smoke every stable OpenEnv HTTP route (simulation mode, no Gradio /web)."""
    with TestClient(app, raise_server_exceptions=True) as client:
        paths = _http_paths(client)
        assert "/health" in paths
        assert "/metadata" in paths
        assert "/schema" in paths
        assert "/state" in paths
        assert "/reset" in paths
        assert "/step" in paths
        assert "/ws" in paths
        assert "/mcp" in paths

        h = client.get("/health")
        assert h.status_code == 200
        assert h.json().get("status") == "healthy"

        meta = client.get("/metadata")
        assert meta.status_code == 200
        body = meta.json()
        assert body.get("name") in ("ghostexec", "GhostexecEnvironment")
        assert "description" in body

        st = client.get("/state")
        assert st.status_code == 200
        assert "step_count" in st.json()

        sch = client.get("/schema")
        assert sch.status_code == 200
        sj = sch.json()
        assert "action" in sj and "observation" in sj and "state" in sj
        assert sj["action"].get("title") or sj["action"].get("properties")


def test_http_reset_and_step_return_valid_payloads() -> None:
    """
    Stateless HTTP: each request builds a fresh env (OpenEnv design).
    POST /step on a new instance loads the scenario then applies the action (primed reset).
    """
    with TestClient(app, raise_server_exceptions=True) as client:
        reset = client.post("/reset", json={})
        assert reset.status_code == 200
        rj = reset.json()
        assert "observation" in rj
        obs = rj["observation"]
        assert "echoed_message" in obs
        assert "GHOSTEXEC BRIEFING" in (obs.get("echoed_message") or "")

        step = client.post(
            "/step",
            json={
                "action": {
                    "action_type": "reply_email",
                    "email_id": "e05",
                    "message_body": "On it.",
                }
            },
        )
        assert step.status_code == 200
        sj = step.json()
        assert "observation" in sj
        assert sj.get("reward") is not None or sj["observation"].get("reward") is not None


def test_http_step_invalid_action_422() -> None:
    with TestClient(app, raise_server_exceptions=True) as client:
        bad = client.post("/step", json={"action": "not-an-object"})
        assert bad.status_code == 422


def test_mcp_jsonrpc_tools_list() -> None:
    with TestClient(app, raise_server_exceptions=True) as client:
        payload = {"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}
        r = client.post("/mcp", json=payload)
        assert r.status_code == 200
        data = r.json()
        assert "result" in data or "error" in data


def test_websocket_full_episode_reset_step_state_close() -> None:
    with TestClient(app, raise_server_exceptions=True) as client:
        with client.websocket_connect("/ws") as ws:
            ws.send_json({"type": "reset", "data": {}})
            msg = ws.receive_json()
            assert msg.get("type") == "observation"
            data = msg.get("data") or {}
            assert "observation" in data
            inner = data["observation"]
            assert "echoed_message" in inner
            assert "GHOSTEXEC BRIEFING" in inner.get("echoed_message", "")

            ws.send_json(
                {
                    "type": "step",
                    "data": {
                        "action_type": "reschedule_meeting",
                        "meeting_id": "m02",
                        "new_time": "2026-04-21T18:00:00",
                    },
                }
            )
            msg2 = ws.receive_json()
            assert msg2.get("type") == "observation"
            d2 = msg2.get("data") or {}
            assert d2.get("reward") is not None

            ws.send_json({"type": "state"})
            msg3 = ws.receive_json()
            assert msg3.get("type") == "state", msg3
            st = msg3.get("data") or {}
            assert st.get("step_count", 0) >= 1

            ws.send_json({"type": "close", "data": {}})


def test_inprocess_env_matches_ws_briefing_shape() -> None:
    env = GhostexecEnvironment(SCENARIO)
    obs = env.reset()
    assert "BRIEFING" in obs.echoed_message
    o2 = env.step(
        GhostexecAction(
            action_type="reschedule_meeting",
            meeting_id="m02",
            new_time="2026-04-21T18:00:00",
        )
    )
    assert o2.reward is not None
    assert o2.metadata.get("step_ok") is True


def test_monday_morning_scenario_reward_signal() -> None:
    assert MONDAY.is_file()
    env = GhostexecEnvironment(MONDAY)
    env.reset()
    r = env.step(GhostexecAction(action_type="do_nothing")).reward
    assert isinstance(r, float)


def test_ghostexec_env_client_against_live_url_if_set() -> None:
    """
    GhostexecEnv opens a real TCP WebSocket; Starlette TestClient uses the
    non-resolvable host ``testserver`` on some platforms, so this only runs when
    ``GHOSTEXEC_WS_BASE_URL`` points at a live server (e.g. local uvicorn).
    """
    base = os.environ.get("GHOSTEXEC_WS_BASE_URL", "").strip().rstrip("/")
    if not base:
        pytest.skip("Set GHOSTEXEC_WS_BASE_URL (e.g. http://127.0.0.1:8000) to test GhostexecEnv client.")

    from ghostexec.client import GhostexecEnv

    sync_client = GhostexecEnv(base_url=base).sync()
    with sync_client:
        res = sync_client.reset()
        assert res.observation.echoed_message
        res2 = sync_client.step(GhostexecAction(action_type="do_nothing"))
        assert res2.observation.echoed_message