diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 0815cac4d2..47bd98ff08 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -668,7 +668,7 @@ def create_monitoring_info(self, status): def workers_per_node(self) -> Union[int, float]: return self._workers_per_node - def scale_in(self, blocks, force=True, max_idletime=None): + def scale_in(self, blocks, max_idletime=None): """Scale in the number of active blocks by specified amount. The scale in method here is very rude. It doesn't give the workers @@ -681,18 +681,14 @@ def scale_in(self, blocks, force=True, max_idletime=None): blocks : int Number of blocks to terminate and scale_in by - force : Bool - Used along with blocks to indicate whether blocks should be terminated by force. - - When force = True, we will kill blocks regardless of the blocks being busy + max_idletime: float + A time to indicate how long a block should be idle to be a + candidate for scaling in. - When force = False, only idle blocks will be terminated. If the - number of idle blocks < ``blocks``, then fewer than ``blocks`` - blocks will be terminated. + If None then blocks will be force scaled in even if they are busy. - max_idletime: float - A time to indicate how long a block can be idle. - Used along with force = False to kill blocks that have been idle for that long. + If a float, then only idle blocks will be terminated, which may be less than + the requested number. Returns ------- @@ -712,18 +708,16 @@ def scale_in(self, blocks, force=True, max_idletime=None): sorted_blocks = sorted(block_info.items(), key=lambda item: (item[1][1], item[1][0])) logger.debug(f"Scale in selecting from {len(sorted_blocks)} blocks") - if force is True: + if max_idletime is None: block_ids_to_kill = [x[0] for x in sorted_blocks[:blocks]] else: - if not max_idletime: - block_ids_to_kill = [x[0] for x in sorted_blocks if x[1][0] == 0][:blocks] - else: - block_ids_to_kill = [] - for x in sorted_blocks: - if x[1][1] > max_idletime and x[1][0] == 0: - block_ids_to_kill.append(x[0]) - if len(block_ids_to_kill) == blocks: - break + block_ids_to_kill = [] + for x in sorted_blocks: + if x[1][1] > max_idletime and x[1][0] == 0: + block_ids_to_kill.append(x[0]) + if len(block_ids_to_kill) == blocks: + break + logger.debug("Selected idle block ids to kill: {}".format( block_ids_to_kill)) if len(block_ids_to_kill) < blocks: diff --git a/parsl/jobs/job_status_poller.py b/parsl/jobs/job_status_poller.py index 4a9132b8a9..8663d7b9be 100644 --- a/parsl/jobs/job_status_poller.py +++ b/parsl/jobs/job_status_poller.py @@ -72,11 +72,17 @@ def status(self) -> Dict[str, JobStatus]: def executor(self) -> BlockProviderExecutor: return self._executor - def scale_in(self, n, force=True, max_idletime=None): - if force and not max_idletime: + def scale_in(self, n, max_idletime=None): + + if max_idletime is None: block_ids = self._executor.scale_in(n) else: - block_ids = self._executor.scale_in(n, force=force, max_idletime=max_idletime) + # This is a HighThroughputExecutor-specific interface violation. + # This code hopes, through pan-codebase reasoning, that this + # scale_in method really does come from HighThroughputExecutor, + # and so does have an extra max_idletime parameter not present + # in the executor interface. + block_ids = self._executor.scale_in(n, max_idletime=max_idletime) if block_ids is not None: new_status = {} for block_id in block_ids: diff --git a/parsl/jobs/strategy.py b/parsl/jobs/strategy.py index beb5e50249..b396d43e37 100644 --- a/parsl/jobs/strategy.py +++ b/parsl/jobs/strategy.py @@ -288,8 +288,8 @@ def _general_strategy(self, status_list, *, strategy_type): excess_slots = math.ceil(active_slots - (active_tasks * parallelism)) excess_blocks = math.ceil(float(excess_slots) / (tasks_per_node * nodes_per_block)) excess_blocks = min(excess_blocks, active_blocks - min_blocks) - logger.debug(f"Requesting scaling in by {excess_blocks} blocks") - exec_status.scale_in(excess_blocks, force=False, max_idletime=self.max_idletime) + logger.debug(f"Requesting scaling in by {excess_blocks} blocks with idle time {self.max_idletime}s") + exec_status.scale_in(excess_blocks, max_idletime=self.max_idletime) else: logger.error("This strategy does not support scaling in except for HighThroughputExecutor - taking no action") else: