Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error handling for accelerator="mps" and ddp strategy pairing #16153

Merged
merged 33 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d5f4a56
Added misconfigurationexception for incorrect accelerator/strategy pa…
Dec 21, 2022
6332a42
Improved check and added test
Dec 21, 2022
daa3599
updated test
Dec 21, 2022
2aec459
modified test to correct MPS
Dec 21, 2022
d8dc01b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 21, 2022
b4c5087
decorated with runif(mps=true)
Dec 21, 2022
9f8e4f8
Apply suggestions from code review
Borda Dec 21, 2022
1e6b914
Apply suggestions from code review
Borda Dec 21, 2022
511557a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 21, 2022
a0370de
Apply suggestions from code review
Borda Dec 21, 2022
d10250c
Merge branch 'master' into shenoy/mps-distributed-check
justusschock Dec 21, 2022
35a37e5
Added edge cases, improved error message and imprved tests
Dec 21, 2022
3db458c
Added hpu, tpu, ipu false backend condition
Dec 21, 2022
eca8ad8
Merge branch 'master' into shenoy/mps-distributed-check
shenoynikhil Dec 21, 2022
38a2b17
Merge branch 'Lightning-AI:master' into shenoy/mps-distributed-check
shenoynikhil Dec 21, 2022
10e6d75
commented out gpu mps setting
Dec 21, 2022
c3296e4
checking if removing not tpu not hpu checks work
Dec 21, 2022
a3d3028
fixed comments
Dec 21, 2022
603c8f4
Merge branch 'master' into shenoy/mps-distributed-check
shenoynikhil Dec 22, 2022
90827dc
Merge branch 'master' into shenoy/mps-distributed-check
shenoynikhil Dec 22, 2022
91e2746
Merge branch 'master' into shenoy/mps-distributed-check
shenoynikhil Dec 24, 2022
8ccfbdd
Merge branch 'master' into shenoy/mps-distributed-check
shenoynikhil Dec 26, 2022
424b07c
Merge branch 'master' into shenoy/mps-distributed-check
shenoynikhil Jan 1, 2023
58289db
Merge branch 'master' into shenoy/mps-distributed-check
shenoynikhil Jan 3, 2023
4151e74
Merge branch 'master' into shenoy/mps-distributed-check
shenoynikhil Jan 4, 2023
f5fdcd3
handle all edge cases and adapt connector tests
awaelchli Jan 11, 2023
9124343
parametrize test
awaelchli Jan 11, 2023
891017d
update tests
awaelchli Jan 11, 2023
728929c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 11, 2023
0877766
Merge branch 'master' into shenoy/mps-distributed-check
awaelchli Jan 11, 2023
c47c6be
simplification
awaelchli Jan 11, 2023
7de3e63
drop special case of custom strategy
awaelchli Jan 12, 2023
fdcb87f
add edge case for deepspeed
awaelchli Jan 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
HorovodStrategy,
HPUParallelStrategy,
IPUStrategy,
ParallelStrategy,
SingleDeviceStrategy,
SingleHPUStrategy,
SingleTPUStrategy,
Expand Down Expand Up @@ -284,6 +285,22 @@ def _check_config_and_set_final_flags(
f" Available names are: {', '.join(self._accelerator_types)}."
)

# MPS accelerator is incompatible with DDP family of strategies. It supports single-device operation only.
is_ddp_str = isinstance(strategy, str) and "ddp" in strategy
is_dp_str = isinstance(strategy, str) and "dp" in strategy
is_parallel_strategy = isinstance(strategy, ParallelStrategy) or is_ddp_str or is_dp_str
awaelchli marked this conversation as resolved.
Show resolved Hide resolved
is_mps_accelerator = MPSAccelerator.is_available() and (
accelerator in ("mps", "auto", "gpu", None)
or isinstance(accelerator, MPSAccelerator)
or isinstance(self._strategy_flag, Strategy)
awaelchli marked this conversation as resolved.
Show resolved Hide resolved
and isinstance(self._strategy_flag.accelerator, MPSAccelerator)
)
if is_mps_accelerator and is_parallel_strategy:
raise ValueError(
f"You set `strategy={strategy}` but strategies from the DDP family are not supported on the"
f" MPS accelerator. Either explicitly set `accelerator='cpu'` or change the strategy."
)

self._accelerator_flag = accelerator

supported_precision = get_args(_PRECISION_INPUT_STR) + get_args(_PRECISION_INPUT_INT)
Expand Down
6 changes: 4 additions & 2 deletions tests/tests_pytorch/plugins/test_cluster_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ def environment_combinations():
yield environment, variables, expected


@RunIf(mps=False)
@pytest.mark.parametrize(
"strategy_cls",
[DDPStrategy, DDPShardedStrategy, pytest.param(DeepSpeedStrategy, marks=RunIf(deepspeed=True))],
)
@mock.patch("pytorch_lightning.accelerators.cuda.CUDAAccelerator.is_available", return_value=True)
def test_ranks_available_manual_strategy_selection(mock_gpu_acc_available, strategy_cls):
def test_ranks_available_manual_strategy_selection(_, strategy_cls):
"""Test that the rank information is readily available after Trainer initialization."""
num_nodes = 2
for cluster, variables, expected in environment_combinations():
Expand All @@ -77,6 +78,7 @@ def test_ranks_available_manual_strategy_selection(mock_gpu_acc_available, strat
assert trainer.world_size == expected["world_size"]


@RunIf(mps=False)
@pytest.mark.parametrize(
"trainer_kwargs",
[
Expand All @@ -86,7 +88,7 @@ def test_ranks_available_manual_strategy_selection(mock_gpu_acc_available, strat
dict(strategy="ddp_spawn", accelerator="gpu", devices=[1, 2]),
],
)
def test_ranks_available_automatic_strategy_selection(mps_count_4, cuda_count_4, trainer_kwargs):
def test_ranks_available_automatic_strategy_selection(cuda_count_4, trainer_kwargs):
"""Test that the rank information is readily available after Trainer initialization."""
num_nodes = 2
trainer_kwargs.update(num_nodes=num_nodes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def creates_processes_externally(self) -> bool:
assert isinstance(trainer.strategy.cluster_environment, CustomCluster)


@RunIf(mps=False)
@mock.patch.dict(
os.environ,
{
Expand Down Expand Up @@ -231,7 +232,8 @@ def test_fallback_from_ddp_spawn_to_ddp_on_cluster(_, __, env_vars, expected_env
assert isinstance(trainer.strategy.cluster_environment, expected_environment)


def test_interactive_incompatible_backend_error(mps_count_2, cuda_count_2, monkeypatch):
@RunIf(mps=False)
def test_interactive_incompatible_backend_error(cuda_count_2, monkeypatch):
monkeypatch.setattr(pytorch_lightning.trainer.connectors.accelerator_connector, "_IS_INTERACTIVE", True)
with pytest.raises(MisconfigurationException, match=r"strategy='ddp'\)`.*is not compatible"):
Trainer(strategy="ddp", accelerator="gpu", devices=2)
Expand All @@ -247,7 +249,7 @@ def test_interactive_incompatible_backend_error(mps_count_2, cuda_count_2, monke
Trainer(strategy="dp")


def test_interactive_compatible_dp_strategy_gpu(cuda_count_2, monkeypatch):
def test_interactive_compatible_dp_strategy_gpu(mps_count_0, cuda_count_2, monkeypatch):
monkeypatch.setattr(pytorch_lightning.trainer.connectors.accelerator_connector, "_IS_INTERACTIVE", True)
trainer = Trainer(strategy="dp", accelerator="gpu")
assert trainer.strategy.launcher is None
Expand Down Expand Up @@ -358,7 +360,7 @@ def test_set_devices_if_none_cpu():

def test_unsupported_strategy_types_on_cpu_and_fallback():
with pytest.warns(UserWarning, match="is not supported on CPUs, hence setting `strategy='ddp"):
trainer = Trainer(strategy="dp", num_processes=2)
trainer = Trainer(accelerator="cpu", strategy="dp", num_processes=2)
assert isinstance(trainer.strategy, DDPStrategy)


Expand All @@ -369,6 +371,32 @@ def test_exception_invalid_strategy():
Trainer(strategy="tpu_spawn")


@pytest.mark.parametrize(
["strategy", "strategy_class"],
(
("ddp_spawn", DDPSpawnStrategy),
("ddp_spawn_find_unused_parameters_false", DDPSpawnStrategy),
("ddp", DDPStrategy),
("ddp_find_unused_parameters_false", DDPStrategy),
("dp", DataParallelStrategy),
("ddp_sharded", DDPShardedStrategy),
("ddp_sharded_spawn", DDPSpawnShardedStrategy),
),
)
@pytest.mark.parametrize("accelerator", ["mps", "auto", "gpu", None, MPSAccelerator()])
def test_invalid_ddp_strategy_with_mps(accelerator, strategy, strategy_class, mps_count_1, cuda_count_0):
with pytest.raises(ValueError, match="strategies from the DDP family are not supported"):
Trainer(accelerator=accelerator, strategy=strategy)

with pytest.raises(ValueError, match="strategies from the DDP family are not supported"):
Trainer(accelerator="mps", strategy=strategy_class())

strategy = strategy_class()
strategy.accelerator = MPSAccelerator()
with pytest.raises(ValueError, match="strategies from the DDP family are not supported"):
Trainer(strategy=strategy)


@pytest.mark.parametrize(
["strategy", "strategy_class"],
[
Expand Down Expand Up @@ -475,14 +503,6 @@ def test_strategy_choice_ddp_cuda(strategy, expected_cls, mps_count_0, cuda_coun
assert isinstance(trainer.strategy.cluster_environment, LightningEnvironment)


@pytest.mark.parametrize("strategy,expected_cls", [("ddp", DDPStrategy), ("ddp_spawn", DDPSpawnStrategy)])
def test_strategy_choice_ddp_mps(strategy, expected_cls, mps_count_1, cuda_count_0):
trainer = Trainer(fast_dev_run=True, strategy=strategy, accelerator="gpu", devices=1)
assert isinstance(trainer.accelerator, MPSAccelerator)
assert isinstance(trainer.strategy, expected_cls)
assert isinstance(trainer.strategy.cluster_environment, LightningEnvironment)


@pytest.mark.parametrize("job_name,expected_env", [("some_name", SLURMEnvironment), ("bash", LightningEnvironment)])
@pytest.mark.parametrize("strategy", ["ddp", DDPStrategy])
def test_strategy_choice_ddp_slurm(cuda_count_2, strategy, job_name, expected_env):
Expand Down Expand Up @@ -704,9 +724,9 @@ def test_deterministic_init(deterministic):
(False, [Mock(spec=LayerSync)], LayerSync),
],
)
def test_sync_batchnorm_set(tmpdir, sync_batchnorm, plugins, expected):
def test_sync_batchnorm_set(sync_batchnorm, plugins, expected):
"""Test valid combinations of the sync_batchnorm Trainer flag and the plugins list of layer-sync plugins."""
trainer = Trainer(sync_batchnorm=sync_batchnorm, plugins=plugins, strategy="ddp")
trainer = Trainer(accelerator="cpu", sync_batchnorm=sync_batchnorm, plugins=plugins, strategy="ddp")
assert isinstance(trainer._accelerator_connector._layer_sync, expected)
assert isinstance(trainer.strategy._layer_sync, expected)

Expand All @@ -733,7 +753,7 @@ def __init__(self, **kwargs):

strategy = CustomParallelStrategy()
assert strategy._layer_sync is None
Trainer(strategy=strategy, sync_batchnorm=True)
Trainer(accelerator="cpu", strategy=strategy, sync_batchnorm=True)
assert isinstance(strategy._layer_sync, NativeSyncBatchNorm)


Expand Down Expand Up @@ -809,12 +829,12 @@ def test_accelerator_specific_checkpoint_io(*_):
)
def test_ddp_fork_on_unsupported_platform(_, strategy):
with pytest.raises(ValueError, match="process forking is not supported on this platform"):
Trainer(strategy=strategy)
Trainer(accelerator="cpu", strategy=strategy)


@pytest.mark.parametrize(
["strategy", "strategy_cls"], [("DDP", DDPStrategy), ("DDP_FIND_UNUSED_PARAMETERS_FALSE", DDPStrategy)]
)
def test_strategy_str_passed_being_case_insensitive(strategy, strategy_cls):
trainer = Trainer(strategy=strategy)
trainer = Trainer(accelerator="cpu", strategy=strategy)
assert isinstance(trainer.strategy, strategy_cls)
2 changes: 1 addition & 1 deletion tests/tests_pytorch/trainer/test_supporters.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def test_nested_calc_num_data(input_data, compute_func, expected_length):
@mock.patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0,1"})
@pytest.mark.parametrize("use_fault_tolerant", [False, True])
@pytest.mark.parametrize("replace_sampler_ddp", [False, True])
def test_combined_data_loader_validation_test(mps_count_2, cuda_count_2, use_fault_tolerant, replace_sampler_ddp):
def test_combined_data_loader_validation_test(mps_count_0, cuda_count_2, use_fault_tolerant, replace_sampler_ddp):
"""This test makes sure distributed sampler has been properly injected in dataloaders when using
CombinedLoader."""

Expand Down
12 changes: 7 additions & 5 deletions tests/tests_pytorch/trainer/test_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1921,7 +1921,7 @@ def on_exception(self, *_):
self.exceptions += 1


@pytest.mark.parametrize("strategy", [None, pytest.param("ddp_spawn", marks=RunIf(skip_windows=True))])
@pytest.mark.parametrize("strategy", [None, pytest.param("ddp_spawn", marks=RunIf(skip_windows=True, mps=False))])
def test_error_handling_all_stages(tmpdir, strategy):
model = TrainerStagesErrorsModel()
counter = ExceptionCounter()
Expand Down Expand Up @@ -2017,9 +2017,11 @@ def training_step(self, batch, batch_idx):
["trainer_kwargs", "strategy_cls", "strategy_name", "accelerator_cls", "devices"],
[
({"strategy": None}, SingleDeviceStrategy, "single_device", CPUAccelerator, 1),
({"strategy": "dp"}, DDPStrategy, "ddp", CPUAccelerator, 1),
({"strategy": "ddp"}, DDPStrategy, "ddp", CPUAccelerator, 1),
({"strategy": "ddp", "num_nodes": 2}, DDPStrategy, "ddp", CPUAccelerator, 1),
pytest.param({"strategy": "dp"}, DDPStrategy, "ddp", CPUAccelerator, 1, marks=RunIf(mps=False)),
pytest.param({"strategy": "ddp"}, DDPStrategy, "ddp", CPUAccelerator, 1, marks=RunIf(mps=False)),
pytest.param(
{"strategy": "ddp", "num_nodes": 2}, DDPStrategy, "ddp", CPUAccelerator, 1, marks=RunIf(mps=False)
),
(
{"strategy": None, "accelerator": "cuda", "devices": 1},
SingleDeviceStrategy,
Expand Down Expand Up @@ -2075,7 +2077,7 @@ def training_step(self, batch, batch_idx):
CUDAAccelerator,
2,
),
({"strategy": DDPStrategy()}, DDPStrategy, "ddp", CPUAccelerator, 1),
pytest.param({"strategy": DDPStrategy()}, DDPStrategy, "ddp", CPUAccelerator, 1, marks=RunIf(mps=False)),
({"strategy": DDPStrategy(), "accelerator": "cuda", "devices": 2}, DDPStrategy, "ddp", CUDAAccelerator, 2),
(
{"strategy": DataParallelStrategy(), "accelerator": "cuda", "devices": 2},
Expand Down