Skip to content

Commit

Permalink
Remove unused codepath from HighThroughputExecutor scale_in (#3115)
Browse files Browse the repository at this point in the history
The codepath would be used when scale_in is called with:

  force=False
  max_idletime=None

and would pick blocks from the list of blocks which are currently idle.

This PR removes that unused codepath, merging the choice of
forced/non-forced scale-in and idle time specification into a single
parameter that indicates "do not scale in blocks that have not been
idle this long".

The two remaining cases from force=True/False, max_idletime=None/number
are:


* max_idletime=None, previously: force=True, max_idletime=None

This means that the requested number of blocks should be scaled in, even
if the blocks are not idle. The use case for this path is "parsl is shutting
down, so we need to tidy up everything. If there are tasks still running, we
don't care because terminating in-progress tasks is part of parsl shutdown."

* max_idletime=some time, previousy: force=False, max_idletime=some time

This means that the scaling code has decided to apply downwards pressure on
the number of blocks: there are more blocks than needed. However, this
pressure should not disrupt already running tasks, and it is less urgent to
cancel blocks, because the same call will happen every 5 seconds to keep
applying that pressure.
  • Loading branch information
benclifford authored Mar 1, 2024
1 parent cdfe5b7 commit 39ecb9c
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 26 deletions.
36 changes: 15 additions & 21 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ def create_monitoring_info(self, status):
def workers_per_node(self) -> Union[int, float]:
return self._workers_per_node

def scale_in(self, blocks, force=True, max_idletime=None):
def scale_in(self, blocks, max_idletime=None):
"""Scale in the number of active blocks by specified amount.
The scale in method here is very rude. It doesn't give the workers
Expand All @@ -681,18 +681,14 @@ def scale_in(self, blocks, force=True, max_idletime=None):
blocks : int
Number of blocks to terminate and scale_in by
force : Bool
Used along with blocks to indicate whether blocks should be terminated by force.
When force = True, we will kill blocks regardless of the blocks being busy
max_idletime: float
A time to indicate how long a block should be idle to be a
candidate for scaling in.
When force = False, only idle blocks will be terminated. If the
number of idle blocks < ``blocks``, then fewer than ``blocks``
blocks will be terminated.
If None then blocks will be force scaled in even if they are busy.
max_idletime: float
A time to indicate how long a block can be idle.
Used along with force = False to kill blocks that have been idle for that long.
If a float, then only idle blocks will be terminated, which may be less than
the requested number.
Returns
-------
Expand All @@ -712,18 +708,16 @@ def scale_in(self, blocks, force=True, max_idletime=None):

sorted_blocks = sorted(block_info.items(), key=lambda item: (item[1][1], item[1][0]))
logger.debug(f"Scale in selecting from {len(sorted_blocks)} blocks")
if force is True:
if max_idletime is None:
block_ids_to_kill = [x[0] for x in sorted_blocks[:blocks]]
else:
if not max_idletime:
block_ids_to_kill = [x[0] for x in sorted_blocks if x[1][0] == 0][:blocks]
else:
block_ids_to_kill = []
for x in sorted_blocks:
if x[1][1] > max_idletime and x[1][0] == 0:
block_ids_to_kill.append(x[0])
if len(block_ids_to_kill) == blocks:
break
block_ids_to_kill = []
for x in sorted_blocks:
if x[1][1] > max_idletime and x[1][0] == 0:
block_ids_to_kill.append(x[0])
if len(block_ids_to_kill) == blocks:
break

logger.debug("Selected idle block ids to kill: {}".format(
block_ids_to_kill))
if len(block_ids_to_kill) < blocks:
Expand Down
12 changes: 9 additions & 3 deletions parsl/jobs/job_status_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,17 @@ def status(self) -> Dict[str, JobStatus]:
def executor(self) -> BlockProviderExecutor:
return self._executor

def scale_in(self, n, force=True, max_idletime=None):
if force and not max_idletime:
def scale_in(self, n, max_idletime=None):

if max_idletime is None:
block_ids = self._executor.scale_in(n)
else:
block_ids = self._executor.scale_in(n, force=force, max_idletime=max_idletime)
# This is a HighThroughputExecutor-specific interface violation.
# This code hopes, through pan-codebase reasoning, that this
# scale_in method really does come from HighThroughputExecutor,
# and so does have an extra max_idletime parameter not present
# in the executor interface.
block_ids = self._executor.scale_in(n, max_idletime=max_idletime)
if block_ids is not None:
new_status = {}
for block_id in block_ids:
Expand Down
4 changes: 2 additions & 2 deletions parsl/jobs/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ def _general_strategy(self, status_list, *, strategy_type):
excess_slots = math.ceil(active_slots - (active_tasks * parallelism))
excess_blocks = math.ceil(float(excess_slots) / (tasks_per_node * nodes_per_block))
excess_blocks = min(excess_blocks, active_blocks - min_blocks)
logger.debug(f"Requesting scaling in by {excess_blocks} blocks")
exec_status.scale_in(excess_blocks, force=False, max_idletime=self.max_idletime)
logger.debug(f"Requesting scaling in by {excess_blocks} blocks with idle time {self.max_idletime}s")
exec_status.scale_in(excess_blocks, max_idletime=self.max_idletime)
else:
logger.error("This strategy does not support scaling in except for HighThroughputExecutor - taking no action")
else:
Expand Down

0 comments on commit 39ecb9c

Please sign in to comment.