Spaces:
Running
Running
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """Ghostexec Environment Client.""" | |
| from typing import Any, Dict | |
| try: | |
| # OpenEnv newer layout. | |
| from openenv.client import EnvClient | |
| except ImportError: | |
| try: | |
| # Some builds expose the class one level deeper. | |
| from openenv.client.client import EnvClient | |
| except ImportError: | |
| # Backward compatibility with older OpenEnv versions. | |
| from openenv.core import EnvClient | |
| from openenv.core.client_types import StepResult | |
| from openenv.core.env_server.types import State | |
| from .models import GhostexecAction, GhostexecObservation | |
| class GhostexecEnv( | |
| EnvClient[GhostexecAction, GhostexecObservation, State] | |
| ): | |
| """ | |
| Client for the Ghostexec Environment. | |
| This client maintains a persistent WebSocket connection to the environment server, | |
| enabling efficient multi-step interactions with lower latency. | |
| Each client instance has its own dedicated environment session on the server. | |
| """ | |
| def _step_payload(self, action: GhostexecAction) -> Dict[str, Any]: | |
| payload = action.model_dump(mode="json") | |
| if not payload.get("metadata"): | |
| payload.pop("metadata", None) | |
| return payload | |
| def _parse_result(self, payload: Dict) -> StepResult[GhostexecObservation]: | |
| obs_data = payload.get("observation", {}) | |
| observation = GhostexecObservation( | |
| echoed_message=obs_data.get("echoed_message", ""), | |
| message_length=obs_data.get("message_length", 0), | |
| done=payload.get("done", False), | |
| reward=payload.get("reward"), | |
| metadata=obs_data.get("metadata", {}), | |
| ) | |
| return StepResult( | |
| observation=observation, | |
| reward=payload.get("reward"), | |
| done=payload.get("done", False), | |
| ) | |
| def _parse_state(self, payload: Dict) -> State: | |
| return State( | |
| episode_id=payload.get("episode_id"), | |
| step_count=payload.get("step_count", 0), | |
| ) | |