Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PyTorch test that uses torchvision #130

Merged
merged 28 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2e7bf95
Initial commit for an EESSI PyTorch test that uses torchvision models
Mar 27, 2024
e089760
Add option to assign number of tasks and cpus per task based on the a…
Mar 29, 2024
a675fc9
We don't need to first calculate cpus_per_socket, it is available dir…
Mar 29, 2024
a6e53bc
Define constant for numa node as a compute unit
Mar 29, 2024
8bb9bbd
Fix num_cores_per_numa_node, use it from the current_partition info
Mar 29, 2024
a6bf34d
Work around the issue of not being able to export varialbes in a laun…
Mar 29, 2024
fce2a45
Change order of imports so that initialization only happens after req…
Apr 9, 2024
b868cc1
Add comment on explicit assumption for computing the local rank
May 3, 2024
760cd59
Merge branch 'fix_compact_process_binding' into pytorch
May 3, 2024
fed8906
Use EESSI prefix to name test
May 6, 2024
357f649
Child classes should also be renamed and inherit from renamed class
May 6, 2024
887f7b3
Merge branch 'main' into pytorch
May 6, 2024
6b1e36a
Remove stray blank line
May 6, 2024
2d33141
Rephrased comment, some changes to make the linter happy
May 6, 2024
4cb7b36
Fix some more linter issues
May 6, 2024
2f0bea2
Fix some more linter issues
May 6, 2024
fc067b2
Fix linter issues
May 6, 2024
4ddfe23
Fix linter issues
May 6, 2024
73b7e84
Can't combine generators with plus, so use chain
May 7, 2024
07b2c1b
Merge branch 'main' into pytorch
Jun 13, 2024
11146ef
Merge branch 'main' into pytorch
Jul 1, 2024
8298e6a
Fix comments from Review Sam
Jul 1, 2024
af30b64
Make linter happy
Jul 1, 2024
7ddeedb
Remove training whitespace
Jul 1, 2024
d62443b
Add set_omp_num_threads hook from https://github.com/EESSI/test-suite…
Jul 25, 2024
00fca31
Call hook to set OMP_NUM_THREADS
Jul 25, 2024
a69e2d3
Revert using the hook, it doesn't make sense to set OMP_NUM_THREADS c…
Jul 25, 2024
4c5c3e7
test is not defined, should be 'self'
Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions eessi/testsuite/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
CI = 'CI'
CPU = 'CPU'
CPU_SOCKET = 'CPU_SOCKET'
NUMA_NODE = 'NUMA_NODE'
GPU = 'GPU'
GPU_VENDOR = 'GPU_VENDOR'
INTEL = 'INTEL'
Expand All @@ -21,6 +22,7 @@
COMPUTE_UNIT = {
CPU: 'cpu',
CPU_SOCKET: 'cpu_socket',
NUMA_NODE: 'numa_node',
GPU: 'gpu',
NODE: 'node',
}
Expand Down
68 changes: 59 additions & 9 deletions eessi/testsuite/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _assign_default_num_gpus_per_node(test: rfm.RegressionTest):

def assign_tasks_per_compute_unit(test: rfm.RegressionTest, compute_unit: str, num_per: int = 1):
"""
Assign one task per compute unit (COMPUTE_UNIT[CPU], COMPUTE_UNIT[CPU_SOCKET] or COMPUTE_UNIT[GPU]).
Assign one task per compute unit.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Assign one task per compute unit.
Assign one task per compute unit. More than 1 task per compute unit can be assigned with num_per for compute units that support it.

please also update line 83 to:

 if num_per != 1 and compute_unit not in [COMPUTE_UNIT[NODE]]:

Automatically sets num_tasks, num_tasks_per_node, num_cpus_per_task, and num_gpus_per_node,
based on the current scale and the current partition’s num_cpus, max_avail_gpus_per_node and num_nodes.
For GPU tests, one task per GPU is set, and num_cpus_per_task is based on the ratio of CPU-cores/GPUs.
Expand Down Expand Up @@ -110,6 +110,8 @@ def assign_tasks_per_compute_unit(test: rfm.RegressionTest, compute_unit: str, n
_assign_one_task_per_cpu(test)
elif compute_unit == COMPUTE_UNIT[CPU_SOCKET]:
_assign_one_task_per_cpu_socket(test)
elif compute_unit == COMPUTE_UNIT[NUMA_NODE]:
_assign_one_task_per_numa_node(test)
elif compute_unit == COMPUTE_UNIT[NODE]:
_assign_num_tasks_per_node(test, num_per)
else:
Expand Down Expand Up @@ -185,22 +187,70 @@ def _assign_one_task_per_cpu_socket(test: rfm.RegressionTest):
test.num_tasks_per_node * test.num_cpus_per_task == test.default_num_cpus_per_node.

Default resources requested:
- num_tasks_per_node = default_num_cpus_per_node
- num_tasks_per_node = default_num_cpus_per_node / num_cpus_per_socket
- num_cpus_per_task = default_num_cpus_per_node / num_tasks_per_node
"""
# neither num_tasks_per_node nor num_cpus_per_task are set
if not test.num_tasks_per_node and not test.num_cpus_per_task:
check_proc_attribute_defined(test, 'num_cpus')
check_proc_attribute_defined(test, 'num_sockets')
num_cpus_per_socket = test.current_partition.processor.num_cpus / test.current_partition.processor.num_sockets
test.num_tasks_per_node = math.ceil(test.default_num_cpus_per_node / num_cpus_per_socket)
check_proc_attribute_defined(test, 'num_cores_per_socket')
test.num_tasks_per_node = math.ceil(
test.default_num_cpus_per_node / test.current_partition.processor.num_cores_per_socket
)
test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node)

# num_tasks_per_node is not set, but num_cpus_per_task is
elif not test.num_tasks_per_node:
test.num_tasks_per_node = int(test.default_num_cpus_per_node / test.num_cpus_per_task)

# num_cpus_per_task is not set, but num_tasks_per_node is
elif not test.num_cpus_per_task:
test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node)

else:
pass # both num_tasks_per_node and num_cpus_per_node are already set

test.num_tasks = test.num_nodes * test.num_tasks_per_node
log(f'Number of tasks per node set to: {test.num_tasks_per_node}')
log(f'Number of cpus per task set to {test.num_cpus_per_task}')
log(f'num_tasks set to {test.num_tasks}')


def _assign_one_task_per_numa_node(test: rfm.RegressionTest):
"""
Determines the number of tasks per node by dividing the default_num_cpus_per_node by
the number of cpus available per numa node, and rounding up. The result is that for full-node jobs the default
will spawn one task per numa node, with a number of cpus per task equal to the number of cpus per numa node.
Other examples:
- half a node (i.e. node_part=2) on a system with 4 numa nodes would result in 2 tasks per node,
with number of cpus per task equal to the number of cpus per numa node.
- a quarter node (i.e. node_part=4) on a system with 2 numa nodes would result in 1 task per node,
with number of cpus equal to half a numa node.
- 2 cores (i.e. default_num_cpus_per_node=2) on a system with 4 cores per numa node would result in
1 task per node, with 2 cpus per task
- 8 cores (i.e. default_num_cpus_per_node=4) on a system with 4 cores per numa node would result in
2 tasks per node, with 4 cpus per task

This default is set unless the test is run with:
--setvar num_tasks_per_node=<x> and/or
--setvar num_cpus_per_task=<y>.
In those cases, those take precedence, and the remaining variable (num_cpus_per task or
num_tasks_per_node respectively) is calculated based on the equality
test.num_tasks_per_node * test.num_cpus_per_task == test.default_num_cpus_per_node.

Default resources requested:
- num_tasks_per_node = default_num_cpus_per_node / num_cores_per_numa_node
- num_cpus_per_task = default_num_cpus_per_node / num_tasks_per_node
"""
# neither num_tasks_per_node nor num_cpus_per_task are set
if not test.num_tasks_per_node and not test.num_cpus_per_task:
check_proc_attribute_defined(test, 'num_cores_per_numa_node')
test.num_tasks_per_node = math.ceil(
test.default_num_cpus_per_node / test.current_partition.processor.num_cores_per_numa_node
)
test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node)

# num_tasks_per_node is not set, but num_cpus_per_task is
elif not test.num_tasks_per_node:
check_proc_attribute_defined(test, 'num_cpus')
check_proc_attribute_defined(test, 'num_sockets')
num_cpus_per_socket = test.current_partition.processor.num_cpus / test.current_partition.processor.num_sockets
test.num_tasks_per_node = int(test.default_num_cpus_per_node / test.num_cpus_per_task)

# num_cpus_per_task is not set, but num_tasks_per_node is
Expand Down
147 changes: 147 additions & 0 deletions eessi/testsuite/tests/apps/PyTorch/PyTorch_torchvision.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
from itertools import chain

import reframe as rfm
import reframe.utility.sanity as sn
# Added only to make the linter happy
from reframe.core.builtins import parameter, variable, run_after, sanity_function, performance_function

from eessi.testsuite import hooks
from eessi.testsuite.constants import SCALES, TAGS, DEVICE_TYPES, COMPUTE_UNIT, CPU, NUMA_NODE, GPU, INVALID_SYSTEM
from eessi.testsuite.utils import find_modules


class EESSI_PyTorch_torchvision(rfm.RunOnlyRegressionTest):
nn_model = parameter(['vgg16', 'resnet50', 'resnet152', 'densenet121', 'mobilenet_v3_large'])
scale = parameter(SCALES.keys())
parallel_strategy = parameter([None, 'ddp'])
compute_device = variable(str)
# Both torchvision and PyTorch-bundle modules have everything needed to run this test
module_name = parameter(chain(find_modules('torchvision'), find_modules('PyTorch-bundle')))

descr = 'Benchmark that runs a selected torchvision model on synthetic data'

executable = 'python'

valid_prog_environs = ['default']
valid_systems = ['*']

time_limit = '30m'

@run_after('init')
def prepare_test(self):

# Set nn_model as executable option
self.executable_opts = ['pytorch_synthetic_benchmark.py --model %s' % self.nn_model]

# If not a GPU run, disable CUDA
if self.compute_device != DEVICE_TYPES[GPU]:
self.executable_opts += ['--no-cuda']

@run_after('init')
def apply_init_hooks(self):
# Filter on which scales are supported by the partitions defined in the ReFrame configuration
hooks.filter_supported_scales(self)

# Make sure that GPU tests run in partitions that support running on a GPU,
# and that CPU-only tests run in partitions that support running CPU-only.
# Also support setting valid_systems on the cmd line.
hooks.filter_valid_systems_by_device_type(self, required_device_type=self.compute_device)

# Support selecting modules on the cmd line.
hooks.set_modules(self)

# Support selecting scales on the cmd line via tags.
hooks.set_tag_scale(self)

@run_after('init')
def set_tag_ci(self):
if self.nn_model == 'resnet50':
self.tags.add(TAGS['CI'])

@run_after('setup')
def apply_setup_hooks(self):
if self.compute_device == DEVICE_TYPES[GPU]:
hooks.assign_tasks_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT[GPU])
else:
# Hybrid code, so launch 1 rank per socket.
# Probably, launching 1 task per NUMA domain is even better, but the current hook doesn't support it
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess these comments no longer apply, as we are now launching 1 rank per numa node?

hooks.assign_tasks_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT[NUMA_NODE])

# This is a hybrid test, binding is important for performance
hooks.set_compact_process_binding(self)

@run_after('setup')
def set_ddp_env_vars(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def set_ddp_env_vars(self):
def set_ddp_options(self):

# Set environment variables for PyTorch DDP
if self.parallel_strategy == 'ddp':
# Set additional options required by DDP
self.executable_opts += ["--master-port $(python python_get_free_socket.py)"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was a bit confused by the name of this script. can we call it get_free_port.py instead?

self.executable_opts += ["--master-address $(hostname --fqdn)"]
self.executable_opts += ["--world-size %s" % self.num_tasks]

@run_after('setup')
def filter_invalid_parameter_combinations(self):
# We cannot detect this situation before the setup phase, because it requires self.num_tasks.
# Thus, the core count of the node needs to be known, which is only the case after the setup phase.
msg = "Skipping test: parallel strategy is 'None',"
msg += f" but requested process count is larger than one ({self.num_tasks})."
self.skip_if(self.num_tasks > 1 and self.parallel_strategy is None, msg)
msg = f"Skipping test: parallel strategy is {self.parallel_strategy},"
msg += " but only one process is requested."
self.skip_if(self.num_tasks == 1 and self.parallel_strategy is not None, msg)

@run_after('setup')
def pass_parallel_strategy(self):
# Set parallelization strategy when using more than one process
if self.num_tasks != 1:
self.executable_opts += ['--use-%s' % self.parallel_strategy]

@run_after('setup')
def avoid_horovod_cpu_contention(self):
# Horovod had issues with CPU performance, see https://github.com/horovod/horovod/issues/2804
# The root cause is Horovod having two threads with very high utilization, which interferes with
# the compute threads. It was fixed, but seems to be broken again in Horovod 0.28.1
# The easiest workaround is to reduce the number of compute threads by 2
if self.compute_device == DEVICE_TYPES[CPU] and self.parallel_strategy == 'horovod':
self.env_vars['OMP_NUM_THREADS'] = max(self.num_cpus_per_task - 2, 2) # Never go below 2 compute threads
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if you have only 1 or 2 cores? is it still better to have 2 compute threads in that case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take it out. We don't have parallel_strategy=horovod in this test anyway, that was a remnant of our (internal) test that I ported.

If we ever deploy horovod (with PyTorch support), we could bring this back. But then it'd be a horovod test, not a pytorch test :)


@sanity_function
def assert_num_ranks(self):
'''Assert that the number of reported CPUs/GPUs used is correct'''
return sn.assert_found(r'Total img/sec on %s .PU\(s\):.*' % self.num_tasks, self.stdout)

@performance_function('img/sec')
def total_throughput(self):
'''Total training throughput, aggregated over all CPUs/GPUs'''
return sn.extractsingle(r'Total img/sec on [0-9]+ .PU\(s\):\s+(?P<perf>\S+)', self.stdout, 'perf', float)

@performance_function('img/sec')
def througput_per_CPU(self):
'''Training througput per CPU'''
if self.compute_device == DEVICE_TYPES[CPU]:
return sn.extractsingle(r'Img/sec per CPU:\s+(?P<perf_per_cpu>\S+)', self.stdout, 'perf_per_cpu', float)
else:
return sn.extractsingle(r'Img/sec per GPU:\s+(?P<perf_per_gpu>\S+)', self.stdout, 'perf_per_gpu', float)


@rfm.simple_test
class EESSI_PyTorch_torchvision_CPU(EESSI_PyTorch_torchvision):
compute_device = DEVICE_TYPES[CPU]


@rfm.simple_test
class EESSI_PyTorch_torchvision_GPU(EESSI_PyTorch_torchvision):
compute_device = DEVICE_TYPES[GPU]
precision = parameter(['default', 'mixed'])

@run_after('init')
def prepare_gpu_test(self):
# Set precision
if self.precision == 'mixed':
self.executable_opts += ['--use-amp']

@run_after('init')
def skip_hvd_plus_amp(self):
'''Skip combination of horovod and AMP, it does not work see https://github.com/horovod/horovod/issues/1417'''
if self.parallel_strategy == 'horovod' and self.precision == 'mixed':
self.valid_systems = [INVALID_SYSTEM]
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Based on https://unix.stackexchange.com/a/132524
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('', 0))
addr = s.getsockname()
print(addr[1])
s.close()
Loading
Loading