From 4280cb121364e5a6d3f15df26c6113dfeae58dd7 Mon Sep 17 00:00:00 2001 From: "Joshua Z. Zhang" Date: Wed, 17 Feb 2021 15:36:41 -0800 Subject: [PATCH] [Auto] fit time limit (#1615) * time_limit in training loops * test time_limit * fix * fix pass args * increase tic frequency * fix precise tics * fix speed tic * fix timelimit * per-trial based cache dir * fix * fix * fix * fix * final_fit * debug * fix * debug * debug * dd * dd * fix path * debug * fix * fix log file path if deleted * fix self config overwrite * fix * results * fix all * try reload mxnet in training * fix * fix timeout * fix * fix reload * fix * import os * fix task id * fix * fix doc * fix overhead * fix * fix lint * fix lint * fix tests * fix * disable hpo in tests --- gluoncv/auto/estimators/base_estimator.py | 52 ++++--- .../auto/estimators/center_net/center_net.py | 47 ++++-- gluoncv/auto/estimators/conf.py | 2 + .../estimators/faster_rcnn/faster_rcnn.py | 47 ++++-- .../image_classification.py | 41 ++++-- .../auto/estimators/mask_rcnn/mask_rcnn.py | 3 +- gluoncv/auto/estimators/ssd/ssd.py | 44 ++++-- gluoncv/auto/estimators/yolo/yolo.py | 42 ++++-- gluoncv/auto/tasks/image_classification.py | 136 +++++++++++++----- gluoncv/auto/tasks/object_detection.py | 133 ++++++++++++----- gluoncv/check.py | 2 +- tests/auto/test_auto_tasks.py | 20 +++ 12 files changed, 421 insertions(+), 148 deletions(-) create mode 100644 gluoncv/auto/estimators/conf.py diff --git a/gluoncv/auto/estimators/base_estimator.py b/gluoncv/auto/estimators/base_estimator.py index 5844f5cd7d..579c08c0f4 100644 --- a/gluoncv/auto/estimators/base_estimator.py +++ b/gluoncv/auto/estimators/base_estimator.py @@ -1,5 +1,6 @@ """Base Estimator""" import os +import math import pickle import io import logging @@ -28,9 +29,7 @@ def _apply(cls): " Config used to override default configurations. \n" " If `str`, assume config file (.yml, .yaml) is used. \n" "logger : logger, default is `None`.\n" - " If not `None`, will use default logging object.\n" - "logdir : str, default is None.\n" - " Directory for saving logs. If `None`, current working directory is used.\n") + " If not `None`, will use default logging object.\n") cls.__doc__ += '\nDefault configurations: \n--------------------\n' sio = io.StringIO() cfg.save(sio) @@ -80,22 +79,22 @@ def __init__(self, config, logger=None, reporter=None, name=None): self.dataset = 'auto' # logdir - logdir = config.pop('logdir', None) if isinstance(config, dict) else None - self._logdir = os.path.abspath(logdir) if logdir else os.getcwd() + logdir = config.pop('log_dir', None) if isinstance(config, dict) else None + if logdir: + self._logdir = os.path.abspath(logdir) + else: + self._logdir = os.path.join(os.getcwd(), name.lower() + datetime.now().strftime("-%m-%d-%Y")) # finalize config cfg = self._default_cfg.merge(config) # config can be dict or yaml file diffs = self._default_cfg.diff(cfg) if diffs: - self._logger.info('modified configs: {') + self._logger.info('modified configs( != ): {') for diff in diffs: self._logger.info(diff) self._logger.info('}') self._cfg = cfg - prefix = name.lower() + datetime.now().strftime("-%m-%d-%Y") - self._logdir = os.path.join(self._logdir, prefix) - # r.config['logdir'] = self._logdir os.makedirs(self._logdir, exist_ok=True) config_file = os.path.join(self._logdir, 'config.yaml') # log file @@ -111,7 +110,9 @@ def __init__(self, config, logger=None, reporter=None, name=None): seed = self._cfg.get('seed', np.random.randint(1000000)) _random.seed(seed) - def fit(self, train_data, val_data=None, train_size=0.9, random_state=None, resume=False): + def fit(self, train_data, val_data=None, train_size=0.9, random_state=None, + resume=False, time_limit=None): + """Fit with train/validation data. Parameters @@ -127,12 +128,22 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None, resu Random state for splitting, for `np.random.seed`. resume : bool Whether resume from previous `fit`(if possible) or start as fresh. + time_limit : int, default is None + The wall clock time limit(second) for fit process, if `None`, time limit is not enforced. + If `fit` takes longer than `time_limit`, the process will terminate early and return the + model prematurally. + Due to callbacks and additional validation functions, the `time_limit` may not be very precise + (few minutes allowance), but you can use it to safe-guard a very long training session. Returns ------- None """ + if time_limit is None: + time_limit = math.inf + elif not isinstance(time_limit, (int, float)): + raise TypeError(f'Invalid type `time_limit={time_limit}`, int/float or None expected') if not resume: self.classes = train_data.classes self.num_class = len(self.classes) @@ -141,7 +152,8 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None, resu assert val_data is not None, \ "Please provide `val_data` as we do not know how to split `train_data` of type: \ {}".format(type(train_data)) - return self._fit(train_data, val_data) if not resume else self._resume_fit(train_data, val_data) + return self._fit(train_data, val_data, time_limit=time_limit) if not resume \ + else self._resume_fit(train_data, val_data, time_limit=time_limit) if val_data is None: assert 0 <= train_size <= 1.0 @@ -152,9 +164,11 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None, resu val = train_data[~split_mask] self._logger.info('Randomly split train_data into train[%d]/validation[%d] splits.', len(train), len(val)) - return self._fit(train, val) if not resume else self._resume_fit(train, val) + return self._fit(train, val, time_limit=time_limit) if not resume else \ + self._resume_fit(train, val, time_limit=time_limit) - return self._fit(train_data, val_data) if not resume else self._resume_fit(train_data, val_data) + return self._fit(train_data, val_data, time_limit=time_limit) if not resume else \ + self._resume_fit(train_data, val_data, time_limit=time_limit) def evaluate(self, val_data): """Evaluate estimator on validation data. @@ -195,10 +209,10 @@ def _predict(self, x): def _predict_feature(self, x): raise NotImplementedError - def _fit(self, train_data, val_data): + def _fit(self, train_data, val_data, time_limit=math.inf): raise NotImplementedError - def _resume_fit(self, train_data, val_data): + def _resume_fit(self, train_data, val_data, time_limit=math.inf): raise NotImplementedError def _evaluate(self, val_data): @@ -265,8 +279,12 @@ def __setstate__(self, state): # logger self._logger = logging.getLogger(state.get('_name', self.__class__.__name__)) self._logger.setLevel(logging.ERROR) - fh = logging.FileHandler(self._log_file) - self._logger.addHandler(fh) + try: + fh = logging.FileHandler(self._log_file) + self._logger.addHandler(fh) + #pylint: disable=bare-except + except: + pass try: import mxnet as _ net_params = state['net'] diff --git a/gluoncv/auto/estimators/center_net/center_net.py b/gluoncv/auto/estimators/center_net/center_net.py index 7707b0d3bf..4a71265751 100644 --- a/gluoncv/auto/estimators/center_net/center_net.py +++ b/gluoncv/auto/estimators/center_net/center_net.py @@ -1,6 +1,7 @@ """CenterNet Estimator""" -# pylint: disable=unused-variable,missing-function-docstring,abstract-method +# pylint: disable=unused-variable,missing-function-docstring,abstract-method,logging-format-interpolation import os +import math import time import warnings from collections import OrderedDict @@ -23,6 +24,7 @@ from ....utils.metrics import VOCMApMetric, VOC07MApMetric from .default import CenterNetCfg from ...data.dataset import ObjectDetectionDataset +from ..conf import _BEST_CHECKPOINT_FILE __all__ = ['CenterNetEstimator'] @@ -79,7 +81,8 @@ def _predict_merge(x): valid_df = df[df['predict_score'] > 0].reset_index(drop=True) return valid_df - def _fit(self, train_data, val_data): + def _fit(self, train_data, val_data, time_limit=math.inf): + tic = time.time() self._best_map = 0 self.epoch = 0 self._time_elapsed = 0 @@ -90,9 +93,11 @@ def _fit(self, train_data, val_data): else: self.last_train = train_data self._init_trainer() - return self._resume_fit(train_data, val_data) + self._time_elapsed += time.time() - tic + return self._resume_fit(train_data, val_data, time_limit=time_limit) - def _resume_fit(self, train_data, val_data): + def _resume_fit(self, train_data, val_data, time_limit=math.inf): + tic = time.time() if max(self._cfg.train.start_epoch, self.epoch) >= self._cfg.train.epochs: return {'time', self._time_elapsed} if not self.classes or not self.num_class: @@ -119,10 +124,11 @@ def _resume_fit(self, train_data, val_data): train_dataset.transform(CenterNetDefaultValTransform(width, height)), self._cfg.valid.batch_size, False, batchify_fn=val_batchify_fn, last_batch='keep', num_workers=self._cfg.valid.num_workers) + self._time_elapsed += time.time() - tic + return self._train_loop(train_loader, val_loader, train_eval_loader, time_limit=time_limit) - return self._train_loop(train_loader, val_loader, train_eval_loader) - - def _train_loop(self, train_data, val_data, train_eval_data): + def _train_loop(self, train_data, val_data, train_eval_data, time_limit=math.inf): + start_tic = time.time() wh_loss = MaskedL1Loss(weight=self._cfg.center_net.wh_weight) heatmap_loss = HeatmapFocalLoss(from_logits=True) center_reg_loss = MaskedL1Loss(weight=self._cfg.center_net.center_reg_weight) @@ -131,19 +137,27 @@ def _train_loop(self, train_data, val_data, train_eval_data): center_reg_metric = mx.metric.Loss('CenterRegL1') self._logger.info('Start training from [Epoch %d]', max(self._cfg.train.start_epoch, self.epoch)) + mean_ap = [-1] + cp_name = '' + self._time_elapsed += time.time() - start_tic for self.epoch in range(max(self._cfg.train.start_epoch, self.epoch), self._cfg.train.epochs): epoch = self.epoch + tic = time.time() + last_tic = time.time() if self._best_map >= 1.0: self._logger.info('[Epoch %d] Early stopping as mAP is reaching 1.0', epoch) break wh_metric.reset() center_reg_metric.reset() heatmap_loss_metric.reset() - tic = time.time() - btic = time.time() self.net.hybridize() for i, batch in enumerate(train_data): + btic = time.time() + if self._time_elapsed > time_limit: + self._logger.warning(f'`time_limit={time_limit}` reached, exit early...') + return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map, + 'time': self._time_elapsed, 'checkpoint': cp_name} split_data = [ gluon.utils.split_and_load(batch[ind], ctx_list=self.ctx, batch_axis=0, even_split=False) for ind in range(6)] @@ -180,10 +194,12 @@ def _train_loop(self, train_data, val_data, train_eval_data): self._logger.info( '[Epoch {}][Batch {}], Speed: {:.3f} samples/sec, ' 'LR={}, {}={:.3f}, {}={:.3f}, {}={:.3f}'.format( - epoch, i, batch_size / (time.time() - btic), + epoch, i, batch_size / (time.time() - last_tic), self.trainer.learning_rate, name2, loss2, name3, loss3, name4, loss4)) - btic = time.time() + last_tic = time.time() + self._time_elapsed += time.time() - btic + post_tic = time.time() name2, loss2 = wh_metric.get() name3, loss3 = center_reg_metric.get() name4, loss4 = heatmap_loss_metric.get() @@ -197,17 +213,20 @@ def _train_loop(self, train_data, val_data, train_eval_data): self._logger.info('[Epoch %d] Validation: \n%s', epoch, val_msg) current_map = float(mean_ap[-1]) if current_map > self._best_map: - cp_name = os.path.join(self._logdir, 'best_checkpoint.pkl') + cp_name = os.path.join(self._logdir, _BEST_CHECKPOINT_FILE) self._logger.info('[Epoch %d] Current best map: %f vs previous %f, saved to %s', self.epoch, current_map, self._best_map, cp_name) self.save(cp_name) self._best_map = current_map if self._reporter: self._reporter(epoch=epoch, map_reward=current_map) - self._time_elapsed += time.time() - btic + self._time_elapsed += time.time() - post_tic # map on train data + tic = time.time() map_name, mean_ap = self._evaluate(train_eval_data) - return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map, 'time': self._time_elapsed} + self._time_elapsed += time.time() - tic + return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map, + 'time': self._time_elapsed, 'checkpoint': cp_name} def _evaluate(self, val_data): """Test on validation dataset.""" diff --git a/gluoncv/auto/estimators/conf.py b/gluoncv/auto/estimators/conf.py new file mode 100644 index 0000000000..5518c85b2b --- /dev/null +++ b/gluoncv/auto/estimators/conf.py @@ -0,0 +1,2 @@ +"""Shared configs""" +_BEST_CHECKPOINT_FILE = 'best_checkpoint.pkl' diff --git a/gluoncv/auto/estimators/faster_rcnn/faster_rcnn.py b/gluoncv/auto/estimators/faster_rcnn/faster_rcnn.py index 1da5125673..59b7221147 100644 --- a/gluoncv/auto/estimators/faster_rcnn/faster_rcnn.py +++ b/gluoncv/auto/estimators/faster_rcnn/faster_rcnn.py @@ -1,6 +1,7 @@ """Faster RCNN Estimator.""" -# pylint: disable=logging-not-lazy,abstract-method,unused-variable +# pylint: disable=logging-not-lazy,abstract-method,unused-variable,logging-format-interpolation import os +import math import time import warnings @@ -21,6 +22,7 @@ from ..base_estimator import BaseEstimator, set_default from .utils import _get_lr_at_iter, _get_dataloader, _split_and_load from ...data.dataset import ObjectDetectionDataset +from ..conf import _BEST_CHECKPOINT_FILE try: import horovod.mxnet as hvd @@ -59,8 +61,9 @@ def __init__(self, config, logger=None, reporter=None): super(FasterRCNNEstimator, self).__init__(config, logger, reporter) self.batch_size = self._cfg.train.batch_size - def _fit(self, train_data, val_data): + def _fit(self, train_data, val_data, time_limit=math.inf): """Fit Faster R-CNN model.""" + tic = time.time() self._best_map = 0 self.epoch = 0 self._time_elapsed = 0 @@ -71,9 +74,11 @@ def _fit(self, train_data, val_data): self.net.collect_params().setattr('grad_req', 'null') self.net.collect_train_params().setattr('grad_req', 'write') self._init_trainer() - return self._resume_fit(train_data, val_data) + self._time_elapsed += time.time() - tic + return self._resume_fit(train_data, val_data, time_limit=time_limit) - def _resume_fit(self, train_data, val_data): + def _resume_fit(self, train_data, val_data, time_limit=math.inf): + tic = time.time() if max(self._cfg.train.start_epoch, self.epoch) >= self._cfg.train.epochs: return {'time', self._time_elapsed} if not self.classes or not self.num_class: @@ -87,10 +92,11 @@ def _resume_fit(self, train_data, val_data): train_loader, val_loader, train_eval_loader = _get_dataloader( self.net, train_dataset, val_dataset, FasterRCNNDefaultTrainTransform, FasterRCNNDefaultValTransform, self.batch_size, len(self.ctx), self._cfg) + self._time_elapsed += time.time() - tic + return self._train_loop(train_loader, val_loader, train_eval_loader, time_limit=time_limit) - return self._train_loop(train_loader, val_loader, train_eval_loader) - - def _train_loop(self, train_data, val_data, train_eval_data): + def _train_loop(self, train_data, val_data, train_eval_data, time_limit=math.inf): + start_tic = time.time() # loss and metric rpn_cls_loss = gluon.loss.SigmoidBinaryCrossEntropyLoss(from_sigmoid=False) rpn_box_loss = gluon.loss.HuberLoss(rho=self._cfg.train.rpn_smoothl1_rho) # == smoothl1 @@ -120,12 +126,16 @@ def _train_loop(self, train_data, val_data, train_eval_data): self.net.collect_params().reset_ctx(self.ctx) self.net.target_generator.collect_params().reset_ctx(self.ctx) + mean_ap = [-1] + cp_name = '' + self._time_elapsed += time.time() - start_tic for self.epoch in range(max(self._cfg.train.start_epoch, self.epoch), self._cfg.train.epochs): epoch = self.epoch + tic = time.time() + last_tic = time.time() if self._best_map >= 1.0: self._logger.info('[Epoch %d] Early stopping as mAP is reaching 1.0', epoch) break - btic = time.time() rcnn_task = ForwardBackwardTask(self.net, self.trainer, rpn_cls_loss, rpn_box_loss, rcnn_cls_loss, rcnn_box_loss, mix_ratio=1.0, amp_enabled=self._cfg.faster_rcnn.amp) @@ -149,10 +159,14 @@ def _train_loop(self, train_data, val_data, train_eval_data): self._logger.info("[Epoch %d] Set learning rate to %f", epoch, new_lr) for metric in metrics: metric.reset() - tic = time.time() base_lr = self.trainer.learning_rate rcnn_task.mix_ratio = mix_ratio for i, batch in enumerate(train_data): + btic = time.time() + if self._time_elapsed > time_limit: + self._logger.warning(f'`time_limit={time_limit}` reached, exit early...') + return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map, + 'time': self._time_elapsed, 'checkpoint': cp_name} if epoch == 0 and i <= lr_warmup: # adjust based on real percentage new_lr = base_lr * _get_lr_at_iter(i / lr_warmup, @@ -194,9 +208,11 @@ def _train_loop(self, train_data, val_data, train_eval_data): self._logger.info('[Epoch {}][Batch {}], Speed: {:.3f} samples/sec, {}'.format( epoch, i, self._cfg.train.log_interval * self.batch_size / ( - time.time() - btic), msg)) - btic = time.time() + time.time() - last_tic), msg)) + last_tic = time.time() + self._time_elapsed += time.time() - btic + post_tic = time.time() if not self._cfg.horovod or hvd.rank() == 0: msg = ','.join(['{}={:.3f}'.format(*metric.get()) for metric in metrics]) # pylint: disable=logging-format-interpolation @@ -209,17 +225,20 @@ def _train_loop(self, train_data, val_data, train_eval_data): self._logger.info('[Epoch {}] Validation: \n{}'.format(epoch, val_msg)) current_map = float(mean_ap[-1]) if current_map > self._best_map: - cp_name = os.path.join(self._logdir, 'best_checkpoint.pkl') + cp_name = os.path.join(self._logdir, _BEST_CHECKPOINT_FILE) self._logger.info('[Epoch %d] Current best map: %f vs previous %f, saved to %s', self.epoch, current_map, self._best_map, cp_name) self.save(cp_name) self._best_map = current_map if self._reporter: self._reporter(epoch=epoch, map_reward=current_map) - self._time_elapsed += time.time() - btic + self._time_elapsed += time.time() - post_tic # map on train data + tic = time.time() map_name, mean_ap = self._evaluate(train_eval_data) - return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map, 'time': self._time_elapsed} + self._time_elapsed += time.time() - tic + return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map, + 'time': self._time_elapsed, 'checkpoint': cp_name} def _evaluate(self, val_data): """Evaluate on validation dataset.""" diff --git a/gluoncv/auto/estimators/image_classification/image_classification.py b/gluoncv/auto/estimators/image_classification/image_classification.py index c1a016f59b..d1909842b0 100644 --- a/gluoncv/auto/estimators/image_classification/image_classification.py +++ b/gluoncv/auto/estimators/image_classification/image_classification.py @@ -21,6 +21,7 @@ from .utils import get_data_loader, get_data_rec, smooth from .default import ImageClassificationCfg from ...data.dataset import ImageClassificationDataset +from ..conf import _BEST_CHECKPOINT_FILE __all__ = ['ImageClassificationEstimator'] @@ -62,7 +63,8 @@ def __init__(self, config, logger=None, reporter=None, net=None, optimizer=None) assert isinstance(optimizer, Optimizer) self._optimizer = optimizer - def _fit(self, train_data, val_data): + def _fit(self, train_data, val_data, time_limit=math.inf): + tic = time.time() self._best_acc = 0 self.epoch = 0 self._time_elapsed = 0 @@ -73,9 +75,11 @@ def _fit(self, train_data, val_data): else: self.last_train = train_data self._init_trainer() - return self._resume_fit(train_data, val_data) + self._time_elapsed += time.time() - tic + return self._resume_fit(train_data, val_data, time_limit=time_limit) - def _resume_fit(self, train_data, val_data): + def _resume_fit(self, train_data, val_data, time_limit=math.inf): + tic = time.time() if max(self._cfg.train.start_epoch, self.epoch) >= self._cfg.train.epochs: return {'time', self._time_elapsed} if not self.classes or not self.num_class: @@ -100,9 +104,11 @@ def _resume_fit(self, train_data, val_data): self._cfg.train.crop_ratio, train_dataset=train_dataset, val_dataset=val_dataset) - return self._train_loop(train_loader, val_loader) + self._time_elapsed += time.time() - tic + return self._train_loop(train_loader, val_loader, time_limit=time_limit) - def _train_loop(self, train_data, val_data): + def _train_loop(self, train_data, val_data, time_limit=math.inf): + start_tic = time.time() if self._cfg.train.no_wd: for k, v in self.net.collect_params('.*beta|.*gamma|.*bias').items(): v.wd_mult = 0.0 @@ -127,20 +133,28 @@ def _train_loop(self, train_data, val_data): self.teacher.hybridize(static_alloc=True, static_shape=True) self._logger.info('Start training from [Epoch %d]', max(self._cfg.train.start_epoch, self.epoch)) + train_metric_score = -1 + cp_name = '' + self._time_elapsed += time.time() - start_tic for self.epoch in range(max(self._cfg.train.start_epoch, self.epoch), self._cfg.train.epochs): epoch = self.epoch if self._best_acc >= 1.0: self._logger.info('[Epoch {}] Early stopping as acc is reaching 1.0'.format(epoch)) break - mx.nd.waitall() tic = time.time() - btic = time.time() + last_tic = time.time() + mx.nd.waitall() if self._cfg.train.use_rec: train_data.reset() train_metric.reset() # pylint: disable=undefined-loop-variable for i, batch in enumerate(train_data): + btic = time.time() + if self._time_elapsed > time_limit: + self._logger.warning(f'`time_limit={time_limit}` reached, exit early...') + return {'train_acc': train_metric_score, 'valid_acc': self._best_acc, + 'time': self._time_elapsed, 'checkpoint': cp_name} data, label = self.batch_fn(batch, self.ctx) if self._cfg.train.mixup: @@ -192,10 +206,12 @@ def _train_loop(self, train_data, val_data): train_metric_name, train_metric_score = train_metric.get() self._logger.info('Epoch[%d] Batch [%d]\tSpeed: %f samples/sec\t%s=%f\tlr=%f', epoch, i, - self._cfg.train.batch_size*self._cfg.train.log_interval/(time.time()-btic), + self._cfg.train.batch_size*self._cfg.train.log_interval/(time.time()-last_tic), train_metric_name, train_metric_score, self.trainer.learning_rate) - btic = time.time() + last_tic = time.time() + self._time_elapsed += time.time() - btic + post_tic = time.time() train_metric_name, train_metric_score = train_metric.get() throughput = int(self.batch_size * i /(time.time() - tic)) @@ -206,15 +222,16 @@ def _train_loop(self, train_data, val_data): self._logger.info('[Epoch %d] validation: top1=%f top5=%f', epoch, top1_val, top5_val) if top1_val > self._best_acc: - cp_name = os.path.join(self._logdir, 'best_checkpoint.pkl') + cp_name = os.path.join(self._logdir, _BEST_CHECKPOINT_FILE) self._logger.info('[Epoch %d] Current best top-1: %f vs previous %f, saved to %s', self.epoch, top1_val, self._best_acc, cp_name) self.save(cp_name) self._best_acc = top1_val if self._reporter: self._reporter(epoch=epoch, acc_reward=top1_val) - self._time_elapsed += time.time() - btic - return {'train_acc': train_metric_score, 'valid_acc': self._best_acc, 'time': self._time_elapsed} + self._time_elapsed += time.time() - post_tic + return {'train_acc': train_metric_score, 'valid_acc': self._best_acc, + 'time': self._time_elapsed, 'checkpoint': cp_name} def _init_network(self): if not self.num_class: diff --git a/gluoncv/auto/estimators/mask_rcnn/mask_rcnn.py b/gluoncv/auto/estimators/mask_rcnn/mask_rcnn.py index bda35111fd..ce284c7320 100644 --- a/gluoncv/auto/estimators/mask_rcnn/mask_rcnn.py +++ b/gluoncv/auto/estimators/mask_rcnn/mask_rcnn.py @@ -1,6 +1,7 @@ """Mask RCNN Estimator.""" # pylint: disable=consider-using-enumerate,abstract-method import os +import math import time import logging from multiprocessing import Process @@ -282,7 +283,7 @@ def coco_eval_save_task(eval_metric, logger): async_eval_processes.append(p) p.start() - def _fit(self, train_data, val_data): + def _fit(self, train_data, val_data, time_limit=math.inf): """ Fit Mask R-CNN models. """ diff --git a/gluoncv/auto/estimators/ssd/ssd.py b/gluoncv/auto/estimators/ssd/ssd.py index 5bd4ea06e3..d881554cc3 100644 --- a/gluoncv/auto/estimators/ssd/ssd.py +++ b/gluoncv/auto/estimators/ssd/ssd.py @@ -1,6 +1,7 @@ """SSD Estimator.""" # pylint: disable=logging-format-interpolation,abstract-method import os +import math import time import warnings @@ -24,6 +25,7 @@ from ..base_estimator import BaseEstimator, set_default from .default import SSDCfg from ...data.dataset import ObjectDetectionDataset +from ..conf import _BEST_CHECKPOINT_FILE try: import horovod.mxnet as hvd @@ -69,8 +71,9 @@ def __init__(self, config, logger=None, reporter=None): if self._cfg.horovod: hvd.init() - def _fit(self, train_data, val_data): + def _fit(self, train_data, val_data, time_limit=math.inf): """Fit SSD model.""" + tic = time.time() self._best_map = 0 self.epoch = 0 self._time_elapsed = 0 @@ -78,9 +81,11 @@ def _fit(self, train_data, val_data): return {'time', self._time_elapsed} self.net.collect_params().reset_ctx(self.ctx) self._init_trainer() - return self._resume_fit(train_data, val_data) + self._time_elapsed += time.time() - tic + return self._resume_fit(train_data, val_data, time_limit=time_limit) - def _resume_fit(self, train_data, val_data): + def _resume_fit(self, train_data, val_data, time_limit=math.inf): + tic = time.time() if max(self._cfg.train.start_epoch, self.epoch) >= self._cfg.train.epochs: return {'time', self._time_elapsed} if not self.classes or not self.num_class: @@ -106,9 +111,11 @@ def _resume_fit(self, train_data, val_data): self.async_net, train_dataset, val_dataset, self._cfg.ssd.data_shape, self.batch_size, self._cfg.num_workers) - return self._train_loop(train_loader, val_loader, train_eval_loader) + self._time_elapsed += time.time() - tic + return self._train_loop(train_loader, val_loader, train_eval_loader, time_limit=time_limit) - def _train_loop(self, train_data, val_data, train_eval_data): + def _train_loop(self, train_data, val_data, train_eval_data, time_limit=math.inf): + start_tic = time.time() # fix seed for mxnet, numpy and python builtin random generator. gutils.random.seed(self._cfg.train.seed) # loss and metric @@ -123,8 +130,13 @@ def _train_loop(self, train_data, val_data, train_eval_data): self._logger.info('Start training from [Epoch %d]', max(self._cfg.train.start_epoch, self.epoch)) self.net.collect_params().reset_ctx(self.ctx) + mean_ap = [-1] + cp_name = '' + self._time_elapsed += time.time() - start_tic for self.epoch in range(max(self._cfg.train.start_epoch, self.epoch), self._cfg.train.epochs): epoch = self.epoch + tic = time.time() + last_tic = time.time() if self._best_map >= 1.0: self._logger.info('[Epoch {}] Early stopping as mAP is reaching 1.0'.format(epoch)) break @@ -135,11 +147,14 @@ def _train_loop(self, train_data, val_data, train_eval_data): self._logger.info("[Epoch {}] Set learning rate to {}".format(epoch, new_lr)) ce_metric.reset() smoothl1_metric.reset() - tic = time.time() - btic = time.time() self.net.hybridize(static_alloc=True, static_shape=True) for i, batch in enumerate(train_data): + btic = time.time() + if self._time_elapsed > time_limit: + self._logger.warning(f'`time_limit={time_limit}` reached, exit early...') + return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map, + 'time': self._time_elapsed, 'checkpoint': cp_name} if self._cfg.train.dali: # dali iterator returns a mxnet.io.DataBatch data = [d.data[0] for d in batch] @@ -178,10 +193,12 @@ def _train_loop(self, train_data, val_data, train_eval_data): name1, loss1 = ce_metric.get() name2, loss2 = smoothl1_metric.get() self._logger.info('[Epoch %d][Batch %d], Speed: %f samples/sec, %s=%f, %s=%f', - epoch, i, self._cfg.train.batch_size/(time.time()-btic), + epoch, i, self._cfg.train.batch_size/(time.time()-last_tic), name1, loss1, name2, loss2) - btic = time.time() + last_tic = time.time() + self._time_elapsed += time.time() - btic + post_tic = time.time() if not self._cfg.horovod or hvd.rank() == 0: name1, loss1 = ce_metric.get() name2, loss2 = smoothl1_metric.get() @@ -195,17 +212,20 @@ def _train_loop(self, train_data, val_data, train_eval_data): self._logger.info('[Epoch %d] Validation: \n%s', epoch, str(val_msg)) current_map = float(mean_ap[-1]) if current_map > self._best_map: - cp_name = os.path.join(self._logdir, 'best_checkpoint.pkl') + cp_name = os.path.join(self._logdir, _BEST_CHECKPOINT_FILE) self._logger.info('[Epoch %d] Current best map: %f vs previous %f, saved to %s', self.epoch, current_map, self._best_map, cp_name) self.save(cp_name) self._best_map = current_map if self._reporter: self._reporter(epoch=epoch, map_reward=current_map) - self._time_elapsed += time.time() - btic + self._time_elapsed += time.time() - post_tic # map on train data + tic = time.time() map_name, mean_ap = self._evaluate(train_eval_data) - return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map, 'time': self._time_elapsed} + self._time_elapsed += time.time() - tic + return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map, + 'time': self._time_elapsed, 'checkpoint': cp_name} def _evaluate(self, val_data): """Evaluate on validation dataset.""" diff --git a/gluoncv/auto/estimators/yolo/yolo.py b/gluoncv/auto/estimators/yolo/yolo.py index dd80bd4546..2174b2f2bb 100644 --- a/gluoncv/auto/estimators/yolo/yolo.py +++ b/gluoncv/auto/estimators/yolo/yolo.py @@ -1,6 +1,7 @@ """YOLO Estimator.""" # pylint: disable=logging-format-interpolation,abstract-method import os +import math import time import warnings import pandas as pd @@ -22,6 +23,7 @@ from ..base_estimator import BaseEstimator, set_default from .utils import _get_dataloader from ...data.dataset import ObjectDetectionDataset +from ..conf import _BEST_CHECKPOINT_FILE try: import horovod.mxnet as hvd @@ -66,8 +68,9 @@ def __init__(self, config, logger=None, reporter=None): raise SystemExit("Horovod not found, please check if you installed it correctly.") hvd.init() - def _fit(self, train_data, val_data): + def _fit(self, train_data, val_data, time_limit=math.inf): """Fit YOLO3 model.""" + tic = time.time() self._best_map = 0 self.epoch = 0 self._time_elapsed = 0 @@ -79,9 +82,11 @@ def _fit(self, train_data, val_data): self.last_train = train_data self.net.collect_params().reset_ctx(self.ctx) self._init_trainer() - return self._resume_fit(train_data, val_data) + self._time_elapsed += time.time() - tic + return self._resume_fit(train_data, val_data, time_limit=time_limit) - def _resume_fit(self, train_data, val_data): + def _resume_fit(self, train_data, val_data, time_limit=math.inf): + tic = time.time() if max(self._cfg.train.start_epoch, self.epoch) >= self._cfg.train.epochs: return {'time', self._time_elapsed} if not self.classes or not self.num_class: @@ -101,9 +106,11 @@ def _resume_fit(self, train_data, val_data): v.wd_mult = 0.0 if self._cfg.train.label_smooth: self.net._target_generator._label_smooth = True - return self._train_loop(train_loader, val_loader, train_eval_loader) + self._time_elapsed += time.time() - tic + return self._train_loop(train_loader, val_loader, train_eval_loader, time_limit=time_limit) - def _train_loop(self, train_data, val_data, train_eval_data): + def _train_loop(self, train_data, val_data, train_eval_data, time_limit=math.inf): + start_tic = time.time() # fix seed for mxnet, numpy and python builtin random generator. gutils.random.seed(self._cfg.train.seed) @@ -114,13 +121,16 @@ def _train_loop(self, train_data, val_data, train_eval_data): cls_metrics = mx.metric.Loss('ClassLoss') trainer = self.trainer self._logger.info('Start training from [Epoch %d]', max(self._cfg.train.start_epoch, self.epoch)) + mean_ap = [-1] + cp_name = '' + self._time_elapsed += time.time() - start_tic for self.epoch in range(max(self._cfg.train.start_epoch, self.epoch), self._cfg.train.epochs): epoch = self.epoch if self._best_map >= 1.0: self._logger.info('[Epoch {}] Early stopping as mAP is reaching 1.0'.format(epoch)) break tic = time.time() - btic = time.time() + last_tic = time.time() if self._cfg.train.mixup: # TODO(zhreshold): more elegant way to control mixup during runtime try: @@ -136,6 +146,11 @@ def _train_loop(self, train_data, val_data, train_eval_data): mx.nd.waitall() self.net.hybridize() for i, batch in enumerate(train_data): + btic = time.time() + if self._time_elapsed > time_limit: + self._logger.warning(f'`time_limit={time_limit}` reached, exit early...') + return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map, + 'time': self._time_elapsed, 'checkpoint': cp_name} data = gluon.utils.split_and_load(batch[0], ctx_list=self.ctx, batch_axis=0, even_split=False) # objectness, center_targets, scale_targets, weights, class_targets fixed_targets = [gluon.utils.split_and_load(batch[it], ctx_list=self.ctx, @@ -174,10 +189,12 @@ def _train_loop(self, train_data, val_data, train_eval_data): self._logger.info( '[Epoch {}][Batch {}], LR: {:.2E}, Speed: {:.3f} samples/sec,' ' {}={:.3f}, {}={:.3f}, {}={:.3f}, {}={:.3f}'.format( - epoch, i, trainer.learning_rate, self._cfg.train.batch_size / (time.time() - btic), + epoch, i, trainer.learning_rate, self._cfg.train.batch_size / (time.time() - last_tic), name1, loss1, name2, loss2, name3, loss3, name4, loss4)) - btic = time.time() + last_tic = time.time() + self._time_elapsed += time.time() - btic + post_tic = time.time() if (not self._cfg.horovod or hvd.rank() == 0): name1, loss1 = obj_metrics.get() name2, loss2 = center_metrics.get() @@ -192,18 +209,21 @@ def _train_loop(self, train_data, val_data, train_eval_data): self._logger.info('[Epoch {}] Validation: \n{}'.format(epoch, val_msg)) current_map = float(mean_ap[-1]) if current_map > self._best_map: - cp_name = os.path.join(self._logdir, 'best_checkpoint.pkl') + cp_name = os.path.join(self._logdir, _BEST_CHECKPOINT_FILE) self._logger.info('[Epoch %d] Current best map: %f vs previous %f, saved to %s', self.epoch, current_map, self._best_map, cp_name) self.save(cp_name) self._best_map = current_map if self._reporter: self._reporter(epoch=epoch, map_reward=current_map) - self._time_elapsed += time.time() - btic + self._time_elapsed += time.time() - post_tic # map on train data + tic = time.time() map_name, mean_ap = self._evaluate(train_eval_data) - return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map, 'time': self._time_elapsed} + self._time_elapsed += time.time() - tic + return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map, + 'time': self._time_elapsed, 'checkpoint': cp_name} def _evaluate(self, val_data): """Evaluate the current model on dataset.""" diff --git a/gluoncv/auto/tasks/image_classification.py b/gluoncv/auto/tasks/image_classification.py index 966f26398f..59e71e0060 100644 --- a/gluoncv/auto/tasks/image_classification.py +++ b/gluoncv/auto/tasks/image_classification.py @@ -1,16 +1,20 @@ """Auto pipeline for image classification task""" -# pylint: disable=bad-whitespace,missing-class-docstring +# pylint: disable=bad-whitespace,missing-class-docstring,bare-except import time +import os +import math import copy import logging import pprint import json import pickle from typing import Union, Tuple +import uuid +import shutil -from autocfg import dataclass import numpy as np import pandas as pd +from autocfg import dataclass import autogluon.core as ag from autogluon.core.decorator import sample_config from autogluon.core.scheduler.resource import get_cpu_count, get_gpu_count @@ -21,6 +25,7 @@ from ..estimators import ImageClassificationEstimator from .utils import config_to_nested from ..data.dataset import ImageClassificationDataset +from ..estimators.conf import _BEST_CHECKPOINT_FILE __all__ = ['ImageClassification'] @@ -58,9 +63,18 @@ def _train_image_classification(args, reporter): ---------- args: """ + tic = time.time() + try: + task_id = int(args.task_id) + except: + task_id = 0 + final_fit = args.pop('final_fit', False) # train, val data train_data = args.pop('train_data') val_data = args.pop('val_data') + # wall clock tick limit + wall_clock_tick = args.pop('wall_clock_tick') + log_dir = args.pop('log_dir', os.getcwd()) # exponential batch size for Int() space batch sizes try: exp_batch_size = args.pop('exp_batch_size') @@ -77,37 +91,67 @@ def _train_image_classification(args, reporter): # convert user defined config to nested form args = config_to_nested(args) - tic = time.time() + if wall_clock_tick < tic and not final_fit: + return {'traceback': 'timeout', 'args': str(args), + 'time': 0, 'train_acc': -1, 'valid_acc': -1} + try: + valid_summary_file = 'fit_summary_img_cls.ag' estimator_cls = args.pop('estimator', None) assert estimator_cls == ImageClassificationEstimator - custom_net = args.pop('custom_net', None) - custom_optimizer = args.pop('custom_optimizer', None) - estimator = estimator_cls(args, reporter=reporter, - net=custom_net, optimizer=custom_optimizer) - # training - result = estimator.fit(train_data=train_data, val_data=val_data) - # save config and result - if task is not None: - trial_log = {} - trial_log.update(args) - trial_log.update(result) - json_str = json.dumps(trial_log) - time_str = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) - json_file_name = task + '_dataset-' + dataset + '_trials-' + str(num_trials) + '_' + time_str + '.json' - with open(json_file_name, 'w') as json_file: - json_file.write(json_str) - logging.info('Config and result in this trial have been saved to %s.', json_file_name) - # pylint: disable=bare-except + if final_fit: + # load from previous dumps + estimator = None + if os.path.isdir(log_dir): + is_valid_dir_fn = lambda d : d.startswith('.trial_') and os.path.isdir(os.path.join(log_dir, d)) + trial_dirs = [d for d in os.listdir(log_dir) if is_valid_dir_fn(d)] + best_checkpoint = '' + best_acc = -1 + result = {} + for dd in trial_dirs: + try: + with open(os.path.join(log_dir, dd, valid_summary_file), 'r') as f: + result = json.load(f) + acc = result.get('valid_acc', -1) + if acc > best_acc and os.path.isfile(os.path.join(log_dir, dd, _BEST_CHECKPOINT_FILE)): + best_checkpoint = os.path.join(log_dir, dd, _BEST_CHECKPOINT_FILE) + best_acc = acc + except: + pass + if best_checkpoint: + estimator = estimator_cls.load(best_checkpoint) + if estimator is None: + result.update({'traceback': 'timeout'}) + else: + # create independent log_dir for each trial + trial_log_dir = os.path.join(log_dir, '.trial_{}'.format(task_id)) + args['log_dir'] = trial_log_dir + custom_net = args.pop('custom_net', None) + custom_optimizer = args.pop('custom_optimizer', None) + estimator = estimator_cls(args, reporter=reporter, + net=custom_net, optimizer=custom_optimizer) + # training + result = estimator.fit(train_data=train_data, val_data=val_data, time_limit=wall_clock_tick-tic) + with open(os.path.join(trial_log_dir, valid_summary_file), 'w') as f: + json.dump(result, f) + # save config and result + if task is not None: + trial_log = {} + trial_log.update(args) + trial_log.update(result) + json_str = json.dumps(trial_log) + time_str = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) + json_file_name = task + '_dataset-' + dataset + '_trials-' + str(num_trials) + '_' + time_str + '.json' + with open(json_file_name, 'w') as json_file: + json_file.write(json_str) + logging.info('Config and result in this trial have been saved to %s.', json_file_name) except: import traceback return {'traceback': traceback.format_exc(), 'args': str(args), 'time': time.time() - tic, 'train_acc': -1, 'valid_acc': -1} - # TODO: checkpointing needs to be done in a better way - # unique_checkpoint = 'train_image_classification_' + str(uuid.uuid4()) + '.pkl' - # estimator.save(unique_checkpoint) - result.update({'model_checkpoint': pickle.dumps(estimator)}) + if estimator: + result.update({'model_checkpoint': pickle.dumps(estimator)}) return result @@ -126,7 +170,7 @@ class ImageClassification(BaseTask): """ Dataset = ImageClassificationDataset - def __init__(self, config=None, estimator=None, logger=None): + def __init__(self, config=None, logger=None): super(ImageClassification, self).__init__() self._fit_summary = {} self._logger = logger if logger is not None else logging.getLogger(__name__) @@ -179,6 +223,8 @@ def __init__(self, config=None, estimator=None, logger=None): config['num_workers'] = nthreads_per_trial config['gpus'] = [int(i) for i in range(ngpus_per_trial)] config['seed'] = config.get('seed', np.random.randint(32,767)) + config['final_fit'] = False + self._cleanup_disk = config.get('cleanup_disk', True) self._config = config # scheduler options @@ -208,7 +254,7 @@ def __init__(self, config=None, estimator=None, logger=None): 'max_t': config.get('epochs', 50), 'grace_period': config.get('grace_period', config.get('epochs', 50) // 4)}) - def fit(self, train_data, val_data=None, train_size=0.9, random_state=None): + def fit(self, train_data, val_data=None, train_size=0.9, random_state=None, time_limit=None): """Fit auto estimator given the input data. Parameters @@ -222,6 +268,14 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None): The portion of train data split from original `train_data` if `val_data` is not provided. random_state : int Random state for splitting, for `np.random.seed`. + time_limit : int, default is None + The wall clock time limit(second) for fit process, if `None`, time limit is not enforced. + If `fit` takes longer than `time_limit`, the process will terminate early and return the + model prematurally. + Due to callbacks and additional validation functions, the `time_limit` may not be very precise + (few minutes allowance), but you can use it to safe-guard a very long training session. + If `time_limits` key set in __init__ with config, the `time_limit` value will overwrite configuration + if not `None`. Returns ------- @@ -229,6 +283,16 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None): The estimator obtained by training on the specified dataset. """ + config = self._config.copy() + if time_limit is None: + if config.get('time_limits', None): + time_limit = config['time_limits'] + else: + time_limit = math.inf + elif not isinstance(time_limit, int): + raise TypeError(f'Invalid type `time_limit={time_limit}`, int or None expected') + self.scheduler_options['time_out'] = time_limit + wall_clock_tick = time.time() + time_limit # split train/val before HPO to make fair comparisons if not isinstance(train_data, pd.DataFrame): assert val_data is not None, \ @@ -246,7 +310,7 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None): len(train), len(val)) train_data, val_data = train, val - estimator = self._config.get('estimator', None) + estimator = config.get('estimator', None) if estimator is None: estimator = [ImageClassificationEstimator] else: @@ -262,14 +326,15 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None): if not estimator: raise ValueError('Unable to determine the estimator for fit function.') if len(estimator) == 1: - self._config['estimator'] = estimator[0] + config['estimator'] = estimator[0] else: - self._config['estimator'] = ag.Categorical(*estimator) + config['estimator'] = ag.Categorical(*estimator) # register args - config = self._config.copy() config['train_data'] = train_data config['val_data'] = val_data + config['wall_clock_tick'] = wall_clock_tick + config['log_dir'] = os.path.join(config.get('log_dir', os.getcwd()), str(uuid.uuid4())[:8]) _train_image_classification.register_args(**config) start_time = time.time() @@ -286,6 +351,7 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None): 'valid_acc': results.get('valid_acc', -1), 'total_time': results.get('time', time.time() - start_time), 'best_config': best_config}) + self._results = self._fit_summary else: self._logger.info("Starting HPO experiments") results = self.run_fit(_train_image_classification, self.search_strategy, @@ -294,8 +360,7 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None): ks = ('best_reward', 'best_config', 'total_time', 'config_history', 'reward_attr') self._results.update({k: v for k, v in results.items() if k in ks}) end_time = time.time() - self._logger.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> finish model fitting") - self._logger.info("total runtime is %.2f s", end_time - start_time) + self._logger.info("Finished, total runtime is %.2f s", end_time - start_time) if config.get('num_trials', 1) > 1: best_config = sample_config(_train_image_classification.args, results['best_config']) # convert best config to nested form @@ -308,9 +373,12 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None): 'best_config': best_config}) self._logger.info(pprint.pformat(self._fit_summary, indent=2)) - # TODO: checkpointing needs to be done in a better way + if self._cleanup_disk: + shutil.rmtree(config['log_dir'], ignore_errors=True) model_checkpoint = results.get('model_checkpoint', None) if model_checkpoint is None: + if results.get('traceback', '') == 'timeout': + raise TimeoutError(f'Unable to fit a usable model given `time_limit={time_limit}`') raise RuntimeError(f'Unexpected error happened during fit: {pprint.pformat(results, indent=2)}') estimator = pickle.loads(results['model_checkpoint']) return estimator diff --git a/gluoncv/auto/tasks/object_detection.py b/gluoncv/auto/tasks/object_detection.py index fe661c6f1c..00ba7a211d 100644 --- a/gluoncv/auto/tasks/object_detection.py +++ b/gluoncv/auto/tasks/object_detection.py @@ -1,16 +1,20 @@ """Auto pipeline for object detection task""" -# pylint: disable=bad-whitespace,missing-class-docstring +# pylint: disable=bad-whitespace,missing-class-docstring,bare-except import time +import os +import math import copy import logging import pprint import json import pickle from typing import Union, Tuple +import uuid +import shutil -from autocfg import dataclass import numpy as np import pandas as pd +from autocfg import dataclass import autogluon.core as ag from autogluon.core.decorator import sample_config from autogluon.core.scheduler.resource import get_cpu_count, get_gpu_count @@ -21,6 +25,7 @@ from ..estimators import SSDEstimator, FasterRCNNEstimator, YOLOv3Estimator, CenterNetEstimator from .utils import auto_suggest, config_to_nested from ..data.dataset import ObjectDetectionDataset +from ..estimators.conf import _BEST_CHECKPOINT_FILE __all__ = ['ObjectDetection'] @@ -59,9 +64,18 @@ def _train_object_detection(args, reporter): ---------- args: """ + tic = time.time() + try: + task_id = int(args.task_id) + except: + task_id = 0 + final_fit = args.pop('final_fit', False) # train, val data train_data = args.pop('train_data') val_data = args.pop('val_data') + # wall clock tick limit + wall_clock_tick = args.pop('wall_clock_tick') + log_dir = args.pop('log_dir', os.getcwd()) # exponential batch size for Int() space batch sizes try: exp_batch_size = args.pop('exp_batch_size') @@ -75,41 +89,72 @@ def _train_object_detection(args, reporter): num_trials = args.pop('num_trials') except AttributeError: task = None + # convert user defined config to nested form args = config_to_nested(args) - tic = time.time() + if wall_clock_tick < tic and not final_fit: + return {'traceback': 'timeout', 'args': str(args), + 'time': 0, 'train_map': -1, 'valid_map': -1} + try: + valid_summary_file = 'fit_summary_obj_det.ag' estimator_cls = args.pop('estimator', None) if estimator_cls == FasterRCNNEstimator: # safe guard if too many GT in dataset train_dataset = train_data.to_mxnet() max_gt_count = max([y[1].shape[0] for y in train_dataset]) + 20 args['faster_rcnn']['max_num_gt'] = max_gt_count - estimator = estimator_cls(args, reporter=reporter) - # training - result = estimator.fit(train_data=train_data, val_data=val_data) - # save config and result - if task is not None: - trial_log = {} - trial_log.update(args) - trial_log.update(result) - json_str = json.dumps(trial_log) - time_str = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) - json_file_name = task + '_dataset-' + dataset + '_trials-' + str(num_trials) + '_' + time_str + '.json' - with open(json_file_name, 'w') as json_file: - json_file.write(json_str) - logging.info('Config and result in this trial have been saved to %s.', json_file_name) - # pylint: disable=bare-except + if final_fit: + # load from previous dumps + estimator = None + if os.path.isdir(log_dir): + is_valid_dir_fn = lambda d : d.startswith('.trial_') and os.path.isdir(os.path.join(log_dir, d)) + trial_dirs = [d for d in os.listdir(log_dir) if is_valid_dir_fn(d)] + best_checkpoint = '' + best_acc = -1 + result = {} + for dd in trial_dirs: + try: + with open(os.path.join(log_dir, dd, valid_summary_file), 'r') as f: + result = json.load(f) + acc = result.get('valid_map', -1) + if acc > best_acc and os.path.isfile(os.path.join(log_dir, dd, _BEST_CHECKPOINT_FILE)): + best_checkpoint = os.path.join(log_dir, dd, _BEST_CHECKPOINT_FILE) + best_acc = acc + except: + pass + if best_checkpoint: + estimator = estimator_cls.load(best_checkpoint) + if estimator is None: + result.update({'traceback': 'timeout'}) + else: + # create independent log_dir for each trial + trial_log_dir = os.path.join(log_dir, '.trial_{}'.format(task_id)) + args['log_dir'] = trial_log_dir + estimator = estimator_cls(args, reporter=reporter) + # training + result = estimator.fit(train_data=train_data, val_data=val_data, time_limit=wall_clock_tick-tic) + with open(os.path.join(trial_log_dir, valid_summary_file), 'w') as f: + json.dump(result, f) + # save config and result + if task is not None: + trial_log = {} + trial_log.update(args) + trial_log.update(result) + json_str = json.dumps(trial_log) + time_str = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) + json_file_name = task + '_dataset-' + dataset + '_trials-' + str(num_trials) + '_' + time_str + '.json' + with open(json_file_name, 'w') as json_file: + json_file.write(json_str) + logging.info('Config and result in this trial have been saved to %s.', json_file_name) except: import traceback return {'traceback': traceback.format_exc(), 'args': str(args), 'time': time.time() - tic, 'train_map': -1, 'valid_map': -1} - # TODO: checkpointing needs to be done in a better way - # unique_checkpoint = 'train_object_detection_' + str(uuid.uuid4()) + '.pkl' - # estimator.save(unique_checkpoint) - result.update({'model_checkpoint': pickle.dumps(estimator)}) + if estimator: + result.update({'model_checkpoint': pickle.dumps(estimator)}) return result class ObjectDetection(BaseTask): @@ -202,6 +247,8 @@ def __init__(self, config=None, logger=None): config['num_workers'] = nthreads_per_trial config['gpus'] = [int(i) for i in range(ngpus_per_trial)] config['seed'] = config.get('seed', np.random.randint(32,767)) + config['final_fit'] = False + self._cleanup_disk = config.get('cleanup_disk', True) self._config = config # scheduler options @@ -231,7 +278,7 @@ def __init__(self, config=None, logger=None): 'max_t': config.get('epochs', 50), 'grace_period': config.get('grace_period', config.get('epochs', 50) // 4)}) - def fit(self, train_data, val_data=None, train_size=0.9, random_state=None): + def fit(self, train_data, val_data=None, train_size=0.9, random_state=None, time_limit=None): """Fit auto estimator given the input data. Parameters @@ -245,6 +292,14 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None): The portion of train data split from original `train_data` if `val_data` is not provided. random_state : int Random state for splitting, for `np.random.seed`. + time_limit : int, default is None + The wall clock time limit(second) for fit process, if `None`, time limit is not enforced. + If `fit` takes longer than `time_limit`, the process will terminate early and return the + model prematurally. + Due to callbacks and additional validation functions, the `time_limit` may not be very precise + (few minutes allowance), but you can use it to safe-guard a very long training session. + If `time_limits` key set in __init__ with config, the `time_limit` value will overwrite configuration + if not `None`. Returns ------- @@ -252,6 +307,16 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None): The estimator obtained by training on the specified dataset. """ + config = self._config.copy() + if time_limit is None: + if config.get('time_limits', None): + time_limit = config['time_limits'] + else: + time_limit = math.inf + elif not isinstance(time_limit, int): + raise TypeError(f'Invalid type `time_limit={time_limit}`, int or None expected') + self.scheduler_options['time_out'] = time_limit + wall_clock_tick = time.time() + time_limit # split train/val before HPO to make fair comparisons if not isinstance(train_data, pd.DataFrame): assert val_data is not None, \ @@ -270,17 +335,18 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None): train_data, val_data = train, val # automatically suggest some hyperparameters based on the dataset statistics(experimental) - estimator = self._config.get('estimator', None) - transfer = self._config.get('transfer', None) + estimator = config.get('estimator', None) + transfer = config.get('transfer', None) if not transfer: - self._config['train_dataset'] = train_data - auto_suggest(self._config, estimator, self._logger) - self._config.pop('train_dataset') + config['train_dataset'] = train_data + auto_suggest(config, estimator, self._logger) + config.pop('train_dataset') # register args - config = self._config.copy() config['train_data'] = train_data config['val_data'] = val_data + config['wall_clock_tick'] = wall_clock_tick + config['log_dir'] = os.path.join(config.get('log_dir', os.getcwd()), str(uuid.uuid4())[:8]) _train_object_detection.register_args(**config) start_time = time.time() @@ -297,6 +363,7 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None): 'valid_map': results.get('valid_map', -1), 'total_time': results.get('time', time.time() - start_time), 'best_config': best_config}) + self._results = self._fit_summary else: self._logger.info("Starting HPO experiments") results = self.run_fit(_train_object_detection, self.search_strategy, @@ -305,8 +372,7 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None): ks = ('best_reward', 'best_config', 'total_time', 'config_history', 'reward_attr') self._results.update({k: v for k, v in results.items() if k in ks}) end_time = time.time() - self._logger.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> finish model fitting") - self._logger.info("total runtime is %.2f s", end_time - start_time) + self._logger.info("Finished, total runtime is %.2f s", end_time - start_time) if config.get('num_trials', 1) > 1: best_config = sample_config(_train_object_detection.args, results['best_config']) # convert best config to nested form @@ -319,9 +385,12 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None): 'best_config': best_config}) self._logger.info(pprint.pformat(self._fit_summary, indent=2)) - # TODO: checkpointing needs to be done in a better way + if self._cleanup_disk: + shutil.rmtree(config['log_dir'], ignore_errors=True) model_checkpoint = results.get('model_checkpoint', None) if model_checkpoint is None: + if results.get('traceback', '') == 'timeout': + raise TimeoutError(f'Unable to fit a usable model given `time_limit={time_limit}`') raise RuntimeError(f'Unexpected error happened during fit: {pprint.pformat(results, indent=2)}') estimator = pickle.loads(results['model_checkpoint']) return estimator diff --git a/gluoncv/check.py b/gluoncv/check.py index 656350141e..ec9b525d8c 100644 --- a/gluoncv/check.py +++ b/gluoncv/check.py @@ -32,7 +32,7 @@ def _require_pytorch_version(torch_version, max_torch_version='2.0.0'): version_str = '>={},<{}'.format(torch_version, max_torch_version) msg = ( "Legacy torch=={0} detected, some modules may not work properly. " - "torch{1} is required. You can use pip or conda to upgrade") + "torch{1} is required. You can use pip or conda to upgrade".format(torch.__version__, version_str)) raise RuntimeError(msg) except ImportError: raise ImportError( diff --git a/tests/auto/test_auto_tasks.py b/tests/auto/test_auto_tasks.py index edf9e5ea74..617a82e3cf 100644 --- a/tests/auto/test_auto_tasks.py +++ b/tests/auto/test_auto_tasks.py @@ -1,6 +1,7 @@ from gluoncv.auto.tasks import ImageClassification from gluoncv.auto.tasks import ObjectDetection import autogluon.core as ag +import time IMAGE_CLASS_DATASET, _, IMAGE_CLASS_TEST = ImageClassification.Dataset.from_folders( 'https://autogluon.s3.amazonaws.com/datasets/shopee-iet.zip') @@ -36,3 +37,22 @@ def test_object_detection_estimator_transfer(): detector = task.fit(OBJECT_DETECTION_TRAIN) assert task.fit_summary().get('valid_map', 0) > 0 test_result = detector.predict(OBJECT_DETECTION_TEST) + +def test_time_out_image_classification(): + time_limit = 30 + from gluoncv.auto.tasks import ImageClassification + task = ImageClassification({'num_trials': 1, 'epochs': 50}) + + tic = time.time() + classifier = task.fit(IMAGE_CLASS_DATASET, time_limit=time_limit) + # check time_limit with a little bit overhead + assert (time.time() - tic) < time_limit + 180 + +def test_time_out_detection(): + time_limit = 30 + from gluoncv.auto.tasks import ObjectDetection + task = ObjectDetection({'num_trials': 1, 'epochs': 50, 'time_limits': time_limit}) + tic = time.time() + detector = task.fit(OBJECT_DETECTION_TRAIN) + # check time_limit with a little bit overhead + assert (time.time() - tic) < time_limit + 180