from __future__ import annotations import logging from typing import Any import importlib import importlib.util import logging import os import sys # Hub constants for downloading additional files HUB_REPO_ID = "LightwheelAI/lw_benchhub_env" def _download_and_import(filename: str): """Download file from Hub and import as module.""" from huggingface_hub import hf_hub_download local_path = hf_hub_download(repo_id=HUB_REPO_ID, filename=filename) module_dir = os.path.dirname(local_path) # Add directory to sys.path so modules can import each other if module_dir not in sys.path: sys.path.insert(0, module_dir) module_name = filename.replace(".py", "") # Read file content and replace relative imports with absolute imports with open(local_path) as f: content = f.read() # Replace relative imports (e.g., "from .errors" -> "from errors") content = content.replace("from .errors import", "from errors import") content = content.replace("from .isaaclab_env_wrapper import", "from isaaclab_env_wrapper import") # Create module and execute modified content module = importlib.util.module_from_spec(importlib.util.spec_from_file_location(module_name, local_path)) sys.modules[module_name] = module # Compile and execute the modified code code = compile(content, local_path, "exec") exec(code, module.__dict__) # noqa: S102 return module try: from .errors import IsaacLabArenaCameraKeyError, IsaacLabArenaStateKeyError from .isaaclab_env_wrapper import IsaacLabEnvWrapper, cleanup_isaaclab except ImportError: _errors = _download_and_import("errors.py") _isaaclab_wrapper = _download_and_import("isaaclab_env_wrapper.py") IsaacLabEnvWrapper = _isaaclab_wrapper.IsaacLabEnvWrapper cleanup_isaaclab = _isaaclab_wrapper.cleanup_isaaclab IsaacLabArenaCameraKeyError = _errors.IsaacLabArenaCameraKeyError IsaacLabArenaStateKeyError = _errors.IsaacLabArenaStateKeyError def make_env(n_envs: int = 1, use_async_envs: bool = False, cfg: Any = None): """ Create vectorized environments for leisaac task. Args: n_envs: Number of parallel environments use_async_envs: Whether to use AsyncVectorEnv or SyncVectorEnv (not use now) cfg: Environment configuration object (EnvConfig) Returns: ManagerBasedRLEnv or DirectRLEnv inherit from gym.Env implemented in IsaacLab """ # Get config_path from cfg.config_path (set via --env.kwargs) if cfg is None or not hasattr(cfg, 'config_path'): raise ValueError( "Please set config_path via --env.kwargs='{\"config_path\": \"...\"}'") config_path = cfg.config_path logging.warning( "Ignore n_envs and use_async_envs in parameter, we use the config path to create the environment") from lw_benchhub.utils.envhub_utils import export_env_for_envhub raw_env, environment, task, render_mode, episode_length, app_launcher = export_env_for_envhub( config_path=config_path ) wrapped_env = IsaacLabEnvWrapper( raw_env, episode_length=episode_length, task=task, render_mode=render_mode, simulation_app=app_launcher, ) logging.info(f"Created: {environment} with {wrapped_env.num_envs} envs") return {environment: {0: wrapped_env}}