File size: 7,118 Bytes
ff293b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d815df7
 
ff293b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
98de6fc
 
 
 
 
 
 
 
 
 
 
 
 
 
60fe7cd
98de6fc
 
60fe7cd
98de6fc
 
 
 
 
 
 
 
d815df7
ff293b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

"""
FastAPI application for the Ghostexec Environment.

This module creates an HTTP server that exposes the GhostexecEnvironment
over HTTP and WebSocket endpoints, compatible with EnvClient.

Endpoints:
    - POST /reset: Reset the environment
    - POST /step: Execute an action
    - GET /state: Get current environment state
    - GET /schema: Get action/observation schemas
    - WS /ws: WebSocket endpoint for persistent sessions

Usage:
    # Development (with auto-reload):
    uvicorn server.app:app --reload --host 0.0.0.0 --port 8000

    # Production:
    uvicorn server.app:app --host 0.0.0.0 --port 8000 --workers 4

    # Or run directly:
    python -m server.app
"""

import os

try:
    import openenv.core.env_server.http_server as _openenv_http
except Exception as e:  # pragma: no cover
    raise ImportError(
        "openenv is required for the web interface. Install dependencies with '\n    uv sync\n'"
    ) from e

# OpenEnv's serialize_observation drops `metadata` from the JSON body; Ghostexec
# trainers and live tests rely on step_ok / ids inside observation.metadata.
_orig_serialize_observation = _openenv_http.serialize_observation


def _ghostexec_serialize_observation(observation):  # type: ignore[no-untyped-def]
    payload = _orig_serialize_observation(observation)
    inner = payload.get("observation")
    if isinstance(inner, dict):
        meta = getattr(observation, "metadata", None) or {}
        inner["metadata"] = _openenv_http._make_json_serializable(meta)
    return payload


_openenv_http.serialize_observation = _ghostexec_serialize_observation

from openenv.core.env_server.http_server import create_app  # noqa: E402
import openenv.core.env_server.web_interface as _openenv_web  # noqa: E402

# Gradio renders readme_content as Markdown; embedding README.md shows YAML frontmatter
# as plain text. Point users at the Hub-rendered README instead.
_orig_load_environment_metadata = _openenv_web.load_environment_metadata


def _ghostexec_load_environment_metadata(env, env_name=None):  # type: ignore[no-untyped-def]
    meta = _orig_load_environment_metadata(env, env_name)
    space = (os.environ.get("SPACE_ID") or "").strip()
    if not space or space.startswith("<"):
        space = "modelbuilderhq/ghostexec"
    readme_url = f"https://huggingface.co/spaces/{space}/blob/main/README.md"
    space_url = f"https://huggingface.co/spaces/{space}"
    demo_video = "https://youtu.be/g4IFZMEzfO8"
    meta.readme_content = (
        "### README\n\n"
        f"**Demo (&lt;2 min):** [**YouTube**]({demo_video})\n\n"
        f"Formatted documentation (Space card + full markdown): "
        f"[**README.md on Hugging Face**]({readme_url})\n\n"
        f"Space: [**{space}**]({space_url})"
    )
    return meta


_openenv_web.load_environment_metadata = _ghostexec_load_environment_metadata

try:
    # Editable / normal install (package name `ghostexec`).
    from ghostexec.models import GhostexecAction, GhostexecObservation
    from ghostexec.server.ghostexec_environment import GhostexecEnvironment
except ImportError:
    # Plain `uvicorn server.app:app` from repo root: top-level `models` + `server` package.
    from models import GhostexecAction, GhostexecObservation
    from server.ghostexec_environment import GhostexecEnvironment


# Create the app with web interface and README integration
app = create_app(
    GhostexecEnvironment,
    GhostexecAction,
    GhostexecObservation,
    env_name="ghostexec",
    max_concurrent_envs=1,  # increase this number to allow more concurrent WebSocket sessions
)


def _patch_openapi_ghostexec_examples(schema: dict) -> None:
    """Replace OpenEnv's generic observation examples with GhostExec's plain-text briefing shape."""
    briefing = (
        "=== GHOSTEXEC BRIEFING — Tue 21 Apr 2026 08:00 ===\n\n"
        "UNREAD EMAILS (…): …\n\n"
        "CALENDAR CONFLICTS IN NEXT 4 HOURS: …\n\n"
        "CONTACTS TO WATCH: …\n\n"
        "OVERDUE OR DUE-SOON TASKS: …\n\n"
        "EXEC STRESS LEVEL: 52/100\n"
        "STEPS REMAINING: 48"
    )
    obs = {"echoed_message": briefing, "message_length": len(briefing)}
    reset_ex = {"observation": obs, "reward": 0.0, "done": False}
    step_ex = {"observation": obs, "reward": -0.42, "done": False}
    for path, example in (("/reset", reset_ex), ("/step", step_ex)):
        try:
            cell = (
                schema["paths"][path]["post"]["responses"]["200"]["content"]["application/json"]
            )
            if isinstance(cell, dict):
                cell["example"] = example
        except KeyError:
            continue


_OPENAPI_HTTP_EPISODE_SENTINEL = "Ghostexec / OpenEnv HTTP"

_OPENAPI_HTTP_EPISODE_NOTE = f"""
---
## {_OPENAPI_HTTP_EPISODE_SENTINEL}

Each `POST /reset` and `POST /step` may run on a **new** environment instance, so
separate HTTP requests do **not** share one in-memory episode across calls. A lone
`POST /step` still applies your action once (after internal scenario load). For
**many steps on the same episode**, use **WebSocket `/ws`**: open a connection,
reset once, then send many step messages on that same socket. See **ghostexec/README.md**
for details.
"""


def _patch_openapi_ghostexec_http_note(schema: dict) -> None:
    """Document HTTP statelessness vs /ws so Swagger and OpenAPI clients see it."""
    try:
        info = schema.get("info")
        if not isinstance(info, dict):
            return
        desc = info.get("description") or ""
        if _OPENAPI_HTTP_EPISODE_SENTINEL in desc:
            return
        info["description"] = desc + _OPENAPI_HTTP_EPISODE_NOTE
    except (TypeError, KeyError):
        return


_fastapi_openapi = type(app).openapi.__get__(app, type(app))


def _ghostexec_openapi() -> dict:
    if app.openapi_schema is None:
        _fastapi_openapi()
        _patch_openapi_ghostexec_examples(app.openapi_schema)
        _patch_openapi_ghostexec_http_note(app.openapi_schema)
    return app.openapi_schema  # type: ignore[return-value]


app.openapi = _ghostexec_openapi  # type: ignore[method-assign]


def main() -> None:
    """
    Entry point for direct execution via uv run or python -m.

    This function enables running the server without Docker:
        uv run --project . server
        uv run --project . server --port 8001
        python -m ghostexec.server.app

    For production deployments, consider using uvicorn directly with
    multiple workers:
        uvicorn ghostexec.server.app:app --workers 4
    """
    import argparse

    import uvicorn

    parser = argparse.ArgumentParser(description="GhostExec OpenEnv HTTP server")
    parser.add_argument("--host", type=str, default="0.0.0.0", help="Bind address")
    parser.add_argument("--port", type=int, default=8000, help="Listen port")
    args = parser.parse_args()
    uvicorn.run(app, host=args.host, port=args.port)


if __name__ == '__main__':
    main()