from __future__ import print_function from __future__ import division from __future__ import unicode_literals from argparse import ArgumentParser from time import time import numpy as np import matplotlib as mpl mpl.use('TkAgg') # fixes my macOS bug import matplotlib.pyplot as plt import matplotlib.colors as colors P = 0.1 # Slip probability ALPHA = 0.8 # Discount factor A2 = np.array([ # Action index to action mapping [-1, 0], # Up [ 1, 0], # Down [ 0, -1], # Left [ 0, 1], # Right [ 0, 0], # Idle ]) # Global state MAZE = None # Map of the environment S_TO_IJ = None # Mapping of state vector to coordinates SN = None # Number of states U_OF_X = None # The allowed action space matrix representation PW_OF_X_U = None # The probability distribution of disturbance G1_X = None # The cost function vector representation G2_X = None # The second cost function vector representation F_X_U_W = None # The System Equation def _valid_target(target): return ( 0 <= target[0] < MAZE.shape[0] and 0 <= target[1] < MAZE.shape[1] and MAZE[tuple(target)] != '1' ) def init_global(maze_filename): global MAZE, SN, S_TO_IJ global U_OF_X, PW_OF_X_U, F_X_U_W, G1_X, G2_X # Basic maze structure initialization MAZE = np.genfromtxt( maze_filename, dtype='|S1', ) state_mask = (MAZE != '1') S_TO_IJ = np.indices(MAZE.shape).transpose(1, 2, 0)[state_mask] SN = len(S_TO_IJ) ij_to_s = np.zeros(MAZE.shape, dtype=np.int32) ij_to_s[state_mask] = np.arange(SN) # One step cost functions initialization maze_cost = np.zeros(MAZE.shape, dtype=np.float64) maze_cost[MAZE == '1'] = np.nan maze_cost[(MAZE == '0') | (MAZE == 'S')] = 0 maze_cost[MAZE == 'T'] = 50 maze_cost[MAZE == 'G'] = -1 G1_X = maze_cost.copy()[state_mask] maze_cost[(MAZE=='0') | (MAZE=='S') | (MAZE=='G')] += 1 G2_X = maze_cost.copy()[state_mask] # Actual environment modelling U_OF_X = np.zeros((SN, len(A2)), dtype=np.bool) PW_OF_X_U = np.zeros((SN, len(A2), len(A2)), dtype=np.float64) F_X_U_W = np.zeros(PW_OF_X_U.shape, dtype=np.int32) for ix, x in enumerate(S_TO_IJ): for iu, u in enumerate(A2): if _valid_target(x + u): U_OF_X[ix, iu] = True if iu in (0, 1): # (Up, Down) possible_iw = [2, 3] # [Left, Right] elif iu in (2, 3): # (Left, Right) possible_iw = [0, 1] # [Up, Down] for iw in possible_iw: if _valid_target(x + u + A2[iw]): PW_OF_X_U[ix, iu, iw] = P F_X_U_W[ix, iu, iw] = ij_to_s[tuple(x + u + A2[iw])] # Idle w is always possible PW_OF_X_U[ix, iu, -1] = 1 - PW_OF_X_U[ix, iu].sum() F_X_U_W[ix, iu, -1] = ij_to_s[tuple(x + u)] # Forbid to leave the goal state # (could've been done through cost function though) goal_idx = ij_to_s[np.where(MAZE == b'G')][0] U_OF_X[goal_idx] = False U_OF_X[goal_idx, -1] = True PW_OF_X_U[goal_idx] = 0 PW_OF_X_U[goal_idx, -1, -1] = 1 F_X_U_W[goal_idx] = 0 F_X_U_W[goal_idx, -1, -1] = goal_idx def h_matrix(j, g): h_x_u = (PW_OF_X_U * (g[F_X_U_W] + ALPHA*j[F_X_U_W])).sum(axis=2) h_x_u[~U_OF_X] = np.inf # discard invalid policies return h_x_u def _policy_improvement(j, g): h_mat = h_matrix(j, g) return h_mat.argmin(axis=1), h_mat.min(axis=1) def _evaluate_policy(policy, g): pw_pi = PW_OF_X_U[np.arange(SN), policy] # p(w) given policy for all x targs = F_X_U_W[np.arange(SN), policy] # all f(x, u(x), w(x, u(x))) G = (pw_pi * g[targs]).sum(axis=1) # Expected one-step cost vector # Markov matrix for given deterministic policy M = np.zeros((SN, SN), dtype=np.float64) x_from = [x_ff for x_f, nz in zip(np.arange(SN), np.count_nonzero(pw_pi, axis=1)) for x_ff in [x_f] * nz] M[x_from, targs[pw_pi > 0]] = pw_pi[pw_pi > 0] return np.linalg.solve(np.eye(SN) - ALPHA*M, G) def value_iteration(g, j, **_): return _policy_improvement(j, g) def policy_iteration(g, policy, **_): j = _evaluate_policy(policy, g) policy, _ = _policy_improvement(j, g) return policy, j def _terminate_pi(j, j_old, policy, policy_old): return np.all(policy == policy_old) def _terminate_vi(j, j_old, policy, policy_old): eps = ALPHA**SN return np.abs(j - j_old).max() < eps def dynamic_programming(optimizer_step, g, terminator, return_history=False): j = np.zeros(SN, dtype=np.float64) policy = np.full(SN, len(A2) - 1, dtype=np.int32) # idle policy history = [] while True: j_old = j policy_old = policy policy, j = optimizer_step(g, j=j, policy=policy) if return_history: history.append(j) if terminator(j, j_old, policy, policy_old): break if not return_history: return j, policy else: history = np.array(history) # cover some edgy cases if (history[-1] == history[-2]).all(): history = history[:-1] return history def plot_j_policy_on_maze(j, policy, normalize=True): heatmap = np.full(MAZE.shape, np.nan, dtype=np.float64) if normalize: # Non-linear, but a discrete representation of different costs norm = colors.BoundaryNorm(boundaries=np.sort(j)[1:-1], ncolors=256) vmin = 0 vmax = 256 else: norm = lambda x: x vmin = None vmax = None heatmap[S_TO_IJ[:, 0], S_TO_IJ[:, 1]] = norm(j) cmap = mpl.cm.get_cmap('coolwarm') cmap.set_bad(color='black') plt.imshow( heatmap, vmin=vmin, vmax=vmax, cmap=cmap, ) # quiver has some weird behavior, the arrow y component must be flipped plt.quiver(S_TO_IJ[:, 1], S_TO_IJ[:, 0], A2[policy, 1], -A2[policy, 0]) plt.gca().get_xaxis().set_visible(False) plt.tick_params(axis='y', which='both', left=False, labelleft=False) def plot_cost_history(hist): error = np.log10( np.sqrt(np.square(hist[:-1] - hist[-1]).mean(axis=1)) ) plt.xticks(np.arange(0, len(error), len(error) // 5)) plt.yticks(np.linspace(error.min(), error.max(), 5)) plt.plot(error) if __name__ == '__main__': # Argument Parsing ap = ArgumentParser() ap.add_argument('maze_file', help='Path to maze file') args = ap.parse_args() # Initialization start = time() init_global(args.maze_file) # J / policy for both algorithms for both cost functions for 3 alphas costs = {'g1': G1_X, 'g2': G2_X} optimizers = {'Value Iteration': value_iteration, 'Policy Iteration': policy_iteration} terminators = {'Value Iteration': _terminate_vi, 'Policy Iteration': _terminate_pi} # cost_transform = {'g1': _neg_log_neg, 'g2': _gamma} for normalize in [False, True]: for a in [0.9, 0.5, 0.01]: plt.figure(figsize=(9, 7)) plt.subplots_adjust(top=0.9, bottom=0.05, left=0.1, right=0.95, wspace=0.1) plt.suptitle('Discount: {}'.format(a) + ('\nNormalized view' if normalize else '')) i = 1 for opt in ['Value Iteration', 'Policy Iteration']: for cost in ['g1', 'g2']: name = '{} / {}'.format(opt, cost) ALPHA = a j, policy = dynamic_programming(optimizers[opt], costs[cost], terminators[opt]) plt.subplot(2, 2, i) plot_j_policy_on_maze(j, policy, normalize=normalize) if i <= 2: plt.gca().set_title('Cost: {}'.format(cost), fontsize='x-large') if (i - 1) % 2 == 0: plt.ylabel(opt, fontsize='x-large') i += 1 # Error graphs for opt in ['Value Iteration', 'Policy Iteration']: plt.figure(figsize=(6, 10)) plt.figtext(0.5, 0.04, 'Number of iterations', ha='center', fontsize='large') plt.figtext(0.01, 0.5, 'Logarithm of cost RMSE', va='center', rotation='vertical', fontsize='large') plt.subplots_adjust(wspace=0.38, hspace=0.35, left=0.205, right=0.98, top=0.9) plt.suptitle(opt) i = 1 for a in [0.99, 0.7, 0.1]: for cost in ['g1', 'g2']: ALPHA = a history = dynamic_programming(optimizers[opt], costs[cost], terminators[opt], return_history=True) plt.subplot(3, 2, i) plot_cost_history(history) if i <= 2: plt.gca().set_title('Cost: {}'.format(cost)) if (i - 1) % 2 == 0: plt.ylabel('Discount: {}'.format(a), fontsize='large') i += 1 print('I ran in {} seconds'.format(time() - start)) plt.show()