diff --git a/examples/data/hotu/024_amp_2.npy b/examples/data/hotu/024_amp_2.npy new file mode 100644 index 00000000..b702952a Binary files /dev/null and b/examples/data/hotu/024_amp_2.npy differ diff --git a/examples/data/hotu/024_amp_3.npy b/examples/data/hotu/024_amp_3.npy new file mode 100644 index 00000000..adf881f2 Binary files /dev/null and b/examples/data/hotu/024_amp_3.npy differ diff --git a/examples/data/hotu/024_tpose.npy b/examples/data/hotu/024_tpose.npy new file mode 100644 index 00000000..e0769b60 Binary files /dev/null and b/examples/data/hotu/024_tpose.npy differ diff --git a/examples/learning_rl/IsaacGym_RofuncRL/example_HumanoidASE_ViewMotion.py b/examples/learning_rl/IsaacGym_RofuncRL/example_HumanoidASE_ViewMotion.py index 66cd8f5f..213a3af0 100644 --- a/examples/learning_rl/IsaacGym_RofuncRL/example_HumanoidASE_ViewMotion.py +++ b/examples/learning_rl/IsaacGym_RofuncRL/example_HumanoidASE_ViewMotion.py @@ -60,7 +60,7 @@ def inference(custom_args): parser = argparse.ArgumentParser() parser.add_argument("--config_name", type=str, default="HumanoidSpoonPanSimple") - parser.add_argument("--motion_file", type=str, default="../hotu/024_amp.npy") + parser.add_argument("--motion_file", type=str, default="../hotu/024_amp_3.npy") custom_args = parser.parse_args() inference(custom_args) diff --git a/rofunc/learning/RofuncRL/tasks/isaacgym/base/vec_task.py b/rofunc/learning/RofuncRL/tasks/isaacgym/base/vec_task.py index 13c81332..178ac9c1 100644 --- a/rofunc/learning/RofuncRL/tasks/isaacgym/base/vec_task.py +++ b/rofunc/learning/RofuncRL/tasks/isaacgym/base/vec_task.py @@ -1,969 +1,969 @@ -# Copyright (c) 2018-2022, NVIDIA Corporation -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# 3. Neither the name of the copyright holder nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -from typing import Dict, Any, Tuple - -import gym -from gym import spaces - -from isaacgym import gymapi -from rofunc.learning.RofuncRL.tasks.isaacgym.base.dr_utils import ( - get_property_setter_map, - get_property_getter_map, - get_default_setter_args, - apply_random_samples, - check_buckets, - generate_random_samples, -) - -import torch -import numpy as np -import operator, random -from copy import deepcopy -import sys - -import abc -from abc import ABC - -EXISTING_SIM = None -SCREEN_CAPTURE_RESOLUTION = (1027, 768) - - -def _create_sim_once(gym, *args, **kwargs): - global EXISTING_SIM - if EXISTING_SIM is not None: - return EXISTING_SIM - else: - EXISTING_SIM = gym.create_sim(*args, **kwargs) - return EXISTING_SIM - - -class Env(ABC): - def __init__( - self, - config: Dict[str, Any], - rl_device: str, - sim_device: str, - graphics_device_id: int, - headless: bool, - ): - """Initialise the env. - - Args: - config: the configuration dictionary. - sim_device: the device to simulate physics on. eg. 'cuda:0' or 'cpu' - graphics_device_id: the device ID to render with. - headless: Set to False to disable viewer rendering. - """ - - split_device = sim_device.split(":") - self.device_type = split_device[0] - self.device_id = int(split_device[1]) if len(split_device) > 1 else 0 - - self.device = "cpu" - if config["sim"]["use_gpu_pipeline"]: - if self.device_type.lower() == "cuda" or self.device_type.lower() == "gpu": - self.device = "cuda" + ":" + str(self.device_id) - else: - print( - "GPU Pipeline can only be used with GPU simulation. Forcing CPU Pipeline." - ) - config["sim"]["use_gpu_pipeline"] = False - - self.rl_device = rl_device - - # Rendering - # if training in a headless mode - self.headless = headless - - enable_camera_sensors = config.get("enableCameraSensors", False) - self.graphics_device_id = graphics_device_id - if enable_camera_sensors == False and self.headless == True: - self.graphics_device_id = -1 - - self.num_environments = config["env"]["numEnvs"] - self.num_agents = config["env"].get( - "numAgents", 1 - ) # used for multi-agent environments - self.num_observations = config["env"]["numObservations"] - self.num_states = config["env"].get("numStates", 0) - self.num_actions = config["env"]["numActions"] - - self.control_freq_inv = config["env"].get("controlFrequencyInv", 1) - - self.obs_space = spaces.Box( - np.ones(self.num_obs, dtype=np.float32) * -np.Inf, - np.ones(self.num_obs, dtype=np.float32) * np.Inf, - ) - self.state_space = spaces.Box( - np.ones(self.num_states, dtype=np.float32) * -np.Inf, - np.ones(self.num_states, dtype=np.float32) * np.Inf, - ) - - self.act_space = spaces.Box( - np.ones(self.num_actions, dtype=np.float32) * -1.0, - np.ones(self.num_actions, dtype=np.float32) * 1.0, - ) - - self.clip_obs = config["env"].get("clipObservations", np.Inf) - self.clip_actions = config["env"].get("clipActions", np.Inf) - - @abc.abstractmethod - def allocate_buffers(self): - """Create torch buffers for observations, rewards, actions dones and any additional data.""" - - @abc.abstractmethod - def step( - self, actions: torch.Tensor - ) -> Tuple[Dict[str, torch.Tensor], torch.Tensor, torch.Tensor, Dict[str, Any]]: - """Step the physics of the environment. - - Args: - actions: actions to apply - Returns: - Observations, rewards, resets, info - Observations are dict of observations (currently only one member called 'obs') - """ - - @abc.abstractmethod - def reset(self) -> Dict[str, torch.Tensor]: - """Reset the environment. - Returns: - Observation dictionary - """ - - @abc.abstractmethod - def reset_idx(self, env_ids: torch.Tensor): - """Reset environments having the provided indices. - Args: - env_ids: environments to reset - """ - - @property - def observation_space(self) -> gym.Space: - """Get the environment's observation space.""" - return self.obs_space - - @property - def action_space(self) -> gym.Space: - """Get the environment's action space.""" - return self.act_space - - @property - def num_envs(self) -> int: - """Get the number of environments.""" - return self.num_environments - - @property - def num_acts(self) -> int: - """Get the number of actions in the environment.""" - return self.num_actions - - @property - def num_obs(self) -> int: - """Get the number of observations in the environment.""" - return self.num_observations - - -class VecTask(Env): - metadata = {"render.modes": ["human", "rgb_array"], "video.frames_per_second": 24} - - def __init__( - self, - config, - rl_device, - sim_device, - graphics_device_id, - headless, - virtual_screen_capture: bool = False, - force_render: bool = False, - ): - """Initialise the `VecTask`. - - Args: - config: config dictionary for the environment. - sim_device: the device to simulate physics on. eg. 'cuda:0' or 'cpu' - graphics_device_id: the device ID to render with. - headless: Set to False to disable viewer rendering. - virtual_screen_capture: Set to True to allow the users get captured screen in RGB array via `env.render(mode='rgb_array')`. - force_render: Set to True to always force rendering in the steps (if the `control_freq_inv` is greater than 1 we suggest stting this arg to True) - """ - super().__init__(config, rl_device, sim_device, graphics_device_id, headless) - self.virtual_screen_capture = virtual_screen_capture - self.virtual_display = None - if self.virtual_screen_capture: - from pyvirtualdisplay.smartdisplay import SmartDisplay - - self.virtual_display = SmartDisplay(size=SCREEN_CAPTURE_RESOLUTION) - self.virtual_display.start() - self.force_render = force_render - - self.sim_params = self.__parse_sim_params( - self.cfg["physics_engine"], self.cfg["sim"] - ) - if self.cfg["physics_engine"] == "physx": - self.physics_engine = gymapi.SIM_PHYSX - elif self.cfg["physics_engine"] == "flex": - self.physics_engine = gymapi.SIM_FLEX - else: - msg = f"Invalid physics engine backend: {self.cfg['physics_engine']}" - raise ValueError(msg) - - # optimization flags for pytorch JIT - torch._C._jit_set_profiling_mode(False) - torch._C._jit_set_profiling_executor(False) - - self.gym = gymapi.acquire_gym() - - self.first_randomization = True - self.original_props = {} - self.dr_randomizations = {} - self.actor_params_generator = None - self.extern_actor_params = {} - self.last_step = -1 - self.last_rand_step = -1 - for env_id in range(self.num_envs): - self.extern_actor_params[env_id] = None - - # create envs, sim and viewer - self.sim_initialized = False - self.create_sim() - self.gym.prepare_sim(self.sim) - self.sim_initialized = True - - self.set_viewer() - self.allocate_buffers() - - self.obs_dict = {} - - def set_viewer(self): - """Create the viewer.""" - - # todo: read from config - self.enable_viewer_sync = True - self.viewer = None - - # if running with a viewer, set up keyboard shortcuts and camera - if self.headless == False: - # subscribe to keyboard shortcuts - self.viewer = self.gym.create_viewer(self.sim, gymapi.CameraProperties()) - self.gym.subscribe_viewer_keyboard_event( - self.viewer, gymapi.KEY_ESCAPE, "QUIT" - ) - self.gym.subscribe_viewer_keyboard_event( - self.viewer, gymapi.KEY_V, "toggle_viewer_sync" - ) - - # set the camera position based on up axis - sim_params = self.gym.get_sim_params(self.sim) - if sim_params.up_axis == gymapi.UP_AXIS_Z: - cam_pos = gymapi.Vec3(-3.0, -3.0, 3.0) - cam_target = gymapi.Vec3(10.0, 15.0, 0.0) - else: - cam_pos = gymapi.Vec3(-3.0, 3.0, -3.0) - cam_target = gymapi.Vec3(10.0, 0.0, 15.0) - - self.gym.viewer_camera_look_at(self.viewer, None, cam_pos, cam_target) - - def allocate_buffers(self): - """Allocate the observation, states, etc. buffers. - - These are what is used to set observations and states in the environment classes which - inherit from this one, and are read in `step` and other related functions. - - """ - - # allocate buffers - self.obs_buf = torch.zeros( - (self.num_envs, self.num_obs), device=self.device, dtype=torch.float - ) - self.states_buf = torch.zeros( - (self.num_envs, self.num_states), device=self.device, dtype=torch.float - ) - self.rew_buf = torch.zeros(self.num_envs, device=self.device, dtype=torch.float) - self.reset_buf = torch.ones(self.num_envs, device=self.device, dtype=torch.long) - self.timeout_buf = torch.zeros( - self.num_envs, device=self.device, dtype=torch.long - ) - self.progress_buf = torch.zeros( - self.num_envs, device=self.device, dtype=torch.long - ) - self.randomize_buf = torch.zeros( - self.num_envs, device=self.device, dtype=torch.long - ) - self.extras = {} - - def set_sim_params_up_axis(self, sim_params: gymapi.SimParams, axis: str) -> int: - """Set gravity based on up axis and return axis index. - - Args: - sim_params: sim params to modify the axis for. - axis: axis to set sim params for. - Returns: - axis index for up axis. - """ - if axis == "z": - sim_params.up_axis = gymapi.UP_AXIS_Z - sim_params.gravity.x = 0 - sim_params.gravity.y = 0 - sim_params.gravity.z = -9.81 - return 2 - return 1 - - def create_sim( - self, - compute_device: int, - graphics_device: int, - physics_engine, - sim_params: gymapi.SimParams, - ): - """Create an Isaac Gym sim object. - - Args: - compute_device: ID of compute device to use. - graphics_device: ID of graphics device to use. - physics_engine: physics engine to use (`gymapi.SIM_PHYSX` or `gymapi.SIM_FLEX`) - sim_params: sim params to use. - Returns: - the Isaac Gym sim object. - """ - sim = _create_sim_once( - self.gym, compute_device, graphics_device, physics_engine, sim_params - ) - if sim is None: - print("*** Failed to create sim") - quit() - - return sim - - def get_state(self): - """Returns the state buffer of the environment (the privileged observations for asymmetric training).""" - return torch.clamp(self.states_buf, -self.clip_obs, self.clip_obs).to( - self.rl_device - ) - - @abc.abstractmethod - def pre_physics_step(self, actions: torch.Tensor): - """Apply the actions to the environment (eg by setting torques, position targets). - - Args: - actions: the actions to apply - """ - - @abc.abstractmethod - def post_physics_step(self): - """Compute reward and observations, reset any environments that require it.""" - - def step( - self, actions: torch.Tensor - ) -> Tuple[Dict[str, torch.Tensor], torch.Tensor, torch.Tensor, Dict[str, Any]]: - """Step the physics of the environment. - - Args: - actions: actions to apply - Returns: - Observations, rewards, resets, info - Observations are dict of observations (currently only one member called 'obs') - """ - - # randomize actions - if self.dr_randomizations.get("actions", None): - actions = self.dr_randomizations["actions"]["noise_lambda"](actions) - - action_tensor = torch.clamp(actions, -self.clip_actions, self.clip_actions) - # apply actions - self.pre_physics_step(action_tensor) - - # step physics and render each frame - for i in range(self.control_freq_inv): - if self.force_render: - self.render() - self.gym.simulate(self.sim) - - # to fix! - if self.device == "cpu": - self.gym.fetch_results(self.sim, True) - - # fill time out buffer # TODO - self.timeout_buf = torch.where( - self.progress_buf >= self.max_episode_length - 1, - torch.ones_like(self.timeout_buf), - torch.zeros_like(self.timeout_buf), - ) - - # compute observations, rewards, resets, ... - self.post_physics_step() - - # fill time out buffer: set to 1 if we reached the max episode length AND the reset buffer is 1. - # Timeout == 1 makes sense only if the reset buffer is 1. - self.timeout_buf = (self.progress_buf >= self.max_episode_length - 1) & ( - self.reset_buf != 0 - ) - - # randomize observations - if self.dr_randomizations.get("observations", None): - self.obs_buf = self.dr_randomizations["observations"]["noise_lambda"]( - self.obs_buf - ) - - self.extras["time_outs"] = self.timeout_buf.to(self.rl_device) - - self.obs_dict["obs"] = torch.clamp( - self.obs_buf, -self.clip_obs, self.clip_obs - ).to(self.rl_device) - - # asymmetric actor-critic - if self.num_states > 0: - self.obs_dict["states"] = self.get_state() - - return ( - self.obs_dict, - self.rew_buf.to(self.rl_device), - self.reset_buf.to(self.rl_device), - self.extras, - ) - - def zero_actions(self) -> torch.Tensor: - """Returns a buffer with zero actions. - - Returns: - A buffer of zero torch actions - """ - actions = torch.zeros( - [self.num_envs, self.num_actions], - dtype=torch.float32, - device=self.rl_device, - ) - - return actions - - def reset_idx(self, env_idx): - """Reset environment with indces in env_idx. - Should be implemented in an environment class inherited from VecTask. - """ - pass - - def reset(self): - """Is called only once when environment starts to provide the first observations. - Doesn't calculate observations. Actual reset and observation calculation need to be implemented by user. - Returns: - Observation dictionary - """ - zero_actions = self.zero_actions() - - # step the simulator - self.step(zero_actions) - - self.obs_dict["obs"] = torch.clamp( - self.obs_buf, -self.clip_obs, self.clip_obs - ).to(self.rl_device) - - # asymmetric actor-critic - if self.num_states > 0: - self.obs_dict["states"] = self.get_state() - - return self.obs_dict - - def reset_done(self): - """Reset the environment. - Returns: - Observation dictionary, indices of environments being reset - """ - done_env_ids = self.reset_buf.nonzero(as_tuple=False).flatten() - if len(done_env_ids) > 0: - self.reset_idx(done_env_ids) - - self.obs_dict["obs"] = torch.clamp( - self.obs_buf, -self.clip_obs, self.clip_obs - ).to(self.rl_device) - - # asymmetric actor-critic - if self.num_states > 0: - self.obs_dict["states"] = self.get_state() - - return self.obs_dict, done_env_ids - - def render(self, mode="rgb_array"): - """Draw the frame to the viewer, and check for keyboard events.""" - if self.viewer: - # check for window closed - if self.gym.query_viewer_has_closed(self.viewer): - sys.exit() - - # check for keyboard events - for evt in self.gym.query_viewer_action_events(self.viewer): - if evt.action == "QUIT" and evt.value > 0: - sys.exit() - elif evt.action == "toggle_viewer_sync" and evt.value > 0: - self.enable_viewer_sync = not self.enable_viewer_sync - - # fetch results - if self.device != "cpu": - self.gym.fetch_results(self.sim, True) - - # step graphics - if self.enable_viewer_sync: - self.gym.step_graphics(self.sim) - self.gym.draw_viewer(self.viewer, self.sim, True) - - # Wait for dt to elapse in real time. - # This synchronizes the physics simulation with the rendering rate. - self.gym.sync_frame_time(self.sim) - - else: - self.gym.poll_viewer_events(self.viewer) - - if self.virtual_display and mode == "rgb_array": - img = self.virtual_display.grab() - return np.array(img) - - def __parse_sim_params( - self, physics_engine: str, config_sim: Dict[str, Any] - ) -> gymapi.SimParams: - """Parse the config dictionary for physics stepping settings. - - Args: - physics_engine: which physics engine to use. "physx" or "flex" - config_sim: dict of sim configuration parameters - Returns - IsaacGym SimParams object with updated settings. - """ - sim_params = gymapi.SimParams() - - # check correct up-axis - if config_sim["up_axis"] not in ["z", "y"]: - msg = f"Invalid physics up-axis: {config_sim['up_axis']}" - print(msg) - raise ValueError(msg) - - # assign general sim parameters - sim_params.dt = config_sim["dt"] - sim_params.num_client_threads = config_sim.get("num_client_threads", 0) - sim_params.use_gpu_pipeline = config_sim["use_gpu_pipeline"] - sim_params.substeps = config_sim.get("substeps", 2) - - # assign up-axis - if config_sim["up_axis"] == "z": - sim_params.up_axis = gymapi.UP_AXIS_Z - else: - sim_params.up_axis = gymapi.UP_AXIS_Y - - # assign gravity - sim_params.gravity = gymapi.Vec3(*config_sim["gravity"]) - - # configure physics parameters - if physics_engine == "physx": - # set the parameters - if "physx" in config_sim: - for opt in config_sim["physx"].keys(): - if opt == "contact_collection": - setattr( - sim_params.physx, - opt, - gymapi.ContactCollection(config_sim["physx"][opt]), - ) - else: - setattr(sim_params.physx, opt, config_sim["physx"][opt]) - else: - # set the parameters - if "flex" in config_sim: - for opt in config_sim["flex"].keys(): - setattr(sim_params.flex, opt, config_sim["flex"][opt]) - - # return the configured params - return sim_params - - """ - Domain Randomization methods - """ - - def get_actor_params_info(self, dr_params: Dict[str, Any], env): - """Generate a flat array of actor params, their names and ranges. - - Returns: - The array - """ - - if "actor_params" not in dr_params: - return None - params = [] - names = [] - lows = [] - highs = [] - param_getters_map = get_property_getter_map(self.gym) - for actor, actor_properties in dr_params["actor_params"].items(): - handle = self.gym.find_actor_handle(env, actor) - for prop_name, prop_attrs in actor_properties.items(): - if prop_name == "color": - continue # this is set randomly - props = param_getters_map[prop_name](env, handle) - if not isinstance(props, list): - props = [props] - for prop_idx, prop in enumerate(props): - for attr, attr_randomization_params in prop_attrs.items(): - name = prop_name + "_" + str(prop_idx) + "_" + attr - lo_hi = attr_randomization_params["range"] - distr = attr_randomization_params["distribution"] - if "uniform" not in distr: - lo_hi = (-1.0 * float("Inf"), float("Inf")) - if isinstance(prop, np.ndarray): - for attr_idx in range(prop[attr].shape[0]): - params.append(prop[attr][attr_idx]) - names.append(name + "_" + str(attr_idx)) - lows.append(lo_hi[0]) - highs.append(lo_hi[1]) - else: - params.append(getattr(prop, attr)) - names.append(name) - lows.append(lo_hi[0]) - highs.append(lo_hi[1]) - return params, names, lows, highs - - def apply_randomizations(self, dr_params): - """Apply domain randomizations to the environment. - - Note that currently we can only apply randomizations only on resets, due to current PhysX limitations - - Args: - dr_params: parameters for domain randomization to use. - """ - - # If we don't have a randomization frequency, randomize every step - rand_freq = dr_params.get("frequency", 1) - - # First, determine what to randomize: - # - non-environment parameters when > frequency steps have passed since the last non-environment - # - physical environments in the reset buffer, which have exceeded the randomization frequency threshold - # - on the first call, randomize everything - self.last_step = self.gym.get_frame_count(self.sim) - if self.first_randomization: - do_nonenv_randomize = True - env_ids = list(range(self.num_envs)) - else: - do_nonenv_randomize = (self.last_step - self.last_rand_step) >= rand_freq - rand_envs = torch.where( - self.randomize_buf >= rand_freq, - torch.ones_like(self.randomize_buf), - torch.zeros_like(self.randomize_buf), - ) - rand_envs = torch.logical_and(rand_envs, self.reset_buf) - env_ids = torch.nonzero(rand_envs, as_tuple=False).squeeze(-1).tolist() - self.randomize_buf[rand_envs] = 0 - - if do_nonenv_randomize: - self.last_rand_step = self.last_step - - param_setters_map = get_property_setter_map(self.gym) - param_setter_defaults_map = get_default_setter_args(self.gym) - param_getters_map = get_property_getter_map(self.gym) - - # On first iteration, check the number of buckets - if self.first_randomization: - check_buckets(self.gym, self.envs, dr_params) - - for nonphysical_param in ["observations", "actions"]: - if nonphysical_param in dr_params and do_nonenv_randomize: - dist = dr_params[nonphysical_param]["distribution"] - op_type = dr_params[nonphysical_param]["operation"] - sched_type = ( - dr_params[nonphysical_param]["schedule"] - if "schedule" in dr_params[nonphysical_param] - else None - ) - sched_step = ( - dr_params[nonphysical_param]["schedule_steps"] - if "schedule" in dr_params[nonphysical_param] - else None - ) - op = operator.add if op_type == "additive" else operator.mul - - if sched_type == "linear": - sched_scaling = 1.0 / sched_step * min(self.last_step, sched_step) - elif sched_type == "constant": - sched_scaling = 0 if self.last_step < sched_step else 1 - else: - sched_scaling = 1 - - if dist == "gaussian": - mu, var = dr_params[nonphysical_param]["range"] - mu_corr, var_corr = dr_params[nonphysical_param].get( - "range_correlated", [0.0, 0.0] - ) - - if op_type == "additive": - mu *= sched_scaling - var *= sched_scaling - mu_corr *= sched_scaling - var_corr *= sched_scaling - elif op_type == "scaling": - var = var * sched_scaling # scale up var over time - mu = mu * sched_scaling + 1.0 * ( - 1.0 - sched_scaling - ) # linearly interpolate - - var_corr = var_corr * sched_scaling # scale up var over time - mu_corr = mu_corr * sched_scaling + 1.0 * ( - 1.0 - sched_scaling - ) # linearly interpolate - - def noise_lambda(tensor, param_name=nonphysical_param): - params = self.dr_randomizations[param_name] - corr = params.get("corr", None) - if corr is None: - corr = torch.randn_like(tensor) - params["corr"] = corr - corr = corr * params["var_corr"] + params["mu_corr"] - return op( - tensor, - corr - + torch.randn_like(tensor) * params["var"] - + params["mu"], - ) - - self.dr_randomizations[nonphysical_param] = { - "mu": mu, - "var": var, - "mu_corr": mu_corr, - "var_corr": var_corr, - "noise_lambda": noise_lambda, - } - - elif dist == "uniform": - lo, hi = dr_params[nonphysical_param]["range"] - lo_corr, hi_corr = dr_params[nonphysical_param].get( - "range_correlated", [0.0, 0.0] - ) - - if op_type == "additive": - lo *= sched_scaling - hi *= sched_scaling - lo_corr *= sched_scaling - hi_corr *= sched_scaling - elif op_type == "scaling": - lo = lo * sched_scaling + 1.0 * (1.0 - sched_scaling) - hi = hi * sched_scaling + 1.0 * (1.0 - sched_scaling) - lo_corr = lo_corr * sched_scaling + 1.0 * (1.0 - sched_scaling) - hi_corr = hi_corr * sched_scaling + 1.0 * (1.0 - sched_scaling) - - def noise_lambda(tensor, param_name=nonphysical_param): - params = self.dr_randomizations[param_name] - corr = params.get("corr", None) - if corr is None: - corr = torch.randn_like(tensor) - params["corr"] = corr - corr = ( - corr * (params["hi_corr"] - params["lo_corr"]) - + params["lo_corr"] - ) - return op( - tensor, - corr - + torch.rand_like(tensor) * (params["hi"] - params["lo"]) - + params["lo"], - ) - - self.dr_randomizations[nonphysical_param] = { - "lo": lo, - "hi": hi, - "lo_corr": lo_corr, - "hi_corr": hi_corr, - "noise_lambda": noise_lambda, - } - - if "sim_params" in dr_params and do_nonenv_randomize: - prop_attrs = dr_params["sim_params"] - prop = self.gym.get_sim_params(self.sim) - - if self.first_randomization: - self.original_props["sim_params"] = { - attr: getattr(prop, attr) for attr in dir(prop) - } - - for attr, attr_randomization_params in prop_attrs.items(): - apply_random_samples( - prop, - self.original_props["sim_params"], - attr, - attr_randomization_params, - self.last_step, - ) - - self.gym.set_sim_params(self.sim, prop) - - # If self.actor_params_generator is initialized: use it to - # sample actor simulation params. This gives users the - # freedom to generate samples from arbitrary distributions, - # e.g. use full-covariance distributions instead of the DR's - # default of treating each simulation parameter independently. - extern_offsets = {} - if self.actor_params_generator is not None: - for env_id in env_ids: - self.extern_actor_params[env_id] = self.actor_params_generator.sample() - extern_offsets[env_id] = 0 - - # randomise all attributes of each actor (hand, cube etc..) - # actor_properties are (stiffness, damping etc..) - - # Loop over actors, then loop over envs, then loop over their props - # and lastly loop over the ranges of the params - - for actor, actor_properties in dr_params["actor_params"].items(): - # Loop over all envs as this part is not tensorised yet - for env_id in env_ids: - env = self.envs[env_id] - handle = self.gym.find_actor_handle(env, actor) - extern_sample = self.extern_actor_params[env_id] - - # randomise dof_props, rigid_body, rigid_shape properties - # all obtained from the YAML file - # EXAMPLE: prop name: dof_properties, rigid_body_properties, rigid_shape properties - # prop_attrs: - # {'damping': {'range': [0.3, 3.0], 'operation': 'scaling', 'distribution': 'loguniform'} - # {'stiffness': {'range': [0.75, 1.5], 'operation': 'scaling', 'distribution': 'loguniform'} - for prop_name, prop_attrs in actor_properties.items(): - if prop_name == "color": - num_bodies = self.gym.get_actor_rigid_body_count(env, handle) - for n in range(num_bodies): - self.gym.set_rigid_body_color( - env, - handle, - n, - gymapi.MESH_VISUAL, - gymapi.Vec3( - random.uniform(0, 1), - random.uniform(0, 1), - random.uniform(0, 1), - ), - ) - continue - if prop_name == "scale": - setup_only = prop_attrs.get("setup_only", False) - if (setup_only and not self.sim_initialized) or not setup_only: - attr_randomization_params = prop_attrs - sample = generate_random_samples( - attr_randomization_params, 1, self.last_step, None - ) - og_scale = 1 - if attr_randomization_params["operation"] == "scaling": - new_scale = og_scale * sample - elif attr_randomization_params["operation"] == "additive": - new_scale = og_scale + sample - self.gym.set_actor_scale(env, handle, new_scale) - continue - - prop = param_getters_map[prop_name](env, handle) - set_random_properties = True - - if isinstance(prop, list): - if self.first_randomization: - self.original_props[prop_name] = [ - {attr: getattr(p, attr) for attr in dir(p)} - for p in prop - ] - for p, og_p in zip(prop, self.original_props[prop_name]): - for attr, attr_randomization_params in prop_attrs.items(): - setup_only = attr_randomization_params.get( - "setup_only", False - ) - if ( - setup_only and not self.sim_initialized - ) or not setup_only: - smpl = None - if self.actor_params_generator is not None: - ( - smpl, - extern_offsets[env_id], - ) = get_attr_val_from_sample( - extern_sample, - extern_offsets[env_id], - p, - attr, - ) - apply_random_samples( - p, - og_p, - attr, - attr_randomization_params, - self.last_step, - smpl, - ) - else: - set_random_properties = False - else: - if self.first_randomization: - self.original_props[prop_name] = deepcopy(prop) - for attr, attr_randomization_params in prop_attrs.items(): - setup_only = attr_randomization_params.get( - "setup_only", False - ) - if ( - setup_only and not self.sim_initialized - ) or not setup_only: - smpl = None - if self.actor_params_generator is not None: - ( - smpl, - extern_offsets[env_id], - ) = get_attr_val_from_sample( - extern_sample, - extern_offsets[env_id], - prop, - attr, - ) - apply_random_samples( - prop, - self.original_props[prop_name], - attr, - attr_randomization_params, - self.last_step, - smpl, - ) - else: - set_random_properties = False - - if set_random_properties: - setter = param_setters_map[prop_name] - default_args = param_setter_defaults_map[prop_name] - setter(env, handle, prop, *default_args) - - if self.actor_params_generator is not None: - for env_id in env_ids: # check that we used all dims in sample - if extern_offsets[env_id] > 0: - extern_sample = self.extern_actor_params[env_id] - if extern_offsets[env_id] != extern_sample.shape[0]: - print( - "env_id", - env_id, - "extern_offset", - extern_offsets[env_id], - "vs extern_sample.shape", - extern_sample.shape, - ) - raise Exception("Invalid extern_sample size") - - self.first_randomization = False +# Copyright (c) 2018-2022, NVIDIA Corporation +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from typing import Dict, Any, Tuple + +import gym +from gym import spaces + +from isaacgym import gymapi +from rofunc.learning.RofuncRL.tasks.isaacgym.base.dr_utils import ( + get_property_setter_map, + get_property_getter_map, + get_default_setter_args, + apply_random_samples, + check_buckets, + generate_random_samples, +) + +import torch +import numpy as np +import operator, random +from copy import deepcopy +import sys + +import abc +from abc import ABC + +EXISTING_SIM = None +SCREEN_CAPTURE_RESOLUTION = (1027, 768) + + +def _create_sim_once(gym, *args, **kwargs): + global EXISTING_SIM + if EXISTING_SIM is not None: + return EXISTING_SIM + else: + EXISTING_SIM = gym.create_sim(*args, **kwargs) + return EXISTING_SIM + + +class Env(ABC): + def __init__( + self, + config: Dict[str, Any], + rl_device: str, + sim_device: str, + graphics_device_id: int, + headless: bool, + ): + """Initialise the env. + + Args: + config: the configuration dictionary. + sim_device: the device to simulate physics on. eg. 'cuda:0' or 'cpu' + graphics_device_id: the device ID to render with. + headless: Set to False to disable viewer rendering. + """ + + split_device = sim_device.split(":") + self.device_type = split_device[0] + self.device_id = int(split_device[1]) if len(split_device) > 1 else 0 + + self.device = "cpu" + if config["sim"]["use_gpu_pipeline"]: + if self.device_type.lower() == "cuda" or self.device_type.lower() == "gpu": + self.device = "cuda" + ":" + str(self.device_id) + else: + print( + "GPU Pipeline can only be used with GPU simulation. Forcing CPU Pipeline." + ) + config["sim"]["use_gpu_pipeline"] = False + + self.rl_device = rl_device + + # Rendering + # if training in a headless mode + self.headless = headless + + enable_camera_sensors = config.get("enableCameraSensors", False) + self.graphics_device_id = graphics_device_id + if enable_camera_sensors == False and self.headless == True: + self.graphics_device_id = -1 + + self.num_environments = config["env"]["numEnvs"] + self.num_agents = config["env"].get( + "numAgents", 1 + ) # used for multi-agent environments + self.num_observations = config["env"]["numObservations"] + self.num_states = config["env"].get("numStates", 0) + self.num_actions = config["env"]["numActions"] + + self.control_freq_inv = config["env"].get("controlFrequencyInv", 1) + + self.obs_space = spaces.Box( + np.ones(self.num_obs, dtype=np.float32) * -np.Inf, + np.ones(self.num_obs, dtype=np.float32) * np.Inf, + ) + self.state_space = spaces.Box( + np.ones(self.num_states, dtype=np.float32) * -np.Inf, + np.ones(self.num_states, dtype=np.float32) * np.Inf, + ) + + self.act_space = spaces.Box( + np.ones(self.num_actions, dtype=np.float32) * -1.0, + np.ones(self.num_actions, dtype=np.float32) * 1.0, + ) + + self.clip_obs = config["env"].get("clipObservations", np.Inf) + self.clip_actions = config["env"].get("clipActions", np.Inf) + + @abc.abstractmethod + def allocate_buffers(self): + """Create torch buffers for observations, rewards, actions dones and any additional data.""" + + @abc.abstractmethod + def step( + self, actions: torch.Tensor + ) -> Tuple[Dict[str, torch.Tensor], torch.Tensor, torch.Tensor, Dict[str, Any]]: + """Step the physics of the environment. + + Args: + actions: actions to apply + Returns: + Observations, rewards, resets, info + Observations are dict of observations (currently only one member called 'obs') + """ + + @abc.abstractmethod + def reset(self) -> Dict[str, torch.Tensor]: + """Reset the environment. + Returns: + Observation dictionary + """ + + @abc.abstractmethod + def reset_idx(self, env_ids: torch.Tensor): + """Reset environments having the provided indices. + Args: + env_ids: environments to reset + """ + + @property + def observation_space(self) -> gym.Space: + """Get the environment's observation space.""" + return self.obs_space + + @property + def action_space(self) -> gym.Space: + """Get the environment's action space.""" + return self.act_space + + @property + def num_envs(self) -> int: + """Get the number of environments.""" + return self.num_environments + + @property + def num_acts(self) -> int: + """Get the number of actions in the environment.""" + return self.num_actions + + @property + def num_obs(self) -> int: + """Get the number of observations in the environment.""" + return self.num_observations + + +class VecTask(Env): + metadata = {"render.modes": ["human", "rgb_array"], "video.frames_per_second": 24} + + def __init__( + self, + config, + rl_device, + sim_device, + graphics_device_id, + headless, + virtual_screen_capture: bool = False, + force_render: bool = False, + ): + """Initialise the `VecTask`. + + Args: + config: config dictionary for the environment. + sim_device: the device to simulate physics on. eg. 'cuda:0' or 'cpu' + graphics_device_id: the device ID to render with. + headless: Set to False to disable viewer rendering. + virtual_screen_capture: Set to True to allow the users get captured screen in RGB array via `env.render(mode='rgb_array')`. + force_render: Set to True to always force rendering in the steps (if the `control_freq_inv` is greater than 1 we suggest stting this arg to True) + """ + super().__init__(config, rl_device, sim_device, graphics_device_id, headless) + self.virtual_screen_capture = virtual_screen_capture + self.virtual_display = None + if self.virtual_screen_capture: + from pyvirtualdisplay.smartdisplay import SmartDisplay + + self.virtual_display = SmartDisplay(size=SCREEN_CAPTURE_RESOLUTION) + self.virtual_display.start() + self.force_render = force_render + + self.sim_params = self.__parse_sim_params( + self.cfg["physics_engine"], self.cfg["sim"] + ) + if self.cfg["physics_engine"] == "physx": + self.physics_engine = gymapi.SIM_PHYSX + elif self.cfg["physics_engine"] == "flex": + self.physics_engine = gymapi.SIM_FLEX + else: + msg = f"Invalid physics engine backend: {self.cfg['physics_engine']}" + raise ValueError(msg) + + # optimization flags for pytorch JIT + torch._C._jit_set_profiling_mode(False) + torch._C._jit_set_profiling_executor(False) + + self.gym = gymapi.acquire_gym() + + self.first_randomization = True + self.original_props = {} + self.dr_randomizations = {} + self.actor_params_generator = None + self.extern_actor_params = {} + self.last_step = -1 + self.last_rand_step = -1 + for env_id in range(self.num_envs): + self.extern_actor_params[env_id] = None + + # create envs, sim and viewer + self.sim_initialized = False + self.create_sim() + self.gym.prepare_sim(self.sim) + self.sim_initialized = True + + self.set_viewer() + self.allocate_buffers() + + self.obs_dict = {} + + def set_viewer(self): + """Create the viewer.""" + + # todo: read from config + self.enable_viewer_sync = True + self.viewer = None + + # if running with a viewer, set up keyboard shortcuts and camera + if self.headless == False: + # subscribe to keyboard shortcuts + self.viewer = self.gym.create_viewer(self.sim, gymapi.CameraProperties()) + self.gym.subscribe_viewer_keyboard_event( + self.viewer, gymapi.KEY_ESCAPE, "QUIT" + ) + self.gym.subscribe_viewer_keyboard_event( + self.viewer, gymapi.KEY_V, "toggle_viewer_sync" + ) + + # set the camera position based on up axis + sim_params = self.gym.get_sim_params(self.sim) + if sim_params.up_axis == gymapi.UP_AXIS_Z: + cam_pos = gymapi.Vec3(-3.0, -3.0, 3.0) + cam_target = gymapi.Vec3(10.0, 15.0, 0.0) + else: + cam_pos = gymapi.Vec3(-3.0, 3.0, -3.0) + cam_target = gymapi.Vec3(10.0, 0.0, 15.0) + + self.gym.viewer_camera_look_at(self.viewer, None, cam_pos, cam_target) + + def allocate_buffers(self): + """Allocate the observation, states, etc. buffers. + + These are what is used to set observations and states in the environment classes which + inherit from this one, and are read in `step` and other related functions. + + """ + + # allocate buffers + self.obs_buf = torch.zeros( + (self.num_envs, self.num_obs), device=self.device, dtype=torch.float + ) + self.states_buf = torch.zeros( + (self.num_envs, self.num_states), device=self.device, dtype=torch.float + ) + self.rew_buf = torch.zeros(self.num_envs, device=self.device, dtype=torch.float) + self.reset_buf = torch.ones(self.num_envs, device=self.device, dtype=torch.long) + self.timeout_buf = torch.zeros( + self.num_envs, device=self.device, dtype=torch.long + ) + self.progress_buf = torch.zeros( + self.num_envs, device=self.device, dtype=torch.long + ) + self.randomize_buf = torch.zeros( + self.num_envs, device=self.device, dtype=torch.long + ) + self.extras = {} + + def set_sim_params_up_axis(self, sim_params: gymapi.SimParams, axis: str) -> int: + """Set gravity based on up axis and return axis index. + + Args: + sim_params: sim params to modify the axis for. + axis: axis to set sim params for. + Returns: + axis index for up axis. + """ + if axis == "z": + sim_params.up_axis = gymapi.UP_AXIS_Z + sim_params.gravity.x = 0 + sim_params.gravity.y = 0 + sim_params.gravity.z = -9.81 + return 2 + return 1 + + def create_sim( + self, + compute_device: int, + graphics_device: int, + physics_engine, + sim_params: gymapi.SimParams, + ): + """Create an Isaac Gym sim object. + + Args: + compute_device: ID of compute device to use. + graphics_device: ID of graphics device to use. + physics_engine: physics engine to use (`gymapi.SIM_PHYSX` or `gymapi.SIM_FLEX`) + sim_params: sim params to use. + Returns: + the Isaac Gym sim object. + """ + sim = _create_sim_once( + self.gym, compute_device, graphics_device, physics_engine, sim_params + ) + if sim is None: + print("*** Failed to create sim") + quit() + + return sim + + def get_state(self): + """Returns the state buffer of the environment (the privileged observations for asymmetric training).""" + return torch.clamp(self.states_buf, -self.clip_obs, self.clip_obs).to( + self.rl_device + ) + + @abc.abstractmethod + def pre_physics_step(self, actions: torch.Tensor): + """Apply the actions to the environment (eg by setting torques, position targets). + + Args: + actions: the actions to apply + """ + + @abc.abstractmethod + def post_physics_step(self): + """Compute reward and observations, reset any environments that require it.""" + + def step( + self, actions: torch.Tensor + ) -> Tuple[Dict[str, torch.Tensor], torch.Tensor, torch.Tensor, Dict[str, Any]]: + """Step the physics of the environment. + + Args: + actions: actions to apply + Returns: + Observations, rewards, resets, info + Observations are dict of observations (currently only one member called 'obs') + """ + + # randomize actions + if self.dr_randomizations.get("actions", None): + actions = self.dr_randomizations["actions"]["noise_lambda"](actions) + + action_tensor = torch.clamp(actions, -self.clip_actions, self.clip_actions) + # apply actions + self.pre_physics_step(action_tensor) + + # step physics and render each frame + for i in range(self.control_freq_inv): + if self.force_render: + self.render() + self.gym.simulate(self.sim) + + # to fix! + if self.device == "cpu": + self.gym.fetch_results(self.sim, True) + + # fill time out buffer # TODO + self.timeout_buf = torch.where( + self.progress_buf >= self.max_episode_length - 1, + torch.ones_like(self.timeout_buf), + torch.zeros_like(self.timeout_buf), + ) + + # compute observations, rewards, resets, ... + self.post_physics_step() + + # fill time out buffer: set to 1 if we reached the max episode length AND the reset buffer is 1. + # Timeout == 1 makes sense only if the reset buffer is 1. + self.timeout_buf = (self.progress_buf >= self.max_episode_length - 1) & ( + self.reset_buf != 0 + ) + + # randomize observations + if self.dr_randomizations.get("observations", None): + self.obs_buf = self.dr_randomizations["observations"]["noise_lambda"]( + self.obs_buf + ) + + self.extras["time_outs"] = self.timeout_buf.to(self.rl_device) + + self.obs_dict["obs"] = torch.clamp( + self.obs_buf, -self.clip_obs, self.clip_obs + ).to(self.rl_device) + + # asymmetric actor-critic + if self.num_states > 0: + self.obs_dict["states"] = self.get_state() + + return ( + self.obs_dict, + self.rew_buf.to(self.rl_device), + self.reset_buf.to(self.rl_device), + self.extras, + ) + + def zero_actions(self) -> torch.Tensor: + """Returns a buffer with zero actions. + + Returns: + A buffer of zero torch actions + """ + actions = torch.zeros( + [self.num_envs, self.num_actions], + dtype=torch.float32, + device=self.rl_device, + ) + + return actions + + def reset_idx(self, env_idx): + """Reset environment with indces in env_idx. + Should be implemented in an environment class inherited from VecTask. + """ + pass + + def reset(self): + """Is called only once when environment starts to provide the first observations. + Doesn't calculate observations. Actual reset and observation calculation need to be implemented by user. + Returns: + Observation dictionary + """ + zero_actions = self.zero_actions() + + # step the simulator + self.step(zero_actions) + + self.obs_dict["obs"] = torch.clamp( + self.obs_buf, -self.clip_obs, self.clip_obs + ).to(self.rl_device) + + # asymmetric actor-critic + if self.num_states > 0: + self.obs_dict["states"] = self.get_state() + + return self.obs_dict + + def reset_done(self): + """Reset the environment. + Returns: + Observation dictionary, indices of environments being reset + """ + done_env_ids = self.reset_buf.nonzero(as_tuple=False).flatten() + if len(done_env_ids) > 0: + self.reset_idx(done_env_ids) + + self.obs_dict["obs"] = torch.clamp( + self.obs_buf, -self.clip_obs, self.clip_obs + ).to(self.rl_device) + + # asymmetric actor-critic + if self.num_states > 0: + self.obs_dict["states"] = self.get_state() + + return self.obs_dict, done_env_ids + + def render(self, mode="rgb_array"): + """Draw the frame to the viewer, and check for keyboard events.""" + if self.viewer: + # check for window closed + if self.gym.query_viewer_has_closed(self.viewer): + sys.exit() + + # check for keyboard events + for evt in self.gym.query_viewer_action_events(self.viewer): + if evt.action == "QUIT" and evt.value > 0: + sys.exit() + elif evt.action == "toggle_viewer_sync" and evt.value > 0: + self.enable_viewer_sync = not self.enable_viewer_sync + + # fetch results + if self.device != "cpu": + self.gym.fetch_results(self.sim, True) + + # step graphics + if self.enable_viewer_sync: + self.gym.step_graphics(self.sim) + self.gym.draw_viewer(self.viewer, self.sim, True) + + # Wait for dt to elapse in real time. + # This synchronizes the physics simulation with the rendering rate. + self.gym.sync_frame_time(self.sim) + + else: + self.gym.poll_viewer_events(self.viewer) + + if self.virtual_display and mode == "rgb_array": + img = self.virtual_display.grab() + return np.array(img) + + def __parse_sim_params( + self, physics_engine: str, config_sim: Dict[str, Any] + ) -> gymapi.SimParams: + """Parse the config dictionary for physics stepping settings. + + Args: + physics_engine: which physics engine to use. "physx" or "flex" + config_sim: dict of sim configuration parameters + Returns + IsaacGym SimParams object with updated settings. + """ + sim_params = gymapi.SimParams() + + # check correct up-axis + if config_sim["up_axis"] not in ["z", "y"]: + msg = f"Invalid physics up-axis: {config_sim['up_axis']}" + print(msg) + raise ValueError(msg) + + # assign general sim parameters + sim_params.dt = config_sim["dt"] + sim_params.num_client_threads = config_sim.get("num_client_threads", 0) + sim_params.use_gpu_pipeline = config_sim["use_gpu_pipeline"] + sim_params.substeps = config_sim.get("substeps", 2) + + # assign up-axis + if config_sim["up_axis"] == "z": + sim_params.up_axis = gymapi.UP_AXIS_Z + else: + sim_params.up_axis = gymapi.UP_AXIS_Y + + # assign gravity + sim_params.gravity = gymapi.Vec3(*config_sim["gravity"]) + + # configure physics parameters + if physics_engine == "physx": + # set the parameters + if "physx" in config_sim: + for opt in config_sim["physx"].keys(): + if opt == "contact_collection": + setattr( + sim_params.physx, + opt, + gymapi.ContactCollection(config_sim["physx"][opt]), + ) + else: + setattr(sim_params.physx, opt, config_sim["physx"][opt]) + else: + # set the parameters + if "flex" in config_sim: + for opt in config_sim["flex"].keys(): + setattr(sim_params.flex, opt, config_sim["flex"][opt]) + + # return the configured params + return sim_params + + """ + Domain Randomization methods + """ + + def get_actor_params_info(self, dr_params: Dict[str, Any], env): + """Generate a flat array of actor params, their names and ranges. + + Returns: + The array + """ + + if "actor_params" not in dr_params: + return None + params = [] + names = [] + lows = [] + highs = [] + param_getters_map = get_property_getter_map(self.gym) + for actor, actor_properties in dr_params["actor_params"].items(): + handle = self.gym.find_actor_handle(env, actor) + for prop_name, prop_attrs in actor_properties.items(): + if prop_name == "color": + continue # this is set randomly + props = param_getters_map[prop_name](env, handle) + if not isinstance(props, list): + props = [props] + for prop_idx, prop in enumerate(props): + for attr, attr_randomization_params in prop_attrs.items(): + name = prop_name + "_" + str(prop_idx) + "_" + attr + lo_hi = attr_randomization_params["range"] + distr = attr_randomization_params["distribution"] + if "uniform" not in distr: + lo_hi = (-1.0 * float("Inf"), float("Inf")) + if isinstance(prop, np.ndarray): + for attr_idx in range(prop[attr].shape[0]): + params.append(prop[attr][attr_idx]) + names.append(name + "_" + str(attr_idx)) + lows.append(lo_hi[0]) + highs.append(lo_hi[1]) + else: + params.append(getattr(prop, attr)) + names.append(name) + lows.append(lo_hi[0]) + highs.append(lo_hi[1]) + return params, names, lows, highs + + def apply_randomizations(self, dr_params): + """Apply domain randomizations to the environment. + + Note that currently we can only apply randomizations only on resets, due to current PhysX limitations + + Args: + dr_params: parameters for domain randomization to use. + """ + + # If we don't have a randomization frequency, randomize every step + rand_freq = dr_params.get("frequency", 1) + + # First, determine what to randomize: + # - non-environment parameters when > frequency steps have passed since the last non-environment + # - physical environments in the reset buffer, which have exceeded the randomization frequency threshold + # - on the first call, randomize everything + self.last_step = self.gym.get_frame_count(self.sim) + if self.first_randomization: + do_nonenv_randomize = True + env_ids = list(range(self.num_envs)) + else: + do_nonenv_randomize = (self.last_step - self.last_rand_step) >= rand_freq + rand_envs = torch.where( + self.randomize_buf >= rand_freq, + torch.ones_like(self.randomize_buf), + torch.zeros_like(self.randomize_buf), + ) + rand_envs = torch.logical_and(rand_envs, self.reset_buf) + env_ids = torch.nonzero(rand_envs, as_tuple=False).squeeze(-1).tolist() + self.randomize_buf[rand_envs] = 0 + + if do_nonenv_randomize: + self.last_rand_step = self.last_step + + param_setters_map = get_property_setter_map(self.gym) + param_setter_defaults_map = get_default_setter_args(self.gym) + param_getters_map = get_property_getter_map(self.gym) + + # On first iteration, check the number of buckets + if self.first_randomization: + check_buckets(self.gym, self.envs, dr_params) + + for nonphysical_param in ["observations", "actions"]: + if nonphysical_param in dr_params and do_nonenv_randomize: + dist = dr_params[nonphysical_param]["distribution"] + op_type = dr_params[nonphysical_param]["operation"] + sched_type = ( + dr_params[nonphysical_param]["schedule"] + if "schedule" in dr_params[nonphysical_param] + else None + ) + sched_step = ( + dr_params[nonphysical_param]["schedule_steps"] + if "schedule" in dr_params[nonphysical_param] + else None + ) + op = operator.add if op_type == "additive" else operator.mul + + if sched_type == "linear": + sched_scaling = 1.0 / sched_step * min(self.last_step, sched_step) + elif sched_type == "constant": + sched_scaling = 0 if self.last_step < sched_step else 1 + else: + sched_scaling = 1 + + if dist == "gaussian": + mu, var = dr_params[nonphysical_param]["range"] + mu_corr, var_corr = dr_params[nonphysical_param].get( + "range_correlated", [0.0, 0.0] + ) + + if op_type == "additive": + mu *= sched_scaling + var *= sched_scaling + mu_corr *= sched_scaling + var_corr *= sched_scaling + elif op_type == "scaling": + var = var * sched_scaling # scale up var over time + mu = mu * sched_scaling + 1.0 * ( + 1.0 - sched_scaling + ) # linearly interpolate + + var_corr = var_corr * sched_scaling # scale up var over time + mu_corr = mu_corr * sched_scaling + 1.0 * ( + 1.0 - sched_scaling + ) # linearly interpolate + + def noise_lambda(tensor, param_name=nonphysical_param): + params = self.dr_randomizations[param_name] + corr = params.get("corr", None) + if corr is None: + corr = torch.randn_like(tensor) + params["corr"] = corr + corr = corr * params["var_corr"] + params["mu_corr"] + return op( + tensor, + corr + + torch.randn_like(tensor) * params["var"] + + params["mu"], + ) + + self.dr_randomizations[nonphysical_param] = { + "mu": mu, + "var": var, + "mu_corr": mu_corr, + "var_corr": var_corr, + "noise_lambda": noise_lambda, + } + + elif dist == "uniform": + lo, hi = dr_params[nonphysical_param]["range"] + lo_corr, hi_corr = dr_params[nonphysical_param].get( + "range_correlated", [0.0, 0.0] + ) + + if op_type == "additive": + lo *= sched_scaling + hi *= sched_scaling + lo_corr *= sched_scaling + hi_corr *= sched_scaling + elif op_type == "scaling": + lo = lo * sched_scaling + 1.0 * (1.0 - sched_scaling) + hi = hi * sched_scaling + 1.0 * (1.0 - sched_scaling) + lo_corr = lo_corr * sched_scaling + 1.0 * (1.0 - sched_scaling) + hi_corr = hi_corr * sched_scaling + 1.0 * (1.0 - sched_scaling) + + def noise_lambda(tensor, param_name=nonphysical_param): + params = self.dr_randomizations[param_name] + corr = params.get("corr", None) + if corr is None: + corr = torch.randn_like(tensor) + params["corr"] = corr + corr = ( + corr * (params["hi_corr"] - params["lo_corr"]) + + params["lo_corr"] + ) + return op( + tensor, + corr + + torch.rand_like(tensor) * (params["hi"] - params["lo"]) + + params["lo"], + ) + + self.dr_randomizations[nonphysical_param] = { + "lo": lo, + "hi": hi, + "lo_corr": lo_corr, + "hi_corr": hi_corr, + "noise_lambda": noise_lambda, + } + + if "sim_params" in dr_params and do_nonenv_randomize: + prop_attrs = dr_params["sim_params"] + prop = self.gym.get_sim_params(self.sim) + + if self.first_randomization: + self.original_props["sim_params"] = { + attr: getattr(prop, attr) for attr in dir(prop) + } + + for attr, attr_randomization_params in prop_attrs.items(): + apply_random_samples( + prop, + self.original_props["sim_params"], + attr, + attr_randomization_params, + self.last_step, + ) + + self.gym.set_sim_params(self.sim, prop) + + # If self.actor_params_generator is initialized: use it to + # sample actor simulation params. This gives users the + # freedom to generate samples from arbitrary distributions, + # e.g. use full-covariance distributions instead of the DR's + # default of treating each simulation parameter independently. + extern_offsets = {} + if self.actor_params_generator is not None: + for env_id in env_ids: + self.extern_actor_params[env_id] = self.actor_params_generator.sample() + extern_offsets[env_id] = 0 + + # randomise all attributes of each actor (hand, cube etc..) + # actor_properties are (stiffness, damping etc..) + + # Loop over actors, then loop over envs, then loop over their props + # and lastly loop over the ranges of the params + + for actor, actor_properties in dr_params["actor_params"].items(): + # Loop over all envs as this part is not tensorised yet + for env_id in env_ids: + env = self.envs[env_id] + handle = self.gym.find_actor_handle(env, actor) + extern_sample = self.extern_actor_params[env_id] + + # randomise dof_props, rigid_body, rigid_shape properties + # all obtained from the YAML file + # EXAMPLE: prop name: dof_properties, rigid_body_properties, rigid_shape properties + # prop_attrs: + # {'damping': {'range': [0.3, 3.0], 'operation': 'scaling', 'distribution': 'loguniform'} + # {'stiffness': {'range': [0.75, 1.5], 'operation': 'scaling', 'distribution': 'loguniform'} + for prop_name, prop_attrs in actor_properties.items(): + if prop_name == "color": + num_bodies = self.gym.get_actor_rigid_body_count(env, handle) + for n in range(num_bodies): + self.gym.set_rigid_body_color( + env, + handle, + n, + gymapi.MESH_VISUAL, + gymapi.Vec3( + random.uniform(0, 1), + random.uniform(0, 1), + random.uniform(0, 1), + ), + ) + continue + if prop_name == "scale": + setup_only = prop_attrs.get("setup_only", False) + if (setup_only and not self.sim_initialized) or not setup_only: + attr_randomization_params = prop_attrs + sample = generate_random_samples( + attr_randomization_params, 1, self.last_step, None + ) + og_scale = 1 + if attr_randomization_params["operation"] == "scaling": + new_scale = og_scale * sample + elif attr_randomization_params["operation"] == "additive": + new_scale = og_scale + sample + self.gym.set_actor_scale(env, handle, new_scale) + continue + + prop = param_getters_map[prop_name](env, handle) + set_random_properties = True + + if isinstance(prop, list): + if self.first_randomization: + self.original_props[prop_name] = [ + {attr: getattr(p, attr) for attr in dir(p)} + for p in prop + ] + for p, og_p in zip(prop, self.original_props[prop_name]): + for attr, attr_randomization_params in prop_attrs.items(): + setup_only = attr_randomization_params.get( + "setup_only", False + ) + if ( + setup_only and not self.sim_initialized + ) or not setup_only: + smpl = None + if self.actor_params_generator is not None: + ( + smpl, + extern_offsets[env_id], + ) = get_attr_val_from_sample( + extern_sample, + extern_offsets[env_id], + p, + attr, + ) + apply_random_samples( + p, + og_p, + attr, + attr_randomization_params, + self.last_step, + smpl, + ) + else: + set_random_properties = False + else: + if self.first_randomization: + self.original_props[prop_name] = deepcopy(prop) + for attr, attr_randomization_params in prop_attrs.items(): + setup_only = attr_randomization_params.get( + "setup_only", False + ) + if ( + setup_only and not self.sim_initialized + ) or not setup_only: + smpl = None + if self.actor_params_generator is not None: + ( + smpl, + extern_offsets[env_id], + ) = get_attr_val_from_sample( + extern_sample, + extern_offsets[env_id], + prop, + attr, + ) + apply_random_samples( + prop, + self.original_props[prop_name], + attr, + attr_randomization_params, + self.last_step, + smpl, + ) + else: + set_random_properties = False + + if set_random_properties: + setter = param_setters_map[prop_name] + default_args = param_setter_defaults_map[prop_name] + setter(env, handle, prop, *default_args) + + if self.actor_params_generator is not None: + for env_id in env_ids: # check that we used all dims in sample + if extern_offsets[env_id] > 0: + extern_sample = self.extern_actor_params[env_id] + if extern_offsets[env_id] != extern_sample.shape[0]: + print( + "env_id", + env_id, + "extern_offset", + extern_offsets[env_id], + "vs extern_sample.shape", + extern_sample.shape, + ) + raise Exception("Invalid extern_sample size") + + self.first_randomization = False diff --git a/rofunc/utils/datalab/poselib/fbx_to_amp_npy.py b/rofunc/utils/datalab/poselib/fbx_to_amp_npy.py index df1aac64..cdfcf8c1 100644 --- a/rofunc/utils/datalab/poselib/fbx_to_amp_npy.py +++ b/rofunc/utils/datalab/poselib/fbx_to_amp_npy.py @@ -234,6 +234,8 @@ def motion_retargeting(retarget_cfg, source_motion, visualize=False): root_translation = target_motion.root_translation local_rotation = local_rotation[frame_beg:frame_end, ...] root_translation = root_translation[frame_beg:frame_end, ...] + avg_root_translation = root_translation.mean(axis=0) + root_translation[1:] -= avg_root_translation new_sk_state = SkeletonState.from_rotation_and_root_translation(target_motion.skeleton_tree, local_rotation, root_translation, is_local=True) @@ -279,10 +281,10 @@ def amp_npy_from_fbx(fbx_file): """ config = { - "target_motion_path": "/home/ubuntu/data/fbx/xsense_amp.npy", - "source_tpose": "/home/ubuntu/poselib/data/clover_tpose.npy", - "target_tpose": "/home/skylark/Github/Knowledge-Universe/Robotics/Roadmap-for-robot-science/rofunc/utils/datalab/poselib/data/amp_humanoid_tpose.npy", - "joint_mapping": { + "target_motion_path": "/home/ubuntu/Github/Rofunc/examples/data/hotu/024_amp_3.npy", + "source_tpose": "/home/ubuntu/Github/Rofunc/examples/data/hotu/024_tpose.npy", + "target_tpose": "/home/ubuntu/Github/Rofunc/rofunc/utils/datalab/poselib/data/amp_humanoid_tpose.npy", + "joint_mapping": { # Left: Xsens, Right: MJCF "Hips": "pelvis", "LeftUpLeg": "left_thigh", "LeftLeg": "left_shin", @@ -301,19 +303,23 @@ def amp_npy_from_fbx(fbx_file): }, # "rotation": [0.707, 0, 0, 0.707], xyzw "rotation": [0.5, 0.5, 0.5, 0.5], - "scale": 0.1, + "scale": 0.001, "root_height_offset": 0.0, "trim_frame_beg": 0, "trim_frame_end": -1 } - motion = motion_from_fbx(fbx_file, root_joint="Reference", fps=60, visualize=False) - config["target_motion_path"] = fbx_file.replace('.fbx', '_amp.npy') - motion_retargeting(config, motion, visualize=True) + motion = motion_from_fbx(fbx_file, root_joint="Hips", fps=60, visualize=False) + # config["target_motion_path"] = fbx_file.replace('.fbx', '_amp.npy') + motion_retargeting(config, motion, visualize=False) if __name__ == '__main__': - fbx_dir = "/home/skylark/Github/Knowledge-Universe/Robotics/Roadmap-for-robot-science/examples/data/hotu" + from rofunc.utils.oslab.path import get_rofunc_path + import os + + rofunc_path = get_rofunc_path() + fbx_dir = os.path.join(rofunc_path, "../examples/data/hotu") fbx_files = rf.oslab.list_absl_path(fbx_dir, suffix='.fbx') parallel = False diff --git a/rofunc/utils/datalab/poselib/generate_tpose_from_xsens_fbx.py b/rofunc/utils/datalab/poselib/generate_tpose_from_xsens_fbx.py index 273c45fc..67128ad2 100644 --- a/rofunc/utils/datalab/poselib/generate_tpose_from_xsens_fbx.py +++ b/rofunc/utils/datalab/poselib/generate_tpose_from_xsens_fbx.py @@ -36,8 +36,10 @@ def get_tpose_from_fbx(fbx_file_path, save_path, verbose=True): def main(): import rofunc as rf + import os - data_dir = "/home/skylark/Github/Knowledge-Universe/Robotics/Roadmap-for-robot-science/examples/data/hotu" + rofunc_path = rf.oslab.get_rofunc_path() + data_dir = os.path.join(rofunc_path, "../examples/data/hotu") fbx_files = rf.oslab.list_absl_path(data_dir, suffix='.fbx') for fbx in fbx_files: fbx_name = os.path.basename(fbx).split('.')[0] diff --git a/rofunc/utils/datalab/poselib/generate_xsens_tpose.py b/rofunc/utils/datalab/poselib/generate_xsens_tpose.py new file mode 100644 index 00000000..6c498764 --- /dev/null +++ b/rofunc/utils/datalab/poselib/generate_xsens_tpose.py @@ -0,0 +1,52 @@ +# Copyright 2023, Junjia LIU, jjliu@mae.cuhk.edu.hk +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import click + +from rofunc.utils.datalab.poselib.poselib.skeleton.skeleton3d import SkeletonState, SkeletonMotion +from rofunc.utils.datalab.poselib.poselib.visualization.common import plot_skeleton_state + + +def get_tpose_from_fbx(fbx_file_path, save_path, verbose=False): + motion = SkeletonMotion.from_fbx( + fbx_file_path=fbx_file_path, + root_joint="Hips", + fps=60 + ) + + source_tpose = SkeletonState.from_rotation_and_root_translation(motion.skeleton_tree, motion.rotation[0], + motion.root_translation[0], is_local=True) + + source_tpose.to_file(save_path) + + if verbose: + plot_skeleton_state(source_tpose) + + +@click.command() +@click.argument("fbx_name") +def main(fbx_name): + data_dir = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "../../../../data" + ) + os.makedirs(data_dir, exist_ok=True) + fbx_files = [os.path.join(data_dir, f"{fbx_name}.fbx")] + for fbx in fbx_files: + save_path = os.path.join(data_dir, "new_tpose.npy") + get_tpose_from_fbx(fbx, save_path, verbose=True) + + +if __name__ == '__main__': + main() diff --git a/rofunc/utils/datalab/poselib_ase/data/01_01_cmu.npy b/rofunc/utils/datalab/poselib_ase/data/01_01_cmu.npy new file mode 100644 index 00000000..c8e1dcb8 Binary files /dev/null and b/rofunc/utils/datalab/poselib_ase/data/01_01_cmu.npy differ diff --git a/rofunc/utils/datalab/poselib_ase/xsens/xsens_fbx_importer.py b/rofunc/utils/datalab/poselib_ase/xsens/xsens_fbx_importer.py new file mode 100644 index 00000000..84e5edd1 --- /dev/null +++ b/rofunc/utils/datalab/poselib_ase/xsens/xsens_fbx_importer.py @@ -0,0 +1,52 @@ +# Copyright (c) 2018-2022, NVIDIA Corporation +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +import os +import json +from rofunc.utils.oslab.path import get_rofunc_path + +from rofunc.utils.datalab.poselib.poselib.skeleton.skeleton3d import SkeletonTree, SkeletonState, SkeletonMotion +from rofunc.utils.datalab.poselib.poselib.visualization.common import plot_skeleton_state, plot_skeleton_motion_interactive + +# source fbx file path +rofunc_path = get_rofunc_path() +fbx_file = os.path.join(rofunc_path, "../examples/data/hotu/024.fbx") + +# import fbx file - make sure to provide a valid joint name for root_joint +motion = SkeletonMotion.from_fbx( + fbx_file_path=fbx_file, + root_joint="Hips", + fps=60 +) + +# save motion in npy format +# motion.to_file("data/01_01_cmu.npy") + +# visualize motion +plot_skeleton_motion_interactive(motion)