diff --git a/eessi/testsuite/constants.py b/eessi/testsuite/constants.py index 0988e9be..bfb3805a 100644 --- a/eessi/testsuite/constants.py +++ b/eessi/testsuite/constants.py @@ -7,6 +7,7 @@ HWTHREAD = 'HWTHREAD' CPU = 'CPU' CPU_SOCKET = 'CPU_SOCKET' +NUMA_NODE = 'NUMA_NODE' GPU = 'GPU' GPU_VENDOR = 'GPU_VENDOR' INTEL = 'INTEL' @@ -23,6 +24,7 @@ HWTHREAD: 'hwthread', CPU: 'cpu', CPU_SOCKET: 'cpu_socket', + NUMA_NODE: 'numa_node', GPU: 'gpu', NODE: 'node', } diff --git a/eessi/testsuite/hooks.py b/eessi/testsuite/hooks.py index 413ccf79..553e4d7f 100644 --- a/eessi/testsuite/hooks.py +++ b/eessi/testsuite/hooks.py @@ -58,7 +58,8 @@ def _assign_default_num_gpus_per_node(test: rfm.RegressionTest): def assign_tasks_per_compute_unit(test: rfm.RegressionTest, compute_unit: str, num_per: int = 1): """ - Assign one task per compute unit (COMPUTE_UNIT[CPU], COMPUTE_UNIT[CPU_SOCKET] or COMPUTE_UNIT[GPU]). + Assign one task per compute unit. More than 1 task per compute unit can be assigned with + num_per for compute units that support it. Automatically sets num_tasks, num_tasks_per_node, num_cpus_per_task, and num_gpus_per_node, based on the current scale and the current partition’s num_cpus, max_avail_gpus_per_node and num_nodes. For GPU tests, one task per GPU is set, and num_cpus_per_task is based on the ratio of CPU-cores/GPUs. @@ -80,7 +81,7 @@ def assign_tasks_per_compute_unit(test: rfm.RegressionTest, compute_unit: str, n - assign_tasks_per_compute_unit(test, COMPUTE_UNIT[CPU_SOCKET]) will launch 2 tasks with 64 threads per task """ - if num_per != 1 and compute_unit in [COMPUTE_UNIT[GPU], COMPUTE_UNIT[CPU], COMPUTE_UNIT[CPU_SOCKET]]: + if num_per != 1 and compute_unit not in [COMPUTE_UNIT[NODE]]: raise NotImplementedError( f'Non-default num_per {num_per} is not implemented for compute_unit {compute_unit}.') @@ -123,6 +124,8 @@ def assign_tasks_per_compute_unit(test: rfm.RegressionTest, compute_unit: str, n _assign_one_task_per_cpu(test) elif compute_unit == COMPUTE_UNIT[CPU_SOCKET]: _assign_one_task_per_cpu_socket(test) + elif compute_unit == COMPUTE_UNIT[NUMA_NODE]: + _assign_one_task_per_numa_node(test) elif compute_unit == COMPUTE_UNIT[NODE]: _assign_num_tasks_per_node(test, num_per) else: @@ -198,22 +201,70 @@ def _assign_one_task_per_cpu_socket(test: rfm.RegressionTest): test.num_tasks_per_node * test.num_cpus_per_task == test.default_num_cpus_per_node. Default resources requested: - - num_tasks_per_node = default_num_cpus_per_node + - num_tasks_per_node = default_num_cpus_per_node / num_cpus_per_socket - num_cpus_per_task = default_num_cpus_per_node / num_tasks_per_node """ # neither num_tasks_per_node nor num_cpus_per_task are set if not test.num_tasks_per_node and not test.num_cpus_per_task: - check_proc_attribute_defined(test, 'num_cpus') - check_proc_attribute_defined(test, 'num_sockets') - num_cpus_per_socket = test.current_partition.processor.num_cpus / test.current_partition.processor.num_sockets - test.num_tasks_per_node = math.ceil(test.default_num_cpus_per_node / num_cpus_per_socket) + check_proc_attribute_defined(test, 'num_cores_per_socket') + test.num_tasks_per_node = math.ceil( + test.default_num_cpus_per_node / test.current_partition.processor.num_cores_per_socket + ) + test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node) + + # num_tasks_per_node is not set, but num_cpus_per_task is + elif not test.num_tasks_per_node: + test.num_tasks_per_node = int(test.default_num_cpus_per_node / test.num_cpus_per_task) + + # num_cpus_per_task is not set, but num_tasks_per_node is + elif not test.num_cpus_per_task: + test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node) + + else: + pass # both num_tasks_per_node and num_cpus_per_node are already set + + test.num_tasks = test.num_nodes * test.num_tasks_per_node + log(f'Number of tasks per node set to: {test.num_tasks_per_node}') + log(f'Number of cpus per task set to {test.num_cpus_per_task}') + log(f'num_tasks set to {test.num_tasks}') + + +def _assign_one_task_per_numa_node(test: rfm.RegressionTest): + """ + Determines the number of tasks per node by dividing the default_num_cpus_per_node by + the number of cpus available per numa node, and rounding up. The result is that for full-node jobs the default + will spawn one task per numa node, with a number of cpus per task equal to the number of cpus per numa node. + Other examples: + - half a node (i.e. node_part=2) on a system with 4 numa nodes would result in 2 tasks per node, + with number of cpus per task equal to the number of cpus per numa node. + - a quarter node (i.e. node_part=4) on a system with 2 numa nodes would result in 1 task per node, + with number of cpus equal to half a numa node. + - 2 cores (i.e. default_num_cpus_per_node=2) on a system with 4 cores per numa node would result in + 1 task per node, with 2 cpus per task + - 8 cores (i.e. default_num_cpus_per_node=4) on a system with 4 cores per numa node would result in + 2 tasks per node, with 4 cpus per task + + This default is set unless the test is run with: + --setvar num_tasks_per_node= and/or + --setvar num_cpus_per_task=. + In those cases, those take precedence, and the remaining variable (num_cpus_per task or + num_tasks_per_node respectively) is calculated based on the equality + test.num_tasks_per_node * test.num_cpus_per_task == test.default_num_cpus_per_node. + + Default resources requested: + - num_tasks_per_node = default_num_cpus_per_node / num_cores_per_numa_node + - num_cpus_per_task = default_num_cpus_per_node / num_tasks_per_node + """ + # neither num_tasks_per_node nor num_cpus_per_task are set + if not test.num_tasks_per_node and not test.num_cpus_per_task: + check_proc_attribute_defined(test, 'num_cores_per_numa_node') + test.num_tasks_per_node = math.ceil( + test.default_num_cpus_per_node / test.current_partition.processor.num_cores_per_numa_node + ) test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node) # num_tasks_per_node is not set, but num_cpus_per_task is elif not test.num_tasks_per_node: - check_proc_attribute_defined(test, 'num_cpus') - check_proc_attribute_defined(test, 'num_sockets') - num_cpus_per_socket = test.current_partition.processor.num_cpus / test.current_partition.processor.num_sockets test.num_tasks_per_node = int(test.default_num_cpus_per_node / test.num_cpus_per_task) # num_cpus_per_task is not set, but num_tasks_per_node is diff --git a/eessi/testsuite/tests/apps/PyTorch/PyTorch_torchvision.py b/eessi/testsuite/tests/apps/PyTorch/PyTorch_torchvision.py new file mode 100644 index 00000000..13171143 --- /dev/null +++ b/eessi/testsuite/tests/apps/PyTorch/PyTorch_torchvision.py @@ -0,0 +1,134 @@ +from itertools import chain + +import reframe as rfm +import reframe.utility.sanity as sn +# Added only to make the linter happy +from reframe.core.builtins import parameter, variable, run_after, sanity_function, performance_function + +from eessi.testsuite import hooks +from eessi.testsuite.constants import SCALES, TAGS, DEVICE_TYPES, COMPUTE_UNIT, CPU, NUMA_NODE, GPU +from eessi.testsuite.utils import find_modules + + +class EESSI_PyTorch_torchvision(rfm.RunOnlyRegressionTest): + nn_model = parameter(['vgg16', 'resnet50', 'resnet152', 'densenet121', 'mobilenet_v3_large']) + scale = parameter(SCALES.keys()) + parallel_strategy = parameter([None, 'ddp']) + compute_device = variable(str) + # Both torchvision and PyTorch-bundle modules have everything needed to run this test + module_name = parameter(chain(find_modules('torchvision'), find_modules('PyTorch-bundle'))) + + descr = 'Benchmark that runs a selected torchvision model on synthetic data' + + executable = 'python' + + valid_prog_environs = ['default'] + valid_systems = ['*'] + + time_limit = '30m' + + @run_after('init') + def prepare_test(self): + + # Set nn_model as executable option + self.executable_opts = ['pytorch_synthetic_benchmark.py --model %s' % self.nn_model] + + # If not a GPU run, disable CUDA + if self.compute_device != DEVICE_TYPES[GPU]: + self.executable_opts += ['--no-cuda'] + + @run_after('init') + def apply_init_hooks(self): + # Filter on which scales are supported by the partitions defined in the ReFrame configuration + hooks.filter_supported_scales(self) + + # Make sure that GPU tests run in partitions that support running on a GPU, + # and that CPU-only tests run in partitions that support running CPU-only. + # Also support setting valid_systems on the cmd line. + hooks.filter_valid_systems_by_device_type(self, required_device_type=self.compute_device) + + # Support selecting modules on the cmd line. + hooks.set_modules(self) + + # Support selecting scales on the cmd line via tags. + hooks.set_tag_scale(self) + + @run_after('init') + def set_tag_ci(self): + if self.nn_model == 'resnet50': + self.tags.add(TAGS['CI']) + + @run_after('setup') + def apply_setup_hooks(self): + if self.compute_device == DEVICE_TYPES[GPU]: + hooks.assign_tasks_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT[GPU]) + else: + # Hybrid code, for which launching one task per NUMA_NODE is typically the most efficient + hooks.assign_tasks_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT[NUMA_NODE]) + + # This is a hybrid test, binding is important for performance + hooks.set_compact_process_binding(self) + + # Set OMP_NUM_THREADS based on the number of cores per task + self.env_vars["OMP_NUM_THREADS"] = self.num_cpus_per_task + + @run_after('setup') + def set_ddp_options(self): + # Set environment variables for PyTorch DDP + if self.parallel_strategy == 'ddp': + # Set additional options required by DDP + self.executable_opts += ["--master-port $(python get_free_socket.py)"] + self.executable_opts += ["--master-address $(hostname --fqdn)"] + self.executable_opts += ["--world-size %s" % self.num_tasks] + + @run_after('setup') + def filter_invalid_parameter_combinations(self): + # We cannot detect this situation before the setup phase, because it requires self.num_tasks. + # Thus, the core count of the node needs to be known, which is only the case after the setup phase. + msg = "Skipping test: parallel strategy is 'None'," + msg += f" but requested process count is larger than one ({self.num_tasks})." + self.skip_if(self.num_tasks > 1 and self.parallel_strategy is None, msg) + msg = f"Skipping test: parallel strategy is {self.parallel_strategy}," + msg += " but only one process is requested." + self.skip_if(self.num_tasks == 1 and self.parallel_strategy is not None, msg) + + @run_after('setup') + def pass_parallel_strategy(self): + # Set parallelization strategy when using more than one process + if self.num_tasks != 1: + self.executable_opts += ['--use-%s' % self.parallel_strategy] + + @sanity_function + def assert_num_ranks(self): + '''Assert that the number of reported CPUs/GPUs used is correct''' + return sn.assert_found(r'Total img/sec on %s .PU\(s\):.*' % self.num_tasks, self.stdout) + + @performance_function('img/sec') + def total_throughput(self): + '''Total training throughput, aggregated over all CPUs/GPUs''' + return sn.extractsingle(r'Total img/sec on [0-9]+ .PU\(s\):\s+(?P\S+)', self.stdout, 'perf', float) + + @performance_function('img/sec') + def througput_per_CPU(self): + '''Training througput per CPU''' + if self.compute_device == DEVICE_TYPES[CPU]: + return sn.extractsingle(r'Img/sec per CPU:\s+(?P\S+)', self.stdout, 'perf_per_cpu', float) + else: + return sn.extractsingle(r'Img/sec per GPU:\s+(?P\S+)', self.stdout, 'perf_per_gpu', float) + + +@rfm.simple_test +class EESSI_PyTorch_torchvision_CPU(EESSI_PyTorch_torchvision): + compute_device = DEVICE_TYPES[CPU] + + +@rfm.simple_test +class EESSI_PyTorch_torchvision_GPU(EESSI_PyTorch_torchvision): + compute_device = DEVICE_TYPES[GPU] + precision = parameter(['default', 'mixed']) + + @run_after('init') + def prepare_gpu_test(self): + # Set precision + if self.precision == 'mixed': + self.executable_opts += ['--use-amp'] diff --git a/eessi/testsuite/tests/apps/PyTorch/src/get_free_socket.py b/eessi/testsuite/tests/apps/PyTorch/src/get_free_socket.py new file mode 100644 index 00000000..a2981304 --- /dev/null +++ b/eessi/testsuite/tests/apps/PyTorch/src/get_free_socket.py @@ -0,0 +1,8 @@ +# Based on https://unix.stackexchange.com/a/132524 +import socket + +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +s.bind(('', 0)) +addr = s.getsockname() +print(addr[1]) +s.close() diff --git a/eessi/testsuite/tests/apps/PyTorch/src/pytorch_synthetic_benchmark.py b/eessi/testsuite/tests/apps/PyTorch/src/pytorch_synthetic_benchmark.py new file mode 100644 index 00000000..373790b5 --- /dev/null +++ b/eessi/testsuite/tests/apps/PyTorch/src/pytorch_synthetic_benchmark.py @@ -0,0 +1,241 @@ +import argparse +import timeit +import os + +import numpy as np + +import torch.backends.cudnn as cudnn +import torch.nn.functional as F +import torch.optim as optim +import torch.utils.data.distributed +from torchvision import models + + +# Benchmark settings +parser = argparse.ArgumentParser(description='PyTorch Synthetic Benchmark', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) +parser.add_argument('--fp16-allreduce', action='store_true', default=False, + help='use fp16 compression during allreduce') + +parser.add_argument('--model', type=str, default='resnet50', + help='model to benchmark') +parser.add_argument('--batch-size', type=int, default=32, + help='input batch size') + +parser.add_argument('--num-warmup-batches', type=int, default=10, + help='number of warm-up batches that don\'t count towards benchmark') +parser.add_argument('--num-batches-per-iter', type=int, default=10, + help='number of batches per benchmark iteration') +parser.add_argument('--num-iters', type=int, default=10, + help='number of benchmark iterations') + +parser.add_argument('--no-cuda', action='store_true', default=False, + help='disables CUDA training') + +parser.add_argument('--use-adasum', action='store_true', default=False, + help='use adasum algorithm to do reduction') +parser.add_argument('--use-horovod', action='store_true', default=False) +parser.add_argument('--use-ddp', action='store_true', default=False) + +parser.add_argument('--use-amp', action='store_true', default=False, + help='Use PyTorch Automatic Mixed Precision (AMP)') +parser.add_argument('--world-size', type=int, default=1, + help='Define the world size for ddp') +parser.add_argument('--master-port', type=int, default=False, + help='Define a master port for ddp') +parser.add_argument('--master-address', type=str, default='localhost', + help='Define a master address for ddp') + +args = parser.parse_args() +args.cuda = not args.no_cuda and torch.cuda.is_available() + +if args.use_horovod and args.use_ddp: + print("You can't specify to use both Horovod and Pytorch DDP, exiting...") + exit(1) + +# Set MASTER_ADDR and MASTER_PORT environment variables +# By doing it as part of this python script, we don't need to have the launchers export them +# This saves us from having to find a launcher-agnostic way of exporting variables +os.environ['MASTER_ADDR'] = args.master_address +os.environ['MASTER_PORT'] = '%s' % args.master_port + +# Set a default rank and world size, also for when ddp and horovod are not used +rank = 0 +world_size = args.world_size +if args.use_horovod: + import horovod.torch as hvd + hvd.init() + rank = hvd.local_rank() + world_size = hvd.size() + + if args.cuda: + # If launched with srun, you are in a CGROUP with only 1 GPU, so you don't need to set it. + # If launched with mpirun, you see ALL local GPUs on the node, and you need to set which one + # this rank should use. + visible_gpus = torch.cuda.device_count() + # Horovod: pin GPU to local rank. + if visible_gpus > 1: + torch.cuda.set_device(hvd.local_rank()) + + # Should only be uncommented for debugging + # In ReFrame tests, a print from each rank can mess up the output file, causing + # performance and sanity patterns to not be found + # print(f"hvd.local_rank: {rank}", flush=True) + + +if args.use_ddp: + from socket import gethostname + import torch.distributed as dist + from torch.nn.parallel import DistributedDataParallel as DDP + + # world_size = int(os.environ["SLURM_NTASKS"]) ## No longer needed now we pass it as argument? + # If launched with mpirun, get rank from this + rank = int(os.environ.get("OMPI_COMM_WORLD_RANK", -1)) + if rank == -1: + # Else it's launched with srun, get rank from this + rank = int(os.environ.get("SLURM_PROCID", -1)) + if rank == -1: + err_msg = "ERROR: cannot determine local rank. This test currently only supports OpenMPI" + err_msg += " and srun as launchers. If you've configured a different launcher for your system" + err_msg += " this test will need to be extended with a method to get it's local rank for that launcher." + print(err_msg) + + # If launched with srun, you are in a CGROUP with only 1 GPU, so you don't need to set it. + # If launched with mpirun, you see ALL local GPUs on the node, and you need to set which one + # this rank should use. + visible_gpus = torch.cuda.device_count() + if visible_gpus > 1: + print("Listing visible devices") + for i in range(torch.cuda.device_count()): + print(f"Device {i}: {torch.cuda.device(i)}") + # This assumes compact mapping of ranks to available hardware + # e.g. rank 0-x to node 1, rank x-y to node 2, etc + # Assuming the set_compact_process_binding hook from the EESSI testsuite is called, + # this condition should be satisfied + local_rank = rank - visible_gpus * (rank // visible_gpus) + torch.cuda.set_device(local_rank) + print("Listing visible devices after setting one") + for i in range(torch.cuda.device_count()): + print(f"Device {i}: {torch.cuda.device(i)}") + # We should also set CUDA_VISIBLE_DEVICES, which gets respected by NCCL + os.environ['CUDA_VISIBLE_DEVICES'] = '%s' % local_rank + print(f"host: {gethostname()}, rank: {rank}, local_rank: {local_rank}") + else: + print(f"host: {gethostname()}, rank: {rank}") + + def setup(rank, world_size): + + # initialize the process group + if args.cuda: + dist.init_process_group("nccl", rank=rank, world_size=world_size) + else: + dist.init_process_group("gloo", rank=rank, world_size=world_size) + + def cleanup(): + # clean up the distributed environment + dist.destroy_process_group() + + setup(rank, world_size) + if rank == 0: + print(f"Group initialized? {dist.is_initialized()}", flush=True) + + +# This relies on the 'rank' set in the if args.use_horovod or args.use_ddp sections +def log(s, nl=True): + if (args.use_horovod or args.use_ddp) and rank != 0: + return + print(s, end='\n' if nl else '', flush=True) + + +log(f"World size: {world_size}") + +torch.set_num_threads(int(os.environ['OMP_NUM_THREADS'])) +torch.set_num_interop_threads(2) + +cudnn.benchmark = True + +# Set up standard model. +model = getattr(models, args.model)() + +# By default, Adasum doesn't need scaling up learning rate. +lr_scaler = hvd.size() if not args.use_adasum and args.use_horovod else 1 + +if args.cuda: + # Move model to GPU. + model.cuda() + # If using GPU Adasum allreduce, scale learning rate by local_size. + if args.use_horovod and args.use_adasum and hvd.nccl_built(): + lr_scaler = hvd.local_size() + +# If using DDP, wrap model +if args.use_ddp: + model = DDP(model) + +optimizer = optim.SGD(model.parameters(), lr=0.01 * lr_scaler) + +# Horovod: (optional) compression algorithm. +if args.use_horovod: + compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none + +# Horovod: wrap optimizer with DistributedOptimizer. +if args.use_horovod: + optimizer = hvd.DistributedOptimizer(optimizer, + named_parameters=model.named_parameters(), + compression=compression, + op=hvd.Adasum if args.use_adasum else hvd.Average) + + # Horovod: broadcast parameters & optimizer state. + hvd.broadcast_parameters(model.state_dict(), root_rank=0) + hvd.broadcast_optimizer_state(optimizer, root_rank=0) + +# Set up fixed fake data +data = torch.randn(args.batch_size, 3, 224, 224) +target = torch.LongTensor(args.batch_size).random_() % 1000 +if args.cuda: + data, target = data.cuda(), target.cuda() + +# Create GradScaler for automatic mixed precision +scaler = torch.cuda.amp.GradScaler(enabled=args.use_amp) + +# Set device_type for AMP +if args.cuda: + device_type = "cuda" +else: + device_type = "cpu" + + +def benchmark_step(): + optimizer.zero_grad() + with torch.autocast(device_type=device_type, enabled=args.use_amp): + output = model(data) + loss = F.cross_entropy(output, target) + scaler.scale(loss).backward() + scaler.step(optimizer) + scaler.update() + + +log('Model: %s' % args.model) +log('Batch size: %d' % args.batch_size) +device = 'GPU' if args.cuda else 'CPU' +if args.use_horovod: + log('Number of %ss: %d' % (device, hvd.size())) + +# Warm-up +log('Running warmup...') +timeit.timeit(benchmark_step, number=args.num_warmup_batches) + +# Benchmark +log('Running benchmark...') +img_secs = [] +for x in range(args.num_iters): + time = timeit.timeit(benchmark_step, number=args.num_batches_per_iter) + img_sec = args.batch_size * args.num_batches_per_iter / time + log('Iter #%d: %.1f img/sec per %s' % (x, img_sec, device)) + img_secs.append(img_sec) + +# Results +img_sec_mean = np.mean(img_secs) +img_sec_conf = 1.96 * np.std(img_secs) +log('Img/sec per %s: %.1f +-%.1f' % (device, img_sec_mean, img_sec_conf)) +log('Total img/sec on %d %s(s): %.1f +-%.1f' % + (world_size, device, world_size * img_sec_mean, world_size * img_sec_conf))