From cb346434bf5126ddd90a4dc0951504934088d1c2 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 27 Mar 2024 09:02:00 -0500 Subject: [PATCH] Handle init_blocks in scaling strategy, rather than special-casing it (#3283) This is part of issue #3278 tidying up job and block management. Now init_blocks scale out happens on the first strategy poll, not at executor start - that will often delay init_blocks scaling by one strategy poll period compared to before this PR. --- parsl/dataflow/dflow.py | 9 +------- parsl/executors/base.py | 2 +- parsl/executors/high_throughput/executor.py | 13 +----------- parsl/executors/taskvine/executor.py | 7 ------- parsl/executors/workqueue/executor.py | 7 ------- parsl/jobs/job_status_poller.py | 1 + parsl/jobs/strategy.py | 23 ++++++++++++++++----- parsl/tests/test_htex/test_drain.py | 1 + 8 files changed, 23 insertions(+), 40 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 91f11101c2..52d61fbe42 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1141,14 +1141,7 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: self._create_remote_dirs_over_channel(executor.provider, executor.provider.channel) self.executors[executor.label] = executor - block_ids = executor.start() - if self.monitoring and block_ids: - new_status = {} - for bid in block_ids: - new_status[bid] = JobStatus(JobState.PENDING) - msg = executor.create_monitoring_info(new_status) - logger.debug("Sending monitoring message {} to hub from DFK".format(msg)) - self.monitoring.send(MessageType.BLOCK_INFO, msg) + executor.start() block_executors = [e for e in executors if isinstance(e, BlockProviderExecutor)] self.job_status_poller.add_executors(block_executors) diff --git a/parsl/executors/base.py b/parsl/executors/base.py index 245d86312a..909e5efef6 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -53,7 +53,7 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]: return False @abstractmethod - def start(self) -> Optional[List[str]]: + def start(self) -> None: """Start the executor. Any spin-up operations (for example: starting thread pools) should be performed here. diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 795c229a42..cfbb096733 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -400,16 +400,6 @@ def initialize_scaling(self): logger.debug("Starting HighThroughputExecutor with provider:\n%s", self.provider) - # TODO: why is this a provider property? - block_ids = [] - if hasattr(self.provider, 'init_blocks'): - try: - block_ids = self.scale_out(blocks=self.provider.init_blocks) - except Exception as e: - logger.error("Scaling out failed: {}".format(e)) - raise e - return block_ids - def start(self): """Create the Interchange process and connect to it. """ @@ -439,8 +429,7 @@ def start(self): logger.debug("Created management thread: {}".format(self._queue_management_thread)) - block_ids = self.initialize_scaling() - return block_ids + self.initialize_scaling() @wrap_with_logs def _queue_management_worker(self): diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index cc41c885f2..cf07cdd763 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -586,13 +586,6 @@ def initialize_scaling(self): self._worker_command = self._construct_worker_command() self._patch_providers() - if hasattr(self.provider, 'init_blocks'): - try: - self.scale_out(blocks=self.provider.init_blocks) - except Exception as e: - logger.error("Initial block scaling out failed: {}".format(e)) - raise e - @property def outstanding(self) -> int: """Count the number of outstanding tasks.""" diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index ea3c5c87b7..e0df148dcb 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -675,13 +675,6 @@ def initialize_scaling(self): self.worker_command = self._construct_worker_command() self._patch_providers() - if hasattr(self.provider, 'init_blocks'): - try: - self.scale_out(blocks=self.provider.init_blocks) - except Exception as e: - logger.error("Initial block scaling out failed: {}".format(e)) - raise e - @property def outstanding(self) -> int: """Count the number of outstanding tasks. This is inefficiently diff --git a/parsl/jobs/job_status_poller.py b/parsl/jobs/job_status_poller.py index b9dc1f01fe..0709a17d30 100644 --- a/parsl/jobs/job_status_poller.py +++ b/parsl/jobs/job_status_poller.py @@ -23,6 +23,7 @@ def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflo self._interval = executor.status_polling_interval self._last_poll_time = 0.0 self._status = {} # type: Dict[str, JobStatus] + self.first = True # Create a ZMQ channel to send poll status to monitoring self.monitoring_enabled = False diff --git a/parsl/jobs/strategy.py b/parsl/jobs/strategy.py index b04b4eb6c6..02519bd456 100644 --- a/parsl/jobs/strategy.py +++ b/parsl/jobs/strategy.py @@ -129,8 +129,8 @@ def __init__(self, *, strategy: Optional[str], max_idletime: float) -> None: self.executors = {} self.max_idletime = max_idletime - self.strategies = {None: self._strategy_noop, - 'none': self._strategy_noop, + self.strategies = {None: self._strategy_init_only, + 'none': self._strategy_init_only, 'simple': self._strategy_simple, 'htex_auto_scale': self._strategy_htex_auto_scale} @@ -146,10 +146,17 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: for executor in executors: self.executors[executor.label] = {'idle_since': None} - def _strategy_noop(self, status: List[jsp.PollItem]) -> None: - """Do nothing. + def _strategy_init_only(self, status_list: List[jsp.PollItem]) -> None: + """Scale up to init_blocks at the start, then nothing more. """ - logger.debug("strategy_noop: doing nothing") + for exec_status in status_list: + if exec_status.first: + executor = exec_status.executor + logger.debug(f"strategy_init_only: scaling out {executor.provider.init_blocks} initial blocks for {executor.label}") + exec_status.scale_out(executor.provider.init_blocks) + exec_status.first = False + else: + logger.debug("strategy_init_only: doing nothing") def _strategy_simple(self, status_list: List[jsp.PollItem]) -> None: self._general_strategy(status_list, strategy_type='simple') @@ -183,6 +190,12 @@ def _general_strategy(self, status_list, *, strategy_type): continue logger.debug(f"Strategizing for executor {label}") + if exec_status.first: + executor = exec_status.executor + logger.debug(f"Scaling out {executor.provider.init_blocks} initial blocks for {label}") + exec_status.scale_out(executor.provider.init_blocks) + exec_status.first = False + # Tasks that are either pending completion active_tasks = executor.outstanding diff --git a/parsl/tests/test_htex/test_drain.py b/parsl/tests/test_htex/test_drain.py index af528eeef5..4787d97e88 100644 --- a/parsl/tests/test_htex/test_drain.py +++ b/parsl/tests/test_htex/test_drain.py @@ -35,6 +35,7 @@ def local_config(): ) ], strategy='none', + strategy_period=0.1 )