Skip to content

Commit

Permalink
fix SingleProcessMultiThread mode, add fuse_allreduce support (Paddle…
Browse files Browse the repository at this point in the history
…Paddle#163)

fix SingleProcessMultiThread mode, add fuse_allreduce support
  • Loading branch information
qingshui authored and root committed Nov 28, 2022
1 parent 0611054 commit 8d73deb
Showing 1 changed file with 156 additions and 16 deletions.
172 changes: 156 additions & 16 deletions python/paddle/fluid/transpiler/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,17 +281,15 @@ def _insert_scale_loss_grad_ops(self):
for idx, op in reversed(list(enumerate(block.ops))):
if self._is_loss_grad_op(op):
loss_grad_var = block.vars[op.output_arg_names[0]]
block._insert_op(
idx + 1,
type='scale',
inputs={'X': loss_grad_var},
outputs={'Out': loss_grad_var},
attrs={
'scale': 1.0 / self.nranks,
self.op_role_key: OpRole.Backward,
},
)

block._insert_op(idx + 1,
type='scale',
inputs={'X': loss_grad_var},
outputs={'Out': loss_grad_var},
attrs={
'scale': 1.0 / self.nranks,
self.op_role_key: OpRole.Backward
})

def _insert_allreduce_ops(self):
block = self.main_program.global_block()
ring_id = -1
Expand Down Expand Up @@ -473,16 +471,23 @@ def _transpile_main_program(self):


class SingleProcessMultiThread(GradAllReduce):
'''
'''
"""
single process multi thread mode
"""
def __init__(self):
GradAllReduce.__init__(self, 1)
self.mode = "single_process_multi_thread"
self.fuse_allreduce = os.getenv("FLAGS_fuse_allreduce", False)
self.gpu_nums = os.getenv("FLAGS_selected_gpus",
"0,1,2,3,4,5,6,7,8").split(",")

def _transpile_startup_program(self):
# block = self.startup_program.global_block()
# block.append_op(type='c_comm_init_all', attrs={'ring_id': 0})
nodes_num = 0
if len(self.endpoints) > 1:
nodes_num = len(set([x.split(':')[0] for x in self.endpoints]))
# diffent ip num is multi node
if nodes_num > 1:
self.nranks = nodes_num
print("begin to _transpile_startup_program for multi-node")
print("current_endpoint: ", self.current_endpoint)
print("total endpoints: ", self.endpoints)
Expand All @@ -493,10 +498,145 @@ def _transpile_startup_program(self):
self.rank, ring_id, self.wait_port,
True)
else:
self.nranks = 1
print("begin to _transpile_startup_program for single-node")
block = self.startup_program.global_block()
block.append_op(type='c_comm_init_all', attrs={'ring_id': 0})


def _transpile_main_program(self):
if self._get_update_param_count() == 0:
return
# scale loss
self._insert_scale_loss_grad_ops()
# fuse allreduce
if self.fuse_allreduce:
print("begin used fuse_allreduce")
# use fuse allreduce
self._insert_fuse_allreduce_ops()
else:
self._insert_allreduce_ops()

def _get_update_param_count(self):
"""
get need update param count
"""
param_count = 0
block = self.main_program.global_block()
for idx, op in reversed(list(enumerate(block.ops))):
if not self._is_backward_op(op):
continue
if not self.op_role_var_key in op.attr_names:
continue
op_role_var = op.all_attrs()[self.op_role_var_key]
if len(op_role_var) == 0:
continue

assert len(op_role_var) % 2 == 0
for i in range(0, len(op_role_var), 2):
param = block.vars[op_role_var[i]]
if param.is_distributed:
continue
param_count = param_count + 1

return param_count

def _insert_scale_loss_grad_ops(self):
'''
In order to keep the learning rate consistent in different numbers of
training workers, we scale the loss grad by the number of workers
'''
scale = 1.0 / self.nranks / self.gpu_nums
block = self.main_program.global_block()
for idx, op in reversed(list(enumerate(block.ops))):
if not self._is_loss_grad_op(op):
continue
loss_grad_var = block.vars[op.output_arg_names[0]]
block._insert_op(idx + 1,
type='scale',
inputs={'X': loss_grad_var},
outputs={'Out': loss_grad_var},
attrs={
'scale': scale,
self.op_role_key: OpRole.Backward
})

def _insert_fuse_allreduce_ops(self):
"""
insert coalesce_tensor and all reduce ops
"""
block = self.main_program.global_block()
ring_id = -1
grad = None
input_grads = []
global_offset = 0 # find insert offset of fuse tensor, after the max dense grad offset
for idx, op in reversed(list(enumerate(block.ops))):
if self._is_backward_op(op) and \
self.op_role_var_key in op.attr_names:
op_role_var = op.all_attrs()[self.op_role_var_key]
if len(op_role_var) == 0:
continue
assert len(op_role_var) % 2 == 0
offset = idx
for i in range(0, len(op_role_var), 2):
param = block.vars[op_role_var[i]]
grad = block.vars[op_role_var[i + 1]]
if param.is_distributed:
continue
if offset == idx:
input_grads.append(grad)
global_offset = max(global_offset, offset + 1)
if grad is None:
return

# init output_grads
output_grads = input_grads
# init fused_output with temp shape, it will calculate real shape depend on inputs
fused_output = block.create_var(name="fused_output", shape=[1],
persistable=False,
dtype=core.VarDesc.VarType.FP32,
stop_gradient=True)
# fuse all grad tensors
coalesce_tensor_attrs = {
"copy_data": True,
"set_constant": False,
"dtype": core.VarDesc.VarType.FP32
}
block._insert_op(global_offset,
type='coalesce_tensor',
inputs={'Input': input_grads},
outputs={'Output': output_grads, 'FusedOutput': fused_output},
attrs=coalesce_tensor_attrs)
global_offset += 1
# grads aggregation of multi-gpus
block._insert_op(global_offset,
type='c_sync_calc_stream',
inputs={'X': fused_output},
outputs={'Out': fused_output},
attrs={self.op_role_key: OpRole.Backward})
global_offset += 1
ring_id = (ring_id + 1) % self.nrings
block._insert_op(
global_offset,
type='c_allreduce_sum',
inputs={'X': fused_output},
outputs={'Out': fused_output},
attrs={
'ring_id': ring_id,
self.op_role_key: OpRole.Backward
})
global_offset += 1

# sync before adam
block._insert_op(
global_offset,
type='c_sync_comm_stream',
inputs={'X': fused_output},
outputs={'Out': fused_output},
attrs={
'ring_id': ring_id,
self.op_role_key: OpRole.Backward
})
global_offset += 1

class MultiThread(GradAllReduce):
''' '''
Expand Down

0 comments on commit 8d73deb

Please sign in to comment.