forked from secondmind-labs/trieste
-
Notifications
You must be signed in to change notification settings - Fork 0
/
asynchronous_greedy_multiprocessing.pct.py
366 lines (292 loc) · 14.2 KB
/
asynchronous_greedy_multiprocessing.pct.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
# %% [markdown]
# # Asynchronous Bayesian Optimization
#
# In this notebook we demonstrate Trieste's ability to perform asynchronous Bayesian optimisation, as is suitable for scenarios where the objective function can be run for several points in parallel but where observations might return back at different times. To avoid wasting resources waiting for the evaluation of the whole batch, we immediately request the next point asynchronously, taking into account points that are still being evaluated. Besides saving resources, asynchronous approach also can potentially [improve sample efficiency](https://arxiv.org/abs/1901.10452) in comparison with synchronous batch strategies, although this is highly dependent on the use case.
#
# To contrast this approach with regular [batch optimization](batch_optimization.ipynb), this notebook also shows how to run parallel synchronous batch approach.
# %%
# silence TF warnings and info messages, only print errors
# https://stackoverflow.com/questions/35911252/disable-tensorflow-debugging-information
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
import tensorflow as tf
tf.get_logger().setLevel("ERROR")
import numpy as np
import time
import timeit
# %% [markdown]
# First, let's define a simple objective that will emulate evaluations taking variable time. We will be using a classic Bayesian optimisation benchmark function [Branin](https://www.sfu.ca/~ssurjano/branin.html) with a sleep call inserted in the middle of the calculation to emulate delay. Our sleep delay is a scaled sum of all input values to make sure delays are uneven.
# %%
from trieste.objectives import ScaledBranin
def objective(points, sleep=True):
if points.shape[1] != 2:
raise ValueError(
f"Incorrect input shape, expected (*, 2), got {points.shape}"
)
observations = []
for point in points:
observation = ScaledBranin.objective(point)
if sleep:
# insert some artificial delay
# increases linearly with the absolute value of points
# which means our evaluations will take different time
delay = 3 * np.sum(point)
pid = os.getpid()
print(
f"Process {pid}: Objective: pretends like it's doing something for {delay:.2}s",
flush=True,
)
time.sleep(delay)
observations.append(observation)
return np.array(observations)
# test the defined objective function
objective(np.array([[0.1, 0.5]]), sleep=False)
# %% [markdown]
# As always, we need to prepare the model and some initial data to kick-start the optimization process.
# %%
from trieste.space import Box
from trieste.data import Dataset
search_space = Box([0, 0], [1, 1])
num_initial_points = 3
initial_query_points = search_space.sample(num_initial_points)
initial_observations = objective(initial_query_points.numpy(), sleep=False)
initial_data = Dataset(
query_points=initial_query_points,
observations=tf.constant(initial_observations, dtype=tf.float64),
)
import gpflow
from trieste.models.gpflow import GaussianProcessRegression, build_gpr
# We set the likelihood variance to a small number because
# we are dealing with a noise-free problem.
gpflow_model = build_gpr(initial_data, search_space, likelihood_variance=1e-7)
model = GaussianProcessRegression(gpflow_model)
# these imports will be used later for optimization
from trieste.acquisition import LocalPenalization
from trieste.acquisition.rule import (
AsynchronousGreedy,
EfficientGlobalOptimization,
)
from trieste.ask_tell_optimization import AskTellOptimizer
# %% [markdown]
# ## Multiprocessing setup
#
# To keep this notebook as reproducible as possible, we will only be using Python's multiprocessing package here. In this section we will explain our setup and define some common code to be used later.
#
# In both synchronous and asynchronous scenarios we will have a fixed set of worker processes performing observations. We will also have a main process responsible for optimization process with Trieste. When Trieste suggests a new point, it is inserted into a points queue. One of the workers picks this point from the queue, performs the observation, and inserts the output into the observations queue. The main process then picks up the observation from the queue, at which moment it either waits for the rest of the points in the batch to come back (synchronous scenario) or immediately suggests a new point (asynchronous scenario). This process continues either for a certain number of iterations or until we accumulate necessary number of observations.
#
# The overall setup is illustrated in this diagram:
# ![multiprocessing setup](figures/async_bo.png)
# %%
# Necessary multiprocessing primitives
from multiprocessing import Manager, Process
# %% [markdown]
# We now define several common functions to implement the described setup. First we define a worker function that will be running a single observation in a separate process. Worker takes both queues as an input, reads next point from the points queue, makes an observation, and inserts observed data into the observations queue.
# %%
def observer_proc(points_queue, observations_queue):
pid = os.getpid()
while True:
point_to_observe = points_queue.get()
if point_to_observe is None:
return
print(
f"Process {pid}: Observer : observing data at point {point_to_observe}",
flush=True,
)
new_observation = objective(point_to_observe, sleep=enable_sleep_delays)
new_data = (point_to_observe, new_observation)
print(f"Process {pid}: Observer : observed data {new_data}", flush=True)
observations_queue.put(new_data)
# %% [markdown]
# Next we define two helper functions, one is to create a certain number of worker processes, and another is to terminate them once we are done.
# %%
def create_worker_processes(n_workers, points_queue, obseverations_queue):
observer_processes = []
for i in range(n_workers):
worker_proc = Process(
target=observer_proc, args=(points_queue, obseverations_queue)
)
worker_proc.daemon = True
worker_proc.start()
observer_processes.append(worker_proc)
return observer_processes
def terminate_processes(processes):
for prc in processes:
prc.terminate()
prc.join()
prc.close()
# %% [markdown]
# Finally we set some common parameters. See comments below for explanation of what each one means.
# %%
# Number of worker processes to run simultaneously
# Setting this to 1 will turn both setups into non-batch sequential optimization
num_workers = 3
# Number of iterations to run the sycnhronous scenario for
num_iterations = 10
# Number of observations to collect in the asynchronous scenario
num_observations = num_workers * num_iterations
# Set this flag to False to disable sleep delays in case you want the notebook to execute quickly
enable_sleep_delays = True
# %% [markdown]
# ## Asynchronous optimization
# This section runs the asynchronous optimization routine. We first setup the [ask/tell optimizer](ask_tell_optimization.ipynb) as we cannot hand over the evaluation of the objective to Trieste. Next we create thread-safe queues for points and observations, and run the optimization loop.
#
# Crucially, even though we are using batch acquisition function Local Penalization, we specify batch size of 1. This is because we don't really want a batch. Since the amount of workers we have is fixed, whenever we see a new observation we only need one point back. However this process can only be done with acquisition functions that implement greedy batch collection strategies, because they are able to take into account points that are currently being observed (in Trieste we call them "pending"). Trieste currently provides two such functions: Local Penalization and GIBBON. Notice that we use **AsynchronousGreedy** rule specifically designed for using greedy batch acquisition functions in asynchronous scenarios.
# %%
# setup Ask Tell BO
local_penalization_acq = LocalPenalization(search_space, num_samples=2000)
local_penalization_rule = AsynchronousGreedy(builder=local_penalization_acq) # type: ignore
async_bo = AskTellOptimizer(
search_space, initial_data, model, local_penalization_rule
)
# retrieve process id for nice logging
pid = os.getpid()
# create point and observation queues
m = Manager()
pq = m.Queue()
oq = m.Queue()
# keep track of all workers we have launched
observer_processes = []
# counter to keep track of collected observations
points_observed = 0
start = timeit.default_timer()
try:
observer_processes = create_worker_processes(num_workers, pq, oq)
# init the queue with first batch of points
for _ in range(num_workers):
point = async_bo.ask()
pq.put(np.atleast_2d(point.numpy()))
while points_observed < num_observations:
# keep asking queue for new observations until one arrives
try:
new_data = oq.get_nowait()
print(
f"Process {pid}: Main : received data {new_data}",
flush=True,
)
except:
continue
# new_data is a tuple of (point, observation value)
# here we turn it into a Dataset and tell of it Trieste
points_observed += 1
new_data = Dataset(
query_points=tf.constant(new_data[0], dtype=tf.float64),
observations=tf.constant(new_data[1], dtype=tf.float64),
)
async_bo.tell(new_data)
# now we can ask Trieste for one more point
# and feed that back into the points queue
point = async_bo.ask()
print(f"Process {pid}: Main : acquired point {point}", flush=True)
pq.put(np.atleast_2d(point))
finally:
terminate_processes(observer_processes)
stop = timeit.default_timer()
# Collect the observations, compute the running time
async_lp_observations = (
async_bo.to_result().try_get_final_dataset().observations
- ScaledBranin.minimum
)
async_lp_time = stop - start
print(f"Got {len(async_lp_observations)} observations in {async_lp_time:.2f}s")
# %% [markdown]
# ## Synchronous parallel optimization
#
# This section runs the synchronous parallel optimization with Trieste. We again use Local Penalization acquisition function, but this time with batch size equal to the number of workers we have available. Once Trieste suggests the batch, we add all points to the point queue, and workers immediatelly pick them up, one point per worker. Therefore all points in the batch are evaluated in parallel.
# %%
# setup Ask Tell BO
gpflow_model = build_gpr(initial_data, search_space, likelihood_variance=1e-7)
model = GaussianProcessRegression(gpflow_model)
local_penalization_acq = LocalPenalization(search_space, num_samples=2000)
local_penalization_rule = EfficientGlobalOptimization( # type: ignore
num_query_points=num_workers, builder=local_penalization_acq
)
sync_bo = AskTellOptimizer(
search_space, initial_data, model, local_penalization_rule
)
# retrieve process id for nice logging
pid = os.getpid()
# create point and observation queues
m = Manager()
pq = m.Queue()
oq = m.Queue()
# keep track of all workers we have launched
observer_processes = []
start = timeit.default_timer()
try:
observer_processes = create_worker_processes(num_workers, pq, oq)
# BO loop starts here
for i in range(num_iterations):
print(f"Process {pid}: Main : iteration {i} starts", flush=True)
# get a batch of points from Trieste, send them to points queue
# each worker picks up a point and processes it
points = sync_bo.ask()
for point in points.numpy():
pq.put(point.reshape(1, -1)) # reshape is to make point a 2d array
# now we wait for all workers to finish
# we create an empty dataset and wait
# until we collected as many observations in it
# as there were points in the batch
all_new_data = Dataset(
tf.zeros((0, initial_data.query_points.shape[1]), tf.float64),
tf.zeros((0, initial_data.observations.shape[1]), tf.float64),
)
while len(all_new_data) < num_workers:
# this line blocks the process until new data is available in the queue
new_data = oq.get()
print(
f"Process {pid}: Main : received data {new_data}",
flush=True,
)
new_data = Dataset(
query_points=tf.constant(new_data[0], dtype=tf.float64),
observations=tf.constant(new_data[1], dtype=tf.float64),
)
all_new_data = all_new_data + new_data
# tell Trieste of new batch of observations
sync_bo.tell(all_new_data)
finally:
terminate_processes(observer_processes)
stop = timeit.default_timer()
# Collect the observations, compute the running time
sync_lp_observations = (
sync_bo.to_result().try_get_final_dataset().observations
- ScaledBranin.minimum
)
sync_lp_time = stop - start
print(f"Got {len(sync_lp_observations)} observations in {sync_lp_time:.2f}s")
# %% [markdown]
# ## Comparison
# To compare outcomes of sync and async runs, let's plot their respective regrets side by side, and print out the running time. For this toy problem we expect async scenario to run a little bit faster on machines with multiple CPU.
# %%
from trieste.experimental.plotting import plot_regret
import matplotlib.pyplot as plt
fig, ax = plt.subplots(1, 2)
sync_lp_min_idx = tf.squeeze(tf.argmin(sync_lp_observations, axis=0))
async_lp_min_idx = tf.squeeze(tf.argmin(async_lp_observations, axis=0))
plot_regret(
sync_lp_observations.numpy(),
ax[0],
num_init=len(initial_data),
idx_best=sync_lp_min_idx,
)
ax[0].set_yscale("log")
ax[0].set_ylabel("Regret")
ax[0].set_ylim(0.0000001, 100)
ax[0].set_xlabel("# evaluations")
ax[0].set_title(
f"Sync LP, {len(sync_lp_observations)} points, time {sync_lp_time:.2f}"
)
plot_regret(
async_lp_observations.numpy(),
ax[1],
num_init=len(initial_data),
idx_best=async_lp_min_idx,
)
ax[1].set_yscale("log")
ax[1].set_ylabel("Regret")
ax[1].set_ylim(0.0000001, 100)
ax[1].set_xlabel("# evaluations")
ax[1].set_title(
f"Async LP, {len(async_lp_observations)} points, time {async_lp_time:.2f}s"
)
fig.tight_layout()