Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow strategy polling period to be configured #3246

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions parsl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class Config(RepresentationMixin):
or `None`.
If 'none' or `None`, dynamic scaling will be disabled. Default is 'simple'. The literal value `None` is
deprecated.
strategy_period : float or int, optional
How often the scaling strategy should be executed. Default is 5 seconds.
max_idletime : float, optional
The maximum idle time allowed for an executor before strategy could shut down unused blocks. Default is 120.0 seconds.
usage_tracking : bool, optional
Expand Down Expand Up @@ -88,6 +90,7 @@ def __init__(self,
retry_handler: Optional[Callable[[Exception, TaskRecord], float]] = None,
run_dir: str = 'runinfo',
strategy: Optional[str] = 'simple',
strategy_period: Union[float, int] = 5,
max_idletime: float = 120.0,
monitoring: Optional[MonitoringHub] = None,
usage_tracking: bool = False,
Expand Down Expand Up @@ -121,6 +124,7 @@ def __init__(self,
self.retry_handler = retry_handler
self.run_dir = run_dir
self.strategy = strategy
self.strategy_period = strategy_period
self.max_idletime = max_idletime
self.usage_tracking = usage_tracking
self.initialize_logging = initialize_logging
Expand Down
1 change: 1 addition & 0 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def __init__(self, config: Config) -> None:
# this must be set before executors are added since add_executors calls
# job_status_poller.add_executors.
self.job_status_poller = JobStatusPoller(strategy=self.config.strategy,
strategy_period=self.config.strategy_period,
max_idletime=self.config.max_idletime,
dfk=self)

Expand Down
5 changes: 3 additions & 2 deletions parsl/jobs/job_status_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import parsl
import time
import zmq
from typing import Dict, List, Sequence, Optional
from typing import Dict, List, Sequence, Optional, Union

from parsl.jobs.states import JobStatus, JobState
from parsl.jobs.strategy import Strategy
Expand Down Expand Up @@ -107,12 +107,13 @@ def __repr__(self) -> str:

class JobStatusPoller(Timer):
def __init__(self, *, strategy: Optional[str], max_idletime: float,
strategy_period: Union[float, int],
dfk: Optional["parsl.dataflow.dflow.DataFlowKernel"] = None) -> None:
self._poll_items = [] # type: List[PollItem]
self.dfk = dfk
self._strategy = Strategy(strategy=strategy,
max_idletime=max_idletime)
super().__init__(self.poll, interval=5, name="JobStatusPoller")
super().__init__(self.poll, interval=strategy_period, name="JobStatusPoller")
benclifford marked this conversation as resolved.
Show resolved Hide resolved

def poll(self) -> None:
self._update_state()
Expand Down
11 changes: 1 addition & 10 deletions parsl/tests/test_scaling/test_scale_down_htex_auto_scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def local_config():
],
max_idletime=0.5,
strategy='htex_auto_scale',
strategy_period=0.1
)


Expand All @@ -62,16 +63,6 @@ def waiting_app(ident: int, outputs=(), inputs=()):
def test_scale_out(tmpd_cwd, try_assert):
dfk = parsl.dfk()

# reconfigure scaling strategy to run faster than usual. This allows
# this test to complete faster - at time of writing 27s with default
# 5s strategy, vs XXXX with 0.5s strategy.

# check this attribute still exists, in the presence of ongoing
# development, so we have some belief that setting it will not be
# setting a now-ignored parameter.
assert hasattr(dfk.job_status_poller, 'interval')
dfk.job_status_poller.interval = 0.1

num_managers = len(dfk.executors['htex_local'].connected_managers())

assert num_managers == 0, "Expected 0 managers at start"
Expand Down
Loading