Skip to content

Commit

Permalink
Merge pull request #4245 from grondo/node-exclusive
Browse files Browse the repository at this point in the history
support node exclusive allocations
  • Loading branch information
mergify[bot] authored Mar 31, 2022
2 parents d483acd + 8f32ad3 commit 41cb8ea
Show file tree
Hide file tree
Showing 16 changed files with 474 additions and 197 deletions.
11 changes: 9 additions & 2 deletions doc/man1/flux-mini.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ as a parallel job, while **batch** and **alloc** submit a script or launch
a command as the initial program of a new Flux instance.

If *--ntasks* is unspecified, a value of *N=1* is assumed. Commands that
take *--nslots* have no default and require that *--nslots* be explicitly
specified.
take *--nslots* have no default and require that *--nslots* or *--nodes*
be specified.

The **submit** and **batch** commands enqueue the job and print its numerical
Job ID on standard output.
Expand Down Expand Up @@ -98,6 +98,13 @@ following additional job parameters:
than there are tasks. If unspecified, the number of nodes will be chosen
by the scheduler.

**--exclusive**
Indicate to the scheduler that nodes should be exclusively allocated to
this job. It is an error to specify this option without also using
*-N, --nodes*. If *--nodes* is specified without *--nslots* or *--ntasks*,
then this option will be enabled by default and the number of tasks
or slots will be set to the number of requested nodes.

**-t, --time-limit=FSD**
Set a time limit for the job in Flux standard duration (RFC 23).
FSD is a floating point number with a single character units suffix
Expand Down
24 changes: 21 additions & 3 deletions src/bindings/python/flux/job/Jobspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def _validate_system_attributes(system):
_validate_constraint(system["constraints"])

@staticmethod
def _create_resource(res_type, count, with_child=None):
def _create_resource(res_type, count, with_child=None, exclusive=False):
if with_child is not None and not isinstance(with_child, abc.Sequence):
raise TypeError("child resource must None or a sequence")
if with_child is not None and isinstance(with_child, str):
Expand All @@ -318,6 +318,9 @@ def _create_resource(res_type, count, with_child=None):

res = {"type": res_type, "count": count}

if exclusive:
res["exclusive"] = True

if with_child:
res["with"] = with_child
return res
Expand Down Expand Up @@ -639,8 +642,15 @@ def _v1_validate(resources, tasks, kwargs):
raise ValueError("attributes.system.duration must be a number")

@classmethod
# pylint: disable=too-many-branches
def from_command(
cls, command, num_tasks=1, cores_per_task=1, gpus_per_task=None, num_nodes=None
cls,
command,
num_tasks=1,
cores_per_task=1,
gpus_per_task=None,
num_nodes=None,
exclusive=False,
):
"""
Factory function that builds the minimum legal v1 jobspec.
Expand All @@ -666,6 +676,8 @@ def from_command(
raise ValueError("node count must be an integer >= 1 (if set)")
if num_nodes > num_tasks:
raise ValueError("node count must not be greater than task count")
elif exclusive:
raise ValueError("exclusive can only be set with a node count")
children = [cls._create_resource("core", cores_per_task)]
if gpus_per_task not in (None, 0):
children.append(cls._create_resource("gpu", gpus_per_task))
Expand All @@ -677,7 +689,9 @@ def from_command(
else:
task_count_dict = {"per_slot": 1}
slot = cls._create_slot("task", num_slots, children)
resource_section = cls._create_resource("node", num_nodes, [slot])
resource_section = cls._create_resource(
"node", num_nodes, [slot], exclusive
)
else:
task_count_dict = {"per_slot": 1}
slot = cls._create_slot("task", num_tasks, children)
Expand All @@ -699,6 +713,7 @@ def from_batch_command(
gpus_per_slot=None,
num_nodes=None,
broker_opts=None,
exclusive=False,
):
"""Create a Jobspec describing a nested Flux instance controlled by a script.
Expand Down Expand Up @@ -738,6 +753,7 @@ def from_batch_command(
cores_per_task=cores_per_slot,
gpus_per_task=gpus_per_slot,
num_nodes=num_nodes,
exclusive=exclusive,
)
jobspec.setattr_shell_option("per-resource.type", "node")
# Copy script contents into jobspec
Expand All @@ -755,6 +771,7 @@ def from_nest_command(
gpus_per_slot=None,
num_nodes=None,
broker_opts=None,
exclusive=False,
):
"""Create a Jobspec describing a nested Flux instance controlled by `command`.
Expand Down Expand Up @@ -783,6 +800,7 @@ def from_nest_command(
cores_per_task=cores_per_slot,
gpus_per_task=gpus_per_slot,
num_nodes=num_nodes,
exclusive=exclusive,
)
jobspec.setattr_shell_option("per-resource.type", "node")
return jobspec
7 changes: 4 additions & 3 deletions src/bindings/python/flux/resource/Rlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,10 @@ def copy_constraint(self, constraint):
error = ffi.new("flux_error_t *")
if not isinstance(constraint, str):
constraint = json.dumps(constraint)
handle = self.pimpl.copy_constraint_string(constraint, error)
if not handle:
try:
handle = self.pimpl.copy_constraint_string(constraint, error)
except OSError as exc:
raise ValueError(
"copy_constraint: " + ffi.string(error.text).decode("utf-8")
)
) from exc
return Rlist(handle=handle)
38 changes: 35 additions & 3 deletions src/cmd/flux-mini.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,6 @@ def __init__(self):
"-n",
"--ntasks",
metavar="N",
default="1",
help="Number of tasks to start",
)
self.parser.add_argument(
Expand All @@ -725,6 +724,11 @@ def __init__(self):
metavar="N",
help="Number of GPUs to allocate per task",
)
self.parser.add_argument(
"--exclusive",
action="store_true",
help="With -N, --nodes, allocate nodes exclusively",
)
self.parser.add_argument(
"-v",
"--verbose",
Expand All @@ -737,6 +741,15 @@ def init_jobspec(self, args):
if not args.command:
raise ValueError("job command and arguments are missing")

# If ntasks not set, then set it to either node count, with
# exclusive flag enabled, or to 1 (the default).
if not args.ntasks:
if args.nodes:
args.ntasks = args.nodes
args.exclusive = True
else:
args.ntasks = 1

# Ensure integer args are converted to int() here.
# This is done because we do not use type=int in argparse in order
# to allow these options to be mutable for bulksubmit:
Expand All @@ -756,6 +769,7 @@ def init_jobspec(self, args):
cores_per_task=args.cores_per_task,
gpus_per_task=args.gpus_per_task,
num_nodes=args.nodes,
exclusive=args.exclusive,
)

def run_and_exit(self):
Expand Down Expand Up @@ -1445,6 +1459,11 @@ def add_batch_alloc_args(parser):
metavar="N",
help="Distribute allocated resource slots across N individual nodes",
)
parser.add_argument(
"--exclusive",
action="store_true",
help="With --nodes, allocate nodes exclusively",
)


def list_split(opts):
Expand Down Expand Up @@ -1499,8 +1518,14 @@ def read_script(args):

def init_jobspec(self, args):
# If no script (reading from stdin), then use "flux" as arg[0]

# If number of slots not specified, then set it to node count
# if set, otherwise raise an error.
if not args.nslots:
raise ValueError("Number of slots to allocate must be specified")
if not args.nodes:
raise ValueError("Number of slots to allocate must be specified")
args.nslots = args.nodes
args.exclusive = True

jobspec = JobspecV1.from_batch_command(
script=self.read_script(args),
Expand All @@ -1511,6 +1536,7 @@ def init_jobspec(self, args):
gpus_per_slot=args.gpus_per_slot,
num_nodes=args.nodes,
broker_opts=list_split(args.broker_opts),
exclusive=args.exclusive,
)

# Default output is flux-{{jobid}}.out
Expand Down Expand Up @@ -1544,8 +1570,13 @@ def __init__(self):

def init_jobspec(self, args):

# If number of slots not specified, then set it to node count
# if set, otherwise raise an error.
if not args.nslots:
raise ValueError("Number of slots to allocate must be specified")
if not args.nodes:
raise ValueError("Number of slots to allocate must be specified")
args.nslots = args.nodes
args.exclusive = True

jobspec = JobspecV1.from_nest_command(
command=args.COMMAND,
Expand All @@ -1554,6 +1585,7 @@ def init_jobspec(self, args):
gpus_per_slot=args.gpus_per_slot,
num_nodes=args.nodes,
broker_opts=list_split(args.broker_opts),
exclusive=args.exclusive,
)
if sys.stdin.isatty():
jobspec.setattr_shell_option("pty.interactive", 1)
Expand Down
Loading

0 comments on commit 41cb8ea

Please sign in to comment.