forked from lukearcus/GeneralSumOBL
-
Notifications
You must be signed in to change notification settings - Fork 0
/
functions.py
159 lines (142 loc) · 5.72 KB
/
functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from agents.players import RL, fixed_pol
from agents import learners
import numpy as np
import logging
def play_game(players, game):
"""
Play chosen game until the end.
"""
game.start_game()
try:
print (f"\n\n play_game started, game id: {game.game_id}, fict id: {players[0].fict_game.game_id}, {players[1].fict_game.game_id}")
except:
pass
while not game.ended:
player = players[game.curr_player]
player.observe(game.observe())
game.action(player.action())
for _ in players:
player = players[game.curr_player]
player.observe(game.observe())
game.action(None)
reward = players[0].r
for player in players:
player.wipe_mem()
return reward
def play_to_convergence(players, game, max_iters=1000000, tol=1e-5):
"""
Play chosen game repeatedly until players' policies have converged
"""
old_pol = [None for _ in players]
converged_itt = 0
for i in range(max_iters):
converged_itt += 1
for k, p in enumerate(players):
old_pol[k] = np.copy(p.opt_pol)
play_game(players, game)
converged = True
for j, p in enumerate(players):
pol_diff = np.linalg.norm(p.opt_pol-old_pol[j],ord=np.inf)
logging.debug("Iteration "+str(i) + " diff " + str(pol_diff))
converged = converged and pol_diff <= tol
if not converged:
break
if converged and i>100:
break
if not converged:
logging.warning("Did not Converge")
return -1
else:
logging.info("Converged after " + str(converged_itt) + " iterations")
return 0
def calc_exploitability(pol, game, learner, num_iters=100000, num_exploit_iters = 10000, tol=1e-10, exploit_tol = 1e-4):
"""
Calculate exploitability of current policies, pol, using a learner (either an RL learner or a true best response if available
"""
new_pols = []
p_avg_exploitability = [0,0]
exploit_rewards = [[],[]]
if isinstance(learner, learners.kuhn_exact_solver):
new_pols.append(learner.calc_opt(pol[1],1))
reward_hist = None
V_1 = None
elif isinstance(learner, learners.bridge_kuhn_exact_solver):
new_pols.append(learner.calc_opt(pol[1],1))
reward_hist = None
V_1 = None
else:
players = [RL(learner,0), fixed_pol(pol[1])]
reward_hist = [[],[]]
change = [[], []]
i = 0
while True:
old_pol = np.copy(players[0].opt_pol)
reward_hist[0].append(float(play_game(players, game)))
change[0].append(np.linalg.norm(players[0].opt_pol-old_pol, ord=np.inf))
i += 1
if i == num_iters:
break
elif i>100 and change[0][-1] <= tol:
break
converged_pol = players[0].opt_pol + np.random.random(players[0].opt_pol.shape)/1000
opt_deterministic = np.array(np.invert(np.array(converged_pol-\
np.max(converged_pol, axis=1,keepdims=True),\
dtype=bool)),\
dtype=float)
new_pols.append(opt_deterministic)
#new_pols.append(players[0].opt_pol)
V_1 = learner.advantage_func.V
players = [fixed_pol(new_pols[0]), fixed_pol(pol[1])]
i = 0
while True:
old_exploitability = p_avg_exploitability[0]
exploit_rewards[0].append(float(play_game(players, game)))
p_avg_exploitability[0] = sum(exploit_rewards[0])/len(exploit_rewards[0])
i += 1
if i == num_exploit_iters:
break
elif i>100 and np.abs(old_exploitability - p_avg_exploitability[0]) < exploit_tol:
break
p_avg_exploitability[0] = sum(exploit_rewards[0])/len(exploit_rewards[0])
if isinstance(learner, learners.kuhn_exact_solver) or isinstance(learner, learners.bridge_kuhn_exact_solver):
new_pols.append(learner.calc_opt(pol[0],2))
V_2 = None
else:
learner.reset()
learner.wipe_memory()
players = [fixed_pol(pol[0]), RL(learner,1)]
i = 0
while True:
old_pol = np.copy(players[1].opt_pol)
reward_hist[1].append(-float(play_game(players, game)))
change[1].append(np.linalg.norm(players[1].opt_pol-old_pol, ord=np.inf))
i += 1
if i == num_iters:
break
elif i>100 and change[1][-1] <= tol:
break
V_2 = learner.advantage_func.V
converged_pol = players[1].opt_pol + np.random.random(players[1].opt_pol.shape)/1000
opt_deterministic = np.array(np.invert(np.array(converged_pol-\
np.max(converged_pol, axis=1,keepdims=True),\
dtype=bool)),\
dtype=float)
new_pols.append(opt_deterministic)
#new_pols.append(players[1].opt_pol)
learner.reset()
learner.wipe_memory()
players = [fixed_pol(pol[0]), fixed_pol(new_pols[1])]
i = 0
while True:
old_exploitability = p_avg_exploitability[1]
exploit_rewards[1].append(-float(play_game(players, game)))
p_avg_exploitability[1] = sum(exploit_rewards[1])/len(exploit_rewards[1])
i+= 1
if i == num_exploit_iters:
break
elif i > 100 and np.abs(old_exploitability - p_avg_exploitability[1]) < exploit_tol:
break
p_avg_exploitability[1] = sum(exploit_rewards[1])/len(exploit_rewards[1])
avg_exploitability = sum(p_avg_exploitability)
#import pdb; pdb.set_trace()
return avg_exploitability, new_pols, reward_hist, (V_1, V_2)