nmi-val / softlearning / replay_pools / trajectory_replay_pool.py
trajectory_replay_pool.py
Raw
from collections import deque
import gzip
import pickle
from itertools import islice

import numpy as np

from softlearning.utils.numpy import softmax
from .replay_pool import ReplayPool


def random_int_with_variable_range(mins, maxs):
    result = np.floor(np.random.uniform(mins, maxs)).astype(int)
    return result


class TrajectoryReplayPool(ReplayPool):
    def __init__(self,
                 observation_space,
                 action_space,
                 max_size):
        super(TrajectoryReplayPool, self).__init__()

        max_size = int(max_size)
        self._max_size = max_size

        self._trajectories = deque(maxlen=max_size)
        self._trajectory_lengths = deque(maxlen=max_size)
        self._num_samples = 0
        self._trajectories_since_save = 0

    @property
    def num_trajectories(self):
        return len(self._trajectories)

    @property
    def size(self):
        return sum(self._trajectory_lengths)

    @property
    def num_samples(self):
        return self._num_samples

    def add_paths(self, trajectories):
        self._trajectories += trajectories
        self._trajectory_lengths += [
            trajectory[next(iter(trajectory.keys()))].shape[0]
            for trajectory in trajectories
        ]
        self._trajectories_since_save += len(trajectories)

    def add_path(self, trajectory):
        self.add_paths([trajectory])

    def add_sample(self, sample):
        raise NotImplementedError(
            f"{self.__class__.__name__} only supports adding full paths at"
            " once.")

    def add_samples(self, samples):
        raise NotImplementedError(
            f"{self.__class__.__name__} only supports adding full paths at"
            " once.")

    def batch_by_indices(self,
                         episode_indices,
                         step_indices,
                         field_name_filter=None):
        assert len(episode_indices) == len(step_indices)

        batch_size = len(episode_indices)
        trajectories = [self._trajectories[i] for i in episode_indices]

        batch = {
            field_name: np.empty(
                (batch_size, *values.shape[1:]), dtype=values.dtype)
            for field_name, values in trajectories[0].items()
        }

        for i, episode in enumerate(trajectories):
            for field_name, episode_values in episode.items():
                batch[field_name][i] = episode_values[step_indices[i]]

        return batch

    def random_batch(self, batch_size, *args, **kwargs):
        num_trajectories = len(self._trajectories)
        if num_trajectories < 1:
            return {}

        trajectory_lengths = np.array(self._trajectory_lengths)
        trajectory_weights = trajectory_lengths / np.sum(trajectory_lengths)
        trajectory_probabilities = softmax(trajectory_weights)

        trajectory_indices = np.random.choice(
            np.arange(num_trajectories),
            size=batch_size,
            replace=True,
            p=trajectory_probabilities)
        first_key = next(iter(
            self._trajectories[trajectory_indices[0]].keys()))
        trajectory_lengths = np.array([
            self._trajectories[trajectory_index][first_key].shape[0]
            for trajectory_index in trajectory_indices
        ])

        step_indices = random_int_with_variable_range(
            np.zeros_like(trajectory_lengths, dtype=np.int64),
            trajectory_lengths)

        batch = self.batch_by_indices(trajectory_indices, step_indices)

        return batch

    def last_n_batch(self, last_n, field_name_filter=None, **kwargs):
        num_trajectories = len(self._trajectories)
        if num_trajectories < 1:
            return {}

        trajectory_indices = []
        step_indices = []

        trajectory_lengths = 0
        for trajectory_index in range(num_trajectories-1, -1, -1):
            trajectory = self._trajectories[trajectory_index]
            trajectory_length = trajectory[list(trajectory.keys())[0]].shape[0]

            steps_from_this_episode = min(trajectory_length, last_n - trajectory_lengths)
            step_indices += list(range(
                trajectory_length-1,
                trajectory_length - steps_from_this_episode - 1,
                -1))
            trajectory_indices += [trajectory_index] * steps_from_this_episode

            trajectory_lengths += trajectory_length

            if trajectory_lengths >= last_n:
                break

        trajectory_indices = trajectory_indices[::-1]
        step_indices = step_indices[::-1]

        batch = self.batch_by_indices(trajectory_indices, step_indices)

        return batch

    def save_latest_experience(self, pickle_path):
        # deque doesn't support direct slicing, thus need to use islice
        num_trajectories = self.num_trajectories
        start_index = max(num_trajectories - self._trajectories_since_save, 0)
        end_index = num_trajectories

        latest_trajectories = tuple(islice(
            self._trajectories, start_index, end_index))

        with gzip.open(pickle_path, 'wb') as f:
            pickle.dump(latest_trajectories, f)

        self._trajectories_since_save = 0

    def load_experience(self, experience_path):
        with gzip.open(experience_path, 'rb') as f:
            latest_trajectories = pickle.load(f)

        self.add_paths(latest_trajectories)
        self._trajectories_since_save = 0