Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated LSF scheduler to accept number of nodes #5153

Merged
merged 3 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 30 additions & 21 deletions aiida/schedulers/plugins/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,7 @@ class LsfJobResource(JobResource):
See https://www-01.ibm.com/support/knowledgecenter/SSETD4_9.1.2/lsf_command_ref/bsub.1.dita?lang=en
for more details about the parallel environment definition (the -m option of bsub).
"""
_default_fields = (
'parallel_env',
'tot_num_mpiprocs',
'default_mpiprocs_per_machine',
)
_default_fields = ('parallel_env', 'tot_num_mpiprocs', 'default_mpiprocs_per_machine', 'num_machines')

@classmethod
def validate_resources(cls, **kwargs):
Expand All @@ -116,23 +112,33 @@ def validate_resources(cls, **kwargs):
from aiida.common.exceptions import ConfigurationError

resources = AttributeDict()

resources.parallel_env = kwargs.pop('parallel_env', '')
resources.use_num_machines = kwargs.pop('use_num_machines', False)
num_machines = kwargs.pop('num_machines', None)
default_mpiprocs_per_machine = kwargs.pop('default_mpiprocs_per_machine', None)

if not isinstance(resources.parallel_env, str):
raise TypeError("When specified, 'parallel_env' must be a string")
raise TypeError('`parallel_env` must be a string')

if default_mpiprocs_per_machine is not None:
raise ConfigurationError('`default_mpiprocs_per_machine` cannot be set.')

if not resources.use_num_machines and num_machines is not None:
raise ConfigurationError('`num_machines` cannot be set unless `use_num_machines` is `True`.')

if resources.use_num_machines and num_machines is None:
raise ConfigurationError('must set `num_machines` when `use_num_machines` is `True`.')

if resources.use_num_machines and num_machines is not None:
try:
resources.num_machines = int(num_machines)
except (KeyError, ValueError):
raise TypeError('`num_machines` must be an integer')

try:
resources.tot_num_mpiprocs = int(kwargs.pop('tot_num_mpiprocs'))
except (KeyError, ValueError) as exc:
raise TypeError('tot_num_mpiprocs must be specified and must be an integer') from exc

default_mpiprocs_per_machine = kwargs.pop('default_mpiprocs_per_machine', None)
if default_mpiprocs_per_machine is not None:
raise ConfigurationError('default_mpiprocs_per_machine cannot be set for LSF scheduler')

num_machines = resources.pop('num_machines', None)
if num_machines is not None:
raise ConfigurationError('num_machines cannot be set for LSF scheduler')
raise TypeError('`tot_num_mpiprocs` must be specified and must be an integer') from exc

if resources.tot_num_mpiprocs <= 0:
raise ValueError('tot_num_mpiprocs must be >= 1')
Expand Down Expand Up @@ -387,11 +393,14 @@ def _get_submit_script_header(self, job_tmpl):
if not job_tmpl.job_resource:
raise ValueError('Job resources (as the tot_num_mpiprocs) are required for the LSF scheduler plugin')

lines.append(f'#BSUB -n {job_tmpl.job_resource.get_tot_num_mpiprocs()}')
# Note: make sure that PARALLEL_SCHED_BY_SLOT=Y is NOT
# defined in lsb.params (you can check with the output of bparams -l).
# Note: the -n option of bsub can also contain a maximum number of
# procs to be used
if job_tmpl.job_resource.use_num_machines:
lines.append(f'#BSUB -nnodes {job_tmpl.job_resource.num_machines}')
else:
lines.append(f'#BSUB -n {job_tmpl.job_resource.get_tot_num_mpiprocs()}')
# Note: make sure that PARALLEL_SCHED_BY_SLOT=Y is NOT
# defined in lsb.params (you can check with the output of bparams -l).
# Note: the -n option of bsub can also contain a maximum number of
# procs to be used
if job_tmpl.job_resource.parallel_env:
lines.append(f'#BSUB -m "{job_tmpl.job_resource.parallel_env}"')

Expand Down
35 changes: 17 additions & 18 deletions tests/schedulers/test_lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import pytest

from aiida.common.exceptions import ConfigurationError
from aiida.schedulers.datastructures import JobState
from aiida.schedulers.plugins.lsf import LsfScheduler
from aiida.schedulers.scheduler import SchedulerError
Expand Down Expand Up @@ -165,32 +166,30 @@ def test_submit_script_rerunnable():
assert '#BSUB -rn' in submit_script_text


def test_create_job_resource():
# yapf: disable
@pytest.mark.parametrize('kwargs, exception, message', (
({'tot_num_mpiprocs': 'Not-a-Number'}, TypeError, ''),
({'parallel_env': 0}, TypeError, 'parallel_env` must be a string'),
({'num_machines': 1}, ConfigurationError, '`num_machines` cannot be set unless `use_num_machines` is `True`.'),
({'use_num_machines': True}, ConfigurationError, 'must set `num_machines` when `use_num_machines` is `True`.'),
({'num_machines': 'string', 'use_num_machines': True}, TypeError, '`num_machines` must be an integer'),
({}, TypeError, '`tot_num_mpiprocs` must be specified and must be an integer'),
({'tot_num_mpiprocs': 'string'}, TypeError, '`tot_num_mpiprocs` must be specified and must be an integer'),
({'tot_num_mpiprocs': 0}, ValueError, 'tot_num_mpiprocs must be >= 1'),
({'default_mpiprocs_per_machine': 1}, ConfigurationError, '`default_mpiprocs_per_machine` cannot be set.'),
))
# yapf: enable
def test_create_job_resource(kwargs, exception, message):
"""
Test to verify that script fails in the following cases:
* if we specify only num_machines
* if tot_num_mpiprocs is not an int (and can't be casted to one)
* if parallel_env is not a str
"""
from aiida.schedulers.datastructures import JobTemplate

scheduler = LsfScheduler()
job_tmpl = JobTemplate()

with pytest.raises(TypeError):
job_tmpl.job_resource = scheduler.create_job_resource(tot_num_mpiprocs='Not-a-Number')

with pytest.raises(TypeError):
job_tmpl.job_resource = scheduler.create_job_resource(
num_machines=1,
num_mpiprocs_per_machine=1,
)

with pytest.raises(TypeError):
job_tmpl.job_resource = scheduler.create_job_resource(
tot_num_mpiprocs=2,
parallel_env=0,
)
with pytest.raises(exception, match=message):
scheduler.create_job_resource(**kwargs)


def test_submit_output():
Expand Down