Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Input batch sharding strategy BATCH #884

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

apoorvtintin
Copy link
Contributor

@apoorvtintin apoorvtintin commented Dec 11, 2024

Axlearn currently supports DataPartitionType strategies FULL and REPLICATED for input batches. This PR adds support for a third sharding strategy DataPartitionType.BATCH.

This sharding strategy shards the input batch on sharding axis inferred as "batch_axis". This ensures batches are replicated on the "model" dimension and thus avoids unnecessary collectives to reshard input batches when Tensor parallelism is used.

@apoorvtintin apoorvtintin force-pushed the mainline_upstream_input_sharding branch 2 times, most recently from 79a8c21 to 03052e4 Compare December 11, 2024 21:10
@@ -607,6 +610,7 @@ def host_to_global_device_array(
host_arrays: Nested[Union[np.ndarray, Tensor]],
*,
partition: DataPartitionType = DataPartitionType.FULL,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @markblee plans to remove the DataPartitionType enum and rely on https://jax.readthedocs.io/en/latest/_autosummary/jax.make_array_from_process_local_data.html to support flexible partition specs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks that sounds promising.

Hello @markblee, let me know if this PR is needed till you make your changes, or if you have your design in mind I can reshape the PR to make it compatible with your design.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think make_array_from_process_local_data is already merged: #781

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, support for make_array_from_process_local_data has been merged. There doesn't seem to be many remaining advantages of passing both a partition type and aux information like batch axis names; instead, we can take partition specs directly:

DataPartitionType.FULL -> input_partition_spec()
DataPartitionType.REPLICATED -> None
DataPartitionType.BATCH -> PartitionSpec(batch_axis_names)

This brings the API closer to jax native, but may require more changes internally.

@hanzhi713
Copy link
Member

@apoorvtintin With this new sharding strategy, how do you plan to support model parallelism across hosts (i.e. per host bs < 1)? If this is not a supported use case, then this change LGTM. I could imagine the support for model parallelism across hosts is not needed in the short term for neuron since the number of chips per host is large.

@kelvin-zou
Copy link
Contributor

kelvin-zou commented Jan 9, 2025

Sorry for the late review, I should've checked this earlier.
@apoorvtintin can you actually look into logical batch and use that instead (

- Convert a global physical batch to a global logical batch (physical_to_logical_batch).
)? From what I see here, this feature is a bit redundant (and less flexible as well) than the logical batch size, and with that, probably you don't need this PR if you just want to run something like batch_size=0.5 or 0.25 per chip with model parallelism.

@@ -423,6 +423,7 @@ def get_trainer_kwargs(
raise NotImplementedError(f"Unknown model size {model_size}.")
model_kwargs = trainer_kwargs.pop("model_kwargs")
model_kwargs.setdefault("vocab_size", vocab_size)
trainer_kwargs["input_partition_type"] = None if backend != "neuron" else DataPartitionType.BATCH
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please go with a configmodifier instead of hardcode in the code, otherwise it becomes hard for people to debug

),
out_shardings=dict(
replicated=None,
per_example=utils.input_partition_spec(),
per_example=utils.data_partition_type_to_spec( partition=self.config.input_partition_type, batch_axis_names=self.config.batch_axis_names),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you run through pylint? this line seems quite long.

"""Returns a PartitionSpec for the given partition type."""
if partition == DataPartitionType.FULL:
return input_partition_spec()
elif partition == DataPartitionType.REPLICATED:
return None
elif partition == DataPartitionType.BATCH:
return PartitionSpec(batch_axis_names)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of directly assigning PartitionSpec, maybe extend input_partition_spec with an argument batch_axis_names, with default None?

@@ -37,16 +36,15 @@ def is_supported(
)
)


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you using a different pylint style? Those lines should not be removed.

@hanzhi713
Copy link
Member

Sorry for the late review, I should've checked this earlier. @apoorvtintin can you actually look into logical batch and use that instead (

- Convert a global physical batch to a global logical batch (physical_to_logical_batch).

)? From what I see here, this feature is a bit redundant (and less flexible as well) than the logical batch size, and with that, probably you don't need this PR if you just want to run something like batch_size=0.5 or 0.25 per chip with model parallelism.

I think the primary purpose of this PR (as stated in the description) is to avoid the resharding cost from physical bs to logical bs when there's no need to shard data along model axis. This could remove the collective for inputs re-sharding and thus reduce step time.

@apoorvtintin May I know how much time do we save by removing this collective? If the benefit is not significant, we can stick to the old code.

@kelvin-zou
Copy link
Contributor

I think the primary purpose of this PR (as stated in the description) is to avoid the resharding cost from physical bs to logical bs when there's no need to shard data along model axis. This could remove the collective for inputs re-sharding and thus reduce step time.

But input collective is only triggered within host, which should be insignificant?

@hanzhi713
Copy link
Member

I think the primary purpose of this PR (as stated in the description) is to avoid the resharding cost from physical bs to logical bs when there's no need to shard data along model axis. This could remove the collective for inputs re-sharding and thus reduce step time.

But input collective is only triggered within host, which should be insignificant?

The collective is on-device.

@apoorvtintin
Copy link
Contributor Author

@apoorvtintin With this new sharding strategy, how do you plan to support model parallelism across hosts (i.e. per host bs < 1)? If this is not a supported use case, then this change LGTM. I could imagine the support for model parallelism across hosts is not needed in the short term for neuron since the number of chips per host is large.

@hanzhi713 Tensor parallelism is usually used within a device. But yes theoretically tensor parallelism can be across hosts with per host bs<1. To answer your question, yes this PR does not support that case.

@apoorvtintin
Copy link
Contributor Author

apoorvtintin commented Jan 15, 2025

Sorry for the late review, I should've checked this earlier. @apoorvtintin can you actually look into logical batch and use that instead (

- Convert a global physical batch to a global logical batch (physical_to_logical_batch).

)? From what I see here, this feature is a bit redundant (and less flexible as well) than the logical batch size, and with that, probably you don't need this PR if you just want to run something like batch_size=0.5 or 0.25 per chip with model parallelism.

@kelvin-zou I checked out fractional batches (logical to physical batches). This feature pads '0' batches to get to at least 1 batch per device. Please correct me if I missed something

For sequence parallelism we need all model axis ranks in a group to have replicated batches. Yes theoretically it means we have only 0.25 batches per device (when fsdp=16 and model=4 ==> 16 batches * 4 replicas = 64). If I use fractional batches with per device BS of 0.25, I would get 64 batches where the first 16 batches will be real inputs but the next 48 will be padded 0s.

@apoorvtintin
Copy link
Contributor Author

apoorvtintin commented Jan 15, 2025

Sorry for the late review, I should've checked this earlier. @apoorvtintin can you actually look into logical batch and use that instead (

- Convert a global physical batch to a global logical batch (physical_to_logical_batch).

)? From what I see here, this feature is a bit redundant (and less flexible as well) than the logical batch size, and with that, probably you don't need this PR if you just want to run something like batch_size=0.5 or 0.25 per chip with model parallelism.

I think the primary purpose of this PR (as stated in the description) is to avoid the resharding cost from physical bs to logical bs when there's no need to shard data along model axis. This could remove the collective for inputs re-sharding and thus reduce step time.

Correct me if I am wrong, Axlearn currently does not allow input batches to be sharded along the sequence dimension which can be sharded along the model axis. Which means without this PR the only choices are:

  • Fractional batches: May not be enough as described in my previous reply.
  • Sharding REPLICATED: Not feasible as it uses too much memory.
  • Sharding FULL: We shard the batch axis along all axis available including model. Which means GBS == global_device_count. For optimal sequence parallel training we want GBS == FSDP*DP ranks.

Which brings us to this Sharding BATCH which is implemented by this PR. This allows us to have GBS == FSDP*DP ranks where batches are replicated across model axis.

@apoorvtintin May I know how much time do we save by removing this collective? If the benefit is not significant, we can stick to the old code.

That depends on the sharding strategy chosen from the options above, if Sharding FULL is chosen then each model parallel group will have to run a reshard to sync batches (like a TP all gather). This example glosses over obvious problems with full sharding like higher batch size (4 times more than we can fit since model=4).

@apoorvtintin
Copy link
Contributor Author

But input collective is only triggered within host, which should be insignificant?

@kelvin-zou I assume your question assumes we use SHARDING FULL, which has a batch size problem even though we could theoretically achieve batches replicated along 'model' with an extra collective.

Copy link
Contributor

@markblee markblee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late review.

@@ -607,6 +610,7 @@ def host_to_global_device_array(
host_arrays: Nested[Union[np.ndarray, Tensor]],
*,
partition: DataPartitionType = DataPartitionType.FULL,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, support for make_array_from_process_local_data has been merged. There doesn't seem to be many remaining advantages of passing both a partition type and aux information like batch axis names; instead, we can take partition specs directly:

DataPartitionType.FULL -> input_partition_spec()
DataPartitionType.REPLICATED -> None
DataPartitionType.BATCH -> PartitionSpec(batch_axis_names)

This brings the API closer to jax native, but may require more changes internally.

@@ -1701,6 +1702,31 @@ def test_length(self):
class HostToGlobalArrayTest(TestCase):
"""Tests host_to_global_device_array."""

@pytest.mark.neuron
def test_partition_batch(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we able to pass the test_every_other_process only relying on make_array_from_process_local_data? If not, then it seems that input dispatch may still be needed.

@hanzhi713
Copy link
Member

@apoorvtintin With this new sharding strategy, how do you plan to support model parallelism across hosts (i.e. per host bs < 1)? If this is not a supported use case, then this change LGTM. I could imagine the support for model parallelism across hosts is not needed in the short term for neuron since the number of chips per host is large.

@hanzhi713 Tensor parallelism is usually used within a device. But yes theoretically tensor parallelism can be across hosts with per host bs<1. To answer your question, yes this PR does not support that case.

With fractional batches (logical batch), yes at first batches are padded to per device bs=1, but then it will be resharded to what the actual per device bs is here: https://github.com/apple/axlearn/blob/141239c3376638d551e10ff1094e6947e6a75f75/axlearn/common/trainer.py#L1139C38-L1139C59

This dispatch function will reshard physical batches to logical batches, e.g. ("data", "fsdp", "model") -> ("data", "fsdp") through an einsum. After this point, batches should be replicated along model parallel axis as you would expect.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants