-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcar_collector.py
123 lines (105 loc) · 3.31 KB
/
car_collector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# while True:
# action = raw_input()
# print action
import numpy as np
from pynput.keyboard import Key, Listener
import pickle
import os
import gym
import time
# create 'static' folder
if not os.path.isdir(os.path.join(os.path.dirname(__file__), 'models')):
os.makedirs(os.path.join(os.path.dirname(__file__), 'models'))
save_path = 'models/mimic_car.pickle'
observations = []
observation = None
max_episode = 15
episode = 0
current_action = 1
def on_press(key):
global episode
global observations
global observation
global current_action
action = str(key)
if action == 'u\'s\'':
action = 1
elif action == 'u\'d\'':
action = 2
elif action == 'u\'a\'':
action = 0
else:
action = None
print(action)
print('{0} pressed'.format(
key))
if action is not None:
current_action = action
# if action is not None:
# print('step')
# observation_, reward, done = env.step(action)
# observations.append(np.hstack((observation, action, reward, observation_)))
# observation = observation_
# env.render()
# if done:
# episode += 1
# if episode >= max_episode:
# print('game over!')
# print('save history..')
# history = np.array(observations)
# with open(save_path, 'wb') as f:
# pickle.dump(history, f, protocol=pickle.HIGHEST_PROTOCOL)
# print('save successful!')
# with open(save_path, 'rb') as f:
# verify = pickle.load(f)
# # to list to avoid some bug of numpy in child thread
# if history.tolist() == verify.tolist():
# print('verify success')
# else:
# print('failed!')
# env.destroy()
# else:
# observation = env.reset()
def on_release(key):
print('{0} release'.format(
key))
if key == Key.esc:
# Stop listener
return False
def get_action():
global observation
observation = env.reset()
listener = Listener(
on_press=on_press,
on_release=on_release)
listener.start()
if __name__ == "__main__":
# maze game
env = gym.make('MountainCar-v0')
env = env.unwrapped
get_action()
for ep in range(max_episode):
observation = env.reset()
while True:
env.render()
time.sleep(0.02)
observation_, reward, done, info = env.step(current_action)
position, velocity = observation_
observations.append(np.hstack((observation, current_action, reward, observation_)))
# the higher the better
reward = abs(position - (-0.5)) # r in [0, 1]
observation = observation_
if done:
break
history = np.array(observations)
with open(save_path, 'wb') as f:
pickle.dump(history, f, protocol=pickle.HIGHEST_PROTOCOL)
print('save successful!')
with open(save_path, 'rb') as f:
verify = pickle.load(f)
# to list to avoid some bug of numpy in child thread
if history.tolist() == verify.tolist():
print('verify success')
else:
print('failed!')
print('env ended!')