Skip to content

Commit

Permalink
a2a fix removed tp world size and group from init (NVIDIA#8944) (NVID…
Browse files Browse the repository at this point in the history
…IA#8952)

Signed-off-by: Anmol Gupta <[email protected]>
Co-authored-by: anmolgupt <[email protected]>
Co-authored-by: Eric Harper <[email protected]>
  • Loading branch information
3 people authored Jun 5, 2024
1 parent 3d4f1f0 commit a802187
Showing 1 changed file with 12 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ def __init__(
self.alpha = alpha if alpha is not None else self.dim
self.input_is_parallel = input_is_parallel
self.dropout_position = dropout_position
self.tp_world_size = None
self.tp_group = None
self.use_a2a = a2a_experimental

# megatron_gpt_peft_models will provide this arg, but deprecated ones do not.
Expand Down Expand Up @@ -212,8 +210,6 @@ def __init__(
lin_out_gather_output = True if input_is_parallel else False
if self.use_a2a and input_is_parallel and self._sequence_parallel:
lin_out_gather_output = False
self.tp_world_size = get_tensor_model_parallel_world_size()
self.tp_group = get_tensor_model_parallel_group()
self.linear_out = ColumnParallelLinear(
dim,
out_features,
Expand Down Expand Up @@ -309,7 +305,7 @@ def forward(self, x):
# this function also handles the backward pass correctly
if self.use_a2a:
# all2all hidden_size / TP to seq_len / TP
x = all2all_hp2sp(x, self.tp_world_size, self.tp_group)
x = all2all_hp2sp(x)
else:
x = scatter_to_sequence_parallel_region(x)

Expand All @@ -333,9 +329,9 @@ class _All2AllHp2Sp(torch.autograd.Function):
"""

@staticmethod
def forward(ctx, input_, world_size, group):
ctx.world_size = world_size
ctx.group = group
def forward(ctx, input_):
world_size = get_tensor_model_parallel_world_size()
group = get_tensor_model_parallel_group()
send_list = list(input_.chunk(world_size, dim=0))
send_list = [tensor.contiguous() for tensor in send_list]
receive_list = [torch.empty_like(send_list[0]) for _ in range(world_size)]
Expand All @@ -345,16 +341,18 @@ def forward(ctx, input_, world_size, group):

@staticmethod
def backward(ctx, grad_output):
send_list = list(grad_output.chunk(ctx.world_size, dim=-1))
world_size = get_tensor_model_parallel_world_size()
group = get_tensor_model_parallel_group()
send_list = list(grad_output.chunk(world_size, dim=-1))
send_list = [tensor.contiguous() for tensor in send_list]
receive_list = [torch.empty_like(send_list[0]) for _ in range(ctx.world_size)]
torch.distributed.all_to_all(receive_list, send_list, group=ctx.group)
receive_list = [torch.empty_like(send_list[0]) for _ in range(world_size)]
torch.distributed.all_to_all(receive_list, send_list, group=group)
x = torch.cat(receive_list, dim=0)
return x, None, None
return x


def all2all_hp2sp(input_, world_size, group):
return _All2AllHp2Sp.apply(input_, world_size, group)
def all2all_hp2sp(input_):
return _All2AllHp2Sp.apply(input_)


@dataclass
Expand Down

0 comments on commit a802187

Please sign in to comment.