-
Notifications
You must be signed in to change notification settings - Fork 0
/
BNNexample.py
171 lines (131 loc) · 4.71 KB
/
BNNexample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_datasets as tfds
import tensorflow_probability as tfp
FEATURE_NAMES = [
"fixed acidity",
"volatile acidity",
"citric acid",
"residual sugar",
"chlorides",
"free sulfur dioxide",
"total sulfur dioxide",
"density",
"pH",
"sulphates",
"alcohol",
]
hidden_units = [8, 8]
learning_rate = 0.001
def create_model_inputs():
inputs = {}
for feature_name in FEATURE_NAMES:
inputs[feature_name] = layers.Input(
name=feature_name, shape=(1,), dtype=tf.float32
)
return inputs
def get_train_and_test_splits(train_size, batch_size=1):
# We prefetch with a buffer the same size as the dataset because th dataset
# is very small and fits into memory.
dataset = (
tfds.load(name="wine_quality", as_supervised=True, split="train")
.map(lambda x, y: (x, tf.cast(y, tf.float32)))
.prefetch(buffer_size=dataset_size)
.cache()
)
# We shuffle with a buffer the same size as the dataset.
train_dataset = (
dataset.take(train_size).shuffle(buffer_size=train_size).batch(batch_size)
)
test_dataset = dataset.skip(train_size).batch(batch_size)
return train_dataset, test_dataset
def run_experiment(model, loss, train_dataset, test_dataset):
model.compile(
optimizer=keras.optimizers.RMSprop(learning_rate=learning_rate),
loss=loss,
metrics=[keras.metrics.RootMeanSquaredError()],
)
print("Start training the model...")
model.fit(train_dataset, epochs=num_epochs, validation_data=test_dataset)
print("Model training finished.")
_, rmse = model.evaluate(train_dataset, verbose=0)
print(f"Train RMSE: {round(rmse, 3)}")
print("Evaluating model performance...")
_, rmse = model.evaluate(test_dataset, verbose=0)
print(f"Test RMSE: {round(rmse, 3)}")
dataset_size = 4898
batch_size = 256
num_epochs = 100
sample = 10
train_size = int(dataset_size * 0.85)
mse_loss = keras.losses.MeanSquaredError()
# Split the datasets into two groups
train_dataset, test_dataset = get_train_and_test_splits(train_size, batch_size)
# Sample some example targets from the test dataset
examples, targets = list(test_dataset.unbatch().shuffle(batch_size * 10).batch(sample))[
0
]
HI=2
def prior(kernel_size, bias_size, dtype=None):
n = kernel_size + bias_size
prior_model = keras.Sequential(
[
tfp.layers.DistributionLambda(
lambda t: tfp.distributions.MultivariateNormalDiag(
loc=tf.zeros(n), scale_diag=tf.ones(n)
)
)
]
)
return prior_model
def posterior(kernel_size, bias_size, dtype=None):
n = kernel_size + bias_size
posterior_model = keras.Sequential(
[
tfp.layers.VariableLayer(
tfp.layers.MultivariateNormalTriL.params_size(n), dtype=dtype
),
tfp.layers.MultivariateNormalTriL(n),
]
)
return posterior_model
def create_bnn_model(train_size):
inputs = create_model_inputs()
features = keras.layers.concatenate(list(inputs.values()))
features = layers.BatchNormalization()(features)
# Create hidden layers with weight uncertainty using the DenseVariational layer.
for units in hidden_units:
features = tfp.layers.DenseVariational(
units=units,
make_prior_fn=prior,
make_posterior_fn=posterior,
kl_weight=1 / train_size,
activation="sigmoid",
)(features)
# The output is deterministic: a single point estimate.
outputs = layers.Dense(units=1)(features)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
def compute_predictions(model, iterations=100):
predicted = []
for _ in range(iterations):
predicted.append(model(examples).numpy())
predicted = np.concatenate(predicted, axis=1)
prediction_mean = np.mean(predicted, axis=1).tolist()
prediction_min = np.min(predicted, axis=1).tolist()
prediction_max = np.max(predicted, axis=1).tolist()
prediction_range = (np.max(predicted, axis=1) - np.min(predicted, axis=1)).tolist()
for idx in range(sample):
print(
f"Predictions mean: {round(prediction_mean[idx], 2)}, "
f"min: {round(prediction_min[idx], 2)}, "
f"max: {round(prediction_max[idx], 2)}, "
f"range: {round(prediction_range[idx], 2)} - "
f"Actual: {targets[idx]}"
)
num_epochs = 3
bnn_model_full = create_bnn_model(train_size)
run_experiment(bnn_model_full, mse_loss, train_dataset, test_dataset)
compute_predictions(bnn_model_full)