From e6a15ba94f8870930882669160533f1353ab91a2 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 17 Aug 2022 16:59:01 -0600 Subject: [PATCH 01/20] add config to coschedule tests --- distributed/tests/test_scheduler.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index f83e5f45bb7..07e1b5bc071 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -150,6 +150,7 @@ def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads): nthreads=nthreads, config={ "distributed.scheduler.work-stealing": False, + "distributed.scheduler.worker-saturation": float("inf"), }, ) async def test_decide_worker_coschedule_order_neighbors_(c, s, *workers): @@ -254,6 +255,9 @@ def random(**kwargs): @gen_cluster( client=True, nthreads=[("", 1), ("", 1)], + config={ + "distributed.scheduler.worker-saturation": float("inf"), + }, ) async def test_decide_worker_coschedule_order_binary_op(c, s, a, b, ngroups): roots = [[delayed(i, name=f"x-{n}-{i}") for i in range(8)] for n in range(ngroups)] From 0f9034a2b68d1bf12f37fcf153515d7b4038c0a8 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 17 Aug 2022 17:12:33 -0600 Subject: [PATCH 02/20] add `oversaturate_only` mark --- conftest.py | 26 ++++++++++++++++++++------ setup.cfg | 1 + 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/conftest.py b/conftest.py index c98d558023b..98b5f537b72 100644 --- a/conftest.py +++ b/conftest.py @@ -1,8 +1,11 @@ -# https://pytest.org/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option from __future__ import annotations +import math + import pytest +import dask + # Uncomment to enable more logging and checks # (https://docs.python.org/3/library/asyncio-dev.html) # Note this makes things slower and might consume much memory. @@ -27,13 +30,24 @@ def pytest_addoption(parser): def pytest_collection_modifyitems(config, items): - if config.getoption("--runslow"): + # https://pytest.org/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option + if skip_slow := not config.getoption("--runslow"): # --runslow given in cli: do not skip slow tests - return - skip_slow = pytest.mark.skip(reason="need --runslow option to run") + skip_slow_marker = pytest.mark.skip(reason="need --runslow option to run") + + if skip_oversaturate := math.isfinite( + dask.config.get("distributed.scheduler.worker_saturation") + ): + skip_oversaturate_marker = pytest.mark.skip( + reason="need `distributed.scheduler.worker_saturation = inf` to run" + ) + for item in items: - if "slow" in item.keywords: - item.add_marker(skip_slow) + if skip_slow and "slow" in item.keywords: + item.add_marker(skip_slow_marker) + + if skip_oversaturate and "oversaturate_only" in item.keywords: + item.add_marker(skip_oversaturate_marker) if "ws" in item.fixturenames: item.add_marker(pytest.mark.workerstate) diff --git a/setup.cfg b/setup.cfg index e0701689571..28c3a0e57ee 100644 --- a/setup.cfg +++ b/setup.cfg @@ -99,6 +99,7 @@ markers = gpu: marks tests we want to run on GPUs leaking: ignore leaked resources; see pytest_resourceleaks.py for usage workerstate: deterministic test for the worker state machine. Automatically applied to all tests that use the 'ws' fixture. + oversaturate_only: tests that only work/make sense to run if root task queuing is disabled # pytest-timeout settings # 'thread' kills off the whole test suite. 'signal' only kills the offending test. From 9acf6f0fae79ddb605cd2f574f266bf4b9fee20d Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 17 Aug 2022 17:12:47 -0600 Subject: [PATCH 03/20] `test_scheduler_reschedule` is oversaturate_only --- distributed/tests/test_reschedule.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/tests/test_reschedule.py b/distributed/tests/test_reschedule.py index c898b0a5d94..c3216832d7f 100644 --- a/distributed/tests/test_reschedule.py +++ b/distributed/tests/test_reschedule.py @@ -21,6 +21,7 @@ ) +@pytest.mark.oversaturate_only @gen_cluster( client=True, nthreads=[("", 1)] * 2, From c6eb5d1df6556c3b7bfd36d9ed63b707e187d143 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 17 Aug 2022 17:41:54 -0600 Subject: [PATCH 04/20] fix `test_steal_twice` --- distributed/tests/test_steal.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index f617b714fdb..5cfac472242 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -4,6 +4,7 @@ import contextlib import itertools import logging +import math import random import weakref from operator import mul @@ -800,9 +801,14 @@ async def test_steal_twice(c, s, a, b): while len(s.tasks) < 100: # tasks are all allocated await asyncio.sleep(0.01) - # Wait for b to start stealing tasks - while len(b.state.tasks) < 30: - await asyncio.sleep(0.01) + if math.isinf(s.WORKER_SATURATION): + # Wait for b to start stealing tasks + while len(b.state.tasks) < 30: + await asyncio.sleep(0.01) + else: + # Wait for b to complete some tasks + while len(b.data) < 8: + await asyncio.sleep(0.01) # Army of new workers arrives to help workers = await asyncio.gather(*(Worker(s.address) for _ in range(20))) @@ -816,6 +822,8 @@ async def test_steal_twice(c, s, a, b): ), f"Too many workers without keys ({len(empty_workers)} out of {len(s.workers)})" # This also tests that some tasks were stolen from b # (see `while len(b.state.tasks) < 30` above) + # If queuing is enabled, then there was nothing to steal from b, + # so this just tests the queue was balanced not-terribly. assert max(len(ws.has_what) for ws in s.workers.values()) < 30 assert a.state.in_flight_tasks_count == 0 From 5570df384e7e5ca1dd7d2770b5cc5f1f4bb517d1 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 17 Aug 2022 17:54:07 -0600 Subject: [PATCH 05/20] update `test_steal_reschedule_reset_in_flight_occupancy` we have to make the tasks to be stolen not look like root tasks. --- distributed/tests/test_steal.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 5cfac472242..85ce2aaac4c 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1090,12 +1090,13 @@ async def test_steal_reschedule_reset_in_flight_occupancy(c, s, *workers): # https://github.com/dask/distributed/issues/5370 steal = s.extensions["stealing"] w0 = workers[0] - futs1 = c.map( - slowinc, - range(10), - key=[f"f1-{ix}" for ix in range(10)], + roots = c.map( + inc, + range(6), + key=[f"r-{ix}" for ix in range(6)], ) - while not w0.state.tasks: + futs1 = [c.submit(slowinc, f, key=f"f1-{ix}") for f in roots for ix in range(4)] + while not w0.state.ready: await asyncio.sleep(0.01) # ready is a heap but we don't need last, just not the next From 2e6f9a29177eb62d92c835c640ff1cac79276295 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 18 Aug 2022 19:41:37 -0600 Subject: [PATCH 06/20] fix `test_ProcessingHistogram` --- distributed/dashboard/tests/test_scheduler_bokeh.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index f97fdd02f65..f08e29dfb49 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -281,13 +281,14 @@ async def test_ProcessingHistogram(c, s, a, b): ph = ProcessingHistogram(s) ph.update() assert (ph.source.data["top"] != 0).sum() == 1 + assert ph.source.data["right"][-1] < 2 futures = c.map(slowinc, range(10), delay=0.050) while not s.tasks: await asyncio.sleep(0.01) ph.update() - assert ph.source.data["right"][-1] > 2 + assert ph.source.data["right"][-1] >= 2 @gen_cluster(client=True) From 54c2a324cf3bac4330fe0ef304b2dc331da7eddd Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 29 Aug 2022 16:38:39 -0600 Subject: [PATCH 07/20] fix `test_close_async_task_handles_cancellation` --- distributed/tests/test_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 1fe036a371c..7fd905ad57c 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1570,7 +1570,7 @@ async def f(ev): assert "Failed to cancel asyncio task" in logger.getvalue() assert time() - start < 5 assert not task.cancelled() - assert s.tasks["f1"].state == "no-worker" + assert s.tasks["f1"].state in ("queued", "no-worker") task.cancel() await asyncio.wait({task}) From 40645d9ad75aaafc2c8c26fa577c4d47b7dd5d6d Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 29 Aug 2022 16:39:12 -0600 Subject: [PATCH 08/20] fix `test_close_while_executing` --- distributed/tests/test_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 7fd905ad57c..24204481f02 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1542,7 +1542,7 @@ async def f(ev): ) await a.close() assert task.cancelled() - assert s.tasks["f1"].state == "no-worker" + assert s.tasks["f1"].state in ("queued", "no-worker") @pytest.mark.slow From e31c7a423db2e9c4df433e49ee2ff266068e6ec2 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 29 Aug 2022 16:41:54 -0600 Subject: [PATCH 09/20] fix `test_TaskState__to_dict` --- distributed/tests/test_scheduler.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 07e1b5bc071..9af04dbc704 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -3762,7 +3762,9 @@ async def test_Scheduler__to_dict(c, s, a): assert isinstance(d["workers"][a.address]["memory"]["process"], int) -@gen_cluster(client=True, nthreads=[]) +@gen_cluster( + client=True, nthreads=[], config={"distributed.scheduler.worker-saturation": 1.0} +) async def test_TaskState__to_dict(c, s): """tasks that are listed as dependencies of other tasks are dumped as a short repr and always appear in full under Scheduler.tasks @@ -3779,7 +3781,7 @@ async def test_TaskState__to_dict(c, s): assert isinstance(tasks["y"], dict) assert isinstance(tasks["z"], dict) assert tasks["x"]["dependents"] == [""] - assert tasks["y"]["dependencies"] == [""] + assert tasks["y"]["dependencies"] == [""] def _verify_cluster_state( From 175e55a73a8b9953924919a215cc0f297ef46767 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 29 Aug 2022 17:50:36 -0600 Subject: [PATCH 10/20] fix `test_pause_while_spilling` There are a lot more race conditions with queuing --- distributed/tests/test_worker_memory.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_worker_memory.py b/distributed/tests/test_worker_memory.py index 818267fff8b..dc41607d4e0 100644 --- a/distributed/tests/test_worker_memory.py +++ b/distributed/tests/test_worker_memory.py @@ -922,16 +922,21 @@ def __reduce__(self): return bool, (paused,) futs = c.map(SlowSpill, range(N_TOTAL)) - while len(a.data.slow) < N_PAUSE + 1: + while len(a.data.slow) < (N_PAUSE + 1 if a.state.ready else N_PAUSE): await asyncio.sleep(0.01) assert a.status == Status.paused # Worker should have become paused after the first `SlowSpill` was evicted, because # the spill to disk took longer than the memory monitor interval. assert len(a.data.fast) == 0 - assert len(a.data.slow) == N_PAUSE + 1 - n_spilled_while_paused = sum(paused is True for paused in a.data.slow.values()) - assert N_PAUSE <= n_spilled_while_paused <= N_PAUSE + 1 + # With queuing enabled, after the 3rd `SlowSpill` has been created, there's a race + # between the scheduler sending the worker a new task, and the memory monitor + # running and pausing the worker. If the worker gets paused before the 4th task + # lands, only 3 will be in memory. If after, the 4th will block on the semaphore + # until one of the others is spilled. + assert len(a.data.slow) in (N_PAUSE, N_PAUSE + 1) + n_spilled_while_not_paused = sum(paused is False for paused in a.data.slow.values()) + assert 0 <= n_spilled_while_not_paused <= 1 @pytest.mark.slow From 611a13ce55b1e0e40cd7810b85b0ca99efeacac8 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 29 Aug 2022 17:54:19 -0600 Subject: [PATCH 11/20] skip `test_target_duration` --- distributed/deploy/tests/test_adaptive.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 04b518b063b..9471de0e507 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -297,6 +297,8 @@ def test_basic_no_loop(cleanup): loop.add_callback(loop.stop) +# adaptive target for queued tasks doesn't yet consider default or learned task durations +@pytest.mark.oversaturate_only @pytest.mark.flaky(condition=LINUX, reruns=10, reruns_delay=5) @pytest.mark.xfail(condition=MACOS or WINDOWS, reason="extremely flaky") @gen_test() From 1327d6aa15a37e094020dfd6dbcca71204a510de Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 1 Sep 2022 17:20:15 -0600 Subject: [PATCH 12/20] As GitHub action --- .github/workflows/tests.yaml | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 753681de0ed..971a8954957 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -26,6 +26,7 @@ jobs: python-version: ["3.8", "3.9", "3.10"] # Cherry-pick test modules to split the overall runtime roughly in half partition: [ci1, not ci1] + queuing: [no-queue] exclude: - os: macos-latest python-version: 3.9 @@ -34,6 +35,14 @@ jobs: partition-label: "ci1" - partition: "not ci1" partition-label: "notci1" + - queuing: queue + os: ubuntu-latest + python-version: 3.9 + partition: "ci1" + - queuing: queue + os: ubuntu-latest + python-version: 3.9 + partition: "not ci1" # Uncomment to stress-test the test suite for random failures. # Must also change env.TEST_ID below. @@ -44,8 +53,8 @@ jobs: # run: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] env: - TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }} - # TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }}-${{ matrix.run }} + TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }}-${{ matrix.queuing }} + # TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }}-${{ matrix.queuing }}-${{ matrix.run }} steps: - name: Checkout source @@ -107,6 +116,11 @@ jobs: python continuous_integration/scripts/host_info.py + - name: Enable queuing + if: ${{ matrix.queuing == 'queue' }} + shell: bash -l {0} + run: export DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1.0 + - name: Test id: run_tests shell: bash -l {0} From 31a00704dfe4c3d85f63460e5c9b8210aa3195a3 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 2 Sep 2022 00:38:08 -0600 Subject: [PATCH 13/20] skip flaky tests for now these should not be failing and we need to evaluate why they are --- distributed/tests/test_client.py | 1 + distributed/tests/test_scheduler.py | 1 + distributed/tests/test_steal.py | 1 + 3 files changed, 3 insertions(+) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 43da018b842..9d4914b77fc 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -5515,6 +5515,7 @@ async def close(): loop.run_sync(close) # TODO: client.close() does not unset global client +@pytest.mark.oversaturate_only # FIXME flaky on windows @pytest.mark.slow @gen_cluster(client=True, Worker=Nanny, timeout=60, nthreads=[("127.0.0.1", 3)] * 2) async def test_nested_compute(c, s, a, b): diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 9af04dbc704..e0fdd28cc9d 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1484,6 +1484,7 @@ async def test_balance_many_workers(c, s, *workers): assert {len(w.has_what) for w in s.workers.values()} == {0, 1} +@pytest.mark.oversaturate_only # FIXME flaky on macOS @nodebug @gen_cluster( client=True, diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 85ce2aaac4c..660f34067d7 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1080,6 +1080,7 @@ async def test_steal_concurrent_simple(c, s, *workers): assert not ws2.has_what +@pytest.mark.oversaturate_only # FIXME flaky on ubuntu @gen_cluster( client=True, config={ From ddb2bbf85ef5da3d2f3fce7d14b02129b8592ddc Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 2 Sep 2022 12:26:02 -0600 Subject: [PATCH 14/20] match name order of queue jobs to others --- .github/workflows/tests.yaml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 971a8954957..a098d387539 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -26,7 +26,7 @@ jobs: python-version: ["3.8", "3.9", "3.10"] # Cherry-pick test modules to split the overall runtime roughly in half partition: [ci1, not ci1] - queuing: [no-queue] + queuing: [no_queue] exclude: - os: macos-latest python-version: 3.9 @@ -35,14 +35,14 @@ jobs: partition-label: "ci1" - partition: "not ci1" partition-label: "notci1" - - queuing: queue - os: ubuntu-latest + - os: ubuntu-latest python-version: 3.9 partition: "ci1" - - queuing: queue - os: ubuntu-latest + queuing: queue + - os: ubuntu-latest python-version: 3.9 partition: "not ci1" + queuing: queue # Uncomment to stress-test the test suite for random failures. # Must also change env.TEST_ID below. From 8a880fdb524d939e10d3e61e1e19750ec7985032 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 2 Sep 2022 12:27:25 -0600 Subject: [PATCH 15/20] set worker-saturation env var in test step --- .github/workflows/tests.yaml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index a098d387539..93194bf300e 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -116,11 +116,6 @@ jobs: python continuous_integration/scripts/host_info.py - - name: Enable queuing - if: ${{ matrix.queuing == 'queue' }} - shell: bash -l {0} - run: export DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1.0 - - name: Test id: run_tests shell: bash -l {0} @@ -133,6 +128,11 @@ jobs: export DISABLE_IPV6=1 fi + if [[ "${{ matrix.queuing }}" = "queue" ]]; then + export DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1.0 + echo "worker-saturation: $DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION" + fi + source continuous_integration/scripts/set_ulimit.sh set -o pipefail mkdir reports From d166e1ac676dfb2faf731e237a07ebd1aa622a29 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 2 Sep 2022 12:29:42 -0600 Subject: [PATCH 16/20] driveby: fix old var name --- distributed/tests/test_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index e0fdd28cc9d..d2ab983e356 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -851,7 +851,7 @@ async def test_ready_remove_worker(s, a, b): elif math.isinf(s.WORKER_SATURATION): cmp = operator.gt else: - pytest.fail(f"{s.WORKER_OVERSATURATION=}, must be 1 or inf") + pytest.fail(f"{s.WORKER_SATURATION=}, must be 1 or inf") assert all(cmp(len(w.processing), w.nthreads) for w in s.workers.values()), ( list(s.workers.values()), From 7b52bede757611b09d7d9c8f579ccbaa5aabb325 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 2 Sep 2022 13:03:56 -0600 Subject: [PATCH 17/20] remove use of `oversaturate_only` mark --- distributed/deploy/tests/test_adaptive.py | 8 +++++--- distributed/tests/test_client.py | 8 +++++++- distributed/tests/test_reschedule.py | 7 +++++-- distributed/tests/test_scheduler.py | 9 +++++++-- distributed/tests/test_steal.py | 6 +++++- 5 files changed, 29 insertions(+), 9 deletions(-) diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 9471de0e507..31dcd7b4459 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -297,14 +297,16 @@ def test_basic_no_loop(cleanup): loop.add_callback(loop.stop) -# adaptive target for queued tasks doesn't yet consider default or learned task durations -@pytest.mark.oversaturate_only @pytest.mark.flaky(condition=LINUX, reruns=10, reruns_delay=5) @pytest.mark.xfail(condition=MACOS or WINDOWS, reason="extremely flaky") @gen_test() async def test_target_duration(): with dask.config.set( - {"distributed.scheduler.default-task-durations": {"slowinc": 1}} + { + "distributed.scheduler.default-task-durations": {"slowinc": 1}, + # adaptive target for queued tasks doesn't yet consider default or learned task durations + "distributed.scheduler.worker-saturation": float("inf"), + } ): async with LocalCluster( n_workers=0, diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 9d4914b77fc..3121dcb6bdd 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -6,6 +6,7 @@ import gc import inspect import logging +import math import operator import os import pathlib @@ -5515,7 +5516,12 @@ async def close(): loop.run_sync(close) # TODO: client.close() does not unset global client -@pytest.mark.oversaturate_only # FIXME flaky on windows +# FIXME shouldn't consistently fail on windows, may be an actual bug +@pytest.mark.skipif( + WINDOWS + and math.isfinite(dask.config.get("distributed.scheduler.worker-saturation")), + reason="flaky on Windows with queuing active", +) @pytest.mark.slow @gen_cluster(client=True, Worker=Nanny, timeout=60, nthreads=[("127.0.0.1", 3)] * 2) async def test_nested_compute(c, s, a, b): diff --git a/distributed/tests/test_reschedule.py b/distributed/tests/test_reschedule.py index c3216832d7f..e5ae0a4f893 100644 --- a/distributed/tests/test_reschedule.py +++ b/distributed/tests/test_reschedule.py @@ -21,11 +21,14 @@ ) -@pytest.mark.oversaturate_only @gen_cluster( client=True, nthreads=[("", 1)] * 2, - config={"distributed.scheduler.work-stealing": False}, + config={ + "distributed.scheduler.work-stealing": False, + # Difficult to get many tasks in processing with scheduler-side queuing + "distributed.scheduler.worker-saturation": float("inf"), + }, ) async def test_scheduler_reschedule(c, s, a, b): xs = c.map(slowinc, range(100), key="x", delay=0.1) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index d2ab983e356..443db8e08d2 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -36,7 +36,7 @@ wait, ) from distributed.comm.addressing import parse_host_port -from distributed.compatibility import LINUX, WINDOWS +from distributed.compatibility import LINUX, MACOS, WINDOWS from distributed.core import ConnectionPool, Status, clean_exception, connect, rpc from distributed.metrics import time from distributed.protocol.pickle import dumps, loads @@ -1484,7 +1484,12 @@ async def test_balance_many_workers(c, s, *workers): assert {len(w.has_what) for w in s.workers.values()} == {0, 1} -@pytest.mark.oversaturate_only # FIXME flaky on macOS +# FIXME test is very timing-based; if some threads are consistently slower than others, +# they'll receive fewer tasks from the queue (a good thing). +@pytest.mark.skipif( + MACOS and math.isfinite(dask.config.get("distributed.scheduler.worker-saturation")), + reason="flaky on macOS with queuing active", +) @nodebug @gen_cluster( client=True, diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 660f34067d7..3128f902b8c 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1080,7 +1080,11 @@ async def test_steal_concurrent_simple(c, s, *workers): assert not ws2.has_what -@pytest.mark.oversaturate_only # FIXME flaky on ubuntu +# FIXME shouldn't consistently fail, may be an actual bug? +@pytest.mark.skipif( + math.isfinite(dask.config.get("distributed.scheduler.worker-saturation")), + reason="flaky with queuing active", +) @gen_cluster( client=True, config={ From 40a8a8e755b3f4f199a1261d0d40d74f0706eaea Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 2 Sep 2022 13:13:44 -0600 Subject: [PATCH 18/20] Revert "add `oversaturate_only` mark" This reverts commit 0f9034a2b68d1bf12f37fcf153515d7b4038c0a8. --- conftest.py | 26 ++++++-------------------- setup.cfg | 1 - 2 files changed, 6 insertions(+), 21 deletions(-) diff --git a/conftest.py b/conftest.py index 98b5f537b72..c98d558023b 100644 --- a/conftest.py +++ b/conftest.py @@ -1,11 +1,8 @@ +# https://pytest.org/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option from __future__ import annotations -import math - import pytest -import dask - # Uncomment to enable more logging and checks # (https://docs.python.org/3/library/asyncio-dev.html) # Note this makes things slower and might consume much memory. @@ -30,24 +27,13 @@ def pytest_addoption(parser): def pytest_collection_modifyitems(config, items): - # https://pytest.org/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option - if skip_slow := not config.getoption("--runslow"): + if config.getoption("--runslow"): # --runslow given in cli: do not skip slow tests - skip_slow_marker = pytest.mark.skip(reason="need --runslow option to run") - - if skip_oversaturate := math.isfinite( - dask.config.get("distributed.scheduler.worker_saturation") - ): - skip_oversaturate_marker = pytest.mark.skip( - reason="need `distributed.scheduler.worker_saturation = inf` to run" - ) - + return + skip_slow = pytest.mark.skip(reason="need --runslow option to run") for item in items: - if skip_slow and "slow" in item.keywords: - item.add_marker(skip_slow_marker) - - if skip_oversaturate and "oversaturate_only" in item.keywords: - item.add_marker(skip_oversaturate_marker) + if "slow" in item.keywords: + item.add_marker(skip_slow) if "ws" in item.fixturenames: item.add_marker(pytest.mark.workerstate) diff --git a/setup.cfg b/setup.cfg index 28c3a0e57ee..e0701689571 100644 --- a/setup.cfg +++ b/setup.cfg @@ -99,7 +99,6 @@ markers = gpu: marks tests we want to run on GPUs leaking: ignore leaked resources; see pytest_resourceleaks.py for usage workerstate: deterministic test for the worker state machine. Automatically applied to all tests that use the 'ws' fixture. - oversaturate_only: tests that only work/make sense to run if root task queuing is disabled # pytest-timeout settings # 'thread' kills off the whole test suite. 'signal' only kills the offending test. From d14f94723086ec4d31247322226bb218797463c8 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 2 Sep 2022 15:58:58 -0600 Subject: [PATCH 19/20] Fix test workflow GitHub didn't like setting `partition-label` in two different ways; it showed up in two different places in the job name and breaks the test report script. Instead, we can set the `TEST_ID` variable as a step in the job, which is cleaner anyway IMO. --- .github/workflows/tests.yaml | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index ef8e687be13..5df232ce2c4 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -24,28 +24,24 @@ jobs: matrix: os: [ubuntu-latest, windows-latest, macos-latest] python-version: ["3.8", "3.9", "3.10"] + queuing: [no_queue] # Cherry-pick test modules to split the overall runtime roughly in half partition: [ci1, not ci1] - queuing: [no_queue] exclude: - os: macos-latest python-version: 3.9 include: - - partition: "ci1" - partition-label: "ci1" - - partition: "not ci1" - partition-label: "notci1" - os: ubuntu-latest python-version: 3.9 - partition: "ci1" queuing: queue + partition: "ci1" - os: ubuntu-latest python-version: 3.9 - partition: "not ci1" queuing: queue + partition: "not ci1" # Uncomment to stress-test the test suite for random failures. - # Must also change env.TEST_ID below. + # Must also change `export TEST_ID` in first step below. # This will take a LONG time and delay all PRs across the whole github.com/dask! # To avoid hamstringing other people, change 'on: [push, pull_request]' above # to just 'on: [push]'; this way the stress test will run exclusively in your @@ -54,10 +50,18 @@ jobs: env: CONDA_FILE: continuous_integration/environment-${{ matrix.python-version }}.yaml - TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }}-${{ matrix.queuing }} - # TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }}-${{ matrix.queuing }}-${{ matrix.run }} steps: + - name: Set $TEST_ID + run: | + export PARTITION_LABEL=$( echo "${{ matrix.partition }}" | sed "s/ //" ) + export TEST_ID="${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.queuing }}-$PARTITION_LABEL" + # Switch to this version for stress-test: + # export TEST_ID="${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.queuing }}-$PARTITION_LABEL-${{ matrix.run }}" + echo "TEST_ID: $TEST_ID" + echo "TEST_ID=$TEST_ID" >> $GITHUB_ENV + shell: bash + - name: Checkout source uses: actions/checkout@v2 with: From 33e499572926a5610d1f4a66939c9fe97f4b7919 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 2 Sep 2022 15:59:36 -0600 Subject: [PATCH 20/20] Update test report script --- continuous_integration/scripts/test_report.py | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/continuous_integration/scripts/test_report.py b/continuous_integration/scripts/test_report.py index b192666b7e2..f8afeae2530 100644 --- a/continuous_integration/scripts/test_report.py +++ b/continuous_integration/scripts/test_report.py @@ -136,22 +136,23 @@ def get_jobs(run, session): cache[url] = jobs df_jobs = pandas.DataFrame.from_records(jobs) - extracted = df_jobs.name.str.extract( - r"\(([\w\-]+), (\d\.\d+),\s([\w|\s]+)\)" - ).dropna() - df_jobs["OS"] = extracted[0] - df_jobs["python_version"] = extracted[1] - df_jobs["partition"] = extracted[2] - # We later need to join on this. Somehow the job ID is not part of the workflow schema and we have no other way to join - df_jobs["suite_name"] = ( - df_jobs["OS"] - + "-" - + df_jobs["python_version"] - + "-" - + df_jobs["partition"].str.replace(" ", "") + name_components = ( + df_jobs.name.str.extract(r"test \((.+)\)", expand=False) + .dropna() + .str.split(", ", expand=True) + .set_axis(["OS", "python_version", "queuing", "partition"], axis="columns") + .assign( + # We later need to join on this. Somehow the job ID is not part of the workflow schema and we have no other way to join + suite_name=lambda df: df["OS"] + + "-" + + df["python_version"] + + "-" + + df["queuing"] + + "-" + + df["partition"].str.replace(" ", "") + ) ) - - return df_jobs + return pandas.concat([df_jobs, name_components], axis="columns") def get_workflow_run_listing( @@ -205,7 +206,7 @@ def suite_from_name(name: str) -> str: can have matrix partitions, pytest marks, etc. Basically, just lop off the front of the name to get the suite. """ - return "-".join(name.split("-")[:3]) + return "-".join(name.split("-")[:4]) def download_and_parse_artifact(