diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 209354baaa..f7abeeaf5e 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -24,7 +24,7 @@ jobs: matrix: os: [ubuntu-latest, windows-latest, macos-latest] python-version: ["3.8", "3.9", "3.10"] - queuing: [no_queue] + queuing: [queue] # Cherry-pick test modules to split the overall runtime roughly in half partition: [ci1, not ci1] exclude: @@ -33,11 +33,11 @@ jobs: include: - os: ubuntu-latest python-version: 3.9 - queuing: queue + queuing: no_queue partition: "ci1" - os: ubuntu-latest python-version: 3.9 - queuing: queue + queuing: no_queue partition: "not ci1" # Uncomment to stress-test the test suite for random failures. @@ -144,8 +144,8 @@ jobs: - name: Set up dask env for job queuing shell: bash -l {0} - if: ${{ matrix.queuing == 'queue' }} - run: echo "DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1.0" >> $GITHUB_ENV + if: ${{ matrix.queuing == 'no_queue' }} + run: echo "DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=inf" >> $GITHUB_ENV - name: Print host info shell: bash -l {0} diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 105a45e900..455de4265c 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -22,7 +22,7 @@ distributed: events-log-length: 100000 work-stealing: True # workers should steal tasks from each other work-stealing-interval: 100ms # Callback time for work stealing - worker-saturation: .inf # Send this fraction of nthreads root tasks to workers + worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers worker-ttl: "5 minutes" # like '60s'. Time to live for workers. They must heartbeat faster than this pickle: True # Is the scheduler allowed to deserialize arbitrary bytestrings preload: [] # Run custom modules with Scheduler @@ -152,7 +152,7 @@ distributed: # Fractions of worker process memory at which we take action to avoid memory # blowup. Set any of the values to False to turn off the behavior entirely. # All fractions are relative to each worker's memory_limit. - transfer: 0.10 # fractional size of incoming data transfers where we start + transfer: 0.10 # fractional size of incoming data transfers where we start # throttling incoming data transfers target: 0.60 # fraction of managed memory where we start spilling to disk spill: 0.70 # fraction of process memory where we start spilling to disk diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index e5028f8746..6961369d6b 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -257,7 +257,7 @@ def random(**kwargs): @pytest.mark.skipif( - math.isfinite(dask.config.get("distributed.scheduler.worker-saturation")), + math.isfinite(float(dask.config.get("distributed.scheduler.worker-saturation"))), reason="Not relevant with queuing on; see https://github.com/dask/distributed/issues/7204", ) @gen_cluster( @@ -918,8 +918,10 @@ def f(x, y=2): assert set(d) == {"function", "args"} +@pytest.mark.parametrize("worker_saturation", [1.0, float("inf")]) @gen_cluster() -async def test_ready_remove_worker(s, a, b): +async def test_ready_remove_worker(s, a, b, worker_saturation): + s.WORKER_SATURATION = worker_saturation s.update_graph( tasks={"x-%d" % i: dumps_task((inc, i)) for i in range(20)}, keys=["x-%d" % i for i in range(20)], @@ -1555,7 +1557,10 @@ async def test_balance_many_workers(c, s, *workers): # FIXME test is very timing-based; if some threads are consistently slower than others, # they'll receive fewer tasks from the queue (a good thing). @pytest.mark.skipif( - MACOS and math.isfinite(dask.config.get("distributed.scheduler.worker-saturation")), + MACOS + and math.isfinite( + float(dask.config.get("distributed.scheduler.worker-saturation")) + ), reason="flaky on macOS with queuing active", ) @nodebug