Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle init_blocks in scaling strategy, rather than special-casing it #3283

Merged
merged 4 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1141,14 +1141,7 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
self._create_remote_dirs_over_channel(executor.provider, executor.provider.channel)

self.executors[executor.label] = executor
block_ids = executor.start()
if self.monitoring and block_ids:
new_status = {}
for bid in block_ids:
new_status[bid] = JobStatus(JobState.PENDING)
msg = executor.create_monitoring_info(new_status)
logger.debug("Sending monitoring message {} to hub from DFK".format(msg))
self.monitoring.send(MessageType.BLOCK_INFO, msg)
executor.start()
block_executors = [e for e in executors if isinstance(e, BlockProviderExecutor)]
self.job_status_poller.add_executors(block_executors)

Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]:
return False

@abstractmethod
def start(self) -> Optional[List[str]]:
def start(self) -> None:
"""Start the executor.

Any spin-up operations (for example: starting thread pools) should be performed here.
Expand Down
13 changes: 1 addition & 12 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,16 +400,6 @@ def initialize_scaling(self):

logger.debug("Starting HighThroughputExecutor with provider:\n%s", self.provider)

# TODO: why is this a provider property?
block_ids = []
if hasattr(self.provider, 'init_blocks'):
try:
block_ids = self.scale_out(blocks=self.provider.init_blocks)
except Exception as e:
logger.error("Scaling out failed: {}".format(e))
raise e
return block_ids

def start(self):
"""Create the Interchange process and connect to it.
"""
Expand Down Expand Up @@ -439,8 +429,7 @@ def start(self):

logger.debug("Created management thread: {}".format(self._queue_management_thread))

block_ids = self.initialize_scaling()
return block_ids
self.initialize_scaling()

@wrap_with_logs
def _queue_management_worker(self):
Expand Down
7 changes: 0 additions & 7 deletions parsl/executors/taskvine/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,13 +586,6 @@ def initialize_scaling(self):
self._worker_command = self._construct_worker_command()
self._patch_providers()

if hasattr(self.provider, 'init_blocks'):
try:
self.scale_out(blocks=self.provider.init_blocks)
except Exception as e:
logger.error("Initial block scaling out failed: {}".format(e))
raise e

@property
def outstanding(self) -> int:
"""Count the number of outstanding tasks."""
Expand Down
7 changes: 0 additions & 7 deletions parsl/executors/workqueue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,13 +675,6 @@ def initialize_scaling(self):
self.worker_command = self._construct_worker_command()
self._patch_providers()

if hasattr(self.provider, 'init_blocks'):
try:
self.scale_out(blocks=self.provider.init_blocks)
except Exception as e:
logger.error("Initial block scaling out failed: {}".format(e))
raise e

@property
def outstanding(self) -> int:
"""Count the number of outstanding tasks. This is inefficiently
Expand Down
1 change: 1 addition & 0 deletions parsl/jobs/job_status_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflo
self._interval = executor.status_polling_interval
self._last_poll_time = 0.0
self._status = {} # type: Dict[str, JobStatus]
self.first = True

# Create a ZMQ channel to send poll status to monitoring
self.monitoring_enabled = False
Expand Down
23 changes: 18 additions & 5 deletions parsl/jobs/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ def __init__(self, *, strategy: Optional[str], max_idletime: float) -> None:
self.executors = {}
self.max_idletime = max_idletime

self.strategies = {None: self._strategy_noop,
'none': self._strategy_noop,
self.strategies = {None: self._strategy_init_only,
'none': self._strategy_init_only,
'simple': self._strategy_simple,
'htex_auto_scale': self._strategy_htex_auto_scale}

Expand All @@ -146,10 +146,17 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
for executor in executors:
self.executors[executor.label] = {'idle_since': None}

def _strategy_noop(self, status: List[jsp.PollItem]) -> None:
"""Do nothing.
def _strategy_init_only(self, status_list: List[jsp.PollItem]) -> None:
"""Scale up to init_blocks at the start, then nothing more.
"""
logger.debug("strategy_noop: doing nothing")
for exec_status in status_list:
if exec_status.first:
executor = exec_status.executor
logger.debug(f"strategy_init_only: scaling out {executor.provider.init_blocks} initial blocks for {executor.label}")
exec_status.scale_out(executor.provider.init_blocks)
khk-globus marked this conversation as resolved.
Show resolved Hide resolved
exec_status.first = False
else:
logger.debug("strategy_init_only: doing nothing")
khk-globus marked this conversation as resolved.
Show resolved Hide resolved

def _strategy_simple(self, status_list: List[jsp.PollItem]) -> None:
self._general_strategy(status_list, strategy_type='simple')
Expand Down Expand Up @@ -183,6 +190,12 @@ def _general_strategy(self, status_list, *, strategy_type):
continue
logger.debug(f"Strategizing for executor {label}")

if exec_status.first:
executor = exec_status.executor
logger.debug(f"Scaling out {executor.provider.init_blocks} initial blocks for {label}")
exec_status.scale_out(executor.provider.init_blocks)
exec_status.first = False

# Tasks that are either pending completion
active_tasks = executor.outstanding

Expand Down
1 change: 1 addition & 0 deletions parsl/tests/test_htex/test_drain.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def local_config():
)
],
strategy='none',
strategy_period=0.1
)


Expand Down
Loading