Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test htex_auto_scale partial scaling-in #3097

Merged
merged 8 commits into from
Mar 1, 2024
151 changes: 151 additions & 0 deletions parsl/tests/test_scaling/test_scale_down_htex_auto_scale.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import time

import pytest

import parsl

from parsl import File, python_app
from parsl.providers import LocalProvider
from parsl.channels import LocalChannel
from parsl.launchers import SingleNodeLauncher
from parsl.config import Config
from parsl.executors import HighThroughputExecutor

from threading import Event

_max_blocks = 5
_min_blocks = 0


def local_config():
return Config(
executors=[
HighThroughputExecutor(
heartbeat_period=1,
heartbeat_threshold=2,
poll_period=100,
label="htex_local",
address="127.0.0.1",
max_workers=1,
encrypted=True,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=0,
max_blocks=_max_blocks,
min_blocks=_min_blocks,
launcher=SingleNodeLauncher(),
),
)
],
max_idletime=0.5,
strategy='htex_auto_scale',
)


@python_app
def waiting_app(ident: int, outputs=(), inputs=()):
import pathlib
import time

# Approximate an Event by writing to files; the test logic will poll this file
with open(outputs[0], "a") as f:
f.write(f"Ready: {ident}\n")

# Similarly, use Event approximation (file check!) by polling.
may_finish_file = pathlib.Path(inputs[0])
while not may_finish_file.exists():
time.sleep(0.01)


# see issue #1885 for details of failures of this test.
# at the time of issue #1885 this test was failing frequently
# in CI.
@pytest.mark.local
def test_scale_out(tmpd_cwd, try_assert):
dfk = parsl.dfk()

num_managers = len(dfk.executors['htex_local'].connected_managers())

assert num_managers == 0, "Expected 0 managers at start"
assert dfk.executors['htex_local'].outstanding == 0, "Expected 0 tasks at start"

ntasks = _max_blocks * 2
ready_path = tmpd_cwd / "workers_ready"
finish_path = tmpd_cwd / "stage1_workers_may_continue"
ready_path.touch()
inputs = [File(finish_path)]
outputs = [File(ready_path)]

futs = [waiting_app(i, outputs=outputs, inputs=inputs) for i in range(ntasks)]

while ready_path.read_text().count("\n") < _max_blocks:
time.sleep(0.5)

assert len(dfk.executors['htex_local'].connected_managers()) == _max_blocks
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider (for new code, anyway) use of try_assert:

def test_scale_out(try_assert, ...):
    ...
    try_assert(lambda: ready_path.read_text().count("\n") < _max_blocks)
    ...

It doesn't save much SLOC-wise, but it might prevent a hung test at some point (e.g., file not getting written [for reason]) and removes the mental context of the loop.

Alternatively, I think what matters for this test is that the .connected_managers() rises to _max_blocks? Perhaps just test strictly that:

def test_scale_out(try_assert, ...):
    dfk = parsl.dfk()
    htex = dfk.executors['htex_local']
    ...
    try_assert(lambda: htex.connected_managers() == _max_blocks, "Verify test setup")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched this to try_assert.

On the immediately following connected managers assert, I added a note to future readers who are trying to understand why the assert fails. What's really needed here is to check how many blocks are successfully running at least to the registration stage, but that info isn't so readily available - running one worker per manager and one manager per block gives two different proxies for that.


finish_path.touch() # Approximation of Event, via files
[x.result() for x in futs]

assert dfk.executors['htex_local'].outstanding == 0

# now we can launch one "long" task - and what should happen is that the connected_managers count "eventually" (?) converges to 1 and stays there.

finish_path = tmpd_cwd / "stage2_workers_may_continue"

fut = waiting_app(0, outputs=outputs, inputs=[File(finish_path)])

def check_one_block():
return len(dfk.executors['htex_local'].connected_managers()) == 1

try_assert(
check_one_block,
fail_msg="Expected 1 managers during a single long task",
timeout_ms=15000,
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've mentioned this in Slack, but I think we should do something about these timeouts. That is, clearly they're needed for the test to pass, but I'm wondering if we can engineer the test setup so that (a) we don't need to up the value to 15s and, more importantly, (b) the test spends ~no time unnecessarily waiting. That is, we're clearly waiting for (an amalgamation) of some loops internally ... can we short-circuit those loops somehow and still ensure this test is valid and useful?

To put some meat to my displeasure with this, when running this test locally, it took 27s.

(In my experience, the usual avenues for this kind of request are mocking, subclassing, and judicious test setups.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

goes down to about 12 seconds if I set the strategy to run 1s, 0.5s, or 0.1s using direct-poking-into-the-strategy-timer

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I'm not poking at the strategy code timing deeper than this: this PR is part of a sponsored project to bugfix a single bug in scaling in, not refactor it for testability)


# the task should not have finished by the time we end up with 1 manager
assert not fut.done()

# This section wait for the strategy to run again, with the above single
# task outstanding, and check that the strategy has not scaled up or
# down more on those subsequent iterations.

# It does this by hooking the callback of the job status poller, and
# waiting until it has run.

old_cb = dfk.job_status_poller.callback

strategy_iterated = Event()

def hook_cb(*args, **kwargs):
r = old_cb(*args, **kwargs)
strategy_iterated.set()
return r

dfk.job_status_poller.callback = hook_cb

def check_strategy_runs():
return strategy_iterated.is_set()

try_assert(
check_strategy_runs,
fail_msg="Expected strategy to have run within this period",
khk-globus marked this conversation as resolved.
Show resolved Hide resolved
timeout_ms=15000,
)

assert check_one_block()

finish_path.touch() # now we can end the single stage-2 task

fut.result()

# now we should expect min_blocks scale down

def check_min_blocks():
return len(dfk.executors['htex_local'].connected_managers()) == _min_blocks

try_assert(
check_min_blocks,
fail_msg=f"Expected {_min_blocks} managers when no tasks (min_blocks)",
timeout_ms=15000,
)
Loading