Skip to content

Commit

Permalink
Deprecate max_workers for max_workers_per_node (#3117)
Browse files Browse the repository at this point in the history
The latter is less ambiguous than the former. We will support both for
the time being but raise a deprecation warning for `max_workers`. Also,
`max_workers_per_node` will take precedence if both are defined.
  • Loading branch information
rjmello authored Mar 4, 2024
1 parent d43201a commit af1d901
Show file tree
Hide file tree
Showing 46 changed files with 100 additions and 61 deletions.
2 changes: 1 addition & 1 deletion docs/teaching_scripts/test_apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from parsl import python_app, HighThroughputExecutor, Config
import parsl

parsl.load(Config(executors=[HighThroughputExecutor(label='htex_spawn', max_workers=1, address='127.0.0.1')]))
parsl.load(Config(executors=[HighThroughputExecutor(label='htex_spawn', max_workers_per_node=1, address='127.0.0.1')]))


# Part 1: Explain imports
Expand Down
2 changes: 1 addition & 1 deletion docs/userguide/configuring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ used by the infiniband interface with ``address_by_interface('ib0')``
HighThroughputExecutor(
label="frontera_htex",
address=address_by_interface('ib0'),
max_workers=56,
max_workers_per_node=56,
provider=SlurmProvider(
channel=LocalChannel(),
nodes_per_block=128,
Expand Down
2 changes: 1 addition & 1 deletion docs/userguide/monitoring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ configuration. Here the `parsl.monitoring.MonitoringHub` is specified to use por
HighThroughputExecutor(
label="local_htex",
cores_per_worker=1,
max_workers=4,
max_workers_per_node=4,
address=address_by_hostname(),
)
],
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/ASPIRE1.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
heartbeat_period=15,
heartbeat_threshold=120,
worker_debug=True,
max_workers=4,
max_workers_per_node=4,
address=address_by_interface('ib0'),
provider=PBSProProvider(
launcher=MpiRunLauncher(),
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/ad_hoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
executors=[
HighThroughputExecutor(
label='remote_htex',
max_workers=2,
max_workers_per_node=2,
worker_logdir_root=user_opts['adhoc']['script_dir'],
provider=AdHocProvider(
# Command to be run before starting a worker, such as:
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/bridges.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
HighThroughputExecutor(
label='Bridges_HTEX_multinode',
address=address_by_interface('ens3f0'),
max_workers=1,
max_workers_per_node=1,
provider=SlurmProvider(
'YOUR_PARTITION_NAME', # Specify Partition / QOS, for eg. RM-small
nodes_per_block=2,
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/cc_in2p3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
executors=[
HighThroughputExecutor(
label='cc_in2p3_htex',
max_workers=2,
max_workers_per_node=2,
provider=GridEngineProvider(
channel=LocalChannel(),
nodes_per_block=1,
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/expanse.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
executors=[
HighThroughputExecutor(
label='Expanse_CPU_Multinode',
max_workers=32,
max_workers_per_node=32,
provider=SlurmProvider(
'compute',
account='YOUR_ALLOCATION_ON_EXPANSE',
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/frontera.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
executors=[
HighThroughputExecutor(
label="frontera_htex",
max_workers=1, # Set number of workers per node
max_workers_per_node=1, # Set number of workers per node
provider=SlurmProvider(
cmd_timeout=60, # Add extra time for slow scheduler responses
channel=LocalChannel(),
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
HighThroughputExecutor(
label='kube-htex',
cores_per_worker=1,
max_workers=1,
max_workers_per_node=1,
worker_logdir_root='YOUR_WORK_DIR',

# Address for the pod worker to connect back
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/midway.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
label='Midway_HTEX_multinode',
address=address_by_interface('bond0'),
worker_debug=False,
max_workers=2,
max_workers_per_node=2,
provider=SlurmProvider(
'YOUR_PARTITION', # Partition name, e.g 'broadwl'
launcher=SrunLauncher(),
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/osg.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
executors=[
HighThroughputExecutor(
label='OSG_HTEX',
max_workers=1,
max_workers_per_node=1,
provider=CondorProvider(
nodes_per_block=1,
init_blocks=4,
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/stampede2.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
HighThroughputExecutor(
label='Stampede2_HTEX',
address=address_by_interface('em3'),
max_workers=2,
max_workers_per_node=2,
provider=SlurmProvider(
nodes_per_block=2,
init_blocks=1,
Expand Down
44 changes: 35 additions & 9 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Dict, Sequence
from typing import List, Optional, Tuple, Union, Callable
import math
import warnings

import parsl.launchers
from parsl.serialize import pack_res_spec_apply_message, deserialize
Expand Down Expand Up @@ -39,7 +40,7 @@

logger = logging.getLogger(__name__)

DEFAULT_LAUNCH_CMD = ("process_worker_pool.py {debug} {max_workers} "
DEFAULT_LAUNCH_CMD = ("process_worker_pool.py {debug} {max_workers_per_node} "
"-a {addresses} "
"-p {prefetch_capacity} "
"-c {cores_per_worker} "
Expand Down Expand Up @@ -154,7 +155,10 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin):
the there's sufficient memory for each worker. Default: None
max_workers : int
Caps the number of workers launched per node. Default: infinity
Deprecated. Please use max_workers_per_node instead.
max_workers_per_node : int
Caps the number of workers launched per node. Default: None
cpu_affinity: string
Whether or how each worker process sets thread affinity. Options include "none" to forgo
Expand Down Expand Up @@ -228,7 +232,8 @@ def __init__(self,
worker_debug: bool = False,
cores_per_worker: float = 1.0,
mem_per_worker: Optional[float] = None,
max_workers: Union[int, float] = float('inf'),
max_workers: Optional[Union[int, float]] = None,
max_workers_per_node: Optional[Union[int, float]] = None,
cpu_affinity: str = 'none',
available_accelerators: Union[int, Sequence[str]] = (),
prefetch_capacity: int = 0,
Expand All @@ -251,7 +256,6 @@ def __init__(self,
self.working_dir = working_dir
self.cores_per_worker = cores_per_worker
self.mem_per_worker = mem_per_worker
self.max_workers = max_workers
self.prefetch_capacity = prefetch_capacity
self.address = address
self.address_probe_timeout = address_probe_timeout
Expand All @@ -260,8 +264,12 @@ def __init__(self,
else:
self.all_addresses = ','.join(get_all_addresses())

mem_slots = max_workers
cpu_slots = max_workers
if max_workers:
self._warn_deprecated("max_workers", "max_workers_per_node")
self.max_workers_per_node = max_workers_per_node or max_workers or float("inf")

mem_slots = self.max_workers_per_node
cpu_slots = self.max_workers_per_node
if hasattr(self.provider, 'mem_per_node') and \
self.provider.mem_per_node is not None and \
mem_per_worker is not None and \
Expand All @@ -278,7 +286,7 @@ def __init__(self,
self.available_accelerators = list(available_accelerators)

# Determine the number of workers per node
self._workers_per_node = min(max_workers, mem_slots, cpu_slots)
self._workers_per_node = min(self.max_workers_per_node, mem_slots, cpu_slots)
if len(self.available_accelerators) > 0:
self._workers_per_node = min(self._workers_per_node, len(available_accelerators))
if self._workers_per_node == float('inf'):
Expand Down Expand Up @@ -316,6 +324,24 @@ def __init__(self,

radio_mode = "htex"

def _warn_deprecated(self, old: str, new: str):
warnings.warn(
f"{old} is deprecated and will be removed in a future release. "
"Please use {new} instead.",
DeprecationWarning,
stacklevel=2
)

@property
def max_workers(self):
self._warn_deprecated("max_workers", "max_workers_per_node")
return self.max_workers_per_node

@max_workers.setter
def max_workers(self, val: Union[int, float]):
self._warn_deprecated("max_workers", "max_workers_per_node")
self.max_workers_per_node = val

@property
def logdir(self):
return "{}/{}".format(self.run_dir, self.label)
Expand All @@ -330,7 +356,7 @@ def initialize_scaling(self):
"""Compose the launch command and scale out the initial blocks.
"""
debug_opts = "--debug" if self.worker_debug else ""
max_workers = "" if self.max_workers == float('inf') else "--max_workers={}".format(self.max_workers)
max_workers_per_node = "" if self.max_workers_per_node == float('inf') else "--max_workers_per_node={}".format(self.max_workers_per_node)
enable_mpi_opts = "--enable_mpi_mode " if self.enable_mpi_mode else ""

address_probe_timeout_string = ""
Expand All @@ -345,7 +371,7 @@ def initialize_scaling(self):
result_port=self.worker_result_port,
cores_per_worker=self.cores_per_worker,
mem_per_worker=self.mem_per_worker,
max_workers=max_workers,
max_workers_per_node=max_workers_per_node,
nodes_per_block=self.provider.nodes_per_block,
heartbeat_period=self.heartbeat_period,
heartbeat_threshold=self.heartbeat_threshold,
Expand Down
18 changes: 9 additions & 9 deletions parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self, *,
result_port,
cores_per_worker,
mem_per_worker,
max_workers,
max_workers_per_node,
prefetch_capacity,
uid,
block_id,
Expand Down Expand Up @@ -100,8 +100,8 @@ def __init__(self, *,
the there's sufficient memory for each worker. If set to None, memory on node is not
considered in the determination of workers to be launched on node by the manager.
max_workers : int
caps the maximum number of workers that can be launched.
max_workers_per_node : int | float
Caps the maximum number of workers that can be launched.
prefetch_capacity : int
Number of tasks that could be prefetched over available worker capacity.
Expand Down Expand Up @@ -190,15 +190,15 @@ def __init__(self, *,
else:
available_mem_on_node = round(psutil.virtual_memory().available / (2**30), 1)

self.max_workers = max_workers
self.max_workers_per_node = max_workers_per_node
self.prefetch_capacity = prefetch_capacity

mem_slots = max_workers
mem_slots = max_workers_per_node
# Avoid a divide by 0 error.
if mem_per_worker and mem_per_worker > 0:
mem_slots = math.floor(available_mem_on_node / mem_per_worker)

self.worker_count: int = min(max_workers,
self.worker_count: int = min(max_workers_per_node,
mem_slots,
math.floor(cores_on_node / cores_per_worker))

Expand Down Expand Up @@ -793,7 +793,7 @@ def start_file_logger(filename, rank, name='parsl', level=logging.DEBUG, format_
help="GB of memory assigned to each worker process. Default=0, no assignment")
parser.add_argument("-t", "--task_port", required=True,
help="REQUIRED: Task port for receiving tasks from the interchange")
parser.add_argument("--max_workers", default=float('inf'),
parser.add_argument("--max_workers_per_node", default=float('inf'),
help="Caps the maximum workers that can be launched, default:infinity")
parser.add_argument("-p", "--prefetch_capacity", default=0,
help="Number of tasks that can be prefetched to the manager. Default is 0.")
Expand Down Expand Up @@ -847,7 +847,7 @@ def strategyorlist(s: str):
logger.info("task_port: {}".format(args.task_port))
logger.info("result_port: {}".format(args.result_port))
logger.info("addresses: {}".format(args.addresses))
logger.info("max_workers: {}".format(args.max_workers))
logger.info("max_workers_per_node: {}".format(args.max_workers_per_node))
logger.info("poll_period: {}".format(args.poll))
logger.info("address_probe_timeout: {}".format(args.address_probe_timeout))
logger.info("Prefetch capacity: {}".format(args.prefetch_capacity))
Expand All @@ -866,7 +866,7 @@ def strategyorlist(s: str):
block_id=args.block_id,
cores_per_worker=float(args.cores_per_worker),
mem_per_worker=None if args.mem_per_worker == 'None' else float(args.mem_per_worker),
max_workers=args.max_workers if args.max_workers == float('inf') else int(args.max_workers),
max_workers_per_node=args.max_workers_per_node if args.max_workers_per_node == float('inf') else int(args.max_workers_per_node),
prefetch_capacity=int(args.prefetch_capacity),
heartbeat_threshold=int(args.hb_threshold),
heartbeat_period=int(args.hb_period),
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/configs/ad_hoc_cluster_htex.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
executors=[
HighThroughputExecutor(
label='remote_htex',
max_workers=2,
max_workers_per_node=2,
worker_logdir_root=user_opts['adhoc']['script_dir'],
encrypted=True,
provider=AdHocProvider(
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/configs/bluewaters.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def fresh_config():
label="bw_htex",
cores_per_worker=1,
worker_debug=False,
max_workers=1,
max_workers_per_node=1,
encrypted=True,
provider=TorqueProvider(
queue='normal',
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/configs/bridges.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def fresh_config():
# This is the network interface on the login node to
# which compute nodes can communicate
# address=address_by_interface('bond0.144'),
max_workers=1,
max_workers_per_node=1,
encrypted=True,
provider=SlurmProvider(
user_opts['bridges']['partition'], # Partition / QOS
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/configs/cc_in2p3.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def fresh_config():
executors=[
HighThroughputExecutor(
label='cc_in2p3_htex',
max_workers=1,
max_workers_per_node=1,
encrypted=True,
provider=GridEngineProvider(
channel=LocalChannel(),
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/configs/comet.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def fresh_config():
executors=[
HighThroughputExecutor(
label='Comet_HTEX_multinode',
max_workers=1,
max_workers_per_node=1,
encrypted=True,
provider=SlurmProvider(
'debug',
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/configs/frontera.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def fresh_config():
executors=[
HighThroughputExecutor(
label="frontera_htex",
max_workers=1,
max_workers_per_node=1,
encrypted=True,
provider=SlurmProvider(
cmd_timeout=60, # Add extra time for slow scheduler responses
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/configs/midway.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def fresh_config():
HighThroughputExecutor(
label='Midway_HTEX_multinode',
worker_debug=False,
max_workers=1,
max_workers_per_node=1,
encrypted=True,
provider=SlurmProvider(
'broadwl', # Partition name, e.g 'broadwl'
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/configs/nscc_singapore.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def fresh_config():
heartbeat_period=15,
heartbeat_threshold=120,
worker_debug=False,
max_workers=1,
max_workers_per_node=1,
address=address_by_interface('ib0'),
encrypted=True,
provider=PBSProProvider(
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/configs/osg_htex.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
executors=[
HighThroughputExecutor(
label='OSG_HTEX',
max_workers=1,
max_workers_per_node=1,
encrypted=True,
provider=CondorProvider(
nodes_per_block=1,
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/configs/petrelkube.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def fresh_config():
HighThroughputExecutor(
label='kube-htex',
cores_per_worker=1,
max_workers=1,
max_workers_per_node=1,
worker_logdir_root='.',

# Address for the pod worker to connect back
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/configs/summit.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def fresh_config():

# address=address_by_interface('ib0'), # This assumes Parsl is running on login node
worker_port_range=(50000, 55000),
max_workers=1,
max_workers_per_node=1,
encrypted=True,
provider=LSFProvider(
launcher=JsrunLauncher(),
Expand Down
Loading

0 comments on commit af1d901

Please sign in to comment.