nmi-val / softlearning / environments / gym / mujoco / pusher_2d.py
pusher_2d.py
Raw
import os.path as osp

import numpy as np
from gym.envs.mujoco.mujoco_env import MujocoEnv

from serializable import Serializable

from softlearning.misc.utils import PROJECT_PATH
from softlearning.environments.helpers import random_point_in_circle


class Pusher2dEnv(Serializable, MujocoEnv):
    """Two-dimensional Pusher environment

    Pusher2dEnv is a two-dimensional 3-DoF manipulator. Task is to slide a
    cylinder-shaped object, or a 'puck', to a target coordinates.

    Note: Serializable has to be the first super class for classes extending
    MujocoEnv (or at least occur before MujocoEnv). Otherwise MujocoEnv calls
    Serializable.__init__ (from MujocoEnv.__init__), and the Serializable
    attributes (_Serializable__args and _Serializable__kwargs) will get
    overwritten.
    """
    MODEL_PATH = osp.abspath(
        osp.join(PROJECT_PATH, 'models', 'pusher_2d.xml'))

    JOINT_INDS = list(range(0, 3))
    PUCK_INDS = list(range(3, 5))
    TARGET_INDS = list(range(5, 7))

    # TODO.before_release Fix target visualization (right now the target is
    # always drawn in (-1, 0), regardless of the actual goal.

    def __init__(self,
                 goal=(0, -1),
                 arm_object_distance_cost_coeff=0,
                 goal_object_distance_cost_coeff=1.0,
                 ctrl_cost_coeff=0.1):
        """
        goal (`list`): List of two elements denoting the x and y coordinates of
            the goal location. Either of the coordinate can also be a string
            'any' to make the reward not to depend on the corresponding
            coordinate.
        arm_distance_coeff ('float'): Coefficient for the arm-to-object distance
            cost.
        goal_distance_coeff ('float'): Coefficient for the object-to-goal
            distance cost.
        """
        self._Serializable__initialize(locals())

        self._goal_mask = [coordinate != 'any' for coordinate in goal]
        self._goal = np.array(goal)[self._goal_mask].astype(np.float32)

        self._arm_object_distance_cost_coeff = arm_object_distance_cost_coeff
        self._goal_object_distance_cost_coeff = goal_object_distance_cost_coeff
        self._ctrl_cost_coeff = ctrl_cost_coeff

        MujocoEnv.__init__(self, model_path=self.MODEL_PATH, frame_skip=5)

        self.model.stat.extent = 10

    def step(self, action):
        reward, info = self.compute_reward(self._get_obs(), action)

        self.do_simulation(action, self.frame_skip)
        observation = self._get_obs()
        done = False

        return observation, reward, done, info

    def compute_reward(self, observations, actions):
        is_batch = True
        if observations.ndim == 1:
            observations = observations[None]
            actions = actions[None]
            is_batch = False

        arm_pos = observations[:, -6:-3]
        obj_pos = observations[:, -3:]
        obj_pos_masked = obj_pos[:, :2][:, self._goal_mask]

        goal_object_distances = np.linalg.norm(
            self._goal[None] - obj_pos_masked, axis=1)
        arm_object_distances = np.linalg.norm(arm_pos - obj_pos, axis=1)
        ctrl_costs = np.sum(actions**2, axis=1)

        costs = (
            + self._arm_object_distance_cost_coeff * arm_object_distances
            + self._goal_object_distance_cost_coeff * goal_object_distances
            + self._ctrl_cost_coeff * ctrl_costs)

        rewards = -costs

        if not is_batch:
            rewards = rewards.squeeze()
            arm_object_distances = arm_object_distances.squeeze()
            goal_object_distances = goal_object_distances.squeeze()

        return rewards, {
            'arm_object_distance': arm_object_distances,
            'goal_object_distance': goal_object_distances
        }

    def viewer_setup(self):
        self.viewer.cam.trackbodyid = 0
        cam_pos = np.array([0, 0, 0, 4, -45, 0])
        for i in range(3):
            self.viewer.cam.lookat[i] = cam_pos[i]
        self.viewer.cam.distance = cam_pos[3]
        self.viewer.cam.elevation = cam_pos[4]
        self.viewer.cam.azimuth = cam_pos[5]
        self.viewer.cam.trackbodyid = -1

    def reset_model(self, qpos=None, qvel=None):
        if qpos is None:
            qpos = np.random.uniform(
                low=-0.1, high=0.1, size=self.model.nq
            ) + self.init_qpos.squeeze()
            qpos[self.TARGET_INDS] = self.init_qpos.squeeze()[self.TARGET_INDS]

            puck_position = np.random.uniform(
                low=[0.3, -1.0], high=[1.0, -0.4]),

            qpos[self.PUCK_INDS] = puck_position

        if qvel is None:
            qvel = self.init_qvel.copy().squeeze()
            qvel[self.PUCK_INDS] = 0
            qvel[self.TARGET_INDS] = 0

        # TODO: remnants from rllab -> gym conversion
        # qacc = np.zeros(self.sim.data.qacc.shape[0])
        # ctrl = np.zeros(self.sim.data.ctrl.shape[0])
        # full_state = np.concatenate((qpos, qvel, qacc, ctrl))

        self.set_state(np.array(qpos), np.array(qvel))

        return self._get_obs()

    def _get_obs(self):
        return np.concatenate([
            np.sin(self.sim.data.qpos.flat[self.JOINT_INDS]),
            np.cos(self.sim.data.qpos.flat[self.JOINT_INDS]),
            self.sim.data.qvel.flat[self.JOINT_INDS],
            self.get_body_com("distal_4"),
            self.get_body_com("object"),
        ]).reshape(-1)


class ForkReacherEnv(Pusher2dEnv):
    def __init__(self,
                 arm_goal_distance_cost_coeff=1.0,
                 arm_object_distance_cost_coeff=0.0,
                 *args,
                 **kwargs):
        self._Serializable__initialize(locals())

        self._arm_goal_distance_cost_coeff = arm_goal_distance_cost_coeff
        self._arm_object_distance_cost_coeff = arm_object_distance_cost_coeff

        super(ForkReacherEnv, self).__init__(*args, **kwargs)

    def compute_reward(self, observations, actions):
        is_batch = True
        if observations.ndim == 1:
            observations = observations[None]
            actions = actions[None]
            is_batch = False
        else:
            raise NotImplementedError('Might be broken.')

        arm_pos = observations[:, -8:-6]
        goal_pos = observations[:, -2:]
        object_pos = observations[:, -5:-3]

        arm_goal_dists = np.linalg.norm(arm_pos - goal_pos, axis=1)
        arm_object_dists = np.linalg.norm(arm_pos - object_pos, axis=1)
        ctrl_costs = np.sum(actions**2, axis=1)

        costs = (
            + self._arm_goal_distance_cost_coeff * arm_goal_dists
            + self._arm_object_distance_cost_coeff * arm_object_dists
            + self._ctrl_cost_coeff * ctrl_costs)

        rewards = -costs

        if not is_batch:
            rewards = rewards.squeeze()
            arm_goal_dists = arm_goal_dists.squeeze()
            arm_object_dists = arm_object_dists.squeeze()

        return rewards, {
            'arm_goal_distance': arm_goal_dists,
            'arm_object_distance': arm_object_dists,
        }

    def reset_model(self, qpos=None, qvel=None):
        if qpos is None:
            qpos = np.random.uniform(
                low=-0.1, high=0.1, size=self.model.nq
            ) + self.init_qpos.squeeze()

            # qpos[self.JOINT_INDS[0]] = np.random.uniform(-np.pi, np.pi)
            # qpos[self.JOINT_INDS[1]] = np.random.uniform(
            #     -np.pi/2, np.pi/2) + np.pi/4
            # qpos[self.JOINT_INDS[2]] = np.random.uniform(
            #     -np.pi/2, np.pi/2) + np.pi/2

            target_position = np.array(random_point_in_circle(
                angle_range=(0, 2*np.pi), radius=(0.6, 1.2)))
            target_position[1] += 1.0

            qpos[self.TARGET_INDS] = target_position
            # qpos[self.TARGET_INDS] = [1.0, 2.0]
            # qpos[self.TARGET_INDS] = self.init_qpos.squeeze()[self.TARGET_INDS]

            puck_position = np.random.uniform([-1.0], [1.0], size=[2])
            puck_position = (
                np.sign(puck_position)
                * np.maximum(np.abs(puck_position), 1/2))
            puck_position[np.where(puck_position == 0)] = 1.0
            # puck_position[1] += 1.0
            # puck_position = np.random.uniform(
            #     low=[0.3, -1.0], high=[1.0, -0.4]),

            qpos[self.PUCK_INDS] = puck_position

        if qvel is None:
            qvel = self.init_qvel.copy().squeeze()
            qvel[self.PUCK_INDS] = 0
            qvel[self.TARGET_INDS] = 0

        # TODO: remnants from rllab -> gym conversion
        # qacc = np.zeros(self.sim.data.qacc.shape[0])
        # ctrl = np.zeros(self.sim.data.ctrl.shape[0])
        # full_state = np.concatenate((qpos, qvel, qacc, ctrl))

        # super(Pusher2dEnv, self).reset(full_state)

        self.set_state(np.array(qpos), np.array(qvel))

        return self._get_obs()

    def _get_obs(self):
        super_observation = super(ForkReacherEnv, self)._get_obs()
        observation = np.concatenate([
            super_observation, self.get_body_com('goal')[:2]
        ])
        return observation