diff --git a/main.py b/main.py index 23de9f0..f2b2ea6 100644 --- a/main.py +++ b/main.py @@ -19,15 +19,13 @@ EPSILON = 1e-8 # Convergence criterium # Global state MAZE = None # Map of the environment -STATE_MASK = None # Fields of maze belonging to state space S_TO_IJ = None # Mapping of state vector to coordinates -IJ_TO_S = None # Mapping of coordinates to state vector +SN = None # Number of states U_OF_X = None # The allowed action space matrix representation PW_OF_X_U = None # The probability distribution of disturbance G1_X = None # The cost function vector representation (depends only on state) G2_X = None # The second cost function vector representation F_X_U_W = None # The state function -SN = None # Number of states A2 = np.array([ [-1, 0], @@ -37,29 +35,6 @@ A2 = np.array([ [0, 0] ]) -ACTIONS = { - 'UP': (-1, 0), - 'DOWN': (1, 0), - 'LEFT': (0, -1), - 'RIGHT': (0, 1), - 'IDLE': (0, 0) -} - - -def _ij_to_s(ij): - return np.argwhere(np.all(ij == S_TO_IJ, axis=1)).flatten()[0] - - -# TODO: for all x and u in one go -def h_function(x, u, j, g): - """Return E_pi_w[g(x, pi(x), w) + alpha*J(f(x, pi(x), w))].""" - pw = pw_of_x_u(x, u) - expectation = sum( - pw[w] * (g(x, u, w) + ALPHA*j[_ij_to_s(f(x, u, w))]) - for w in pw - ) - return expectation - def h_matrix(j, g): result = (PW_OF_X_U * (g[F_X_U_W] + ALPHA*j[F_X_U_W])).sum(axis=2) @@ -67,32 +42,6 @@ def h_matrix(j, g): return result -def f(x, u, w): - return _move(_move(x, ACTIONS[u]), ACTIONS[w]) - - -def cost_treasure(x, u, w): - xt = f(x, u, w) - options = { - 'T': 50, - 'G': -1, - } - return options.get(MAZE[xt], 0) - - -def cost_energy(x, u, w): - xt = f(x, u, w) - options = { - 'T': 50, - 'G': 0 - } - return options.get(MAZE[xt], 1) - - -def _move(start, move): - return start[0] + move[0], start[1] + move[1] - - def _valid_target(target): return ( 0 <= target[0] < MAZE.shape[0] and @@ -101,8 +50,8 @@ def _valid_target(target): ) -def _init_global(maze_file): - global MAZE, STATE_MASK, SN, S_TO_IJ, IJ_TO_S +def init_global(maze_file): + global MAZE, SN, S_TO_IJ global U_OF_X, PW_OF_X_U, F_X_U_W, G1_X, G2_X # Basic maze structure initialization @@ -110,11 +59,11 @@ def _init_global(maze_file): maze_file, dtype=str, ) - STATE_MASK = (MAZE != '1') - S_TO_IJ = np.indices(MAZE.shape).transpose(1, 2, 0)[STATE_MASK] + state_mask = (MAZE != '1') + S_TO_IJ = np.indices(MAZE.shape).transpose(1, 2, 0)[state_mask] SN = len(S_TO_IJ) - IJ_TO_S = np.zeros(MAZE.shape, dtype=np.int32) - IJ_TO_S[STATE_MASK] = np.arange(SN) + ij_to_s = np.zeros(MAZE.shape, dtype=np.int32) + ij_to_s[state_mask] = np.arange(SN) # One step cost functions initialization maze_cost = np.zeros(MAZE.shape) @@ -122,13 +71,13 @@ def _init_global(maze_file): maze_cost[(MAZE == '0') | (MAZE == 'S')] = 0 maze_cost[MAZE == 'T'] = 50 maze_cost[MAZE == 'G'] = -1 - G1_X = maze_cost.copy()[STATE_MASK] + G1_X = maze_cost.copy()[state_mask] maze_cost[maze_cost < 1] += 1 # assert np.nan < whatever == True - G2_X = maze_cost.copy()[STATE_MASK] + G2_X = maze_cost.copy()[state_mask] # Actual environment modelling U_OF_X = np.zeros((SN, len(A2)), dtype=np.bool) - PW_OF_X_U = np.zeros((SN, len(A2), len(A2))) + PW_OF_X_U = np.zeros((SN, len(A2), len(A2)), dtype=np.float64) F_X_U_W = np.zeros(PW_OF_X_U.shape, dtype=np.int32) for ix, x in enumerate(S_TO_IJ): @@ -142,53 +91,15 @@ def _init_global(maze_file): for iw in possible_iw: if _valid_target(x + u + A2[iw]): PW_OF_X_U[ix, iu, iw] = P - F_X_U_W[ix, iu, iw] = IJ_TO_S[tuple(x + u + A2[iw])] + F_X_U_W[ix, iu, iw] = ij_to_s[tuple(x + u + A2[iw])] # IDLE w is always possible PW_OF_X_U[ix, iu, -1] = 1 - PW_OF_X_U[ix, iu].sum() - F_X_U_W[ix, iu, -1] = IJ_TO_S[tuple(x + u)] - - -def u_of_x(x): - """Return a list of allowed actions for the given state x.""" - return [u for u in ACTIONS if _valid_target(_move(x, ACTIONS[u]))] - - -def pw_of_x_u(x, u): - """Calculate probabilities of disturbances given state and action. - - Parameters - ---------- - x : tuple of ints - The state coordinate - (it is up to user to ensure this is a valid state). - u : str - The name of the action (again, up to the user to ensure validity). - - Returns - ------- - dict - A mapping of valid disturbances to their probabilities. - - """ - if u in ('LEFT', 'RIGHT'): - possible_w = ('UP', 'IDLE', 'DOWN') - elif u in ('UP', 'DOWN'): - possible_w = ('LEFT', 'IDLE', 'RIGHT') - else: # I assume that the IDLE action is deterministic - possible_w = ('IDLE',) - - allowed_w = [ - w for w in possible_w if - _valid_target(f(x, u, w)) - ] - probs = {w: P for w in allowed_w if w != 'IDLE'} - probs['IDLE'] = 1 - sum(probs.values()) - return probs + F_X_U_W[ix, iu, -1] = ij_to_s[tuple(x + u)] def plot_j_policy_on_maze(j, policy): - heatmap = np.ones(MAZE.shape) * np.nan # Ugly - heatmap[STATE_MASK] = j # Even uglier + heatmap = np.full(MAZE.shape, np.nan) + heatmap[S_TO_IJ[:,0], S_TO_IJ[:,1]] = j cmap = mpl.cm.get_cmap('coolwarm') cmap.set_bad(color='black') plt.imshow(heatmap, cmap=cmap) @@ -200,7 +111,7 @@ def plot_j_policy_on_maze(j, policy): def plot_cost_history(hist): - error = [((h - hist[-1])**2).sum()**0.5 for h in hist[:-1]] + error = np.sqrt(np.square(hist[:-1] - hist[-1]).mean(axis=1)) plt.xlabel('Number of iterations') plt.ylabel('Cost function error') plt.plot(error) @@ -208,28 +119,19 @@ def plot_cost_history(hist): def _policy_improvement(j, g): h_mat = h_matrix(j, g) - return np.argmin(h_mat, axis=1), h_mat.min(axis=1) + return h_mat.argmin(axis=1), h_mat.min(axis=1) def _evaluate_policy(policy, g): pw_pi = PW_OF_X_U[np.arange(SN), policy] # p(w) given policy for all x - targs = F_X_U_W[np.arange(SN), policy] # all f(x, u(x)) - G = (pw_pi * g[targs]).sum(axis=1) + targs = F_X_U_W[np.arange(SN), policy] # all f(x, u(x), w(x, u(x))) + G = (pw_pi * g[targs]).sum(axis=1) # Expected one-step cost vector - M = np.zeros((SN, SN)) # Markov matrix for given determ policy + M = np.zeros((SN, SN)) # Markov matrix for given deterministic policy x_from = [x_ff for x_f, nz in zip(np.arange(SN), np.count_nonzero(pw_pi, axis=1)) for x_ff in [x_f] * nz] M[x_from, targs[pw_pi > 0]] = pw_pi[pw_pi > 0] - # M[np.arange(SN), F_X_U_W[PW_OF_X_U > 0]] = PW_OF_X_U[PW_OF_X_U > 0] - # for x, u in zip(S_TO_IJ, policy): - # pw = pw_of_x_u(x, u) - # G.append(sum(pw[w] * g(x, u, w) for w in pw)) - # targets = [(_ij_to_s(f(x, u, w)), pw[w]) for w in pw] - # iox = _ij_to_s(x) - # for t, pww in targets: - # M[iox, t] = pww - # G = np.array(G) return np.linalg.solve(np.eye(SN) - ALPHA*M, G) @@ -248,24 +150,24 @@ def value_iteration(g, return_history=False): if not return_history: return j, policy else: - return history + return np.array(history) def policy_iteration(g, return_history=False): j = None - policy = np.full(SN, len(A2) - 1) + policy = np.full(SN, len(A2) - 1) # starting policy is IDLE history = [] while True: j_old = j j = _evaluate_policy(policy, g) history.append(j) - if j_old is not None and max(abs(j - j_old)) < EPSILON: + if j_old is not None and np.abs(j - j_old).max() < EPSILON: break policy, _ = _policy_improvement(j, g) if not return_history: return j, policy else: - return history + return np.array(history) if __name__ == '__main__': @@ -274,10 +176,9 @@ if __name__ == '__main__': ap.add_argument('maze_file', help='Path to maze file') args = ap.parse_args() - # start = time() # Initialization start = time() - _init_global(args.maze_file) + init_global(args.maze_file) # J / policy for both algorithms for both cost functions for 3 alphas costs = {'g1': G1_X, 'g2': G2_X}