Skip to content

Commit

Permalink
Handle init_blocks in scaling strategy, rather than special-casing it (
Browse files Browse the repository at this point in the history
…#3283)

This is part of issue #3278 tidying up job and block management.

Now init_blocks scale out happens on the first strategy poll, not at executor start - that will often delay init_blocks scaling by one strategy poll period compared to before this PR.
  • Loading branch information
benclifford authored Mar 27, 2024
1 parent 96938e9 commit cb34643
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 40 deletions.
9 changes: 1 addition & 8 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1141,14 +1141,7 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
self._create_remote_dirs_over_channel(executor.provider, executor.provider.channel)

self.executors[executor.label] = executor
block_ids = executor.start()
if self.monitoring and block_ids:
new_status = {}
for bid in block_ids:
new_status[bid] = JobStatus(JobState.PENDING)
msg = executor.create_monitoring_info(new_status)
logger.debug("Sending monitoring message {} to hub from DFK".format(msg))
self.monitoring.send(MessageType.BLOCK_INFO, msg)
executor.start()
block_executors = [e for e in executors if isinstance(e, BlockProviderExecutor)]
self.job_status_poller.add_executors(block_executors)

Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]:
return False

@abstractmethod
def start(self) -> Optional[List[str]]:
def start(self) -> None:
"""Start the executor.
Any spin-up operations (for example: starting thread pools) should be performed here.
Expand Down
13 changes: 1 addition & 12 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,16 +400,6 @@ def initialize_scaling(self):

logger.debug("Starting HighThroughputExecutor with provider:\n%s", self.provider)

# TODO: why is this a provider property?
block_ids = []
if hasattr(self.provider, 'init_blocks'):
try:
block_ids = self.scale_out(blocks=self.provider.init_blocks)
except Exception as e:
logger.error("Scaling out failed: {}".format(e))
raise e
return block_ids

def start(self):
"""Create the Interchange process and connect to it.
"""
Expand Down Expand Up @@ -439,8 +429,7 @@ def start(self):

logger.debug("Created management thread: {}".format(self._queue_management_thread))

block_ids = self.initialize_scaling()
return block_ids
self.initialize_scaling()

@wrap_with_logs
def _queue_management_worker(self):
Expand Down
7 changes: 0 additions & 7 deletions parsl/executors/taskvine/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,13 +586,6 @@ def initialize_scaling(self):
self._worker_command = self._construct_worker_command()
self._patch_providers()

if hasattr(self.provider, 'init_blocks'):
try:
self.scale_out(blocks=self.provider.init_blocks)
except Exception as e:
logger.error("Initial block scaling out failed: {}".format(e))
raise e

@property
def outstanding(self) -> int:
"""Count the number of outstanding tasks."""
Expand Down
7 changes: 0 additions & 7 deletions parsl/executors/workqueue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,13 +675,6 @@ def initialize_scaling(self):
self.worker_command = self._construct_worker_command()
self._patch_providers()

if hasattr(self.provider, 'init_blocks'):
try:
self.scale_out(blocks=self.provider.init_blocks)
except Exception as e:
logger.error("Initial block scaling out failed: {}".format(e))
raise e

@property
def outstanding(self) -> int:
"""Count the number of outstanding tasks. This is inefficiently
Expand Down
1 change: 1 addition & 0 deletions parsl/jobs/job_status_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflo
self._interval = executor.status_polling_interval
self._last_poll_time = 0.0
self._status = {} # type: Dict[str, JobStatus]
self.first = True

# Create a ZMQ channel to send poll status to monitoring
self.monitoring_enabled = False
Expand Down
23 changes: 18 additions & 5 deletions parsl/jobs/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ def __init__(self, *, strategy: Optional[str], max_idletime: float) -> None:
self.executors = {}
self.max_idletime = max_idletime

self.strategies = {None: self._strategy_noop,
'none': self._strategy_noop,
self.strategies = {None: self._strategy_init_only,
'none': self._strategy_init_only,
'simple': self._strategy_simple,
'htex_auto_scale': self._strategy_htex_auto_scale}

Expand All @@ -146,10 +146,17 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
for executor in executors:
self.executors[executor.label] = {'idle_since': None}

def _strategy_noop(self, status: List[jsp.PollItem]) -> None:
"""Do nothing.
def _strategy_init_only(self, status_list: List[jsp.PollItem]) -> None:
"""Scale up to init_blocks at the start, then nothing more.
"""
logger.debug("strategy_noop: doing nothing")
for exec_status in status_list:
if exec_status.first:
executor = exec_status.executor
logger.debug(f"strategy_init_only: scaling out {executor.provider.init_blocks} initial blocks for {executor.label}")
exec_status.scale_out(executor.provider.init_blocks)
exec_status.first = False
else:
logger.debug("strategy_init_only: doing nothing")

def _strategy_simple(self, status_list: List[jsp.PollItem]) -> None:
self._general_strategy(status_list, strategy_type='simple')
Expand Down Expand Up @@ -183,6 +190,12 @@ def _general_strategy(self, status_list, *, strategy_type):
continue
logger.debug(f"Strategizing for executor {label}")

if exec_status.first:
executor = exec_status.executor
logger.debug(f"Scaling out {executor.provider.init_blocks} initial blocks for {label}")
exec_status.scale_out(executor.provider.init_blocks)
exec_status.first = False

# Tasks that are either pending completion
active_tasks = executor.outstanding

Expand Down
1 change: 1 addition & 0 deletions parsl/tests/test_htex/test_drain.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def local_config():
)
],
strategy='none',
strategy_period=0.1
)


Expand Down

0 comments on commit cb34643

Please sign in to comment.