Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CI job running tests with queuing on #6989

Merged
merged 21 commits into from
Sep 5, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ jobs:
python-version: ["3.8", "3.9", "3.10"]
# Cherry-pick test modules to split the overall runtime roughly in half
partition: [ci1, not ci1]
queuing: [no-queue]
exclude:
- os: macos-latest
python-version: 3.9
Expand All @@ -34,6 +35,14 @@ jobs:
partition-label: "ci1"
- partition: "not ci1"
partition-label: "notci1"
- queuing: queue
os: ubuntu-latest
python-version: 3.9
partition: "ci1"
- queuing: queue
os: ubuntu-latest
python-version: 3.9
partition: "not ci1"

# Uncomment to stress-test the test suite for random failures.
# Must also change env.TEST_ID below.
Expand All @@ -44,8 +53,8 @@ jobs:
# run: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

env:
TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }}
# TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }}-${{ matrix.run }}
TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }}-${{ matrix.queuing }}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this renaming will break some logic in our test report generation.

df_jobs["suite_name"] = (
df_jobs["OS"]
+ "-"
+ df_jobs["python_version"]
+ "-"
+ df_jobs["partition"].str.replace(" ", "")
)

html_url = jobs_df[jobs_df["suite_name"] == a["name"]].html_url.unique()
assert (
len(html_url) == 1
), f"Artifact suit name {a['name']} did not match any jobs dataframe {jobs_df['suite_name'].unique()}"
html_url = html_url[0]

I introduced this in #6837

Worst case, we remove the more specific job URL again (we knew from the start this is very brittle)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should block this PR

# TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }}-${{ matrix.queuing }}-${{ matrix.run }}

steps:
- name: Checkout source
Expand Down Expand Up @@ -107,6 +116,11 @@ jobs:

python continuous_integration/scripts/host_info.py

- name: Enable queuing
if: ${{ matrix.queuing == 'queue' }}
shell: bash -l {0}
run: export DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1.0

gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved
- name: Test
id: run_tests
shell: bash -l {0}
Expand Down
26 changes: 20 additions & 6 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# https://pytest.org/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option
from __future__ import annotations

import math

import pytest

import dask

# Uncomment to enable more logging and checks
# (https://docs.python.org/3/library/asyncio-dev.html)
# Note this makes things slower and might consume much memory.
Expand All @@ -27,13 +30,24 @@ def pytest_addoption(parser):


def pytest_collection_modifyitems(config, items):
if config.getoption("--runslow"):
# https://pytest.org/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option
if skip_slow := not config.getoption("--runslow"):
# --runslow given in cli: do not skip slow tests
return
skip_slow = pytest.mark.skip(reason="need --runslow option to run")
skip_slow_marker = pytest.mark.skip(reason="need --runslow option to run")

if skip_oversaturate := math.isfinite(
dask.config.get("distributed.scheduler.worker_saturation")
):
skip_oversaturate_marker = pytest.mark.skip(
reason="need `distributed.scheduler.worker_saturation = inf` to run"
)

for item in items:
if "slow" in item.keywords:
item.add_marker(skip_slow)
if skip_slow and "slow" in item.keywords:
item.add_marker(skip_slow_marker)

if skip_oversaturate and "oversaturate_only" in item.keywords:
item.add_marker(skip_oversaturate_marker)

if "ws" in item.fixturenames:
item.add_marker(pytest.mark.workerstate)
Expand Down
3 changes: 2 additions & 1 deletion distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,14 @@ async def test_ProcessingHistogram(c, s, a, b):
ph = ProcessingHistogram(s)
ph.update()
assert (ph.source.data["top"] != 0).sum() == 1
assert ph.source.data["right"][-1] < 2

futures = c.map(slowinc, range(10), delay=0.050)
while not s.tasks:
await asyncio.sleep(0.01)

ph.update()
assert ph.source.data["right"][-1] > 2
assert ph.source.data["right"][-1] >= 2


@gen_cluster(client=True)
Expand Down
2 changes: 2 additions & 0 deletions distributed/deploy/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ def test_basic_no_loop(cleanup):
loop.add_callback(loop.stop)


# adaptive target for queued tasks doesn't yet consider default or learned task durations
@pytest.mark.oversaturate_only
@pytest.mark.flaky(condition=LINUX, reruns=10, reruns_delay=5)
@pytest.mark.xfail(condition=MACOS or WINDOWS, reason="extremely flaky")
@gen_test()
Expand Down
1 change: 1 addition & 0 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5515,6 +5515,7 @@ async def close():
loop.run_sync(close) # TODO: client.close() does not unset global client


@pytest.mark.oversaturate_only # FIXME flaky on windows
@pytest.mark.slow
@gen_cluster(client=True, Worker=Nanny, timeout=60, nthreads=[("127.0.0.1", 3)] * 2)
async def test_nested_compute(c, s, a, b):
Expand Down
1 change: 1 addition & 0 deletions distributed/tests/test_reschedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)


@pytest.mark.oversaturate_only
@gen_cluster(
client=True,
nthreads=[("", 1)] * 2,
Expand Down
11 changes: 9 additions & 2 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads):
nthreads=nthreads,
config={
"distributed.scheduler.work-stealing": False,
"distributed.scheduler.worker-saturation": float("inf"),
},
)
async def test_decide_worker_coschedule_order_neighbors_(c, s, *workers):
Expand Down Expand Up @@ -254,6 +255,9 @@ def random(**kwargs):
@gen_cluster(
client=True,
nthreads=[("", 1), ("", 1)],
config={
"distributed.scheduler.worker-saturation": float("inf"),
},
)
async def test_decide_worker_coschedule_order_binary_op(c, s, a, b, ngroups):
roots = [[delayed(i, name=f"x-{n}-{i}") for i in range(8)] for n in range(ngroups)]
Expand Down Expand Up @@ -1480,6 +1484,7 @@ async def test_balance_many_workers(c, s, *workers):
assert {len(w.has_what) for w in s.workers.values()} == {0, 1}


@pytest.mark.oversaturate_only # FIXME flaky on macOS
@nodebug
@gen_cluster(
client=True,
Expand Down Expand Up @@ -3758,7 +3763,9 @@ async def test_Scheduler__to_dict(c, s, a):
assert isinstance(d["workers"][a.address]["memory"]["process"], int)


@gen_cluster(client=True, nthreads=[])
@gen_cluster(
client=True, nthreads=[], config={"distributed.scheduler.worker-saturation": 1.0}
)
async def test_TaskState__to_dict(c, s):
"""tasks that are listed as dependencies of other tasks are dumped as a short repr
and always appear in full under Scheduler.tasks
Expand All @@ -3775,7 +3782,7 @@ async def test_TaskState__to_dict(c, s):
assert isinstance(tasks["y"], dict)
assert isinstance(tasks["z"], dict)
assert tasks["x"]["dependents"] == ["<TaskState 'y' waiting>"]
assert tasks["y"]["dependencies"] == ["<TaskState 'x' no-worker>"]
assert tasks["y"]["dependencies"] == ["<TaskState 'x' queued>"]


def _verify_cluster_state(
Expand Down
26 changes: 18 additions & 8 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import contextlib
import itertools
import logging
import math
import random
import weakref
from operator import mul
Expand Down Expand Up @@ -800,9 +801,14 @@ async def test_steal_twice(c, s, a, b):

while len(s.tasks) < 100: # tasks are all allocated
await asyncio.sleep(0.01)
# Wait for b to start stealing tasks
while len(b.state.tasks) < 30:
await asyncio.sleep(0.01)
if math.isinf(s.WORKER_SATURATION):
# Wait for b to start stealing tasks
while len(b.state.tasks) < 30:
await asyncio.sleep(0.01)
else:
# Wait for b to complete some tasks
while len(b.data) < 8:
await asyncio.sleep(0.01)

# Army of new workers arrives to help
workers = await asyncio.gather(*(Worker(s.address) for _ in range(20)))
Expand All @@ -816,6 +822,8 @@ async def test_steal_twice(c, s, a, b):
), f"Too many workers without keys ({len(empty_workers)} out of {len(s.workers)})"
# This also tests that some tasks were stolen from b
# (see `while len(b.state.tasks) < 30` above)
# If queuing is enabled, then there was nothing to steal from b,
# so this just tests the queue was balanced not-terribly.
assert max(len(ws.has_what) for ws in s.workers.values()) < 30

assert a.state.in_flight_tasks_count == 0
Expand Down Expand Up @@ -1072,6 +1080,7 @@ async def test_steal_concurrent_simple(c, s, *workers):
assert not ws2.has_what


@pytest.mark.oversaturate_only # FIXME flaky on ubuntu
@gen_cluster(
client=True,
config={
Expand All @@ -1082,12 +1091,13 @@ async def test_steal_reschedule_reset_in_flight_occupancy(c, s, *workers):
# https://github.com/dask/distributed/issues/5370
steal = s.extensions["stealing"]
w0 = workers[0]
futs1 = c.map(
slowinc,
range(10),
key=[f"f1-{ix}" for ix in range(10)],
roots = c.map(
inc,
range(6),
key=[f"r-{ix}" for ix in range(6)],
)
while not w0.state.tasks:
futs1 = [c.submit(slowinc, f, key=f"f1-{ix}") for f in roots for ix in range(4)]
while not w0.state.ready:
await asyncio.sleep(0.01)

# ready is a heap but we don't need last, just not the next
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1542,7 +1542,7 @@ async def f(ev):
)
await a.close()
assert task.cancelled()
assert s.tasks["f1"].state == "no-worker"
assert s.tasks["f1"].state in ("queued", "no-worker")


@pytest.mark.slow
Expand Down Expand Up @@ -1570,7 +1570,7 @@ async def f(ev):
assert "Failed to cancel asyncio task" in logger.getvalue()
assert time() - start < 5
assert not task.cancelled()
assert s.tasks["f1"].state == "no-worker"
assert s.tasks["f1"].state in ("queued", "no-worker")
task.cancel()
await asyncio.wait({task})

Expand Down
13 changes: 9 additions & 4 deletions distributed/tests/test_worker_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -922,16 +922,21 @@ def __reduce__(self):
return bool, (paused,)

futs = c.map(SlowSpill, range(N_TOTAL))
while len(a.data.slow) < N_PAUSE + 1:
while len(a.data.slow) < (N_PAUSE + 1 if a.state.ready else N_PAUSE):
await asyncio.sleep(0.01)

assert a.status == Status.paused
# Worker should have become paused after the first `SlowSpill` was evicted, because
# the spill to disk took longer than the memory monitor interval.
assert len(a.data.fast) == 0
assert len(a.data.slow) == N_PAUSE + 1
n_spilled_while_paused = sum(paused is True for paused in a.data.slow.values())
assert N_PAUSE <= n_spilled_while_paused <= N_PAUSE + 1
# With queuing enabled, after the 3rd `SlowSpill` has been created, there's a race
# between the scheduler sending the worker a new task, and the memory monitor
# running and pausing the worker. If the worker gets paused before the 4th task
# lands, only 3 will be in memory. If after, the 4th will block on the semaphore
# until one of the others is spilled.
assert len(a.data.slow) in (N_PAUSE, N_PAUSE + 1)
n_spilled_while_not_paused = sum(paused is False for paused in a.data.slow.values())
assert 0 <= n_spilled_while_not_paused <= 1


@pytest.mark.slow
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ markers =
gpu: marks tests we want to run on GPUs
leaking: ignore leaked resources; see pytest_resourceleaks.py for usage
workerstate: deterministic test for the worker state machine. Automatically applied to all tests that use the 'ws' fixture.
oversaturate_only: tests that only work/make sense to run if root task queuing is disabled

# pytest-timeout settings
# 'thread' kills off the whole test suite. 'signal' only kills the offending test.
Expand Down