-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathgnn_es_index.py
90 lines (79 loc) · 3.97 KB
/
gnn_es_index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
##########################################
#Kamer Ali Yuksel linkedin.com/in/kyuksel#
##########################################
def init_weights(model):
for m in model.modules():
if isinstance(m, nn.BatchNorm1d):
m.weight.data.fill_(1)
elif isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight, gain = 5/3)
if hasattr(m, 'bias') and m.bias is not None: m.bias.data.zero_()
class LSTMModule(nn.Module):
def __init__(self, input_size = 1, hidden_size = 1, num_layers = 2):
super(LSTMModule, self).__init__()
self.rnn = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.h = torch.zeros(num_layers, 1, hidden_size, requires_grad=True).to(device)
self.c = torch.zeros(num_layers, 1, hidden_size, requires_grad=True).to(device)
def forward(self, x):
self.rnn.flatten_parameters()
out, (h_end, c_end) = self.rnn(x, (self.h, self.c))
self.h.data = h_end.data
self.c.data = c_end.data
return out[:,-1, :].flatten()
class Extractor(nn.Module):
def __init__(self, latent_dim, ks = 5):
super(Extractor, self).__init__()
self.conv = nn.Conv1d(args.noise, latent_dim,
bias = False, kernel_size = ks, padding = (ks // 2) + 1)
self.conv.weight.data.normal_(0, 0.01)
self.activation = nn.Sequential(nn.BatchNorm1d(
latent_dim, track_running_stats = False), nn.Mish())
self.gap = nn.AvgPool1d(kernel_size = args.batch, padding = 1)
self.rnn = LSTMModule(hidden_size = latent_dim)
def forward(self, x):
y = x.unsqueeze(0).permute(0, 2, 1)
y = self.rnn(self.gap(self.activation(self.conv(y))))
return torch.cat([x, y.repeat(args.batch, 1)], dim = 1)
class Generator(nn.Module):
def __init__(self, noise_dim = 0):
super(Generator, self).__init__()
def block(in_feat, out_feat):
return [nn.Linear(in_feat, out_feat), nn.Tanh()]
self.model = nn.Sequential(
*block(noise_dim+args.cnndim, 512), *block(512, 1024), nn.Linear(1024, len(assets)))
init_weights(self)
self.extract = Extractor(args.cnndim)
self.std_weight = nn.Parameter(torch.zeros(len(assets)).to(device))
def forward(self, x):
mu = self.model(self.extract(x))
return mu, mu + (self.std_weight * torch.randn_like(mu))
actor = Generator(args.noise).to(device)
opt = torch.optim.AdamW(filter(lambda p: p.requires_grad, actor.parameters()), lr=1e-3)
best_reward = None
for epoch in range(args.iter):
torch.cuda.empty_cache()
#dweights is a noisy version of the weights, use weights for validation
weights, dweights = actor(torch.randn((args.batch, args.noise)).to(device))
#robustness of portfolio candidates against dropping 75% of their weights
#the portfolio candidates should be robust when their weights are dropped
dweights = nn.functional.dropout(dweights, p = 0.75).softmax(dim=1)
loss = calculate_reward(dweights, valid_data[:-test_size], index[:-test_size], True).mean()
opt.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(actor.parameters(), 1.0)
opt.step()
with torch.no_grad():
#take mean of quality-diversity candidates and spasify using sparsemax
#entmax15 actually is better but resulting portfolios are less sparse
weights = sparsemax(weights.mean(dim=0), dim=0)
test_reward = calculate_reward(weights.unsqueeze(0),
valid_data[-test_size:], index[-test_size:])[0]
if best_reward is None: best_reward = test_reward
if test_reward < best_reward:
best_reward = test_reward
print('epoch: %i v_loss: %f' % (epoch, best_reward))
bw = weights.detach().cpu().numpy()
bw = pd.DataFrame(bw).set_index([assets])
bw = bw.loc[~(bw==0).all(axis=1)]
bw = bw.reindex(bw[0].abs().sort_values(ascending=False).index)
bw.to_csv('best_weights.csv', header=False)