Spaces:
Paused
Paused
| """PayOps Environment Client.""" | |
| from typing import Dict, Any | |
| from openenv.core import EnvClient | |
| from openenv.core.client_types import StepResult | |
| from openenv.core.env_server.types import State | |
| from payops_env.models import PayOpsAction, PayOpsObservation | |
| class PayOpsEnv(EnvClient[PayOpsAction, PayOpsObservation]): | |
| """ | |
| Client for the PayOps Payment Operations Incident Response Environment. | |
| Maintains a persistent WebSocket connection to the environment server, | |
| enabling efficient multi-step interactions. | |
| Example:: | |
| with PayOpsEnv(base_url="http://localhost:7860") as client: | |
| result = client.reset() | |
| while not result.done: | |
| action = PayOpsAction( | |
| action_type="approve", | |
| transaction_id=result.observation.transaction_id, | |
| ) | |
| result = client.step(action) | |
| print("Score:", result.observation.cumulative_reward) | |
| """ | |
| def _step_payload(self, action: PayOpsAction) -> Dict[str, Any]: | |
| """Convert PayOpsAction to the official openenv wire format.""" | |
| return { | |
| "action": action.model_dump(exclude_none=True), | |
| } | |
| def _parse_result(self, payload: Dict[str, Any]) -> StepResult[PayOpsObservation]: | |
| """Parse server response into StepResult[PayOpsObservation].""" | |
| obs_data = payload.get("observation", payload) # support wrapped or flat | |
| observation = PayOpsObservation(**obs_data) | |
| return StepResult( | |
| observation=observation, | |
| reward=payload.get("reward", obs_data.get("reward")), | |
| done=payload.get("done", obs_data.get("done", False)), | |
| ) | |
| def _parse_state(self, payload: Dict[str, Any]) -> State: | |
| """Parse server state response into State object.""" | |
| return State( | |
| episode_id=payload.get("episode_id"), | |
| step_count=payload.get("step_count", 0), | |
| ) | |