import numpy as np import matplotlib.pyplot as plt import math from cliffwalk import CliffWalk import time def policy_evaluation(P, R, policy, gamma=0.9, tol=1e-2): """ Args: P: np.array transition matrix (NsxNaxNs) R: np.array reward matrix (NsxNa) policy: np.array matrix mapping states to action (Ns) gamma: float discount factor tol: float precision of the solution Return: value_function: np.array The value function of the given policy """ Ns, Na = R.shape # according to bellman equation Vpi = Rpi + gamma*PpiVpi rpi = np.array([R[i,policy[i]] for i in range(Ns)]) Ppi = np.array([P[i,policy[i],:] for i in range(Ns)]) I = np.eye(Ns) return np.linalg.solve(I-gamma*Ppi, rpi) def policy_iteration(P, R, gamma=0.9, tol=1e-3): """ Args: P: np.array transition matrix (NsxNaxNs) R: np.array reward matrix (NsxNa) gamma: float discount factor tol: float precision of the solution Return: policy: np.array the final policy V: np.array the value function associated to the final policy """ Ns, Na = R.shape V = np.zeros(Ns) policy = np.zeros(Ns, dtype=np.int) old_policy = np.ones(Ns, dtype=np.int) while np.linalg.norm(old_policy - policy) > tol: # stopping criterion old_policy = policy V = policy_evaluation(P, R, policy, env.gamma) # policy evaluation policy = np.argmax( R + gamma * P @ V, axis=1) # policy improvement return policy, V def value_iteration(P, R, gamma=0.9, tol=1e-3): """ Args: P: np.array transition matrix (NsxNaxNs) R: np.array reward matrix (NsxNa) gamma: float discount factor tol: float precision of the solution Return: Q: final Q-function (at iteration n) greedy_policy: greedy policy wrt Qn Qfs: all Q-functions generated by the algorithm (for visualization) """ Ns, Na = R.shape Q = np.zeros((Ns, Na)) Qfs = [Q] while True: Qmax = np.max(Q, axis=1) Q = R + gamma * P @ Qmax # apply optimal Bellman operator to previous Q Qfs.append(Q) if np.linalg.norm(Qfs[-1] - Qfs[-2], ord=math.inf) < tol: break greedy_policy = np.argmax(Q, axis=1) return Q, greedy_policy, Qfs # Edit below to run policy and value iteration on different environments and # visualize the resulting policies in action! # You may change the parameters in the functions below if __name__ == "__main__": tol =1e-5 env = CliffWalk(proba_succ=0.5) print(env.R.shape) print(env.P.shape) env.render() print(env.P[0]) # run value iteration to obtain Q-values VI_Q, VI_greedypol, all_qfunctions = value_iteration(env.P, env.R, gamma=env.gamma, tol=tol) # render the policy print("[VI]Greedy policy: ") env.render_policy(VI_greedypol) # compute the value function of the greedy policy using matrix inversion greedy_V = policy_evaluation(env.P, env.R, VI_greedypol, env.gamma) # show the error between the computed V-functions and the final V-function # (that should be the optimal one, if correctly implemented) # as a function of time norms = [ np.linalg.norm(q.max(axis=1) - greedy_V) for q in all_qfunctions] plt.plot(norms) plt.xlabel('Iteration') plt.ylabel('Error') plt.title("Value iteration: convergence") #### POLICY ITERATION #### PI_policy, PI_V = policy_iteration(env.P, env.R, gamma=env.gamma, tol=tol) print("\n[PI]final policy: ") env.render_policy(PI_policy) # control that everything is correct assert np.allclose(PI_policy, VI_greedypol),\ "You should check the code, the greedy policy computed by VI is not equal to the solution of PI" assert np.allclose(PI_V, greedy_V),\ "Since the policies are equal, even the value function should be" # for visualizing the execution of a policy, you can use the following code # state = env.reset() # env.render() # for i in range(15): # action = VI_greedypol[state] # state, reward, done, _ = env.step(action) # env.render() plt.show()