-
Notifications
You must be signed in to change notification settings - Fork 389
/
Copy pathrunner.py
516 lines (454 loc) · 18.7 KB
/
runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
"""Main library entrypoint."""
import copy
import os
import sys
import random
import math
import subprocess
import time
import tempfile
import yaml
import numpy as np
import tensorflow as tf
from opennmt import evaluation
from opennmt import inference
from opennmt import models
from opennmt import training as training_util
from opennmt.utils import checkpoint as checkpoint_util
from opennmt.utils import misc
from opennmt.version import __version__
# These options require a value but we can fallback to a default one.
_CONFIG_FALLBACK = {
"params": {},
"train": {
"batch_type": "examples",
"length_bucket_width": 1,
"sample_buffer_size": 500000,
"save_summary_steps": 100
},
"eval": {
"length_bucket_width": None,
"batch_type": "examples",
"batch_size": 32
},
"infer": {
"length_bucket_width": None,
"batch_type": "examples",
"batch_size": 16
},
"score": {
"batch_size": 64
}
}
class Runner(object):
"""Class for running and exporting models."""
def __init__(self,
model,
config,
auto_config=False,
mixed_precision=False,
seed=None):
"""Initializes the runner parameters.
Args:
model: A :class:`opennmt.models.Model` instance to run or a callable that
returns such instance.
config: The run configuration.
auto_config: If ``True``, use automatic configuration values defined by
:obj:`model`.
mixed_precision: Enable mixed precision.
seed: The random seed to set.
Raises:
TypeError: if :obj:`model` is not a :class:`opennmt.models.Model` instance
or a callable.
"""
if isinstance(model, models.Model):
self._model = model
self._model_fn = lambda: misc.clone_layer(model)
elif callable(model):
self._model = model()
self._model_fn = model
else:
raise TypeError("model should be a opennmt.models.Model instance or a callable")
tf.get_logger().info("Using OpenNMT-tf version %s", __version__)
tf.get_logger().info("Using model:\n%s", self._model)
self._optimizer = None
self._config = copy.deepcopy(config)
self._auto_config = auto_config
self._mixed_precision = mixed_precision
if mixed_precision:
tf.config.optimizer.set_experimental_options({"auto_mixed_precision": True})
if seed is not None:
np.random.seed(seed)
random.seed(seed)
tf.random.set_seed(seed)
@property
def model(self):
"""The :class:`opennmt.models.Model` executed by this runner."""
return self._model
@property
def model_dir(self):
"""The active model directory."""
return self._config["model_dir"]
def _finalize_config(self, training=False, num_replicas=1, num_devices=1):
# Configuration priority: user config > auto config > default config.
config = copy.deepcopy(_CONFIG_FALLBACK)
if self._auto_config:
model_config = self._model.auto_config(num_replicas=num_replicas)
if not model_config:
raise NotImplementedError("This model does not define any automatic configuration values")
misc.merge_dict(config, model_config)
misc.merge_dict(config, self._config)
config["params"].setdefault("num_hypotheses", config["infer"].get("n_best", 1))
config["params"].setdefault("average_loss_in_time", config["train"]["batch_type"] == "tokens")
if training:
train_config = config["train"]
batch_size = train_config.get("batch_size")
# Auto tune batch size.
if batch_size is None or batch_size == 0:
if train_config["batch_type"] == "examples":
raise ValueError("Batch size autotuning is only supported for the \"tokens\" batch type")
max_batch_size = 16384
if train_config.get("effective_batch_size") is not None:
max_batch_size = min(max_batch_size, train_config["effective_batch_size"])
train_config["batch_size"] = _auto_tune_batch_size(
config,
max_batch_size=max_batch_size,
num_devices=num_devices,
mixed_precision=self._mixed_precision)
tf.get_logger().info(
"Using parameters:\n%s", yaml.dump(config, indent=2, default_flow_style=False))
return config
def _init_model(self, config):
model = self._model_fn()
model.initialize(config["data"], params=config["params"])
return model
def train(self,
num_devices=1,
with_eval=False,
checkpoint_path=None,
hvd=None,
return_summary=False):
"""Runs the training loop.
Args:
num_devices: Number of devices to use for training.
with_eval: Enable evaluation during training.
checkpoint_path: The checkpoint path to load the model weights from it.
hvd: Optional Horovod module.
return_summary: Return a summary of the training from this function.
Returns:
The path to the final model directory and, if :obj:`return_summary` is set,
a dictionary with various training statistics.
"""
if hvd is None:
num_replicas = num_devices
is_master = True
else:
num_replicas = hvd.size()
is_master = hvd.rank() == 0
config = self._finalize_config(
training=True,
num_replicas=num_replicas,
num_devices=num_devices)
model = self._init_model(config)
optimizer = model.get_optimizer()
data_config = config["data"]
train_config = config["train"]
eval_config = config["eval"]
batch_type = train_config["batch_type"]
if batch_type == "tokens" and self._mixed_precision:
batch_size_multiple = 8
else:
batch_size_multiple = 1
dataset_fn = lambda input_context: model.examples_inputter.make_training_dataset(
data_config["train_features_file"],
data_config.get("train_labels_file"),
train_config["batch_size"],
batch_type=batch_type,
batch_size_multiple=batch_size_multiple,
shuffle_buffer_size=train_config["sample_buffer_size"],
length_bucket_width=train_config["length_bucket_width"],
maximum_features_length=train_config.get("maximum_features_length"),
maximum_labels_length=train_config.get("maximum_labels_length"),
single_pass=train_config.get("single_pass", False),
num_shards=input_context.num_input_pipelines,
shard_index=input_context.input_pipeline_id,
prefetch_buffer_size=train_config.get("prefetch_buffer_size"),
cardinality_multiple=input_context.num_replicas_in_sync,
weights=data_config.get("train_files_weights"))
checkpoint = None
evaluator = None
if is_master:
checkpoint = checkpoint_util.Checkpoint.from_config(config, model, optimizer=optimizer)
checkpoint.restore(
checkpoint_path=checkpoint_path, weights_only=checkpoint_path is not None)
if with_eval:
evaluator = evaluation.Evaluator.from_config(model, config)
# Set gradients accumulation based on the requested effective batch size.
if train_config.get("effective_batch_size") is not None:
accum_steps = _count_batch_accum(
train_config["batch_size"],
train_config["effective_batch_size"],
num_replicas=num_replicas)
tf.get_logger().info(
"Accumulate gradients of %d iterations to reach effective batch size of %d",
accum_steps,
train_config["effective_batch_size"])
else:
accum_steps = 1
if hvd is not None:
if num_devices > 1:
raise ValueError("num_devices (or num_gpus) should be set to 1 when using Horovod")
trainer = training_util.HorovodTrainer(
model, optimizer, hvd, checkpoint=checkpoint)
elif num_devices > 1:
devices = misc.get_devices(count=num_devices)
trainer = training_util.MirroredStrategyTrainer(
model, optimizer, checkpoint=checkpoint, devices=devices)
else:
trainer = training_util.Trainer(model, optimizer, checkpoint=checkpoint)
summary = trainer(
dataset_fn,
max_step=train_config.get("max_step"),
accum_steps=accum_steps,
report_steps=train_config.get("save_summary_steps", 100),
save_steps=train_config.get("save_checkpoints_steps", 5000),
evaluator=evaluator,
eval_steps=eval_config.get("steps", 5000),
moving_average_decay=train_config.get("moving_average_decay"))
average_last_checkpoints = train_config.get("average_last_checkpoints", 0)
if checkpoint is None:
output_dir = None
elif average_last_checkpoints > 0:
output_dir = self.average_checkpoints(
os.path.join(checkpoint.model_dir, "avg"),
max_count=average_last_checkpoints)
else:
output_dir = checkpoint.model_dir
if return_summary:
return output_dir, summary
return output_dir
def evaluate(self, features_file=None, labels_file=None, checkpoint_path=None):
"""Runs evaluation.
Args:
features_file: The input features file to evaluate. If not set, will load
``eval_features_file`` from the data configuration.
labels_file: The output labels file to evaluate. If not set, will load
``eval_labels_file`` from the data configuration.
checkpoint_path: The checkpoint path to load the model weights from it.
Returns:
A dict of evaluation metrics.
"""
config = self._finalize_config()
model = self._init_model(config)
checkpoint = checkpoint_util.Checkpoint.from_config(config, model)
checkpoint_path = checkpoint.restore(checkpoint_path=checkpoint_path, weights_only=True)
step = checkpoint_util.get_step_from_checkpoint_prefix(checkpoint_path)
evaluator = evaluation.Evaluator.from_config(
model,
config,
features_file=features_file,
labels_file=labels_file)
return evaluator(step)
def average_checkpoints(self, output_dir, max_count=8):
"""Averages checkpoints.
Args:
output_dir: The directory that will contain the averaged checkpoint.
max_count: The maximum number of checkpoints to average.
Returns:
The path to the directory containing the averaged checkpoint.
"""
config = self._finalize_config()
model = self._init_model(config)
optimizer = model.get_optimizer()
checkpoint = checkpoint_util.Checkpoint.from_config(config, model, optimizer=optimizer)
checkpoint.restore()
model.create_variables(optimizer=optimizer)
trackables = dict(model=model, optimizer=optimizer)
output_dir = checkpoint_util.average_checkpoints(
checkpoint.model_dir,
output_dir,
trackables,
max_count=max_count)
self._config["model_dir"] = output_dir
return output_dir
def update_vocab(self, output_dir, src_vocab=None, tgt_vocab=None):
"""Updates model vocabularies.
Args:
output_dir: Directory where the update checkpoint will be saved.
src_vocab: Path to the new source vocabulary.
tgt_vocab: Path to the new tagret vocabulary.
Returns:
Path to the new checkpoint directory.
"""
if not isinstance(self._model, models.SequenceToSequence):
raise ValueError("Updating vocabularies is only supported for sequence to sequence models")
config = self._finalize_config()
if src_vocab is None and tgt_vocab is None:
return config["model_dir"]
model = self._init_model(config)
optimizer = model.get_optimizer()
cur_checkpoint = checkpoint_util.Checkpoint.from_config(config, model, optimizer=optimizer)
cur_checkpoint.restore()
model.create_variables(optimizer=optimizer)
self._config["model_dir"] = output_dir
if src_vocab is not None:
self._config["data"]["source_vocabulary"] = src_vocab
if tgt_vocab is not None:
self._config["data"]["target_vocabulary"] = tgt_vocab
new_config = self._finalize_config()
new_model = self._init_model(new_config)
new_optimizer = new_model.get_optimizer()
new_checkpoint = checkpoint_util.Checkpoint.from_config(
new_config, new_model, optimizer=new_optimizer)
new_model.create_variables(optimizer=new_optimizer)
model.transfer_weights(new_model, new_optimizer=new_optimizer, optimizer=optimizer)
new_optimizer.iterations.assign(optimizer.iterations)
new_checkpoint.save()
return output_dir
def infer(self,
features_file,
predictions_file=None,
checkpoint_path=None,
log_time=False):
"""Runs inference.
Args:
features_file: The file(s) to infer from.
predictions_file: If set, predictions are saved in this file, otherwise
they are printed on the standard output.
checkpoint_path: Path of a specific checkpoint to predict. If ``None``,
the latest is used.
log_time: If ``True``, several time metrics will be printed in the logs at
the end of the inference loop.
"""
config = self._finalize_config()
model = self._init_model(config)
checkpoint = checkpoint_util.Checkpoint.from_config(config, model)
checkpoint.restore(checkpoint_path=checkpoint_path, weights_only=True)
infer_config = config["infer"]
dataset = model.examples_inputter.make_inference_dataset(
features_file,
infer_config["batch_size"],
batch_type=infer_config["batch_type"],
length_bucket_width=infer_config["length_bucket_width"],
prefetch_buffer_size=infer_config.get("prefetch_buffer_size"))
inference.predict_dataset(
model,
dataset,
print_params=infer_config,
predictions_file=predictions_file,
log_time=log_time)
def export(self, export_dir, checkpoint_path=None, exporter=None):
"""Exports a model.
Args:
export_dir: The export directory.
checkpoint_path: The checkpoint path to export. If ``None``, the latest is used.
exporter: A :class:`opennmt.utils.Exporter` instance. Defaults to
:class:`opennmt.utils.SavedModelExporter`.
"""
config = self._finalize_config()
model = self._init_model(config)
checkpoint = checkpoint_util.Checkpoint.from_config(config, model)
checkpoint.restore(checkpoint_path=checkpoint_path, weights_only=True)
model.export(export_dir, exporter=exporter)
def score(self, features_file, predictions_file, checkpoint_path=None, output_file=None):
"""Scores existing predictions.
Args:
features_file: The input file.
predictions_file: The predictions file to score.
checkpoint_path: Path of a specific checkpoint to use. If ``None``,
the latest is used.
output_file: The file where the scores are saved. Otherwise, they will be
printed on the standard output.
"""
config = self._finalize_config()
model = self._init_model(config)
checkpoint = checkpoint_util.Checkpoint.from_config(config, model)
checkpoint.restore(checkpoint_path=checkpoint_path, weights_only=True)
score_config = config["score"]
dataset = model.examples_inputter.make_evaluation_dataset(
features_file,
predictions_file,
score_config["batch_size"],
prefetch_buffer_size=score_config.get("prefetch_buffer_size"))
inference.score_dataset(model, dataset, print_params=score_config, output_file=output_file)
def _count_batch_accum(batch_size, target_batch_size, num_replicas=1):
"""Given the current batch size, the number of replicas, and the requested
effective batch size, returns the number of gradients to accumulate.
"""
return int(math.ceil(float(target_batch_size) / (batch_size * num_replicas)))
def _auto_tune_batch_size(config,
min_batch_size=1024,
max_batch_size=16384,
min_range=256,
sample_iterations=10,
num_devices=1,
scaling_factor=0.8,
mixed_precision=False):
"""Find the largest token-based batch size that can be used with this
configuration.
This function runs some training iterations and uses out-of-memory errors as
search conditions. A binary search is used to converge to a suitable batch
size.
We prefer to run the iterations in a different process so that it does not
alter the current context (OOM may not be safe to recover from, see for
example https://stackoverflow.com/q/53820713/2529808).
Args:
config: The training configuration.
min_batch_size: The smallest batch size to consider.
max_batch_size: The largest batch size to consider.
min_range: Continue searching while the difference between
:obj:`max_batch_size` and :obj:`min_batch_size` is larger than this value.
sample_iterations: The number of training iterations.
num_devices: The number of devices to use.
scaling_factor: Scale the found batch size by this value.
mixed_precision: If ``True``, run the autotuning with mixed precision.
Returns:
The autotuned batch size.
"""
model_dir = config["model_dir"]
with tempfile.TemporaryDirectory() as tmpdir:
config = copy.deepcopy(config)
config["model_dir"] = tmpdir
config["train"]["save_checkpoints_steps"] = None
config["train"]["average_last_checkpoints"] = 0
config["train"]["max_step"] = sample_iterations
config_path = os.path.join(config["model_dir"], "batch_size_autotuner.yml")
model_description = os.path.join(model_dir, "model_description.py")
args = [
sys.executable or "python",
"-m", "opennmt.bin.main",
"--config", config_path,
"--model", model_description,
"--checkpoint_path", model_dir,
]
if mixed_precision:
args.extend(["--mixed_precision"])
args.extend([
"train",
"--num_gpus", str(num_devices),
])
tf.get_logger().info(
"Searching the largest batch size between %d and %d with a precision of %d...",
min_batch_size, max_batch_size, min_range)
while max_batch_size - min_batch_size > min_range:
batch_size = (max_batch_size + min_batch_size) // 2
# Update configuration with current batch size and adjusted gradients
# accumulation.
config["train"]["batch_size"] = batch_size
with tf.io.gfile.GFile(config_path, mode="wb") as config_file:
yaml.dump(config, config_file)
tf.get_logger().info("Trying training with batch size %d...", batch_size)
time.sleep(1)
with open(os.devnull, "w") as devnull:
process = subprocess.Popen(args, stdout=devnull, stderr=devnull)
exit_code = process.wait()
if exit_code != 0:
tf.get_logger().info("... failed.")
max_batch_size = batch_size - 1
else:
tf.get_logger().info(
"... succeeded, continue until the search range is smaller than %d.", min_range)
min_batch_size = batch_size
batch_size = int(scaling_factor * min_batch_size)
tf.get_logger().info("Batch size auto tuned to %d.", batch_size)
return batch_size