cloud_queue_env / README.md
Mrkumar007's picture
Upload folder using huggingface_hub
16bd852 verified
---
title: Cloud Queue Env Environment Server
emoji: 🖨️
colorFrom: pink
colorTo: blue
sdk: docker
pinned: false
app_port: 8000
base_path: /web
tags:
- openenv
---
# Cloud Queue Env Environment
A real-world queue-operations benchmark for OpenEnv.
This environment simulates service operations decisions humans make in production systems:
- Admission and rejection under load
- Queue routing and dispatching
- Priority handling for urgent traffic
- Capacity scaling under infrastructure cost constraints
The benchmark includes three deterministic tasks with partial graders in [0, 1]:
- easy: single-queue stability
- medium: multi-server priority routing
- hard: two-stage queue network with scaling
## Quick Start
Use the CloudQueueEnv client to connect to a running server or container:
```python
from cloud_queue_env import CloudQueueAction, CloudQueueEnv
try:
env = CloudQueueEnv.from_docker_image("cloud_queue_env-env:latest")
# Configure task + seed, then reset into that deterministic episode
env.reset()
env.step(CloudQueueAction(action_type="configure_task", task_id="easy", seed=11))
result = env.reset()
for _ in range(20):
obs = result.observation
if obs.incoming_job_present:
action = CloudQueueAction(action_type="admit", target_queue=0)
else:
action = CloudQueueAction(action_type="dispatch", target_queue=0)
result = env.step(action)
print(
f"step={obs.sim_time} queues={obs.queue_lengths} "
f"reward={result.reward:.3f} done={result.done}"
)
if result.done:
break
final_score = result.observation.metadata.get("episode_score", 0.0)
print(f"episode_score={final_score:.3f}")
finally:
env.close()
```
The CloudQueueEnv.from_docker_image() method handles:
- Starting the Docker container
- Waiting for the server to be ready
- Connecting to the environment
- Container cleanup when you call `close()`
## Building the Docker Image
Before using the environment, you need to build the Docker image:
```bash
# From project root
docker build -t cloud_queue_env-env:latest -f server/Dockerfile .
```
## Deploying to Hugging Face Spaces
You can easily deploy your OpenEnv environment to Hugging Face Spaces using the `openenv push` command:
```bash
# From the environment directory (where openenv.yaml is located)
openenv push
# Or specify options
openenv push --namespace my-org --private
```
The `openenv push` command will:
1. Validate that the directory is an OpenEnv environment (checks for `openenv.yaml`)
2. Prepare a custom build for Hugging Face Docker space (enables web interface)
3. Upload to Hugging Face (ensuring you're logged in)
### Prerequisites
- Authenticate with Hugging Face: The command will prompt for login if not already authenticated
### Options
- `--directory`, `-d`: Directory containing the OpenEnv environment (defaults to current directory)
- `--repo-id`, `-r`: Repository ID in format 'username/repo-name' (defaults to 'username/env-name' from openenv.yaml)
- `--base-image`, `-b`: Base Docker image to use (overrides Dockerfile FROM)
- `--private`: Deploy the space as private (default: public)
### Examples
```bash
# Push to your personal namespace (defaults to username/env-name from openenv.yaml)
openenv push
# Push to a specific repository
openenv push --repo-id my-org/my-env
# Push with a custom base image
openenv push --base-image ghcr.io/meta-pytorch/openenv-base:latest
# Push as a private space
openenv push --private
# Combine options
openenv push --repo-id my-org/my-env --base-image custom-base:latest --private
```
After deployment, your space will be available at:
`https://huggingface.co/spaces/<repo-id>`
The deployed space includes:
- **Web Interface** at `/web` - Interactive UI for exploring the environment
- **API Documentation** at `/docs` - Full OpenAPI/Swagger interface
- **Health Check** at `/health` - Container health monitoring
- **WebSocket** at `/ws` - Persistent session endpoint for low-latency interactions
## Environment Details
### Action
CloudQueueAction fields:
- action_type: one of configure_task, admit, reject, route, dispatch, scale, reprioritize, noop
- target_queue: queue index for route/dispatch/admit
- target_server: optional server index
- scale_delta: server delta for scale action
- new_priority: new priority value for reprioritize
- task_id: easy/medium/hard (used with configure_task)
- seed: deterministic task seed (used with configure_task)
### Observation
CloudQueueObservation includes:
- task_id, sim_time, horizon
- queue_lengths, queue_wait_ema
- server_busy, server_remaining_service, utilization
- incoming_job_present, incoming_job_size, incoming_job_priority, incoming_job_deadline, incoming_job_type
- sla_violation_rate, abandonment_rate, throughput_recent, energy_cost_rate
- level, optional_history, action_mask
- reward, done, metadata
### Reward
Per-step reward is dense and multi-objective:
$$
r_t = 0.35R_{wait} + 0.20R_{throughput} + 0.20R_{sla} + 0.15R_{cost} + 0.05R_{fair} + 0.05R_{safe}
$$
Properties:
- Partial progress signal over the full trajectory
- Penalties for invalid actions and unsafe/noop behavior under congestion
- Bounded reward values for stability
### Deterministic Graders
Each task returns a deterministic episode_score in [0, 1], stored in observation metadata.
- easy score uses avg wait, throughput, rejection rate, and SLA violations
- medium score uses urgent/normal p95 waits, urgent SLA, throughput, and action cost
- hard score uses end-to-end p95, abandonment, SLA, throughput, infra cost, and fairness gap
If invalid action rate exceeds threshold, score is capped.
## Tasks
1. easy (single queue stability)
- one queue, one server
- objective: low wait with acceptable throughput and low rejection
2. medium (priority routing)
- two queues and multiple servers
- objective: protect urgent traffic while maintaining total performance
3. hard (queue network + scaling)
- two-stage queue network with bursty arrivals and heavy-tailed service times
- objective: balance latency/SLA/abandonment against infra cost and fairness
## Baseline Inference
Run baseline inference across easy/medium/hard:
```bash
API_KEY=your_provider_key python inference.py
```
Optional variables:
- API_KEY (OpenAI-compatible provider key for model calls)
- API_BASE_URL (default: https://router.huggingface.co/v1)
- MODEL_NAME (default: Qwen/Qwen2.5-72B-Instruct)
- BASE_URL (if using deployed space)
- IMAGE_NAME (if launching local docker image)
- USE_HEURISTIC_ONLY (true/false)
- DISABLE_MODEL_ON_FIRST_ERROR (true/false)
- MAX_STEPS_OVERRIDE (integer quick-test cap)
- TASK_SEEDS_JSON (JSON map for multi-seed runs)
- ACTION_TRACE_FILE (JSON replay file keyed by task:seed)
- REPORT_JSON_PATH (write seed/task report JSON)
- REPORT_CSV_PATH (write per-seed report CSV)
Output includes required line types:
- [START]
- [STEP]
- [END]
And final aggregate summary:
- [SUMMARY] easy=<...> medium=<...> hard=<...> final=<...>
V2 reporting also includes:
- [REPORT_SEED] task=<task_id> seed=<seed> score=<score> steps=<n> trace=<digest>
- [REPORT] task=<task_id> seeds=<n> mean=<score> std=<score> ci95=<score>
## Baseline Scores
Current reproducible heuristic-only baseline (deployed runtime, single seed per task):
| Task | Seed Count | Mean Score |
|---|---:|---:|
| easy | 1 | 0.000 |
| medium | 1 | 0.000 |
| hard | 1 | 0.000 |
| final (mean of task means) | - | 0.000 |
Notes:
- These values are from heuristic fallback mode and are expected to be low.
- Model-based scores depend on provider/model availability and should be recorded from a successful funded run.
- Keep this table updated with your latest official benchmark run before final submission.
## Advanced Usage
### Connecting to an Existing Server
If you already have a Cloud Queue Env environment server running, you can connect directly:
```python
from cloud_queue_env import CloudQueueAction, CloudQueueEnv
# Connect to existing server
cloud_queue_envenv = CloudQueueEnv(base_url="<ENV_HTTP_URL_HERE>")
# Use as normal
result = cloud_queue_envenv.reset()
result = cloud_queue_envenv.step(CloudQueueAction(action_type="dispatch", target_queue=0))
```
Note: When connecting to an existing server, `cloud_queue_envenv.close()` will NOT stop the server.
### Using the Context Manager
The client supports context manager usage for automatic connection management:
```python
from cloud_queue_env import CloudQueueAction, CloudQueueEnv
# Connect with context manager (auto-connects and closes)
with CloudQueueEnv(base_url="http://localhost:8000") as env:
result = env.reset()
print(f"Initial queues: {result.observation.queue_lengths}")
# Multiple steps with low latency
for _ in range(10):
result = env.step(CloudQueueAction(action_type="noop"))
print(f"Reward: {result.reward:.3f}")
```
The client uses WebSocket connections for:
- **Lower latency**: No HTTP connection overhead per request
- **Persistent session**: Server maintains your environment state
- **Efficient for episodes**: Better for many sequential steps
### Concurrent WebSocket Sessions
The server supports multiple concurrent WebSocket connections. To enable this,
modify `server/app.py` to use factory mode:
```python
# In server/app.py - use factory mode for concurrent sessions
app = create_app(
CloudQueueEnvironment, # Pass class, not instance
CloudQueueAction,
CloudQueueObservation,
max_concurrent_envs=4, # Allow 4 concurrent sessions
)
```
Then multiple clients can connect simultaneously:
```python
from cloud_queue_env import CloudQueueAction, CloudQueueEnv
from concurrent.futures import ThreadPoolExecutor
def run_episode(client_id: int):
with CloudQueueEnv(base_url="http://localhost:8000") as env:
result = env.reset()
for i in range(10):
result = env.step(CloudQueueAction(action_type="dispatch", target_queue=i % 2))
return client_id, result.observation.queue_lengths
# Run 4 episodes concurrently
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(run_episode, range(4)))
```
## Development & Testing
### Direct Environment Testing
Core files:
- models: typed action/observation schema
- server environment: queue simulation, reward shaping, grading
- inference script: task sweep and benchmark logging
### Running Locally
Run the server locally for development:
```bash
uvicorn server.app:app --reload
```
## Project Structure
```
cloud_queue_env/
├── .dockerignore
├── __init__.py
├── README.md
├── openenv.yaml
├── pyproject.toml
├── client.py
├── models.py
├── inference.py
├── IMPLEMENTATION_ROADMAP.md
└── server/
├── __init__.py
├── cloud_queue_env_environment.py
├── app.py
└── Dockerfile
```
TASK A — Easy (150 steps)
Scenario: 1 queue, 1 server (M/M/1), only admit/reject/dispatch
Objective: Keep wait low while processing throughput
Grader: score = 0.40×(1-avg_wait/6) + 0.30×(throughput/70)
+ 0.15×(1-rejection_rate/0.3) + 0.15×(1-sla_breaches/0.3)
TASK B — Medium (200 steps)
Scenario: 2 queues, 3 servers, 28% urgent jobs → route + reprioritize
Objective: Protect urgent SLA while not starving normal jobs
Grader: score = 0.35×urgent_wait_score + 0.25×urgent_sla_score
+ 0.15×normal_wait_score + 0.15×throughput + 0.10×cost
TASK C — Hard (250 steps)
Scenario: 2-stage pipeline, 1–6 servers, heavy-tail service, abandonments
Objective: Maximize quality under budget with fairness
Grader: score = 0.25×e2e_latency + 0.20×abandonment + 0.20×sla
+ 0.15×throughput + 0.10×cost + 0.10×fairness