MVA-2021 / reinforcement_learning / hw1_dynamic_programming / code_assignment1 / vipi.py
vipi.py
Raw
import numpy as np
import matplotlib.pyplot as plt
import math
from cliffwalk import CliffWalk
import time


def policy_evaluation(P, R, policy, gamma=0.9, tol=1e-2):
    """
    Args:
        P: np.array
            transition matrix (NsxNaxNs)
        R: np.array
            reward matrix (NsxNa)
        policy: np.array
            matrix mapping states to action (Ns)
        gamma: float
            discount factor
        tol: float
            precision of the solution
    Return:
        value_function: np.array
            The value function of the given policy
    """
    Ns, Na = R.shape

    # according to bellman equation Vpi = Rpi + gamma*PpiVpi
    rpi = np.array([R[i,policy[i]] for i in range(Ns)])
    Ppi = np.array([P[i,policy[i],:] for i in range(Ns)])
    I = np.eye(Ns)
    return np.linalg.solve(I-gamma*Ppi, rpi)

def policy_iteration(P, R, gamma=0.9, tol=1e-3):
    """
    Args:
        P: np.array
            transition matrix (NsxNaxNs)
        R: np.array
            reward matrix (NsxNa)
        gamma: float
            discount factor
        tol: float
            precision of the solution
    Return:
        policy: np.array
            the final policy
        V: np.array
            the value function associated to the final policy
    """
    Ns, Na = R.shape
    V = np.zeros(Ns)
    policy = np.zeros(Ns, dtype=np.int)
    old_policy = np.ones(Ns, dtype=np.int)

    while np.linalg.norm(old_policy - policy) > tol: # stopping criterion
        old_policy = policy
        V = policy_evaluation(P, R, policy, env.gamma) # policy evaluation
        policy = np.argmax( R + gamma * P @ V, axis=1) # policy improvement

    return policy, V

def value_iteration(P, R, gamma=0.9, tol=1e-3):
    """
    Args:
        P: np.array
            transition matrix (NsxNaxNs)
        R: np.array
            reward matrix (NsxNa)
        gamma: float
            discount factor
        tol: float
            precision of the solution
    Return:
        Q: final Q-function (at iteration n)
        greedy_policy: greedy policy wrt Qn
        Qfs: all Q-functions generated by the algorithm (for visualization)
    """
    Ns, Na = R.shape
    Q = np.zeros((Ns, Na))
    Qfs = [Q]

    while True:
        Qmax = np.max(Q, axis=1) 
        Q = R + gamma * P @ Qmax # apply optimal Bellman operator to previous Q
        Qfs.append(Q)
        if np.linalg.norm(Qfs[-1] - Qfs[-2], ord=math.inf) < tol:
            break

    greedy_policy = np.argmax(Q, axis=1)
    return Q, greedy_policy, Qfs


# Edit below to run policy and value iteration on different environments and
# visualize the resulting policies in action!
# You may change the parameters in the functions below
if __name__ == "__main__":
    tol =1e-5
    env = CliffWalk(proba_succ=0.5)
    print(env.R.shape)
    print(env.P.shape)
    env.render()
    print(env.P[0])

    # run value iteration to obtain Q-values
    VI_Q, VI_greedypol, all_qfunctions = value_iteration(env.P, env.R, gamma=env.gamma, tol=tol)

    # render the policy
    print("[VI]Greedy policy: ")
    env.render_policy(VI_greedypol)

    # compute the value function of the greedy policy using matrix inversion
    greedy_V = policy_evaluation(env.P, env.R, VI_greedypol, env.gamma)

    # show the error between the computed V-functions and the final V-function
    # (that should be the optimal one, if correctly implemented)
    # as a function of time
    norms = [ np.linalg.norm(q.max(axis=1) - greedy_V) for q in all_qfunctions]
    plt.plot(norms)
    plt.xlabel('Iteration')
    plt.ylabel('Error')
    plt.title("Value iteration: convergence")

    #### POLICY ITERATION ####
    PI_policy, PI_V = policy_iteration(env.P, env.R, gamma=env.gamma, tol=tol)
    print("\n[PI]final policy: ")
    env.render_policy(PI_policy)

    # control that everything is correct
    assert np.allclose(PI_policy, VI_greedypol),\
        "You should check the code, the greedy policy computed by VI is not equal to the solution of PI"
    assert np.allclose(PI_V, greedy_V),\
        "Since the policies are equal, even the value function should be"

    
    # for visualizing the execution of a policy, you can use the following code
    # state = env.reset()
    # env.render()
    # for i in range(15):
    #     action = VI_greedypol[state]
    #     state, reward, done, _ = env.step(action)
    #     env.render()


    plt.show()