| | from navsim.agents.abstract_agent import AbstractAgent |
| | from omegaconf import DictConfig |
| | import hydra |
| | from hydra.utils import instantiate |
| | import argparse |
| | import os |
| | from hugsim.visualize import to_video, save_frame |
| | from hugsim.dataparser import parse_raw |
| | import torch |
| | from hugsim_client import HugsimClient |
| |
|
| | CONFIG_PATH = "navsim/planning/script/config/HUGSIM" |
| | CONFIG_NAME = "transfuser" |
| |
|
| | hugsim_client = HugsimClient() |
| | hugsim_client.reset_env() |
| |
|
| | def get_opts(): |
| | parser = argparse.ArgumentParser() |
| | parser.add_argument('--output', type=str, required=True) |
| | return parser.parse_args() |
| |
|
| |
|
| | @hydra.main(config_path=CONFIG_PATH, config_name=CONFIG_NAME, version_base=None) |
| | def main(cfg: DictConfig) -> None: |
| | cfg.agent.config.latent = True |
| | cfg.agent.checkpoint_path = "./ckpts/ltf_seed_0.ckpt" |
| | print(cfg) |
| | agent: AbstractAgent = instantiate(cfg.agent) |
| | agent.initialize() |
| | agent = agent.cuda() |
| | |
| | print('Ready for recieving') |
| | cnt = 0 |
| | output_folder = os.path.join(cfg.output, 'ltf') |
| | os.makedirs(output_folder, exist_ok=True) |
| | |
| | env_state = hugsim_client.get_current_state() |
| | while True: |
| | if env_state.done: |
| | return |
| |
|
| | data = parse_raw((env_state.state.obs, env_state.state.info)) |
| | del data['raw_imgs'] |
| | |
| | try: |
| | with torch.no_grad(): |
| | traj = agent.compute_trajectory(data['input']) |
| | except RuntimeError as e: |
| | traj = None |
| | print(e) |
| |
|
| | if traj is None: |
| | print('Waiting for visualize tasks...') |
| | return |
| | |
| | |
| | imu_way_points = traj.poses[:, :3] |
| | way_points = imu_way_points[:, [1, 0, 2]] |
| | way_points[:, 0] *= -1 |
| |
|
| | env_state = hugsim_client.execute_action(way_points[:, :2]) |
| | print('sent') |
| | cnt += 1 |
| | if env_state.cur_scene_done: |
| | print('Scene done') |
| |
|
| |
|
| | if __name__ == '__main__': |
| | |
| | main() |