Skip to content

Commit

Permalink
fix pylayer py39 mem leak (#56722)
Browse files Browse the repository at this point in the history
  • Loading branch information
wanghuancoder authored Aug 29, 2023
1 parent 1851543 commit 69db934
Showing 1 changed file with 53 additions and 59 deletions.
112 changes: 53 additions & 59 deletions python/paddle/distributed/fleet/layers/mpu/mp_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,27 @@
from ....communication.reduce import ReduceOp, _get_reduce_op


class c_identity_eager(paddle.autograd.PyLayer):
@staticmethod
def forward(ctx, tensor, group=None):
ctx.group = group
return _legacy_C_ops.c_identity(
tensor,
'use_calc_stream',
True,
'ring_id',
group.id,
'use_model_parallel',
True,
)

@staticmethod
def backward(ctx, dy):
op_type = _get_reduce_op(ReduceOp.SUM, "_c_identity")
ctx.group.process_group.all_reduce_on_calc_stream(dy, op_type)
return dy


def _c_identity(tensor, group=None):
"""
Return a copy of the tensor, mainly used with model parallel.
Expand All @@ -40,28 +61,7 @@ def _c_identity(tensor, group=None):
ring_id = 0 if group is None else group.id

if in_dynamic_mode():
from paddle.autograd import PyLayer

class c_identity_eager(PyLayer):
@staticmethod
def forward(ctx, tensor):
return _legacy_C_ops.c_identity(
tensor,
'use_calc_stream',
True,
'ring_id',
group.id,
'use_model_parallel',
True,
)

@staticmethod
def backward(ctx, dy):
op_type = _get_reduce_op(ReduceOp.SUM, "_c_identity")
group.process_group.all_reduce_on_calc_stream(dy, op_type)
return dy

return c_identity_eager.apply(tensor)
return c_identity_eager.apply(tensor, group)
else:
op_type = 'c_identity'
helper = LayerHelper(op_type, **locals())
Expand Down Expand Up @@ -215,6 +215,37 @@ def _c_split(tensor, group=None):
return out


class mp_allreduce_eager(paddle.autograd.PyLayer):
@staticmethod
def forward(ctx, tensor, group, use_calc_stream, use_model_parallel):
ctx.ring_id = group.id

if use_calc_stream:
op_type = _get_reduce_op(ReduceOp.SUM, "_mp_allreduce")
group.process_group.all_reduce_on_calc_stream(tensor, op_type)
return tensor
else:
return _legacy_C_ops.c_allreduce_sum_(
tensor,
'use_calc_stream',
use_calc_stream,
'ring_id',
group.id,
)

@staticmethod
def backward(ctx, dy):
return _legacy_C_ops.c_identity(
dy,
'use_calc_stream',
True,
'ring_id',
ctx.ring_id,
'use_model_parallel',
True,
)


def _mp_allreduce(
tensor,
op=ReduceOp.SUM,
Expand All @@ -229,43 +260,6 @@ def _mp_allreduce(
if in_dynamic_mode():
group = collective._get_default_group() if group is None else group
assert op == ReduceOp.SUM, f"Unknown parameter: {op}."

from paddle.autograd import PyLayer

class mp_allreduce_eager(PyLayer):
@staticmethod
def forward(
ctx, tensor, group, use_calc_stream, use_model_parallel
):
ctx.ring_id = group.id

if use_calc_stream:
op_type = _get_reduce_op(op, "_mp_allreduce")
group.process_group.all_reduce_on_calc_stream(
tensor, op_type
)
return tensor
else:
return _legacy_C_ops.c_allreduce_sum_(
tensor,
'use_calc_stream',
use_calc_stream,
'ring_id',
ring_id,
)

@staticmethod
def backward(ctx, dy):
return _legacy_C_ops.c_identity(
dy,
'use_calc_stream',
True,
'ring_id',
ctx.ring_id,
'use_model_parallel',
True,
)

return mp_allreduce_eager.apply(
tensor, group, use_calc_stream, use_model_parallel
)
Expand Down

0 comments on commit 69db934

Please sign in to comment.