Skip to content

Commit

Permalink
Grep for potential errors in standalone tests (#15341)
Browse files Browse the repository at this point in the history
Co-authored-by: awaelchli <[email protected]>
(cherry picked from commit 12d6e44)
  • Loading branch information
carmocca authored and Borda committed Nov 8, 2022
1 parent b5f64de commit 904c467
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 9 deletions.
4 changes: 2 additions & 2 deletions src/pytorch_lightning/strategies/fully_sharded.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ def _setup_model(self, model: torch.nn.Module) -> FullyShardedDataParallel:
log.detail(f"setting up `Fairscale FSDP` model with device id: {self.root_device.index}.")

rank_zero_info(
"When using FairScale FSDP auto-wrap, make sure to initalize your model using trainer else"
" you will get an error.\ntorch.optim.Optimizer(self.trainer.model.parameters(), ...)"
"When using FairScale FSDP auto-wrap, make sure to initialize your model using trainer: "
"`torch.optim.Optimizer(self.trainer.model.parameters(), ...)`"
)

return FullyShardedDataParallel(
Expand Down
5 changes: 4 additions & 1 deletion tests/tests_lite/strategies/test_fairscale_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ def run(self, tmpdir, with_fairscale_oss=False):
self.model.cpu()

checkpoint_path = os.path.join(tmpdir, "checkpoint.ckpt")
# need to broadcast because tmpdir is different on each process
checkpoint_path = self.broadcast(checkpoint_path)

checkpoint = {"model": self.model.state_dict(), "optimizer": self.optimizer.state_dict()}
self.save(checkpoint, checkpoint_path)

self.barrier()
self.barrier() # ensure the checkpoint is saved before load

loaded_checkpoint = self.load(checkpoint_path)
new_model = self.get_model()
Expand Down
10 changes: 8 additions & 2 deletions tests/tests_pytorch/run_standalone_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ set -e
# Batch size for testing: Determines how many standalone test invocations run in parallel
# It can be set through the env variable PL_STANDALONE_TESTS_BATCH_SIZE and defaults to 6 if not set
test_batch_size="${PL_STANDALONE_TESTS_BATCH_SIZE:-6}"
source="${PL_STANDALONE_TESTS_SOURCE}"
source="${PL_STANDALONE_TESTS_SOURCE:-"pytorch_lightning"}"

# this environment variable allows special tests to run
export PL_RUN_STANDALONE_TESTS=1
# python arguments
defaults="-m coverage run --source $source --append -m pytest --no-header"
defaults="-m coverage run --source $source --append -m pytest --no-header -v -s"

# find tests marked as `@RunIf(standalone=True)`. done manually instead of with pytest because it is faster
grep_output=$(grep --recursive --word-regexp . --regexp 'standalone=True' --include '*.py')
Expand All @@ -49,6 +49,12 @@ rm -f standalone_test_output.txt # in case it exists, remove it
function show_batched_output {
if [ -f standalone_test_output.txt ]; then # if exists
cat standalone_test_output.txt
# heuristic: stop if there's mentions of errors. this can prevent false negatives when only some of the ranks fail
if grep --quiet --ignore-case --extended-regexp 'error|exception|traceback|failed' standalone_test_output.txt; then
echo "Potential error! Stopping."
rm standalone_test_output.txt
exit 1
fi
rm standalone_test_output.txt
fi
}
Expand Down
2 changes: 1 addition & 1 deletion tests/tests_pytorch/strategies/test_colossalai.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def test_gradient_clip_algorithm_error(tmpdir):


@RunIf(min_cuda_gpus=1, standalone=True, colossalai=True)
def test_gradient_accumulation_error(tmpdir):
def test_gradient_accumulation_raises(tmpdir):
model = ModelParallelBoringModel()
trainer = Trainer(
default_root_dir=tmpdir,
Expand Down
8 changes: 7 additions & 1 deletion tests/tests_pytorch/strategies/test_ddp_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,17 @@ def configure_optimizers(self):
@RunIf(min_cuda_gpus=2, fairscale=True)
@pytest.mark.parametrize("strategy", (pytest.param("ddp", marks=RunIf(standalone=True)), "ddp_spawn"))
def test_ddp_strategy_checkpoint_multi_gpu_fairscale_optimizer(tmpdir, strategy):
"""Test to ensure that checkpoint is saved correctly when using faircale optimizer."""
"""Test to ensure that checkpoint is saved correctly when using fairscale optimizer."""
model = BoringFairScaleOptimizerModel()
trainer = Trainer(accelerator="gpu", devices=2, strategy=strategy, max_steps=1)

trainer.fit(model)

checkpoint_path = os.path.join(tmpdir, "model.pt")
# need to broadcast because tmpdir is different on each process
checkpoint_path = trainer.strategy.broadcast(checkpoint_path)
trainer.save_checkpoint(checkpoint_path)
trainer.strategy.barrier() # ensure the checkpoint is saved before load
saved_model = BoringModel.load_from_checkpoint(checkpoint_path)

# Assert model parameters are identical after loading
Expand All @@ -297,7 +300,10 @@ def test_ddp_strategy_checkpoint_zero_redundancy_optimizer(tmpdir, strategy):
trainer.fit(model)

checkpoint_path = os.path.join(tmpdir, "model.pt")
# need to broadcast because tmpdir is different on each process
checkpoint_path = trainer.strategy.broadcast(checkpoint_path)
trainer.save_checkpoint(checkpoint_path)
trainer.strategy.barrier() # ensure the checkpoint is saved before load
saved_model = BoringModel.load_from_checkpoint(checkpoint_path)

# Assert model parameters are identical after loading
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ def test_post_local_sgd_model_averaging(average_parameters_mock, tmpdir):

@RunIf(skip_windows=True, min_torch="1.10.0", min_cuda_gpus=2, standalone=True)
@mock.patch("torch.distributed.algorithms.model_averaging.averagers.PeriodicModelAverager.average_parameters")
def test_post_local_sgd_model_averaging_value_error(average_parameters_mock, tmpdir):
"""Test that when using DDP with post-localSGD a ValueError is thrown when the optmizer is
def test_post_local_sgd_model_averaging_raises(average_parameters_mock, tmpdir):
"""Test that when using DDP with post-localSGD a ValueError is thrown when the optimizer is
ZeroRedundancyOptimizer."""
from torch.distributed.optim import ZeroRedundancyOptimizer

Expand Down
3 changes: 3 additions & 0 deletions tests/tests_pytorch/strategies/test_sharded_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,10 @@ def test_ddp_sharded_strategy_checkpoint_multi_gpu_fairscale_optimizer(tmpdir, s
trainer.fit(model)

checkpoint_path = os.path.join(tmpdir, "model.pt")
# need to broadcast because tmpdir is different on each process
checkpoint_path = trainer.strategy.broadcast(checkpoint_path)
trainer.save_checkpoint(checkpoint_path)
trainer.strategy.barrier() # ensure the checkpoint is saved before load
saved_model = BoringModel.load_from_checkpoint(checkpoint_path)

# Assert model parameters are identical after loading
Expand Down

0 comments on commit 904c467

Please sign in to comment.