cloud_queue_env / README.md
Mrkumar007's picture
Upload folder using huggingface_hub
16bd852 verified
metadata
title: Cloud Queue Env Environment Server
emoji: 🖨️
colorFrom: pink
colorTo: blue
sdk: docker
pinned: false
app_port: 8000
base_path: /web
tags:
  - openenv

Cloud Queue Env Environment

A real-world queue-operations benchmark for OpenEnv.

This environment simulates service operations decisions humans make in production systems:

  • Admission and rejection under load
  • Queue routing and dispatching
  • Priority handling for urgent traffic
  • Capacity scaling under infrastructure cost constraints

The benchmark includes three deterministic tasks with partial graders in [0, 1]:

  • easy: single-queue stability
  • medium: multi-server priority routing
  • hard: two-stage queue network with scaling

Quick Start

Use the CloudQueueEnv client to connect to a running server or container:

from cloud_queue_env import CloudQueueAction, CloudQueueEnv

try:
    env = CloudQueueEnv.from_docker_image("cloud_queue_env-env:latest")

    # Configure task + seed, then reset into that deterministic episode
    env.reset()
    env.step(CloudQueueAction(action_type="configure_task", task_id="easy", seed=11))
    result = env.reset()

    for _ in range(20):
        obs = result.observation
        if obs.incoming_job_present:
            action = CloudQueueAction(action_type="admit", target_queue=0)
        else:
            action = CloudQueueAction(action_type="dispatch", target_queue=0)

        result = env.step(action)
        print(
            f"step={obs.sim_time} queues={obs.queue_lengths} "
            f"reward={result.reward:.3f} done={result.done}"
        )
        if result.done:
            break

    final_score = result.observation.metadata.get("episode_score", 0.0)
    print(f"episode_score={final_score:.3f}")

finally:
    env.close()

The CloudQueueEnv.from_docker_image() method handles:

  • Starting the Docker container
  • Waiting for the server to be ready
  • Connecting to the environment
  • Container cleanup when you call close()

Building the Docker Image

Before using the environment, you need to build the Docker image:

# From project root
docker build -t cloud_queue_env-env:latest -f server/Dockerfile .

Deploying to Hugging Face Spaces

You can easily deploy your OpenEnv environment to Hugging Face Spaces using the openenv push command:

# From the environment directory (where openenv.yaml is located)
openenv push

# Or specify options
openenv push --namespace my-org --private

The openenv push command will:

  1. Validate that the directory is an OpenEnv environment (checks for openenv.yaml)
  2. Prepare a custom build for Hugging Face Docker space (enables web interface)
  3. Upload to Hugging Face (ensuring you're logged in)

Prerequisites

  • Authenticate with Hugging Face: The command will prompt for login if not already authenticated

Options

  • --directory, -d: Directory containing the OpenEnv environment (defaults to current directory)
  • --repo-id, -r: Repository ID in format 'username/repo-name' (defaults to 'username/env-name' from openenv.yaml)
  • --base-image, -b: Base Docker image to use (overrides Dockerfile FROM)
  • --private: Deploy the space as private (default: public)

Examples

# Push to your personal namespace (defaults to username/env-name from openenv.yaml)
openenv push

# Push to a specific repository
openenv push --repo-id my-org/my-env

# Push with a custom base image
openenv push --base-image ghcr.io/meta-pytorch/openenv-base:latest

# Push as a private space
openenv push --private

# Combine options
openenv push --repo-id my-org/my-env --base-image custom-base:latest --private

After deployment, your space will be available at: https://huggingface.co/spaces/<repo-id>

The deployed space includes:

  • Web Interface at /web - Interactive UI for exploring the environment
  • API Documentation at /docs - Full OpenAPI/Swagger interface
  • Health Check at /health - Container health monitoring
  • WebSocket at /ws - Persistent session endpoint for low-latency interactions

Environment Details

Action

CloudQueueAction fields:

  • action_type: one of configure_task, admit, reject, route, dispatch, scale, reprioritize, noop
  • target_queue: queue index for route/dispatch/admit
  • target_server: optional server index
  • scale_delta: server delta for scale action
  • new_priority: new priority value for reprioritize
  • task_id: easy/medium/hard (used with configure_task)
  • seed: deterministic task seed (used with configure_task)

Observation

CloudQueueObservation includes:

  • task_id, sim_time, horizon
  • queue_lengths, queue_wait_ema
  • server_busy, server_remaining_service, utilization
  • incoming_job_present, incoming_job_size, incoming_job_priority, incoming_job_deadline, incoming_job_type
  • sla_violation_rate, abandonment_rate, throughput_recent, energy_cost_rate
  • level, optional_history, action_mask
  • reward, done, metadata

Reward

Per-step reward is dense and multi-objective:

rt=0.35Rwait+0.20Rthroughput+0.20Rsla+0.15Rcost+0.05Rfair+0.05Rsafe r_t = 0.35R_{wait} + 0.20R_{throughput} + 0.20R_{sla} + 0.15R_{cost} + 0.05R_{fair} + 0.05R_{safe}

Properties:

  • Partial progress signal over the full trajectory
  • Penalties for invalid actions and unsafe/noop behavior under congestion
  • Bounded reward values for stability

Deterministic Graders

Each task returns a deterministic episode_score in [0, 1], stored in observation metadata.

  • easy score uses avg wait, throughput, rejection rate, and SLA violations
  • medium score uses urgent/normal p95 waits, urgent SLA, throughput, and action cost
  • hard score uses end-to-end p95, abandonment, SLA, throughput, infra cost, and fairness gap

If invalid action rate exceeds threshold, score is capped.

Tasks

  1. easy (single queue stability)
  • one queue, one server
  • objective: low wait with acceptable throughput and low rejection
  1. medium (priority routing)
  • two queues and multiple servers
  • objective: protect urgent traffic while maintaining total performance
  1. hard (queue network + scaling)
  • two-stage queue network with bursty arrivals and heavy-tailed service times
  • objective: balance latency/SLA/abandonment against infra cost and fairness

Baseline Inference

Run baseline inference across easy/medium/hard:

API_KEY=your_provider_key python inference.py

Optional variables:

  • API_KEY (OpenAI-compatible provider key for model calls)
  • API_BASE_URL (default: https://router.huggingface.co/v1)
  • MODEL_NAME (default: Qwen/Qwen2.5-72B-Instruct)
  • BASE_URL (if using deployed space)
  • IMAGE_NAME (if launching local docker image)
  • USE_HEURISTIC_ONLY (true/false)
  • DISABLE_MODEL_ON_FIRST_ERROR (true/false)
  • MAX_STEPS_OVERRIDE (integer quick-test cap)
  • TASK_SEEDS_JSON (JSON map for multi-seed runs)
  • ACTION_TRACE_FILE (JSON replay file keyed by task:seed)
  • REPORT_JSON_PATH (write seed/task report JSON)
  • REPORT_CSV_PATH (write per-seed report CSV)

Output includes required line types:

  • [START]
  • [STEP]
  • [END]

And final aggregate summary:

  • [SUMMARY] easy=<...> medium=<...> hard=<...> final=<...>

V2 reporting also includes:

  • [REPORT_SEED] task= seed= score= steps= trace=
  • [REPORT] task= seeds= mean= std= ci95=

Baseline Scores

Current reproducible heuristic-only baseline (deployed runtime, single seed per task):

Task Seed Count Mean Score
easy 1 0.000
medium 1 0.000
hard 1 0.000
final (mean of task means) - 0.000

Notes:

  • These values are from heuristic fallback mode and are expected to be low.
  • Model-based scores depend on provider/model availability and should be recorded from a successful funded run.
  • Keep this table updated with your latest official benchmark run before final submission.

Advanced Usage

Connecting to an Existing Server

If you already have a Cloud Queue Env environment server running, you can connect directly:

from cloud_queue_env import CloudQueueAction, CloudQueueEnv

# Connect to existing server
cloud_queue_envenv = CloudQueueEnv(base_url="<ENV_HTTP_URL_HERE>")

# Use as normal
result = cloud_queue_envenv.reset()
result = cloud_queue_envenv.step(CloudQueueAction(action_type="dispatch", target_queue=0))

Note: When connecting to an existing server, cloud_queue_envenv.close() will NOT stop the server.

Using the Context Manager

The client supports context manager usage for automatic connection management:

from cloud_queue_env import CloudQueueAction, CloudQueueEnv

# Connect with context manager (auto-connects and closes)
with CloudQueueEnv(base_url="http://localhost:8000") as env:
    result = env.reset()
    print(f"Initial queues: {result.observation.queue_lengths}")
    # Multiple steps with low latency
    for _ in range(10):
        result = env.step(CloudQueueAction(action_type="noop"))
        print(f"Reward: {result.reward:.3f}")

The client uses WebSocket connections for:

  • Lower latency: No HTTP connection overhead per request
  • Persistent session: Server maintains your environment state
  • Efficient for episodes: Better for many sequential steps

Concurrent WebSocket Sessions

The server supports multiple concurrent WebSocket connections. To enable this, modify server/app.py to use factory mode:

# In server/app.py - use factory mode for concurrent sessions
app = create_app(
    CloudQueueEnvironment,  # Pass class, not instance
    CloudQueueAction,
    CloudQueueObservation,
    max_concurrent_envs=4,  # Allow 4 concurrent sessions
)

Then multiple clients can connect simultaneously:

from cloud_queue_env import CloudQueueAction, CloudQueueEnv
from concurrent.futures import ThreadPoolExecutor

def run_episode(client_id: int):
    with CloudQueueEnv(base_url="http://localhost:8000") as env:
        result = env.reset()
        for i in range(10):
            result = env.step(CloudQueueAction(action_type="dispatch", target_queue=i % 2))
        return client_id, result.observation.queue_lengths

# Run 4 episodes concurrently
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(run_episode, range(4)))

Development & Testing

Direct Environment Testing

Core files:

  • models: typed action/observation schema
  • server environment: queue simulation, reward shaping, grading
  • inference script: task sweep and benchmark logging

Running Locally

Run the server locally for development:

uvicorn server.app:app --reload

Project Structure

cloud_queue_env/
├── .dockerignore
├── __init__.py
├── README.md
├── openenv.yaml
├── pyproject.toml
├── client.py
├── models.py
├── inference.py
├── IMPLEMENTATION_ROADMAP.md
└── server/
    ├── __init__.py
    ├── cloud_queue_env_environment.py
    ├── app.py
    └── Dockerfile

TASK A — Easy (150 steps) Scenario: 1 queue, 1 server (M/M/1), only admit/reject/dispatch Objective: Keep wait low while processing throughput Grader: score = 0.40×(1-avg_wait/6) + 0.30×(throughput/70) + 0.15×(1-rejection_rate/0.3) + 0.15×(1-sla_breaches/0.3) TASK B — Medium (200 steps) Scenario: 2 queues, 3 servers, 28% urgent jobs → route + reprioritize Objective: Protect urgent SLA while not starving normal jobs Grader: score = 0.35×urgent_wait_score + 0.25×urgent_sla_score + 0.15×normal_wait_score + 0.15×throughput + 0.10×cost TASK C — Hard (250 steps) Scenario: 2-stage pipeline, 1–6 servers, heavy-tail service, abandonments Objective: Maximize quality under budget with fairness Grader: score = 0.25×e2e_latency + 0.20×abandonment + 0.20×sla + 0.15×throughput + 0.10×cost + 0.10×fairness