-
Notifications
You must be signed in to change notification settings - Fork 0
/
mountain_car.py
140 lines (105 loc) · 4.1 KB
/
mountain_car.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from traceback import print_tb
import gym
import numpy as np
import matplotlib.pyplot as plt
from sympy import false
env = gym.make("MountainCar-v0")
## 1) Variáveis globais
DISCRETE_UNITS = 20
NMR_EPISODES = 500
DISCOUNT = 0.98
EPISODE_DISPLAY = 20
LEARNING_RATE = 0.3
EPSILON = 0.05
EPSILON_DECREMENTADOR = EPSILON/(NMR_EPISODES//4)
env._max_episode_steps = 1000
#Environment values
# print(env.observation_space.high) #[0.6 1.5]
# print(env.observation_space.low) #[-1.2 -1.5]
# print(env.action_space.n) #3
#Q-Table de tamanho DISCRETE_UNITS*DISCRETE_BUCKETS*env.action_space.n
#Q_TABLE = np.random.randn(DISCRETE_UNITS,DISCRETE_UNITS,env.action_space.n)
## 2) Criação das tabelas de qualidade -> array com número de tabelas correspondente ao número de estados discretizados
Q_TABLE = np.random.uniform(low = -1, high = 1,
size = (DISCRETE_UNITS, DISCRETE_UNITS,
env.action_space.n))
## 3) função para calcular o estado em tempo discreto
def discretizar_estado(estado):
tamanho_janela_discreta = (env.observation_space.high-env.observation_space.low)/[DISCRETE_UNITS]*len(env.observation_space.high)
estado_discreto = (estado-env.observation_space.low)//tamanho_janela_discreta
#integer tuple para usar mais tarde na extração dos valores da tabela Q
return tuple(estado_discreto.astype(int))
# Para estatisticas
array_it = list()
episodes_list = list()
ep_rewards = []
ep_list = []
x_graph = 0;
iteration = 0
## 4) Inicio do algoritmo
for episode in range(NMR_EPISODES):
episode_reward = 0
done = false
## 5) Discretizar o estado inicial
estado_discreto_atual = discretizar_estado(env.reset())
# print(estado_discreto_atual)
# calcular estado para renderizar o simulador
if episode % EPISODE_DISPLAY == 0:
render_state = True
else:
render_state = False
## 6) Ciclo que percorre as interações dentro de cada episódio
while not done:
# contar o número de iterações necessárias para atingir o objetivo
iteration+=1
## 7) condição para gerar uma ação random, para a ação do agente não ficar enviesada
if np.random.random() > EPSILON:
action = np.argmax(Q_TABLE[estado_discreto_atual])
else:
action = np.random.randint(0, env.action_space.n)
## 8) outputs do ambiente em função das ações do agente
novo_estado, reward, done, _ = env.step(action)
## 9) novo estado discreto
novo_estado_discreto = discretizar_estado(novo_estado)
# renderização do simulador
if render_state:
env.render()
## 9) Verificação se a ação já foi tomada
if not done:
## 10) cálculo da nova matriz Q
max_future_q = np.max(Q_TABLE[novo_estado_discreto])
current_q = Q_TABLE[estado_discreto_atual+(action,)]
new_q = current_q + LEARNING_RATE*(reward + DISCOUNT*max_future_q - current_q)
Q_TABLE[estado_discreto_atual+(action,)]=new_q
## 11) Verificar se o agente chegou à posição objetivo
elif novo_estado[0] >= env.goal_position:
# atualização da matriz Q, quando o agente chega ao objetivo
# if Q_TABLE[estado_discreto_atual + (action,)] == 0:
# print(episode)
Q_TABLE[estado_discreto_atual + (action,)] = 0
# estatisticas para o gráfico
x_graph += 1
array_it.append(iteration)
episodes_list.append(x_graph)
iteration = 0
## 12) atualização do esatdo discreto, para calcular a ação no inicio da nova iteração
estado_discreto_atual = novo_estado_discreto
episode_reward += reward
## 13) cálculo do novo epsilon
EPSILON = EPSILON - EPSILON_DECREMENTADOR
ep_rewards.append(episode_reward)
ep_list.append(episode)
if not episode % EPISODE_DISPLAY:
avg_reward = sum(ep_rewards[-EPISODE_DISPLAY:])/len(ep_rewards[-EPISODE_DISPLAY:])
print(f"Episode:{episode} avg:{avg_reward} min:{min(ep_rewards[-EPISODE_DISPLAY:])} max:{max(ep_rewards[-EPISODE_DISPLAY:])}")
env.close()
# plt.plot(ep_list, ep_rewards)
# plt.title('Mountain Car Q-Learning')
# plt.ylabel('Average reward/Episode')
# plt.xlabel('Episodes')
# plt.show()
plt.plot(np.array(episodes_list), np.array(array_it))
plt.title('Learning Trial')
plt.ylabel('Performance')
plt.xlabel('Trials')
plt.show()