| | from __future__ import annotations |
| | import logging |
| | from typing import Any |
| |
|
| | import importlib |
| | import importlib.util |
| | import logging |
| | import os |
| | import sys |
| |
|
| |
|
| | |
| | HUB_REPO_ID = "LightwheelAI/lw_benchhub_env" |
| |
|
| |
|
| | def _download_and_import(filename: str): |
| | """Download file from Hub and import as module.""" |
| | from huggingface_hub import hf_hub_download |
| |
|
| | local_path = hf_hub_download(repo_id=HUB_REPO_ID, filename=filename) |
| | module_dir = os.path.dirname(local_path) |
| |
|
| | |
| | if module_dir not in sys.path: |
| | sys.path.insert(0, module_dir) |
| |
|
| | module_name = filename.replace(".py", "") |
| |
|
| | |
| | with open(local_path) as f: |
| | content = f.read() |
| |
|
| | |
| | content = content.replace("from .errors import", "from errors import") |
| | content = content.replace("from .isaaclab_env_wrapper import", "from isaaclab_env_wrapper import") |
| |
|
| | |
| | module = importlib.util.module_from_spec(importlib.util.spec_from_file_location(module_name, local_path)) |
| | sys.modules[module_name] = module |
| |
|
| | |
| | code = compile(content, local_path, "exec") |
| | exec(code, module.__dict__) |
| |
|
| | return module |
| |
|
| |
|
| | try: |
| | from .errors import IsaacLabArenaCameraKeyError, IsaacLabArenaStateKeyError |
| | from .isaaclab_env_wrapper import IsaacLabEnvWrapper, cleanup_isaaclab |
| | except ImportError: |
| | _errors = _download_and_import("errors.py") |
| | _isaaclab_wrapper = _download_and_import("isaaclab_env_wrapper.py") |
| | IsaacLabEnvWrapper = _isaaclab_wrapper.IsaacLabEnvWrapper |
| | cleanup_isaaclab = _isaaclab_wrapper.cleanup_isaaclab |
| | IsaacLabArenaCameraKeyError = _errors.IsaacLabArenaCameraKeyError |
| | IsaacLabArenaStateKeyError = _errors.IsaacLabArenaStateKeyError |
| |
|
| |
|
| | def make_env(n_envs: int = 1, use_async_envs: bool = False, cfg: Any = None): |
| | """ |
| | Create vectorized environments for leisaac task. |
| | Args: |
| | n_envs: Number of parallel environments |
| | use_async_envs: Whether to use AsyncVectorEnv or SyncVectorEnv |
| | (not use now) |
| | cfg: Environment configuration object (EnvConfig) |
| | Returns: |
| | ManagerBasedRLEnv or DirectRLEnv inherit from gym.Env implemented in IsaacLab |
| | """ |
| |
|
| | |
| | if cfg is None or not hasattr(cfg, 'config_path'): |
| | raise ValueError( |
| | "Please set config_path via --env.kwargs='{\"config_path\": \"...\"}'") |
| | config_path = cfg.config_path |
| | logging.warning( |
| | "Ignore n_envs and use_async_envs in parameter, we use the config path to create the environment") |
| |
|
| | from lw_benchhub.utils.envhub_utils import export_env_for_envhub |
| |
|
| | raw_env, environment, task, render_mode, episode_length, app_launcher = export_env_for_envhub( |
| | config_path=config_path |
| | ) |
| |
|
| | wrapped_env = IsaacLabEnvWrapper( |
| | raw_env, |
| | episode_length=episode_length, |
| | task=task, |
| | render_mode=render_mode, |
| | simulation_app=app_launcher, |
| | ) |
| | logging.info(f"Created: {environment} with {wrapped_env.num_envs} envs") |
| |
|
| | return {environment: {0: wrapped_env}} |
| |
|