-
Notifications
You must be signed in to change notification settings - Fork 4
/
dqn_sin_stability.py
194 lines (153 loc) · 5.63 KB
/
dqn_sin_stability.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import numpy as np
x = np.arange(0,2*np.pi,0.01)
def f(x, noise):
return np.sin(x) + noise
# f = lambda x: np.sin(x)
# import matplotlib.pyplot as plt
# noise = np.random.choice([0.025, -0.025, 0.05, -0.05], size=x.shape[0])
# states = np.array([x, f(x, noise)])
# plt.plot(states[0], states[1], label='original', color='b')
def step(states, actions):
states[1] += actions
return states
# actions = -noise
# states = step(states, actions)
# plt.plot(states[0], states[1], label='smoothened', color='g')
# plt.legend()
# plt.show()
# In[]:
class RandomVariable():
class ActionSpace():
def __init__(self, action_space):
self.actions = action_space
self.n = self.actions.shape[0]
def sample(self):
return np.random.choice(self.n)
def __init__(self, errepsilon, noise_levels, x_increment, x_range, action_space):
self.y = 0
self.x = 0
self.x_increment = x_increment
self.x_range = x_range
self.noise_levels = noise_levels
# TODO : Change these values and check
self.errepsilon = errepsilon
self.observation_space = np.array([2,2])
self.action_space = self.ActionSpace(action_space)
def select_noise(self):
return np.random.choice(self.noise_levels)
def step(self, action):
current_reward = self.get_reward(action)
self.x = ( self.x + self.x_increment ) % self.x_range
self.last_noise = self.select_noise()
self.y = self.f() + self.last_noise
return np.array([self.x, self.y]), current_reward
def f(self):
return np.sin(self.x)
def get_reward(self, action):
# if the noise cancellation is within self.errepsilon then reward something
# check is -> noisy + corrective action == actual value?
if np.abs((self.y + self.action_space.actions[action]) - self.f()) < self.errepsilon:
# TODO : Change the reward values and check
return +10
else:
return -1
def reset(self):
self.x = 0
self.last_noise = self.select_noise()
self.y = self.f() + self.last_noise
return np.array([self.x, self.y])
# In[]:
from dqn import DQNPolicy, ReplayBuffer
def run_current_policy(policy, env, cur_state, epsilon, max_iterations):
total_reward = 0
function_history = list()
timesteps = 0
for iterations in range(max_iterations):
action = policy.select_action(cur_state.reshape(1,-1), epsilon)
next_state, reward = env.step(action)
function_history.append(cur_state)
total_reward += reward
timesteps += 1
cur_state = next_state
print('{} timesteps taken and collected {} reward'.format(timesteps, total_reward))
return total_reward, timesteps, np.array(function_history)
# In[]:
noise = [0.05]
# TODO : Can change these parameters
lr = 0.001
# TODO : Need to do the epsilon decay
epsilon = 0.1
# epsilon_decay = 0.05
epsilon_min = 0.01
gamma = 0.99
hidden_dim = 24
mod_episode = 10
max_iterations = 250
x_range = 10
x_increment = 0.01
max_x = x_increment * max_iterations
action_space = np.array([-0.05])
# action_space = np.linspace(0, 1, 3) # 3 unique actions out of which one would be the noise
env = RandomVariable(0.001, noise, x_increment, x_range, action_space)
env_policy = DQNPolicy(env, lr, gamma, hidden_dim)
replay_buffer = ReplayBuffer()
total_train_episodes = 50
# play with a random policy
# run_current_policy(env_policy, env, env.reset(), max_iterations)
# In[]:
history = dict({'reward':list(), 'timesteps':list(), 'episodes':list()})
import matplotlib.pyplot as plt
plt.ion()
fig, ax = plt.subplots()
noise_pl = np.random.choice([0.025, -0.025, 0.05, -0.05], size=x.shape[0])
states_pl = np.array([x, f(x, noise_pl)])
sc = ax.scatter(states_pl[0], states_pl[1])
plt.xlim(0, max_x)
plt.ylim(-1.75, 1.75)
plt.draw()
for episode in range(1, total_train_episodes):
done = False
# print('Epoch :', episode + 1)
ep_reward = 0
ep_timesteps = 0
cur_state = env.reset()
epsilon = max(epsilon, epsilon_min)
for iterations in range(max_iterations):
action = env_policy.select_action(cur_state.reshape(1, -1), epsilon)
next_state, reward = env.step(action)
if reward == -1:
print('wow')
replay_buffer.add(cur_state, action, next_state, reward, done)
# TODO : Change the sample size and check any improvements
sampled_transitions = replay_buffer.sample()
# the q updation occurs for all transitions in all episodes, just like TD updates
env_policy.update_policy(**sampled_transitions)
ep_reward += reward
ep_timesteps += 1
cur_state = next_state
history['reward'].append(ep_reward)
history['timesteps'].append(ep_timesteps)
history['episodes'].append(episode+1)
if episode % mod_episode == 0:
# Get last 100 points from replay buffer
states = np.array(replay_buffer.cur_states[:-100])
print('Epoch : {} Avg Reward : {} Timesteps : {}'.format(
episode, history['reward'][-1], history['timesteps'][-1]))
# plt.figure()
sc.set_offsets(np.c_[states[:, 0], states[:, 1]])
fig.canvas.draw_idle()
plt.pause(0.1)
# TODO : Note the removed the epsilon decay
# decay the epsilon after every episode
# epsilon -= epsilon_decay
plt.ioff()
plt.show()
# In[]:
# Now play again
_, _, states = run_current_policy(env_policy, env, env.reset(), epsilon, max_iterations)
plt.scatter(states[:, 0], states[:, 1])
# In[]:
# import matplotlib
# matplotlib.use('Qt5Agg')
from plot_functions import plot_timesteps_and_rewards
plot_timesteps_and_rewards(history)