Skip to content

Commit

Permalink
[Auto] fit time limit (#1615)
Browse files Browse the repository at this point in the history
* time_limit in training loops

* test time_limit

* fix

* fix pass args

* increase tic frequency

* fix precise tics

* fix speed tic

* fix timelimit

* per-trial based cache dir

* fix

* fix

* fix

* fix

* final_fit

* debug

* fix

* debug

* debug

* dd

* dd

* fix path

* debug

* fix

* fix log file path if deleted

* fix self config overwrite

* fix

* results

* fix all

* try reload mxnet in training

* fix

* fix timeout

* fix

* fix reload

* fix

* import os

* fix task id

* fix

* fix doc

* fix overhead

* fix

* fix lint

* fix lint

* fix tests

* fix

* disable hpo in tests
  • Loading branch information
zhreshold authored Feb 17, 2021
1 parent 5db0943 commit 4280cb1
Show file tree
Hide file tree
Showing 12 changed files with 421 additions and 148 deletions.
52 changes: 35 additions & 17 deletions gluoncv/auto/estimators/base_estimator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Base Estimator"""
import os
import math
import pickle
import io
import logging
Expand Down Expand Up @@ -28,9 +29,7 @@ def _apply(cls):
" Config used to override default configurations. \n"
" If `str`, assume config file (.yml, .yaml) is used. \n"
"logger : logger, default is `None`.\n"
" If not `None`, will use default logging object.\n"
"logdir : str, default is None.\n"
" Directory for saving logs. If `None`, current working directory is used.\n")
" If not `None`, will use default logging object.\n")
cls.__doc__ += '\nDefault configurations: \n--------------------\n'
sio = io.StringIO()
cfg.save(sio)
Expand Down Expand Up @@ -80,22 +79,22 @@ def __init__(self, config, logger=None, reporter=None, name=None):
self.dataset = 'auto'

# logdir
logdir = config.pop('logdir', None) if isinstance(config, dict) else None
self._logdir = os.path.abspath(logdir) if logdir else os.getcwd()
logdir = config.pop('log_dir', None) if isinstance(config, dict) else None
if logdir:
self._logdir = os.path.abspath(logdir)
else:
self._logdir = os.path.join(os.getcwd(), name.lower() + datetime.now().strftime("-%m-%d-%Y"))

# finalize config
cfg = self._default_cfg.merge(config) # config can be dict or yaml file
diffs = self._default_cfg.diff(cfg)
if diffs:
self._logger.info('modified configs: {')
self._logger.info('modified configs(<old> != <new>): {')
for diff in diffs:
self._logger.info(diff)
self._logger.info('}')
self._cfg = cfg

prefix = name.lower() + datetime.now().strftime("-%m-%d-%Y")
self._logdir = os.path.join(self._logdir, prefix)
# r.config['logdir'] = self._logdir
os.makedirs(self._logdir, exist_ok=True)
config_file = os.path.join(self._logdir, 'config.yaml')
# log file
Expand All @@ -111,7 +110,9 @@ def __init__(self, config, logger=None, reporter=None, name=None):
seed = self._cfg.get('seed', np.random.randint(1000000))
_random.seed(seed)

def fit(self, train_data, val_data=None, train_size=0.9, random_state=None, resume=False):
def fit(self, train_data, val_data=None, train_size=0.9, random_state=None,
resume=False, time_limit=None):

"""Fit with train/validation data.
Parameters
Expand All @@ -127,12 +128,22 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None, resu
Random state for splitting, for `np.random.seed`.
resume : bool
Whether resume from previous `fit`(if possible) or start as fresh.
time_limit : int, default is None
The wall clock time limit(second) for fit process, if `None`, time limit is not enforced.
If `fit` takes longer than `time_limit`, the process will terminate early and return the
model prematurally.
Due to callbacks and additional validation functions, the `time_limit` may not be very precise
(few minutes allowance), but you can use it to safe-guard a very long training session.
Returns
-------
None
"""
if time_limit is None:
time_limit = math.inf
elif not isinstance(time_limit, (int, float)):
raise TypeError(f'Invalid type `time_limit={time_limit}`, int/float or None expected')
if not resume:
self.classes = train_data.classes
self.num_class = len(self.classes)
Expand All @@ -141,7 +152,8 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None, resu
assert val_data is not None, \
"Please provide `val_data` as we do not know how to split `train_data` of type: \
{}".format(type(train_data))
return self._fit(train_data, val_data) if not resume else self._resume_fit(train_data, val_data)
return self._fit(train_data, val_data, time_limit=time_limit) if not resume \
else self._resume_fit(train_data, val_data, time_limit=time_limit)

if val_data is None:
assert 0 <= train_size <= 1.0
Expand All @@ -152,9 +164,11 @@ def fit(self, train_data, val_data=None, train_size=0.9, random_state=None, resu
val = train_data[~split_mask]
self._logger.info('Randomly split train_data into train[%d]/validation[%d] splits.',
len(train), len(val))
return self._fit(train, val) if not resume else self._resume_fit(train, val)
return self._fit(train, val, time_limit=time_limit) if not resume else \
self._resume_fit(train, val, time_limit=time_limit)

return self._fit(train_data, val_data) if not resume else self._resume_fit(train_data, val_data)
return self._fit(train_data, val_data, time_limit=time_limit) if not resume else \
self._resume_fit(train_data, val_data, time_limit=time_limit)

def evaluate(self, val_data):
"""Evaluate estimator on validation data.
Expand Down Expand Up @@ -195,10 +209,10 @@ def _predict(self, x):
def _predict_feature(self, x):
raise NotImplementedError

def _fit(self, train_data, val_data):
def _fit(self, train_data, val_data, time_limit=math.inf):
raise NotImplementedError

def _resume_fit(self, train_data, val_data):
def _resume_fit(self, train_data, val_data, time_limit=math.inf):
raise NotImplementedError

def _evaluate(self, val_data):
Expand Down Expand Up @@ -265,8 +279,12 @@ def __setstate__(self, state):
# logger
self._logger = logging.getLogger(state.get('_name', self.__class__.__name__))
self._logger.setLevel(logging.ERROR)
fh = logging.FileHandler(self._log_file)
self._logger.addHandler(fh)
try:
fh = logging.FileHandler(self._log_file)
self._logger.addHandler(fh)
#pylint: disable=bare-except
except:
pass
try:
import mxnet as _
net_params = state['net']
Expand Down
47 changes: 33 additions & 14 deletions gluoncv/auto/estimators/center_net/center_net.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""CenterNet Estimator"""
# pylint: disable=unused-variable,missing-function-docstring,abstract-method
# pylint: disable=unused-variable,missing-function-docstring,abstract-method,logging-format-interpolation
import os
import math
import time
import warnings
from collections import OrderedDict
Expand All @@ -23,6 +24,7 @@
from ....utils.metrics import VOCMApMetric, VOC07MApMetric
from .default import CenterNetCfg
from ...data.dataset import ObjectDetectionDataset
from ..conf import _BEST_CHECKPOINT_FILE

__all__ = ['CenterNetEstimator']

Expand Down Expand Up @@ -79,7 +81,8 @@ def _predict_merge(x):
valid_df = df[df['predict_score'] > 0].reset_index(drop=True)
return valid_df

def _fit(self, train_data, val_data):
def _fit(self, train_data, val_data, time_limit=math.inf):
tic = time.time()
self._best_map = 0
self.epoch = 0
self._time_elapsed = 0
Expand All @@ -90,9 +93,11 @@ def _fit(self, train_data, val_data):
else:
self.last_train = train_data
self._init_trainer()
return self._resume_fit(train_data, val_data)
self._time_elapsed += time.time() - tic
return self._resume_fit(train_data, val_data, time_limit=time_limit)

def _resume_fit(self, train_data, val_data):
def _resume_fit(self, train_data, val_data, time_limit=math.inf):
tic = time.time()
if max(self._cfg.train.start_epoch, self.epoch) >= self._cfg.train.epochs:
return {'time', self._time_elapsed}
if not self.classes or not self.num_class:
Expand All @@ -119,10 +124,11 @@ def _resume_fit(self, train_data, val_data):
train_dataset.transform(CenterNetDefaultValTransform(width, height)),
self._cfg.valid.batch_size, False, batchify_fn=val_batchify_fn, last_batch='keep',
num_workers=self._cfg.valid.num_workers)
self._time_elapsed += time.time() - tic
return self._train_loop(train_loader, val_loader, train_eval_loader, time_limit=time_limit)

return self._train_loop(train_loader, val_loader, train_eval_loader)

def _train_loop(self, train_data, val_data, train_eval_data):
def _train_loop(self, train_data, val_data, train_eval_data, time_limit=math.inf):
start_tic = time.time()
wh_loss = MaskedL1Loss(weight=self._cfg.center_net.wh_weight)
heatmap_loss = HeatmapFocalLoss(from_logits=True)
center_reg_loss = MaskedL1Loss(weight=self._cfg.center_net.center_reg_weight)
Expand All @@ -131,19 +137,27 @@ def _train_loop(self, train_data, val_data, train_eval_data):
center_reg_metric = mx.metric.Loss('CenterRegL1')

self._logger.info('Start training from [Epoch %d]', max(self._cfg.train.start_epoch, self.epoch))
mean_ap = [-1]
cp_name = ''
self._time_elapsed += time.time() - start_tic
for self.epoch in range(max(self._cfg.train.start_epoch, self.epoch), self._cfg.train.epochs):
epoch = self.epoch
tic = time.time()
last_tic = time.time()
if self._best_map >= 1.0:
self._logger.info('[Epoch %d] Early stopping as mAP is reaching 1.0', epoch)
break
wh_metric.reset()
center_reg_metric.reset()
heatmap_loss_metric.reset()
tic = time.time()
btic = time.time()
self.net.hybridize()

for i, batch in enumerate(train_data):
btic = time.time()
if self._time_elapsed > time_limit:
self._logger.warning(f'`time_limit={time_limit}` reached, exit early...')
return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map,
'time': self._time_elapsed, 'checkpoint': cp_name}
split_data = [
gluon.utils.split_and_load(batch[ind], ctx_list=self.ctx, batch_axis=0, even_split=False) for ind
in range(6)]
Expand Down Expand Up @@ -180,10 +194,12 @@ def _train_loop(self, train_data, val_data, train_eval_data):
self._logger.info(
'[Epoch {}][Batch {}], Speed: {:.3f} samples/sec, '
'LR={}, {}={:.3f}, {}={:.3f}, {}={:.3f}'.format(
epoch, i, batch_size / (time.time() - btic),
epoch, i, batch_size / (time.time() - last_tic),
self.trainer.learning_rate, name2, loss2, name3, loss3, name4, loss4))
btic = time.time()
last_tic = time.time()
self._time_elapsed += time.time() - btic

post_tic = time.time()
name2, loss2 = wh_metric.get()
name3, loss3 = center_reg_metric.get()
name4, loss4 = heatmap_loss_metric.get()
Expand All @@ -197,17 +213,20 @@ def _train_loop(self, train_data, val_data, train_eval_data):
self._logger.info('[Epoch %d] Validation: \n%s', epoch, val_msg)
current_map = float(mean_ap[-1])
if current_map > self._best_map:
cp_name = os.path.join(self._logdir, 'best_checkpoint.pkl')
cp_name = os.path.join(self._logdir, _BEST_CHECKPOINT_FILE)
self._logger.info('[Epoch %d] Current best map: %f vs previous %f, saved to %s',
self.epoch, current_map, self._best_map, cp_name)
self.save(cp_name)
self._best_map = current_map
if self._reporter:
self._reporter(epoch=epoch, map_reward=current_map)
self._time_elapsed += time.time() - btic
self._time_elapsed += time.time() - post_tic
# map on train data
tic = time.time()
map_name, mean_ap = self._evaluate(train_eval_data)
return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map, 'time': self._time_elapsed}
self._time_elapsed += time.time() - tic
return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map,
'time': self._time_elapsed, 'checkpoint': cp_name}

def _evaluate(self, val_data):
"""Test on validation dataset."""
Expand Down
2 changes: 2 additions & 0 deletions gluoncv/auto/estimators/conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
"""Shared configs"""
_BEST_CHECKPOINT_FILE = 'best_checkpoint.pkl'
47 changes: 33 additions & 14 deletions gluoncv/auto/estimators/faster_rcnn/faster_rcnn.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Faster RCNN Estimator."""
# pylint: disable=logging-not-lazy,abstract-method,unused-variable
# pylint: disable=logging-not-lazy,abstract-method,unused-variable,logging-format-interpolation
import os
import math
import time
import warnings

Expand All @@ -21,6 +22,7 @@
from ..base_estimator import BaseEstimator, set_default
from .utils import _get_lr_at_iter, _get_dataloader, _split_and_load
from ...data.dataset import ObjectDetectionDataset
from ..conf import _BEST_CHECKPOINT_FILE

try:
import horovod.mxnet as hvd
Expand Down Expand Up @@ -59,8 +61,9 @@ def __init__(self, config, logger=None, reporter=None):
super(FasterRCNNEstimator, self).__init__(config, logger, reporter)
self.batch_size = self._cfg.train.batch_size

def _fit(self, train_data, val_data):
def _fit(self, train_data, val_data, time_limit=math.inf):
"""Fit Faster R-CNN model."""
tic = time.time()
self._best_map = 0
self.epoch = 0
self._time_elapsed = 0
Expand All @@ -71,9 +74,11 @@ def _fit(self, train_data, val_data):
self.net.collect_params().setattr('grad_req', 'null')
self.net.collect_train_params().setattr('grad_req', 'write')
self._init_trainer()
return self._resume_fit(train_data, val_data)
self._time_elapsed += time.time() - tic
return self._resume_fit(train_data, val_data, time_limit=time_limit)

def _resume_fit(self, train_data, val_data):
def _resume_fit(self, train_data, val_data, time_limit=math.inf):
tic = time.time()
if max(self._cfg.train.start_epoch, self.epoch) >= self._cfg.train.epochs:
return {'time', self._time_elapsed}
if not self.classes or not self.num_class:
Expand All @@ -87,10 +92,11 @@ def _resume_fit(self, train_data, val_data):
train_loader, val_loader, train_eval_loader = _get_dataloader(
self.net, train_dataset, val_dataset, FasterRCNNDefaultTrainTransform,
FasterRCNNDefaultValTransform, self.batch_size, len(self.ctx), self._cfg)
self._time_elapsed += time.time() - tic
return self._train_loop(train_loader, val_loader, train_eval_loader, time_limit=time_limit)

return self._train_loop(train_loader, val_loader, train_eval_loader)

def _train_loop(self, train_data, val_data, train_eval_data):
def _train_loop(self, train_data, val_data, train_eval_data, time_limit=math.inf):
start_tic = time.time()
# loss and metric
rpn_cls_loss = gluon.loss.SigmoidBinaryCrossEntropyLoss(from_sigmoid=False)
rpn_box_loss = gluon.loss.HuberLoss(rho=self._cfg.train.rpn_smoothl1_rho) # == smoothl1
Expand Down Expand Up @@ -120,12 +126,16 @@ def _train_loop(self, train_data, val_data, train_eval_data):
self.net.collect_params().reset_ctx(self.ctx)
self.net.target_generator.collect_params().reset_ctx(self.ctx)

mean_ap = [-1]
cp_name = ''
self._time_elapsed += time.time() - start_tic
for self.epoch in range(max(self._cfg.train.start_epoch, self.epoch), self._cfg.train.epochs):
epoch = self.epoch
tic = time.time()
last_tic = time.time()
if self._best_map >= 1.0:
self._logger.info('[Epoch %d] Early stopping as mAP is reaching 1.0', epoch)
break
btic = time.time()
rcnn_task = ForwardBackwardTask(self.net, self.trainer, rpn_cls_loss, rpn_box_loss,
rcnn_cls_loss, rcnn_box_loss, mix_ratio=1.0,
amp_enabled=self._cfg.faster_rcnn.amp)
Expand All @@ -149,10 +159,14 @@ def _train_loop(self, train_data, val_data, train_eval_data):
self._logger.info("[Epoch %d] Set learning rate to %f", epoch, new_lr)
for metric in metrics:
metric.reset()
tic = time.time()
base_lr = self.trainer.learning_rate
rcnn_task.mix_ratio = mix_ratio
for i, batch in enumerate(train_data):
btic = time.time()
if self._time_elapsed > time_limit:
self._logger.warning(f'`time_limit={time_limit}` reached, exit early...')
return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map,
'time': self._time_elapsed, 'checkpoint': cp_name}
if epoch == 0 and i <= lr_warmup:
# adjust based on real percentage
new_lr = base_lr * _get_lr_at_iter(i / lr_warmup,
Expand Down Expand Up @@ -194,9 +208,11 @@ def _train_loop(self, train_data, val_data, train_eval_data):
self._logger.info('[Epoch {}][Batch {}], Speed: {:.3f} samples/sec, {}'.format(
epoch, i,
self._cfg.train.log_interval * self.batch_size / (
time.time() - btic), msg))
btic = time.time()
time.time() - last_tic), msg))
last_tic = time.time()
self._time_elapsed += time.time() - btic

post_tic = time.time()
if not self._cfg.horovod or hvd.rank() == 0:
msg = ','.join(['{}={:.3f}'.format(*metric.get()) for metric in metrics])
# pylint: disable=logging-format-interpolation
Expand All @@ -209,17 +225,20 @@ def _train_loop(self, train_data, val_data, train_eval_data):
self._logger.info('[Epoch {}] Validation: \n{}'.format(epoch, val_msg))
current_map = float(mean_ap[-1])
if current_map > self._best_map:
cp_name = os.path.join(self._logdir, 'best_checkpoint.pkl')
cp_name = os.path.join(self._logdir, _BEST_CHECKPOINT_FILE)
self._logger.info('[Epoch %d] Current best map: %f vs previous %f, saved to %s',
self.epoch, current_map, self._best_map, cp_name)
self.save(cp_name)
self._best_map = current_map
if self._reporter:
self._reporter(epoch=epoch, map_reward=current_map)
self._time_elapsed += time.time() - btic
self._time_elapsed += time.time() - post_tic
# map on train data
tic = time.time()
map_name, mean_ap = self._evaluate(train_eval_data)
return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map, 'time': self._time_elapsed}
self._time_elapsed += time.time() - tic
return {'train_map': float(mean_ap[-1]), 'valid_map': self._best_map,
'time': self._time_elapsed, 'checkpoint': cp_name}

def _evaluate(self, val_data):
"""Evaluate on validation dataset."""
Expand Down
Loading

0 comments on commit 4280cb1

Please sign in to comment.