lw_benchhub_env / env.py
tianheng.wu
update env,py
93562b2
from __future__ import annotations
import logging
from typing import Any
import importlib
import importlib.util
import logging
import os
import sys
# Hub constants for downloading additional files
HUB_REPO_ID = "LightwheelAI/lw_benchhub_env"
def _download_and_import(filename: str):
"""Download file from Hub and import as module."""
from huggingface_hub import hf_hub_download
local_path = hf_hub_download(repo_id=HUB_REPO_ID, filename=filename)
module_dir = os.path.dirname(local_path)
# Add directory to sys.path so modules can import each other
if module_dir not in sys.path:
sys.path.insert(0, module_dir)
module_name = filename.replace(".py", "")
# Read file content and replace relative imports with absolute imports
with open(local_path) as f:
content = f.read()
# Replace relative imports (e.g., "from .errors" -> "from errors")
content = content.replace("from .errors import", "from errors import")
content = content.replace("from .isaaclab_env_wrapper import", "from isaaclab_env_wrapper import")
# Create module and execute modified content
module = importlib.util.module_from_spec(importlib.util.spec_from_file_location(module_name, local_path))
sys.modules[module_name] = module
# Compile and execute the modified code
code = compile(content, local_path, "exec")
exec(code, module.__dict__) # noqa: S102
return module
try:
from .errors import IsaacLabArenaCameraKeyError, IsaacLabArenaStateKeyError
from .isaaclab_env_wrapper import IsaacLabEnvWrapper, cleanup_isaaclab
except ImportError:
_errors = _download_and_import("errors.py")
_isaaclab_wrapper = _download_and_import("isaaclab_env_wrapper.py")
IsaacLabEnvWrapper = _isaaclab_wrapper.IsaacLabEnvWrapper
cleanup_isaaclab = _isaaclab_wrapper.cleanup_isaaclab
IsaacLabArenaCameraKeyError = _errors.IsaacLabArenaCameraKeyError
IsaacLabArenaStateKeyError = _errors.IsaacLabArenaStateKeyError
def make_env(n_envs: int = 1, use_async_envs: bool = False, cfg: Any = None):
"""
Create vectorized environments for leisaac task.
Args:
n_envs: Number of parallel environments
use_async_envs: Whether to use AsyncVectorEnv or SyncVectorEnv
(not use now)
cfg: Environment configuration object (EnvConfig)
Returns:
ManagerBasedRLEnv or DirectRLEnv inherit from gym.Env implemented in IsaacLab
"""
# Get config_path from cfg.config_path (set via --env.kwargs)
if cfg is None or not hasattr(cfg, 'config_path'):
raise ValueError(
"Please set config_path via --env.kwargs='{\"config_path\": \"...\"}'")
config_path = cfg.config_path
logging.warning(
"Ignore n_envs and use_async_envs in parameter, we use the config path to create the environment")
from lw_benchhub.utils.envhub_utils import export_env_for_envhub
raw_env, environment, task, render_mode, episode_length, app_launcher = export_env_for_envhub(
config_path=config_path
)
wrapped_env = IsaacLabEnvWrapper(
raw_env,
episode_length=episode_length,
task=task,
render_mode=render_mode,
simulation_app=app_launcher,
)
logging.info(f"Created: {environment} with {wrapped_env.num_envs} envs")
return {environment: {0: wrapped_env}}