Skip to content

Commit

Permalink
Allow strategy polling period to be configured
Browse files Browse the repository at this point in the history
This is initially driven by a desire to run strategy polling faster in
tests: there's no fundamental reason why the previous hard-coded value
of 5 seconds needs to set the timescale for test execution. This was
demonstrated previously in
parsl/tests/test_scaling/test_scale_down_htex_auto_scale.py in PR #3097
performing modification on the internals of a live DFK-private
JobStatusPoller. Work I've done on tests elsewhere benefits from
strategy polling period reconfiguration too, so this PR makes that
facility a publicly exposed feature.

This change allows the interval to be set before the job status poller
starts running, which means a racy initial first 5s poll in the above
mentioned test_scale_down_htex_auto_scale.py is avoided: median runtime
of that test on my laptop goes from 11s before this PR to 6s after this
PR (dropping by exactly the 5s initial poll that is now avoided).

Its reasonable to expect some users to want to use this facility too:
perhaps a user doesn't want to wait 5 seconds before the scaling code
notices their workload; or perhaps they are more interested in running
the strategy code much less frequently (for example, if running
workloads on the scale of hours/days to reduce eg debug log load)
  • Loading branch information
benclifford committed Mar 13, 2024
1 parent 0bc0807 commit 0003a44
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 12 deletions.
4 changes: 4 additions & 0 deletions parsl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class Config(RepresentationMixin):
or `None`.
If 'none' or `None`, dynamic scaling will be disabled. Default is 'simple'. The literal value `None` is
deprecated.
strategy_period : float or int, optional
How often the scaling strategy should be executed. Default is 5 seconds.
max_idletime : float, optional
The maximum idle time allowed for an executor before strategy could shut down unused blocks. Default is 120.0 seconds.
usage_tracking : bool, optional
Expand Down Expand Up @@ -88,6 +90,7 @@ def __init__(self,
retry_handler: Optional[Callable[[Exception, TaskRecord], float]] = None,
run_dir: str = 'runinfo',
strategy: Optional[str] = 'simple',
strategy_period: Union[float, int] = 5,
max_idletime: float = 120.0,
monitoring: Optional[MonitoringHub] = None,
usage_tracking: bool = False,
Expand Down Expand Up @@ -121,6 +124,7 @@ def __init__(self,
self.retry_handler = retry_handler
self.run_dir = run_dir
self.strategy = strategy
self.strategy_period = strategy_period
self.max_idletime = max_idletime
self.usage_tracking = usage_tracking
self.initialize_logging = initialize_logging
Expand Down
1 change: 1 addition & 0 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def __init__(self, config: Config) -> None:
# this must be set before executors are added since add_executors calls
# job_status_poller.add_executors.
self.job_status_poller = JobStatusPoller(strategy=self.config.strategy,
strategy_period=self.config.strategy_period,
max_idletime=self.config.max_idletime,
dfk=self)

Expand Down
5 changes: 3 additions & 2 deletions parsl/jobs/job_status_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import parsl
import time
import zmq
from typing import Dict, List, Sequence, Optional
from typing import Dict, List, Sequence, Optional, Union

from parsl.jobs.states import JobStatus, JobState
from parsl.jobs.strategy import Strategy
Expand Down Expand Up @@ -107,12 +107,13 @@ def __repr__(self) -> str:

class JobStatusPoller(Timer):
def __init__(self, *, strategy: Optional[str], max_idletime: float,
strategy_period: Union[float, int],
dfk: Optional["parsl.dataflow.dflow.DataFlowKernel"] = None) -> None:
self._poll_items = [] # type: List[PollItem]
self.dfk = dfk
self._strategy = Strategy(strategy=strategy,
max_idletime=max_idletime)
super().__init__(self.poll, interval=5, name="JobStatusPoller")
super().__init__(self.poll, interval=strategy_period, name="JobStatusPoller")

def poll(self) -> None:
self._update_state()
Expand Down
11 changes: 1 addition & 10 deletions parsl/tests/test_scaling/test_scale_down_htex_auto_scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def local_config():
],
max_idletime=0.5,
strategy='htex_auto_scale',
strategy_period=0.1
)


Expand All @@ -62,16 +63,6 @@ def waiting_app(ident: int, outputs=(), inputs=()):
def test_scale_out(tmpd_cwd, try_assert):
dfk = parsl.dfk()

# reconfigure scaling strategy to run faster than usual. This allows
# this test to complete faster - at time of writing 27s with default
# 5s strategy, vs XXXX with 0.5s strategy.

# check this attribute still exists, in the presence of ongoing
# development, so we have some belief that setting it will not be
# setting a now-ignored parameter.
assert hasattr(dfk.job_status_poller, 'interval')
dfk.job_status_poller.interval = 0.1

num_managers = len(dfk.executors['htex_local'].connected_managers())

assert num_managers == 0, "Expected 0 managers at start"
Expand Down

0 comments on commit 0003a44

Please sign in to comment.