-
Notifications
You must be signed in to change notification settings - Fork 0
/
vi_and_pi.py
233 lines (212 loc) · 6.75 KB
/
vi_and_pi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
### MDP Value Iteration and Policy Iteration
import argparse
import numpy as np
import gym
import time
from lake_envs import *
np.set_printoptions(precision=3)
parser = argparse.ArgumentParser(description='A program to run assignment 1 implementations.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--env",
help="The name of the environment to run your algorithm on.",
choices=["Deterministic-4x4-FrozenLake-v0","Stochastic-4x4-FrozenLake-v0"],
default="Deterministic-4x4-FrozenLake-v0")
"""
For policy_evaluation, policy_improvement, policy_iteration and value_iteration,
the parameters P, nS, nA, gamma are defined as follows:
P: nested dictionary
From gym.core.Environment
For each pair of states in [1, nS] and actions in [1, nA], P[state][action] is a
tuple of the form (probability, nextstate, reward, terminal) where
- probability: float
the probability of transitioning from "state" to "nextstate" with "action"
- nextstate: int
denotes the state we transition to (in range [0, nS - 1])
- reward: int
either 0 or 1, the reward for transitioning from "state" to
"nextstate" with "action"
- terminal: bool
True when "nextstate" is a terminal state (hole or goal), False otherwise
nS: int
number of states in the environment
nA: int
number of actions in the environment
gamma: float
Discount factor. Number in range [0, 1)
"""
def policy_evaluation(P, nS, nA, policy, gamma=0.9, tol=1e-3):
"""Evaluate the value function from a given policy.
Parameters
----------
P, nS, nA, gamma:
defined at beginning of file
policy: np.array[nS]
The policy to evaluate. Maps states to actions.
tol: float
Terminate policy evaluation when
max |value_function(s) - prev_value_function(s)| < tol
Returns
-------
value_function: np.ndarray[nS]
The value function of the given policy, where value_function[s] is
the value of state s
"""
value_function = np.zeros(nS)
while True:
error = 0
for s in range(nS):
v = 0
for a, action_prob in enumerate(policy[s]):
for prob, next_state, reward, terminal in P[s][a]:
v += action_prob * prob * (reward + gamma * value_function[next_state])
error = max(error, np.abs(v - value_function[s]))
value_function[s] = v
if error < tol:
break
return value_function
def policy_improvement(P, nS, nA, value_from_policy, policy, gamma=0.9):
"""Given the value function from policy improve the policy.
Parameters
----------
P, nS, nA, gamma:
defined at beginning of file
value_from_policy: np.ndarray
The value calculated from the policy
policy: np.array
The previous policy.
Returns
-------
new_policy: np.ndarray[nS]
An array of integers. Each integer is the optimal action to take
in that state according to the environment dynamics and the
given value function.
"""
# new_policy = np.zeros(nS, dtype='int')
############################
# YOUR IMPLEMENTATION HERE #
def compute_ahead(state, v):
A = np.zeros(nA)
for a in range(nA):
for prob, next_state, reward, terminal in P[state][a]:
A[a] += prob * (reward + gamma * v[next_state])
return A
policy = np.ones([nS, nA]) / nA
while True:
value_from_policy = policy_evaluation(P, nS, nA, policy, gamma, 1e-3)
policy_stable = True
for s in range(nS):
chosen_a = np.argmax(policy[s])
action_values = compute_ahead(s, value_from_policy)
best_a = np.argmax(action_values)
if chosen_a != best_a:
policy_stable = False
policy[s] = np.eye(nA)[best_a]
############################
return policy
def policy_iteration(P, nS, nA, gamma=0.9, tol=10e-3):
"""Runs policy iteration.
You should call the policy_evaluation() and policy_improvement() methods to
implement this method.
Parameters
----------
P, nS, nA, gamma:
defined at beginning of file
tol: float
tol parameter used in policy_evaluation()
' Returns:
----------
' value_function: np.ndarray[nS]
' policy: np.ndarray[nS]
"""
value_function = np.zeros(nS)
policy = np.zeros(nS, dtype=int)
############################
# YOUR IMPLEMENTATION HERE #
value_function = policy_evaluation(P, nS, nA, policy, gamma, tol)
policy = policy_improvement(P, nS, nA, value_function, policy, gamma)
############################
return value_function, policy
def value_iteration(P, nS, nA, gamma=0.9, tol=1e-3):
"""
Learn value function and policy by using value iteration method for a given
gamma and environment.
Parameters:
----------
P, nS, nA, gamma:
defined at beginning of file
tol: float
Terminate value iteration when
max |value_function(s) - prev_value_function(s)| < tol
Returns:
----------
value_function: np.ndarray[nS]
policy: np.ndarray[nS]
"""
value_function = np.zeros(nS)
policy = np.zeros(nS, dtype=int)
############################
# YOUR IMPLEMENTATION HERE #
def one_step_lookahead(state, V):
A = np.zeros(nA)
for a in range(nA):
for prob, next_state, reward, terminal in P[state][a]:
A[a] += prob * (reward + gamma * V[next_state])
return A
V = np.zeros(nS)
while True:
delta = 0
for s in range(nS):
A = one_step_lookahead(s, V)
best_action_value = np.max(A)
delta = max(delta, np.abs(best_action_value - V[s]))
V[s] = best_action_value
if delta < tol:
break
############################
policy = np.zeros([nS, nA])
for s in range(nS):
A = one_step_lookahead(s, V)
best_action = np.argmax(A)
policy[s, best_action] = 1.0
return value_function, policy
def render_single(env, policy, max_steps=100):
"""
This function does not need to be modified
Renders policy once on environment. Watch your agent play!
Parameters
----------
env: gym.core.Environment
Environment to play on. Must have nS, nA, and P as
attributes.
# Policy: np.array of shape [env.nS]
The action to take at a given state
"""
episode_reward = 0
ob = env.reset()
for t in range(max_steps):
env.render()
time.sleep(0.25)
a = policy[ob]
ob, rew, done, _ = env.step(a)
episode_reward += rew
if done:
break
env.render()
if not done:
print("The agent didn't reach a terminal state in {} steps.".format(max_steps))
else:
print("Episode reward: %f" % episode_reward)
# Edit below to run policy and value iteration on different environments and
# visualize the resulting policies in action!
# You may change the parameters in the functions below
if __name__ == "__main__":
# read in script argument
args = parser.parse_args()
# Make gym environment
env = gym.make(args.env)
print("\n" + "-"*25 + "\nBeginning Policy Iteration\n" + "-"*25)
V_pi, p_pi = policy_iteration(env.P, env.nS, env.nA, gamma=0.9, tol=1e-3)
render_single(env, p_pi, 100)
print("\n" + "-"*25 + "\nBeginning Value Iteration\n" + "-"*25)
V_vi, p_vi = value_iteration(env.P, env.nS, env.nA, gamma=0.9, tol=1e-3)
render_single(env, p_vi, 100)