-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_env.py
205 lines (144 loc) · 8.55 KB
/
test_env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
from mlagents_envs.environment import UnityEnvironment
import numpy as np
from mlagents_envs.base_env import ActionTuple, BaseEnv, DecisionSteps, TerminalSteps
# Import the EngineConfigurationChannel class
from mlagents_envs.side_channel.engine_configuration_channel import EngineConfigurationChannel
#SoccerTwos_BurstDebugInformation_DoNotShip//UnityEnvironment
envunity = UnityEnvironment(file_name="C://Users//ps//Documents//Academic//StudyInUofT//2022Fall//MIE1075//MIE1075_Soccer//buildmysoccer//SoccerTwos", seed=1, side_channels=[])
#envunity = UnityEnvironment(file_name="C://Users//ps//Documents//Academic//StudyInUofT//2022Fall//MIE1075//MIE1075_Soccer//osoccer//UnityEnvironment", seed=1, side_channels=[])
# SoccerTwos UnityEnvironment
#envunity.reset()
import keyboard # using module keyboard
class TransUnity2Gym():
def __init__(self,env):
env.reset()
self.environment = env
self.calculate_agent_num()
def calculate_agent_num(self):
agent_sum = 0
behavior_names = list(self.environment.behavior_specs)
for behavior_name in behavior_names:
decision_steps, terminal_steps = self.environment.get_steps(behavior_name)
agent_sum += len(decision_steps)
self.n = agent_sum
print('self.agent_sum = ',self.n)
def reset(self):
env = self.environment
env.reset()
behavior_names = list(env.behavior_specs)
obs_n=[0 for _ in range(self.n)]
for behavior_name in behavior_names:
decision_steps, terminal_steps = env.get_steps(behavior_name)
if len(terminal_steps.agent_id) > 0:
for agent_id_terminated in terminal_steps:
obs = terminal_steps[agent_id_terminated].obs
obs_s = np.concatenate([obs[0],obs[1]])
obs_n[agent_id_terminated] = obs_s
if len(decision_steps.agent_id) > 0:
for agent_id_decision in decision_steps:
obs = decision_steps[agent_id_decision].obs
obs_s = np.concatenate([obs[0],obs[1]])
obs_n[agent_id_decision] = obs_s
return obs_n
def step(self,action_n):
env = self.environment
behavior_names = list(env.behavior_specs)
next_obs_n = [0 for _ in range(self.n)]
reward_n = [0 for _ in range(self.n)]
done_n = [False for _ in range(self.n)]
info = [0 for _ in range(self.n)]
for behavior_name in behavior_names:
decision_steps, terminal_steps = env.get_steps(behavior_name)
# print('pp_terminal_steps.agent_id',terminal_steps.agent_id)
# print('pp_decision_steps.agent_id',decision_steps.agent_id)
if(len(terminal_steps.agent_id)==0):
action = []
for i_d in decision_steps.agent_id:
action.append(action_n[i_d])
action = np.array(action)
action_tuple = ActionTuple()
action_tuple.add_discrete(action)
env.set_actions(behavior_name,action_tuple)
env.step()
else:
for agent_id_terminated in terminal_steps:
done = terminal_steps[agent_id_terminated].interrupted
obs = terminal_steps[agent_id_terminated].obs
reward = terminal_steps[agent_id_terminated].reward
obs_s = np.concatenate([obs[0],obs[1]])
done_n[agent_id_terminated] = done
next_obs_n[agent_id_terminated] = obs_s
reward_n[agent_id_terminated] = reward
break
print('p-terminal - ',len(next_obs_n),next_obs_n[0].shape,reward_n,done_n)
# if done_n[0] == True:
# return next_obs_n, reward_n, done_n, info
for behavior_name in behavior_names:
decision_steps, terminal_steps = env.get_steps(behavior_name)
#print('decision_steps.agent_id',decision_steps.agent_id,len(decision_steps.agent_id))
#print('terminal_steps.agent_id',terminal_steps.agent_id,len(terminal_steps.agent_id))
#print('decision_steps.obs',decision_steps.obs[0].shape)
#print('terminal_steps.obs',terminal_steps.obs[0].shape)
#print('decision_steps.reward',decision_steps.reward)
#print('terminal_steps.reward',terminal_steps.reward)
obs_s = np.concatenate([decision_steps.obs[0],decision_steps.obs[1]],axis=1)
#print(obs_s.shape)
#print('decision_steps.reward',decision_steps.reward)
#print('terminal_steps.reward',terminal_steps.reward)
#print('terminal_steps.interrupted=',terminal_steps.interrupted)
if len(terminal_steps.agent_id) > 0:
# print('terminal_steps.agent_id=',terminal_steps.agent_id)
for agent_id_terminated in terminal_steps:
done = True #terminal_steps[agent_id_terminated].interrupted
obs = terminal_steps[agent_id_terminated].obs
reward = terminal_steps[agent_id_terminated].group_reward
obs_s = np.concatenate([obs[0],obs[1]])
done_n[agent_id_terminated] = done
next_obs_n[agent_id_terminated] = obs_s
reward_n[agent_id_terminated] = reward
# print('terminal_steps[agent_id_decision].group_reward=',terminal_steps[agent_id_terminated].group_reward)
# print('terminal_steps.group_reward=',terminal_steps.group_reward)
# print('terminal_steps[agent_id_decision].reward=',terminal_steps[agent_id_terminated].reward)
# print('terminal_steps.done=',done)
# print('dddone=',done,'agent_id_terminated=',agent_id_terminated)
if len(decision_steps.agent_id) > 0:
# print('decision_steps.agent_id=',decision_steps.agent_id)
for agent_id_decision in decision_steps:
obs = decision_steps[agent_id_decision].obs
reward = decision_steps[agent_id_decision].group_reward
obs_s = np.concatenate([obs[0],obs[1]])
next_obs_n[agent_id_decision] = obs_s
reward_n[agent_id_decision] = reward
# print('decision_steps[agent_id_decision].group_reward=',decision_steps.group_reward)
# print('decision_steps[agent_id_decision].reward=',decision_steps[agent_id_decision].reward)
#print('agent_id_decision=',agent_id_decision)
if done_n[0] == True:
print(len(next_obs_n),next_obs_n[0].shape,reward_n,done_n)
return next_obs_n, reward_n, done_n, info
env = TransUnity2Gym(envunity)
obs = env.reset()
while True: # making a loop
try: # used try so that if user pressed other than the given key error will not be shown
if keyboard.is_pressed('q'): # if key 'q' is pressed
print('You Pressed A Key!')
break # finishing the loop
elif keyboard.is_pressed('w'): # if key 'q' is pressed
actions = [np.array([1,0,0]),np.array([0,0,0]),np.array([0,0,0]),np.array([0,0,0])]
elif keyboard.is_pressed('s'): # if key 'q' is pressed
actions = [np.array([2,0,0]),np.array([0,0,0]),np.array([0,0,0]),np.array([0,0,0])]
elif keyboard.is_pressed('a'): # if key 'q' is pressed
actions = [np.array([0,2,0]),np.array([0,0,0]),np.array([0,0,0]),np.array([0,0,0])]
elif keyboard.is_pressed('d'): # if key 'q' is pressed
actions = [np.array([0,1,0]),np.array([0,0,0]),np.array([0,0,0]),np.array([0,0,0])]
else:
actions = [np.array([0,0,0]),np.array([0,0,0]),np.array([0,0,0]),np.array([0,0,0])]
next_obs_n, reward_n, done_n, info = env.step(actions)
if sum(reward_n) !=0:
print(reward_n)
if done_n[0] ==True:
print('reward_n = ',reward_n)
break
pass
except:
pass
#1,2 前后 1,2右左