Spaces:
Sleeping
Sleeping
File size: 12,227 Bytes
16bd852 a49c996 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 | ---
title: Cloud Queue Env Environment Server
emoji: 🖨️
colorFrom: pink
colorTo: blue
sdk: docker
pinned: false
app_port: 8000
base_path: /web
tags:
- openenv
---
# Cloud Queue Env Environment
A real-world queue-operations benchmark for OpenEnv.
This environment simulates service operations decisions humans make in production systems:
- Admission and rejection under load
- Queue routing and dispatching
- Priority handling for urgent traffic
- Capacity scaling under infrastructure cost constraints
The benchmark includes three deterministic tasks with partial graders in [0, 1]:
- easy: single-queue stability
- medium: multi-server priority routing
- hard: two-stage queue network with scaling
## Quick Start
Use the CloudQueueEnv client to connect to a running server or container:
```python
from cloud_queue_env import CloudQueueAction, CloudQueueEnv
try:
env = CloudQueueEnv.from_docker_image("cloud_queue_env-env:latest")
# Configure task + seed, then reset into that deterministic episode
env.reset()
env.step(CloudQueueAction(action_type="configure_task", task_id="easy", seed=11))
result = env.reset()
for _ in range(20):
obs = result.observation
if obs.incoming_job_present:
action = CloudQueueAction(action_type="admit", target_queue=0)
else:
action = CloudQueueAction(action_type="dispatch", target_queue=0)
result = env.step(action)
print(
f"step={obs.sim_time} queues={obs.queue_lengths} "
f"reward={result.reward:.3f} done={result.done}"
)
if result.done:
break
final_score = result.observation.metadata.get("episode_score", 0.0)
print(f"episode_score={final_score:.3f}")
finally:
env.close()
```
The CloudQueueEnv.from_docker_image() method handles:
- Starting the Docker container
- Waiting for the server to be ready
- Connecting to the environment
- Container cleanup when you call `close()`
## Building the Docker Image
Before using the environment, you need to build the Docker image:
```bash
# From project root
docker build -t cloud_queue_env-env:latest -f server/Dockerfile .
```
## Deploying to Hugging Face Spaces
You can easily deploy your OpenEnv environment to Hugging Face Spaces using the `openenv push` command:
```bash
# From the environment directory (where openenv.yaml is located)
openenv push
# Or specify options
openenv push --namespace my-org --private
```
The `openenv push` command will:
1. Validate that the directory is an OpenEnv environment (checks for `openenv.yaml`)
2. Prepare a custom build for Hugging Face Docker space (enables web interface)
3. Upload to Hugging Face (ensuring you're logged in)
### Prerequisites
- Authenticate with Hugging Face: The command will prompt for login if not already authenticated
### Options
- `--directory`, `-d`: Directory containing the OpenEnv environment (defaults to current directory)
- `--repo-id`, `-r`: Repository ID in format 'username/repo-name' (defaults to 'username/env-name' from openenv.yaml)
- `--base-image`, `-b`: Base Docker image to use (overrides Dockerfile FROM)
- `--private`: Deploy the space as private (default: public)
### Examples
```bash
# Push to your personal namespace (defaults to username/env-name from openenv.yaml)
openenv push
# Push to a specific repository
openenv push --repo-id my-org/my-env
# Push with a custom base image
openenv push --base-image ghcr.io/meta-pytorch/openenv-base:latest
# Push as a private space
openenv push --private
# Combine options
openenv push --repo-id my-org/my-env --base-image custom-base:latest --private
```
After deployment, your space will be available at:
`https://huggingface.co/spaces/<repo-id>`
The deployed space includes:
- **Web Interface** at `/web` - Interactive UI for exploring the environment
- **API Documentation** at `/docs` - Full OpenAPI/Swagger interface
- **Health Check** at `/health` - Container health monitoring
- **WebSocket** at `/ws` - Persistent session endpoint for low-latency interactions
## Environment Details
### Action
CloudQueueAction fields:
- action_type: one of configure_task, admit, reject, route, dispatch, scale, reprioritize, noop
- target_queue: queue index for route/dispatch/admit
- target_server: optional server index
- scale_delta: server delta for scale action
- new_priority: new priority value for reprioritize
- task_id: easy/medium/hard (used with configure_task)
- seed: deterministic task seed (used with configure_task)
### Observation
CloudQueueObservation includes:
- task_id, sim_time, horizon
- queue_lengths, queue_wait_ema
- server_busy, server_remaining_service, utilization
- incoming_job_present, incoming_job_size, incoming_job_priority, incoming_job_deadline, incoming_job_type
- sla_violation_rate, abandonment_rate, throughput_recent, energy_cost_rate
- level, optional_history, action_mask
- reward, done, metadata
### Reward
Per-step reward is dense and multi-objective:
$$
r_t = 0.35R_{wait} + 0.20R_{throughput} + 0.20R_{sla} + 0.15R_{cost} + 0.05R_{fair} + 0.05R_{safe}
$$
Properties:
- Partial progress signal over the full trajectory
- Penalties for invalid actions and unsafe/noop behavior under congestion
- Bounded reward values for stability
### Deterministic Graders
Each task returns a deterministic episode_score in [0, 1], stored in observation metadata.
- easy score uses avg wait, throughput, rejection rate, and SLA violations
- medium score uses urgent/normal p95 waits, urgent SLA, throughput, and action cost
- hard score uses end-to-end p95, abandonment, SLA, throughput, infra cost, and fairness gap
If invalid action rate exceeds threshold, score is capped.
## Tasks
1. easy (single queue stability)
- one queue, one server
- objective: low wait with acceptable throughput and low rejection
2. medium (priority routing)
- two queues and multiple servers
- objective: protect urgent traffic while maintaining total performance
3. hard (queue network + scaling)
- two-stage queue network with bursty arrivals and heavy-tailed service times
- objective: balance latency/SLA/abandonment against infra cost and fairness
## Baseline Inference
Run baseline inference across easy/medium/hard:
```bash
API_KEY=your_provider_key python inference.py
```
Optional variables:
- API_KEY (OpenAI-compatible provider key for model calls)
- API_BASE_URL (default: https://router.huggingface.co/v1)
- MODEL_NAME (default: Qwen/Qwen2.5-72B-Instruct)
- BASE_URL (if using deployed space)
- IMAGE_NAME (if launching local docker image)
- USE_HEURISTIC_ONLY (true/false)
- DISABLE_MODEL_ON_FIRST_ERROR (true/false)
- MAX_STEPS_OVERRIDE (integer quick-test cap)
- TASK_SEEDS_JSON (JSON map for multi-seed runs)
- ACTION_TRACE_FILE (JSON replay file keyed by task:seed)
- REPORT_JSON_PATH (write seed/task report JSON)
- REPORT_CSV_PATH (write per-seed report CSV)
Output includes required line types:
- [START]
- [STEP]
- [END]
And final aggregate summary:
- [SUMMARY] easy=<...> medium=<...> hard=<...> final=<...>
V2 reporting also includes:
- [REPORT_SEED] task=<task_id> seed=<seed> score=<score> steps=<n> trace=<digest>
- [REPORT] task=<task_id> seeds=<n> mean=<score> std=<score> ci95=<score>
## Baseline Scores
Current reproducible heuristic-only baseline (deployed runtime, single seed per task):
| Task | Seed Count | Mean Score |
|---|---:|---:|
| easy | 1 | 0.000 |
| medium | 1 | 0.000 |
| hard | 1 | 0.000 |
| final (mean of task means) | - | 0.000 |
Notes:
- These values are from heuristic fallback mode and are expected to be low.
- Model-based scores depend on provider/model availability and should be recorded from a successful funded run.
- Keep this table updated with your latest official benchmark run before final submission.
## Advanced Usage
### Connecting to an Existing Server
If you already have a Cloud Queue Env environment server running, you can connect directly:
```python
from cloud_queue_env import CloudQueueAction, CloudQueueEnv
# Connect to existing server
cloud_queue_envenv = CloudQueueEnv(base_url="<ENV_HTTP_URL_HERE>")
# Use as normal
result = cloud_queue_envenv.reset()
result = cloud_queue_envenv.step(CloudQueueAction(action_type="dispatch", target_queue=0))
```
Note: When connecting to an existing server, `cloud_queue_envenv.close()` will NOT stop the server.
### Using the Context Manager
The client supports context manager usage for automatic connection management:
```python
from cloud_queue_env import CloudQueueAction, CloudQueueEnv
# Connect with context manager (auto-connects and closes)
with CloudQueueEnv(base_url="http://localhost:8000") as env:
result = env.reset()
print(f"Initial queues: {result.observation.queue_lengths}")
# Multiple steps with low latency
for _ in range(10):
result = env.step(CloudQueueAction(action_type="noop"))
print(f"Reward: {result.reward:.3f}")
```
The client uses WebSocket connections for:
- **Lower latency**: No HTTP connection overhead per request
- **Persistent session**: Server maintains your environment state
- **Efficient for episodes**: Better for many sequential steps
### Concurrent WebSocket Sessions
The server supports multiple concurrent WebSocket connections. To enable this,
modify `server/app.py` to use factory mode:
```python
# In server/app.py - use factory mode for concurrent sessions
app = create_app(
CloudQueueEnvironment, # Pass class, not instance
CloudQueueAction,
CloudQueueObservation,
max_concurrent_envs=4, # Allow 4 concurrent sessions
)
```
Then multiple clients can connect simultaneously:
```python
from cloud_queue_env import CloudQueueAction, CloudQueueEnv
from concurrent.futures import ThreadPoolExecutor
def run_episode(client_id: int):
with CloudQueueEnv(base_url="http://localhost:8000") as env:
result = env.reset()
for i in range(10):
result = env.step(CloudQueueAction(action_type="dispatch", target_queue=i % 2))
return client_id, result.observation.queue_lengths
# Run 4 episodes concurrently
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(run_episode, range(4)))
```
## Development & Testing
### Direct Environment Testing
Core files:
- models: typed action/observation schema
- server environment: queue simulation, reward shaping, grading
- inference script: task sweep and benchmark logging
### Running Locally
Run the server locally for development:
```bash
uvicorn server.app:app --reload
```
## Project Structure
```
cloud_queue_env/
├── .dockerignore
├── __init__.py
├── README.md
├── openenv.yaml
├── pyproject.toml
├── client.py
├── models.py
├── inference.py
├── IMPLEMENTATION_ROADMAP.md
└── server/
├── __init__.py
├── cloud_queue_env_environment.py
├── app.py
└── Dockerfile
```
TASK A — Easy (150 steps)
Scenario: 1 queue, 1 server (M/M/1), only admit/reject/dispatch
Objective: Keep wait low while processing throughput
Grader: score = 0.40×(1-avg_wait/6) + 0.30×(throughput/70)
+ 0.15×(1-rejection_rate/0.3) + 0.15×(1-sla_breaches/0.3)
TASK B — Medium (200 steps)
Scenario: 2 queues, 3 servers, 28% urgent jobs → route + reprioritize
Objective: Protect urgent SLA while not starving normal jobs
Grader: score = 0.35×urgent_wait_score + 0.25×urgent_sla_score
+ 0.15×normal_wait_score + 0.15×throughput + 0.10×cost
TASK C — Hard (250 steps)
Scenario: 2-stage pipeline, 1–6 servers, heavy-tail service, abandonments
Objective: Maximize quality under budget with fairness
Grader: score = 0.25×e2e_latency + 0.20×abandonment + 0.20×sla
+ 0.15×throughput + 0.10×cost + 0.10×fairness |