diff --git a/pytorch_lightning/accelerators/ddp_backend.py b/pytorch_lightning/accelerators/ddp_backend.py index e499feda651d9..a679e1085184a 100644 --- a/pytorch_lightning/accelerators/ddp_backend.py +++ b/pytorch_lightning/accelerators/ddp_backend.py @@ -11,20 +11,22 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License - +import atexit import os +import socket + +import torch +import torch.distributed import subprocess import sys -from os.path import abspath from time import sleep -from typing import Optional - import numpy as np -import torch +from os.path import abspath +from pytorch_lightning.utilities import NATIVE_AMP_AVALAIBLE +from pytorch_lightning.utilities.distributed import rank_zero_only, rank_zero_debug from pytorch_lightning import _logger as log -from pytorch_lightning.utilities import AMPType -from pytorch_lightning.utilities.distributed import rank_zero_only +from typing import Optional try: from hydra.utils import to_absolute_path, get_original_cwd @@ -37,7 +39,9 @@ try: from apex import amp except ImportError: - amp = None + APEX_AVAILABLE = False +else: + APEX_AVAILABLE = True class DDPBackend(object): @@ -45,6 +49,7 @@ class DDPBackend(object): def __init__(self, trainer): self.trainer = trainer self.task_idx = None + self.distributed_connection = DistributedConnection(trainer) def slurm_setup(self): self.task_idx = int(os.environ['SLURM_LOCALID']) @@ -56,19 +61,15 @@ def train(self, model): self.ddp_train(process_idx=self.task_idx, mp_queue=None, model=model) def spawn_ddp_children(self, model): - port = os.environ['MASTER_PORT'] + assert self.trainer.global_rank == 0 - master_address = '127.0.0.1' if 'MASTER_ADDR' not in os.environ else os.environ['MASTER_ADDR'] - os.environ['MASTER_PORT'] = f'{port}' + master_address = os.environ.get('MASTER_ADDR', '127.0.0.1') os.environ['MASTER_ADDR'] = f'{master_address}' # allow the user to pass the node rank node_rank = '0' - if 'NODE_RANK' in os.environ: - node_rank = os.environ['NODE_RANK'] - if 'GROUP_RANK' in os.environ: - node_rank = os.environ['GROUP_RANK'] - + node_rank = os.environ.get('NODE_RANK', node_rank) + node_rank = os.environ.get('GROUP_RANK', node_rank) os.environ['NODE_RANK'] = node_rank os.environ['LOCAL_RANK'] = '0' @@ -153,11 +154,8 @@ def ddp_train(self, process_idx, mp_queue, model, is_master=False, proc_offset=0 # try to init for 20 times at max in case ports are taken # where to store ip_table model.trainer = self.trainer - model.init_ddp_connection( - self.trainer.global_rank, - self.trainer.world_size, - self.trainer.is_slurm_managing_tasks - ) + + self.distributed_connection.reset_connection(self.trainer, model) # call setup after the ddp process has connected self.trainer.call_setup_hook(model) @@ -176,6 +174,8 @@ def ddp_train(self, process_idx, mp_queue, model, is_master=False, proc_offset=0 self.trainer.lr_schedulers = lr_schedulers self.trainer.optimizer_frequencies = optimizer_frequencies + print('here 1') + # call sync_bn before .cuda(), configure_apex and configure_ddp if self.trainer.sync_batchnorm: model = model.configure_sync_batchnorm(model) @@ -193,15 +193,20 @@ def ddp_train(self, process_idx, mp_queue, model, is_master=False, proc_offset=0 available_gpus = os.environ['CUDA_VISIBLE_DEVICES'].split(',') gpu_idx = int(available_gpus[self.trainer.local_rank]) + print('here 2') self.trainer.root_gpu = gpu_idx torch.cuda.set_device(self.trainer.root_gpu) model.cuda(self.trainer.root_gpu) + print('here 3') + # set model properties before going into wrapper self.trainer.copy_trainer_model_properties(model) - # AMP - run through amp wrapper before going to distributed DP - if self.trainer.amp_type == AMPType.APEX: + # AMP + # run through amp wrapper before going to distributed DP + # TODO: remove with dropping NVIDIA AMP support + if self.trainer.use_amp and not NATIVE_AMP_AVALAIBLE: model, optimizers = model.configure_apex(amp, model, self.trainer.optimizers, self.trainer.amp_level) self.trainer.optimizers = optimizers self.trainer.reinit_scheduler_properties(self.trainer.optimizers, self.trainer.lr_schedulers) @@ -212,12 +217,18 @@ def ddp_train(self, process_idx, mp_queue, model, is_master=False, proc_offset=0 else: # includes ddp_cpu device_ids = None + print('here 4') + # allow user to configure ddp model = model.configure_ddp(model, device_ids) + print('here 5') + # continue training routine results = self.trainer.run_pretrain_routine(model) + print('here 6') + # get original model model = self.trainer.get_model() @@ -229,3 +240,98 @@ def ddp_train(self, process_idx, mp_queue, model, is_master=False, proc_offset=0 if self.trainer.global_rank == 0 and self.trainer.distributed_backend not in ['ddp_spawn', 'ddp_cpu']: return results + + +class DistributedConnection: + + def __init__(self, trainer): + super().__init__() + self.trainer = trainer + if trainer.num_nodes == 1: + # select or forcibly set an initial port before ddp connection is initialized + self._set_master_port(port=self._get_master_port()) + + def reset_connection(self, trainer, model): + if not torch.distributed.is_initialized(): + print('init ddp', 'rank', trainer.global_rank, 'port', self._get_master_port()) + model.init_ddp_connection(trainer.global_rank, trainer.world_size, trainer.is_slurm_managing_tasks) + + def reset_connection_old(self, trainer, model): + + if not torch.distributed.is_initialized(): + print('init ddp', 'rank', trainer.global_rank, 'port', self._get_master_port()) + model.init_ddp_connection(trainer.global_rank, trainer.world_size, trainer.is_slurm_managing_tasks) + print('init ddp', 'rank', trainer.global_rank, 'port', self._get_master_port(), 'done') + + new_port = torch.tensor([int(self._get_master_port())], dtype=torch.int, device='cuda') + if torch.distributed.is_initialized() and trainer.global_rank == 0: + print(trainer.global_rank, "DDP connection already initialized. Reinitializing on new port...") + + #model.init_ddp_connection(trainer.global_rank, trainer.world_size, trainer.is_slurm_managing_tasks) + + # torch.distributed.barrier() + + + #if trainer.global_rank == 0: + port = find_open_network_port() + new_port[0] = port + + torch.distributed.broadcast(new_port, src=0) + new_port = int(new_port.item()) + print('recv new port', 'rank', trainer.global_rank, 'port', new_port) + + if int(self._get_master_port()) != new_port: + print('need to update port') + torch.distributed.destroy_process_group() # destroy connections on old port + print('destroy group', 'rank', trainer.global_rank, 'port', self._get_master_port()) + print('set port', 'rank', trainer.global_rank, 'port', self._get_master_port()) + self._set_master_port(port=new_port) + + model.init_ddp_connection(trainer.global_rank, trainer.world_size, trainer.is_slurm_managing_tasks) + + print('exit') + + # s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # #print('shutdown', self._get_master_address(), int(self._get_master_port())) + # s.connect((self._get_master_address(), int(self._get_master_port()))) + # s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # #s.shutdown(socket.SHUT_RDWR) + # s.close() + # #sleep(10) + + def exit_handler(): + if torch.distributed.is_initialized() and trainer.global_rank > 0: + print('destroying on ', trainer.global_rank) + torch.distributed.destroy_process_group() + + atexit.register(exit_handler) + + def _get_master_port(self): + return os.environ.get('MASTER_PORT') + + def _get_master_address(self): + return os.environ.get('MASTER_ADDR') + + def _set_master_port(self, port: int = None): + """ + Sets the `MASTER_PORT` environment variable in single-node DDP training. + + Args: + port: If provided, sets the environment variable MASTER_PORT, and otherwhise + an attempt is made to find an unused open port. + + Return: + The port that was set. + """ + assert self.trainer.num_nodes == 1, 'random port can only be called from single node training' + os.environ['MASTER_PORT'] = str(port or find_open_network_port()) + return port + + +def find_open_network_port(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(("", 0)) + s.listen(1) + port = s.getsockname()[1] + s.close() + return port diff --git a/pytorch_lightning/accelerators/ddp_spawn_backend.py b/pytorch_lightning/accelerators/ddp_spawn_backend.py index 9ed68f66083ad..3eea10017dd61 100644 --- a/pytorch_lightning/accelerators/ddp_spawn_backend.py +++ b/pytorch_lightning/accelerators/ddp_spawn_backend.py @@ -15,14 +15,16 @@ import torch import torch.multiprocessing as mp -from pytorch_lightning import _logger as log -from pytorch_lightning.utilities import AMPType +from pytorch_lightning.accelerators.ddp_backend import DistributedConnection from pytorch_lightning.utilities.distributed import rank_zero_only +from pytorch_lightning import _logger as log try: from apex import amp except ImportError: - amp = None + APEX_AVAILABLE = False +else: + APEX_AVAILABLE = True class DDPSpawnBackend(object): @@ -30,10 +32,9 @@ class DDPSpawnBackend(object): def __init__(self, trainer): self.trainer = trainer self.mp_queue = None + self.distributed_connection = DistributedConnection(trainer) def setup(self): - self.trainer.set_random_port() - # pass in a state q smp = mp.get_context('spawn') self.mp_queue = smp.SimpleQueue() @@ -94,11 +95,8 @@ def ddp_train(self, process_idx, mp_queue, model): # try to init for 20 times at max in case ports are taken # where to store ip_table model.trainer = self.trainer - model.init_ddp_connection( - self.trainer.global_rank, - self.trainer.world_size, - self.trainer.is_slurm_managing_tasks - ) + + self.distributed_connection.reset_connection(self.trainer, model) # call setup after the ddp process has connected self.trainer.call_setup_hook(model) @@ -132,9 +130,11 @@ def ddp_train(self, process_idx, mp_queue, model): # set model properties before going into wrapper self.trainer.copy_trainer_model_properties(model) - # AMP - + # AMP # run through amp wrapper before going to distributed DP - if self.trainer.amp_type == AMPType.APEX: + # TODO: remove with dropping NVIDIA AMP support + native_amp_available = hasattr(torch.cuda, "amp") and hasattr(torch.cuda.amp, "autocast") + if self.trainer.use_amp and not native_amp_available: model, optimizers = model.configure_apex(amp, model, self.trainer.optimizers, self.trainer.amp_level) self.trainer.optimizers = optimizers self.trainer.reinit_scheduler_properties(self.trainer.optimizers, self.trainer.lr_schedulers) diff --git a/pytorch_lightning/core/lightning.py b/pytorch_lightning/core/lightning.py index d23cde63f450e..8826b84e0d3f7 100644 --- a/pytorch_lightning/core/lightning.py +++ b/pytorch_lightning/core/lightning.py @@ -954,7 +954,7 @@ def init_ddp_connection(self, global_rank: int, world_size: int, is_slurm_managi ) torch_backend = "nccl" if self.trainer.on_gpu else "gloo" - log.info(f"initializing ddp: GLOBAL_RANK: {global_rank}, MEMBER: {global_rank+1}/{world_size}") + log.info(f"initializing ddp: GLOBAL_RANK: {global_rank}, MEMBER: {global_rank+1}/{world_size}, ADDR: {os.environ['MASTER_ADDR']}") torch_distrib.init_process_group(torch_backend, rank=global_rank, world_size=world_size) def configure_sync_batchnorm(self, model: 'LightningModule') -> 'LightningModule': diff --git a/pytorch_lightning/trainer/distrib_data_parallel.py b/pytorch_lightning/trainer/distrib_data_parallel.py index c550fb648f0ca..f9fc6602f4181 100644 --- a/pytorch_lightning/trainer/distrib_data_parallel.py +++ b/pytorch_lightning/trainer/distrib_data_parallel.py @@ -131,13 +131,7 @@ def train_fx(trial_hparams, cluster_manager, _): import re from abc import ABC, abstractmethod from distutils.version import LooseVersion -from typing import Union, List, Optional, Callable, Tuple -import subprocess -import sys -from time import sleep -import numpy as np -from os.path import abspath -from pkg_resources import parse_version +from typing import Union, List, Optional, Tuple import torch from pytorch_lightning import _logger as log @@ -168,10 +162,6 @@ def train_fx(trial_hparams, cluster_manager, _): else: XLA_AVAILABLE = True -PID = os.getpid() -RNG1 = np.random.RandomState(PID) -RANDOM_PORTS = RNG1.randint(10000, 19999, 1000) - class TrainerDDPMixin(ABC): @@ -397,22 +387,6 @@ def set_nvidia_flags(self, is_slurm_managing_tasks, data_parallel_device_ids): # don't make this debug... this is good UX rank_zero_info(f'CUDA_VISIBLE_DEVICES: [{os.environ["CUDA_VISIBLE_DEVICES"]}]') - def set_random_port(self, force=False): - """ - When running DDP NOT managed by SLURM, the ports might collide - """ - # pick a random port first - assert self.num_nodes == 1, 'random port can only be called from single node training' - global RANDOM_PORTS - default_port = RANDOM_PORTS[-1] - RANDOM_PORTS = RANDOM_PORTS[:-1] - - # when not forced, use the user port - if not force: - default_port = os.environ.get('MASTER_PORT', default_port) - - os.environ['MASTER_PORT'] = str(default_port) - def transfer_distrib_spawn_state_on_fit_end(self, model, mp_queue, results): if self.distributed_backend.lower() not in ['ddp_spawn', 'ddp_cpu', 'tpu']: return diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index edbba05813ad1..ee3082734e4f6 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -14,6 +14,7 @@ import inspect import os +import subprocess import warnings from argparse import ArgumentParser, Namespace from typing import Any, Dict, Iterable, List, Optional, Tuple, Union @@ -1034,7 +1035,6 @@ def fit( # ddp elif self.distributed_backend == 'ddp': - self.set_random_port() self.accelerator_backend = DDPBackend(self) results = self.accelerator_backend.spawn_ddp_children(model) @@ -1321,6 +1321,7 @@ def test( self.verbose_test = verbose if self.global_rank != 0: + # do nothing, rank 0 process will launch new processes for testing return # If you supply a datamodule you can't supply train_dataloader or val_dataloaders @@ -1339,6 +1340,20 @@ def test( self.teardown('test') + if self.global_rank == 0: + for proc in self.interactive_ddp_procs: + subprocess.Popen.kill(proc) + + # clean up dist group + # if self.use_ddp or self.use_ddp2: + # torch_distrib.destroy_process_group() + + # clear mem + # if self.on_gpu: + # model = self.get_model() + # model.cpu() + # torch.cuda.empty_cache() + return results def __test_using_best_weights(self, ckpt_path, test_dataloaders): @@ -1372,7 +1387,6 @@ def __test_using_best_weights(self, ckpt_path, test_dataloaders): # run tests self.tested_ckpt_path = ckpt_path - self.set_random_port(force=True) self.testing = True os.environ['PL_TESTING_MODE'] = '1' self.model = model @@ -1395,7 +1409,6 @@ def __test_given_model(self, model, test_dataloaders): # run test # sets up testing so we short circuit to eval - self.set_random_port(force=True) self.testing = True self.model = model results = self.fit(model) diff --git a/pytorch_lightning/trainer/training_loop.py b/pytorch_lightning/trainer/training_loop.py index ec5bd0938d15c..1feb58c0a76bd 100644 --- a/pytorch_lightning/trainer/training_loop.py +++ b/pytorch_lightning/trainer/training_loop.py @@ -1144,14 +1144,14 @@ def run_training_teardown(self): subprocess.Popen.kill(proc) # clean up dist group - if self.use_ddp or self.use_ddp2: - torch_distrib.destroy_process_group() + # if self.use_ddp or self.use_ddp2: + # torch_distrib.destroy_process_group() # clear mem - if self.on_gpu: - model = self.get_model() - model.cpu() - torch.cuda.empty_cache() + # if self.on_gpu: + # model = self.get_model() + # model.cpu() + # torch.cuda.empty_cache() def training_forward(self, batch, batch_idx, opt_idx, hiddens): """ diff --git a/tests/base/model_valid_epoch_ends.py b/tests/base/model_valid_epoch_ends.py index 8974224409624..943227e738cae 100644 --- a/tests/base/model_valid_epoch_ends.py +++ b/tests/base/model_valid_epoch_ends.py @@ -21,7 +21,6 @@ def _mean(res, key): # recursive mean for multilevel dicts return torch.stack([x[key] if isinstance(x, dict) else _mean(x, key) for x in res]).mean() - print('in validation epoch end') val_loss_mean = _mean(outputs, 'val_loss') val_acc_mean = _mean(outputs, 'val_acc') diff --git a/tests/models/data/ddp/train_test_variations.py b/tests/models/data/ddp/train_test_variations.py new file mode 100644 index 0000000000000..40088e5f74520 --- /dev/null +++ b/tests/models/data/ddp/train_test_variations.py @@ -0,0 +1,72 @@ +""" +Runs several combinations of `.fit()` and `.test()` on a single node across multiple gpus. +""" +from argparse import ArgumentParser + +from pytorch_lightning import Trainer, seed_everything +from tests.base import EvalModelTemplate + + +def variation_fit_test(trainer, model): + trainer.fit(model) + trainer.test(model) + + +def variation_test_fit(trainer, model): + trainer.test(model) + trainer.fit(model) + + +def variation_fit_fit(trainer, model): + trainer.fit(model) + trainer.fit(model) + + +def variation_test_test(trainer, model): + trainer.test(model) + trainer.test(model) + + +def variation_test_fit_test(trainer, model): + trainer.test(model) + trainer.fit(model) + trainer.test(model) + + +def get_variations(): + variations = [ + "variation_fit_test", + "variation_test_fit", + "variation_fit_fit", + "variation_test_test", + "variation_test_fit_test", + ] + return variations + + +def main(): + seed_everything(1234) + parser = ArgumentParser(add_help=False) + parser = Trainer.add_argparse_args(parser) + parser.add_argument('--variation', default=variation_fit_test.__name__) + parser.set_defaults(gpus=2) + parser.set_defaults(distributed_backend="ddp") + args = parser.parse_args() + + model = EvalModelTemplate() + trainer = Trainer.from_argparse_args(args) + + # run the chosen variation + run_variation = globals()[args.variation] + run_variation(trainer, model) + + # TODO + # remove this in https://github.com/PyTorchLightning/pytorch-lightning/pull/2165 + # when we have proper signal handling working + # otherwise we will see zombie processes in CI, causing tests to hang + for p in trainer.interactive_ddp_procs: + p.kill() + + +if __name__ == '__main__': + main() diff --git a/tests/models/test_gpu.py b/tests/models/test_gpu.py index 7497a53083612..37e5398e6daf7 100644 --- a/tests/models/test_gpu.py +++ b/tests/models/test_gpu.py @@ -1,4 +1,9 @@ +import os +import subprocess +import sys from collections import namedtuple +from pathlib import Path +from unittest import mock import pytest import torch @@ -6,11 +11,13 @@ import tests.base.develop_pipelines as tpipes import tests.base.develop_utils as tutils +import pytorch_lightning from pytorch_lightning import Trainer from pytorch_lightning.core import memory from pytorch_lightning.trainer.distrib_parts import _parse_gpu_ids, determine_root_gpu_device from pytorch_lightning.utilities.exceptions import MisconfigurationException from tests.base import EvalModelTemplate +from tests.models.data.ddp import train_test_variations PRETEND_N_OF_GPUS = 16 @@ -93,6 +100,31 @@ def test_multi_gpu_model_dp(tmpdir): memory.get_memory_profile('min_max') +@pytest.mark.parametrize('cli_args', [ + pytest.param('--max_epochs 1 --gpus 2 --distributed_backend ddp'), +]) +@pytest.mark.parametrize('variation', train_test_variations.get_variations()) +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") +def test_multi_gpu_model_ddp(tmpdir, cli_args, variation): + file = Path(train_test_variations.__file__).absolute() + cli_args = cli_args.split(' ') if cli_args else [] + cli_args += ['--default_root_dir', str(tmpdir)] + command = [sys.executable, str(file), '--variation', variation] + cli_args + env = os.environ.copy() + env['PYTHONPATH'] = f'{pytorch_lightning.__file__}:' + env.get('PYTHONPATH', '') + p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) + + std, err = p.communicate(timeout=60) + std = std.decode('utf-8').strip() + err = err.decode('utf-8').strip() + assert std + if p.returncode: + print(std) + print(err) + print(command) + pytest.fail(err) + + @pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") def test_multi_gpu_model_ddp_spawn(tmpdir): tutils.set_random_master_port()