Skip to content

Commit

Permalink
Merge pull request DIRACGrid#7194 from chrisburr/safer-set-nproc
Browse files Browse the repository at this point in the history
[v8.1] Make Job.setNumberOfProcessors safer to use
  • Loading branch information
fstagni authored Sep 12, 2023
2 parents 1a9a5e3 + 6066dee commit df6a89f
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 58 deletions.
86 changes: 38 additions & 48 deletions src/DIRAC/Interfaces/API/Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
COMPONENT_NAME = "/Interfaces/API/Job"


class BadJobParameterError(ValueError):
pass


class Job(API):
"""DIRAC jobs"""

Expand Down Expand Up @@ -517,7 +521,10 @@ def setDestination(self, destination):

#############################################################################
def setNumberOfProcessors(self, numberOfProcessors=None, minNumberOfProcessors=None, maxNumberOfProcessors=None):
"""Helper function.
"""Helper function to set the number of processors required by the job.
The DIRAC_JOB_PROCESSORS environment variable can be used by the job to
determine how many processors are actually assigned.
Example usage:
Expand Down Expand Up @@ -551,75 +558,50 @@ def setNumberOfProcessors(self, numberOfProcessors=None, minNumberOfProcessors=N
>>> job = Job()
>>> job.setNumberOfProcessors(numberOfProcessors=3, maxNumberOfProcessors=4)
will lead to ignore the second parameter
will result in a BadJobParameterError
>>> job = Job()
>>> job.setNumberOfProcessors(numberOfProcessors=3, minNumberOfProcessors=2)
will lead to ignore the second parameter
will result in a BadJobParameterError
:param int processors: number of processors required by the job (exact number, unless a min/max are set)
:param int minNumberOfProcessors: optional min number of processors the job applications can use
:param int maxNumberOfProcessors: optional max number of processors the job applications can use
:return: S_OK/S_ERROR
"""
if numberOfProcessors:
if not minNumberOfProcessors:
nProc = numberOfProcessors
else:
nProc = max(numberOfProcessors, minNumberOfProcessors)
if nProc > 1:
self._addParameter(
self.workflow, "NumberOfProcessors", "JDL", nProc, "Exact number of processors requested"
)
self._addParameter(
self.workflow,
"MaxNumberOfProcessors",
"JDL",
nProc,
"Max Number of processors the job applications may use",
)
return S_OK()
:return: S_OK
if maxNumberOfProcessors and not minNumberOfProcessors:
minNumberOfProcessors = 1
:raises BadJobParameterError: If the function arguments are not valid.
"""
# If min and max are the same then that's the same as setting numberOfProcessors
if minNumberOfProcessors and maxNumberOfProcessors and minNumberOfProcessors == maxNumberOfProcessors:
numberOfProcessors = minNumberOfProcessors
minNumberOfProcessors = maxNumberOfProcessors = None

if minNumberOfProcessors and maxNumberOfProcessors and minNumberOfProcessors >= maxNumberOfProcessors:
minNumberOfProcessors = maxNumberOfProcessors
if numberOfProcessors is not None:
if minNumberOfProcessors is not None:
raise BadJobParameterError("minNumberOfProcessors cannot be used with numberOfProcessors")
if maxNumberOfProcessors is not None:
raise BadJobParameterError("maxNumberOfProcessors cannot be used with numberOfProcessors")

if (
minNumberOfProcessors
and maxNumberOfProcessors
and minNumberOfProcessors == maxNumberOfProcessors
and minNumberOfProcessors > 1
):
self._addParameter(
self.workflow,
"NumberOfProcessors",
"JDL",
minNumberOfProcessors,
"Exact number of processors requested",
self.workflow, "NumberOfProcessors", "JDL", numberOfProcessors, "Exact number of processors requested"
)
self._addParameter(
self.workflow,
"MaxNumberOfProcessors",
"JDL",
minNumberOfProcessors,
numberOfProcessors,
"Max Number of processors the job applications may use",
)
return S_OK()

# By this point there should be a min
self._addParameter(
self.workflow,
"MinNumberOfProcessors",
"JDL",
minNumberOfProcessors,
"Min Number of processors the job applications may use",
)
if minNumberOfProcessors is None and maxNumberOfProcessors is None:
return S_OK()

# If not set, will be "all"
if maxNumberOfProcessors:
minNumberOfProcessors = minNumberOfProcessors or 1
if maxNumberOfProcessors is not None:
if maxNumberOfProcessors < minNumberOfProcessors:
raise BadJobParameterError("minNumberOfProcessors must be less than or equal to maxNumberOfProcessors")
self._addParameter(
self.workflow,
"MaxNumberOfProcessors",
Expand All @@ -628,6 +610,14 @@ def setNumberOfProcessors(self, numberOfProcessors=None, minNumberOfProcessors=N
"Max Number of processors the job applications may use",
)

self._addParameter(
self.workflow,
"MinNumberOfProcessors",
"JDL",
minNumberOfProcessors,
"Min Number of processors the job applications may use",
)

return S_OK()

#############################################################################
Expand Down
32 changes: 22 additions & 10 deletions src/DIRAC/Interfaces/API/test/Test_JobAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pytest

from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
from DIRAC.Interfaces.API.Job import Job
from DIRAC.Interfaces.API.Job import Job, BadJobParameterError


def test_basicJob():
Expand Down Expand Up @@ -74,31 +74,43 @@ def test_SimpleParametricJob():
"proc, minProc, maxProc, expectedProc, expectedMinProc, expectedMaxProc",
[
(4, None, None, 4, None, 4),
(4, 2, None, 4, None, 4),
(4, 2, 8, 4, None, 4),
(4, 8, 6, 8, None, 8), # non-sense
(None, 2, 8, None, 2, 8),
(None, 1, None, None, 1, None),
(None, None, 8, None, 1, 8),
(None, 8, 8, 8, None, 8),
(None, 12, 8, 8, None, 8), # non-sense
],
)
def test_setNumberOfProcessors(proc, minProc, maxProc, expectedProc, expectedMinProc, expectedMaxProc):
# Arrange
def test_setNumberOfProcessors_successful(proc, minProc, maxProc, expectedProc, expectedMinProc, expectedMaxProc):
job = Job()

# Act
res = job.setNumberOfProcessors(proc, minProc, maxProc)

# Assert
assert res["OK"], res["Message"]
jobDescription = ClassAd(f"[{job._toJDL()}]")
assert expectedProc == jobDescription.getAttributeInt("NumberOfProcessors")
assert expectedMinProc == jobDescription.getAttributeInt("MinNumberOfProcessors")
assert expectedMaxProc == jobDescription.getAttributeInt("MaxNumberOfProcessors")


@pytest.mark.parametrize(
"proc, minProc, maxProc",
[
(4, 2, None),
(4, 2, 8),
(4, 8, 6),
(None, 12, 8),
],
)
def test_setNumberOfProcessors_unsuccessful(proc, minProc, maxProc):
job = Job()
with pytest.raises(BadJobParameterError):
job.setNumberOfProcessors(proc, minProc, maxProc)

jobDescription = ClassAd(f"[{job._toJDL()}]")
assert jobDescription.getAttributeInt("NumberOfProcessors") is None
assert jobDescription.getAttributeInt("MinNumberOfProcessors") is None
assert jobDescription.getAttributeInt("MaxNumberOfProcessors") is None


@pytest.mark.parametrize(
"sites, expectedSites",
[
Expand Down

0 comments on commit df6a89f

Please sign in to comment.