Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/PaddlePaddle/hapi into fi…
Browse files Browse the repository at this point in the history
…x-data-train
  • Loading branch information
guoshengCS committed Mar 31, 2020
2 parents dd44668 + 4d22fee commit 863897c
Show file tree
Hide file tree
Showing 5 changed files with 480 additions and 306 deletions.
22 changes: 14 additions & 8 deletions callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import copy

from progressbar import ProgressBar
from distributed import get_local_rank
from paddle.fluid.dygraph.parallel import ParallelEnv


def config_callbacks(callbacks=None,
Expand Down Expand Up @@ -195,7 +195,7 @@ def on_epoch_begin(self, epoch=None, logs=None):
self.steps = self.params['steps']
self.epoch = epoch
self.train_step = 0
if self.verbose and self.epochs and get_local_rank() == 0:
if self.verbose and self.epochs and ParallelEnv().local_rank == 0:
print('Epoch %d/%d' % (epoch + 1, self.epochs))
self.train_progbar = ProgressBar(num=self.steps, verbose=self.verbose)

Expand All @@ -213,8 +213,8 @@ def on_train_batch_end(self, step, logs=None):
logs = logs or {}
self.train_step += 1

if self.train_step % self.log_freq == 0 and self.verbose and get_local_rank(
) == 0:
if self.train_step % self.log_freq == 0 and self.verbose and ParallelEnv(
).local_rank == 0:
# if steps is not None, last step will update in on_epoch_end
if self.steps and self.train_step < self.steps:
self._updates(logs, 'train')
Expand All @@ -223,7 +223,7 @@ def on_train_batch_end(self, step, logs=None):

def on_epoch_end(self, epoch, logs=None):
logs = logs or {}
if self.verbose and get_local_rank() == 0:
if self.verbose and ParallelEnv().local_rank == 0:
self._updates(logs, 'train')

def on_eval_begin(self, logs=None):
Expand All @@ -233,7 +233,7 @@ def on_eval_begin(self, logs=None):
self.evaled_samples = 0
self.eval_progbar = ProgressBar(
num=self.eval_steps, verbose=self.verbose)
if get_local_rank() == 0:
if ParallelEnv().local_rank == 0:
print('Eval begin...')

def on_eval_batch_end(self, step, logs=None):
Expand All @@ -242,9 +242,15 @@ def on_eval_batch_end(self, step, logs=None):
samples = logs.get('batch_size', 1)
self.evaled_samples += samples

if self.eval_step % self.log_freq == 0 and self.verbose and ParallelEnv(
).local_rank == 0:
# if steps is not None, last step will update in on_epoch_end
if self.eval_steps and self.eval_step < self.eval_steps:
self._updates(logs, 'eval')

def on_eval_end(self, logs=None):
logs = logs or {}
if self.verbose and get_local_rank() == 0:
if self.verbose and ParallelEnv().local_rank == 0:
self._updates(logs, 'eval')
print('Eval samples: %d' % (self.evaled_samples))

Expand All @@ -258,7 +264,7 @@ def on_epoch_begin(self, epoch=None, logs=None):
self.epoch = epoch

def _is_save(self):
return self.model and self.save_dir and get_local_rank() == 0
return self.model and self.save_dir and ParallelEnv().local_rank == 0

def on_epoch_end(self, epoch, logs=None):
if self._is_save() and self.epoch % self.save_freq == 0:
Expand Down
163 changes: 58 additions & 105 deletions distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,20 @@
# limitations under the License.
import os
import sys
import six
import time
import math
import socket
import contextlib
from contextlib import closing
from six import string_types
import numpy as np
from collections import OrderedDict
from paddle import fluid
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core

from paddle.fluid import framework
from paddle import fluid
from paddle.fluid.layers import collective
from paddle.fluid.dygraph import to_variable, no_grad, layers
from paddle.fluid.framework import Variable
from paddle.fluid.executor import global_scope
from paddle.fluid.dygraph.parallel import ParallelEnv, ParallelStrategy
from paddle.fluid.io import BatchSampler

from paddle.fluid.dygraph.parallel import Env, DataParallel, ParallelStrategy
from paddle.fluid.layers.collective import _c_allreduce, _c_allgather, _c_broadcast, \
_c_sync_comm_stream, _c_sync_calc_stream
from paddle.fluid.io import BatchSampler, DataLoader
_parallel_context_initialized = False

__parallel_context_init = False

class DistributedBatchSampler(BatchSampler):
"""Sampler that restricts data loading to a subset of the dataset.
Expand Down Expand Up @@ -71,11 +61,13 @@ def __init__(self, dataset, batch_size, shuffle=False, drop_last=False):
self.shuffle = shuffle
assert isinstance(drop_last, bool), \
"drop_last should be a boolean number"

self.drop_last = drop_last
self.nranks = get_nranks()
self.local_rank = get_local_rank()
self.nranks = ParallelEnv().nranks
self.local_rank = ParallelEnv().local_rank
self.epoch = 0
self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.nranks))
self.num_samples = int(
math.ceil(len(self.dataset) * 1.0 / self.nranks))
self.total_size = self.num_samples * self.nranks

def __iter__(self):
Expand All @@ -86,9 +78,28 @@ def __iter__(self):
if self.shuffle:
np.random.RandomState(self.epoch).shuffle(indices)
self.epoch += 1

# subsample
indices = indices[self.local_rank * self.num_samples:
(self.local_rank + 1) * self.num_samples]
def _get_indices_by_batch_size(indices):
subsampled_indices = []
last_batch_size = self.total_size % (self.batch_size * self.nranks)
assert last_batch_size % self.nranks == 0
last_local_batch_size = last_batch_size // self.nranks

for i in range(self.local_rank * self.batch_size,
len(indices) - last_batch_size,
self.batch_size * self.nranks):
subsampled_indices.extend(indices[i:i + self.batch_size])

indices = indices[len(indices) - last_batch_size:]
subsampled_indices.extend(indices[
self.local_rank * last_local_batch_size:(
self.local_rank + 1) * last_local_batch_size])
return subsampled_indices

if self.nranks > 1:
indices = _get_indices_by_batch_size(indices)

assert len(indices) == self.num_samples
_sample_iter = iter(indices)

Expand All @@ -106,46 +117,37 @@ def __len__(self):
num_samples += int(not self.drop_last) * (self.batch_size - 1)
return num_samples // self.batch_size


def _all_gather(x, nranks, ring_id=0, use_calc_stream=True):
return _c_allgather(x, nranks, ring_id=ring_id, use_calc_stream=use_calc_stream)


def get_local_rank():
return Env().local_rank
def set_epoch(self, epoch):
self.epoch = epoch


def get_nranks():
return Env().nranks
def _all_gather(x, nranks, ring_id=0, use_calc_stream=True):
return collective._c_allgather(
x, nranks, ring_id=ring_id, use_calc_stream=use_calc_stream)


def wait_server_ready(endpoints):
assert not isinstance(endpoints, string_types)
assert not isinstance(endpoints, six.string_types)
while True:
all_ok = True
not_ready_endpoints = []
for ep in endpoints:
ip_port = ep.split(":")
with closing(
socket.socket(socket.AF_INET,
socket.SOCK_STREAM)) as sock:
with contextlib.closing(
socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex((ip_port[0], int(ip_port[1])))
if result != 0:
all_ok = False
not_ready_endpoints.append(ep)
if not all_ok:
sys.stderr.write("server not ready, wait 3 sec to retry...\n")
sys.stderr.write("not ready endpoints:" + str(
not_ready_endpoints) + "\n")
sys.stderr.flush()
time.sleep(3)
else:
break


def init_communicator(program, rank, nranks, wait_port,
current_endpoint, endpoints):
def init_communicator(program, rank, nranks, wait_port, current_endpoint,
endpoints):
if nranks < 2:
return
other_endpoints = endpoints[:]
Expand All @@ -154,9 +156,9 @@ def init_communicator(program, rank, nranks, wait_port,
wait_server_ready(other_endpoints)
block = program.global_block()
nccl_id_var = block.create_var(
name=nameGen.generate('nccl_id'),
name=fluid.unique_name.generate('nccl_id'),
persistable=True,
type=core.VarDesc.VarType.RAW)
type=fluid.core.VarDesc.VarType.RAW)

block.append_op(
type='c_gen_nccl_id',
Expand All @@ -181,25 +183,28 @@ def init_communicator(program, rank, nranks, wait_port,

def prepare_distributed_context(place=None):
if place is None:
place = fluid.CUDAPlace(Env().dev_id) if Env().nranks > 1 \
place = fluid.CUDAPlace(ParallelEnv().dev_id) if ParallelEnv().nranks > 1 \
else fluid.CUDAPlace(0)

strategy = ParallelStrategy()
strategy.nranks = Env().nranks
strategy.local_rank = Env().local_rank
strategy.trainer_endpoints = Env().trainer_endpoints
strategy.current_endpoint = Env().current_endpoint
strategy.nranks = ParallelEnv().nranks
strategy.local_rank = ParallelEnv().local_rank
strategy.trainer_endpoints = ParallelEnv().trainer_endpoints
strategy.current_endpoint = ParallelEnv().current_endpoint

if strategy.nranks < 2:
return

global __parallel_context_init
global _parallel_context_initialized

if not _parallel_context_initialized and isinstance(place,
fluid.CUDAPlace):

if not __parallel_context_init and isinstance(place, core.CUDAPlace):
def _init_context():
communicator_prog = framework.Program()
init_communicator(communicator_prog, strategy.local_rank, strategy.nranks,
True, strategy.current_endpoint, strategy.trainer_endpoints)
communicator_prog = fluid.Program()
init_communicator(communicator_prog, strategy.local_rank,
strategy.nranks, True, strategy.current_endpoint,
strategy.trainer_endpoints)
exe = fluid.Executor(place)
exe.run(communicator_prog)

Expand All @@ -213,57 +218,5 @@ def _init_context():
else:
assert ("Only support CUDAPlace for now.")

__parallel_context_init = True
_parallel_context_initialized = True
return strategy


class DistributedDataParallel(DataParallel):
def __init__(self, layers, strategy=None):
if strategy is None:
strategy = ParallelStrategy()
strategy.nranks = Env().nranks
strategy.local_rank = Env().local_rank
strategy.trainer_endpoints = Env().trainer_endpoints
strategy.current_endpoint = Env().current_endpoint

super(DistributedDataParallel, self).__init__(layers, strategy)

@no_grad
def apply_collective_grads(self):
"""
AllReduce the Parameters' gradient.
"""
if not self._is_data_parallel_mode():
return

grad_var_set = set()
grad_vars = []
for param in self._layers.parameters():
# NOTE(zcd): The grad_ivar maybe no generated.
if param.trainable and param._grad_ivar():
g_var = param._grad_ivar()
grad_vars.append(g_var)
assert g_var not in grad_var_set
grad_var_set.add(g_var)

mega_bytes = 128 * 1024 * 1024
group_idx = 0
memory_counter = 0
grad_var_groups = OrderedDict()
dtype = grad_vars[0].dtype
for g_var in grad_vars:
# Note: the dtype of the same group should be the same.
bytes = np.prod(g_var.shape) * core.size_of_dtype(g_var.dtype)
if memory_counter < mega_bytes and dtype == g_var.dtype:
memory_counter += bytes
else:
memory_counter = bytes
group_idx += 1
grad_var_groups.setdefault(group_idx, []).append(g_var)

coalesced_grads_and_vars = self._coalesce_tensors(grad_var_groups)

for coalesced_grad, _, _ in coalesced_grads_and_vars:
collective._c_allreduce(coalesced_grad, coalesced_grad, use_calc_stream=True)

self._split_tensors(coalesced_grads_and_vars)
15 changes: 12 additions & 3 deletions mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from paddle.fluid.dygraph.nn import Conv2D, Pool2D, Linear
from paddle.fluid.io import MNIST as MnistDataset

from model import Model, CrossEntropy, Input, init_context
from model import Model, CrossEntropy, Input, set_device
from metrics import Accuracy


Expand Down Expand Up @@ -106,7 +106,8 @@ def forward(self, inputs):


def main():
init_context('dynamic' if FLAGS.dynamic else 'static')
device = set_device(FLAGS.device)
fluid.enable_dygraph(device) if FLAGS.dynamic else None

train_dataset = MnistDataset(mode='train')
val_dataset = MnistDataset(mode='test')
Expand All @@ -118,7 +119,13 @@ def main():
optim = Momentum(
learning_rate=FLAGS.lr, momentum=.9, parameter_list=model.parameters())

model.prepare(optim, CrossEntropy(), Accuracy(topk=(1, 2)), inputs, labels)
model.prepare(
optim,
CrossEntropy(),
Accuracy(topk=(1, 2)),
inputs,
labels,
device=FLAGS.device)
if FLAGS.resume is not None:
model.load(FLAGS.resume)

Expand All @@ -131,6 +138,8 @@ def main():

if __name__ == '__main__':
parser = argparse.ArgumentParser("CNN training on MNIST")
parser.add_argument(
"--device", type=str, default='gpu', help="device to use, gpu or cpu")
parser.add_argument(
"-d", "--dynamic", action='store_true', help="enable dygraph mode")
parser.add_argument(
Expand Down
Loading

0 comments on commit 863897c

Please sign in to comment.