Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/dask/distributed into dask-…
Browse files Browse the repository at this point in the history
…expr-conda
  • Loading branch information
jrbourbeau committed Mar 15, 2024
2 parents 89b9499 + 8c93366 commit 9e0e335
Show file tree
Hide file tree
Showing 42 changed files with 721 additions and 530 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/conda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
with:
fetch-depth: 0
- name: Set up Python
uses: conda-incubator/[email protected].2
uses: conda-incubator/[email protected].3
with:
miniforge-variant: Mambaforge
use-mamba: true
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- uses: actions/[email protected]

- name: Setup Conda Environment
uses: conda-incubator/[email protected].2
uses: conda-incubator/[email protected].3
with:
miniforge-variant: Mambaforge
miniforge-version: latest
Expand Down
23 changes: 19 additions & 4 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
environment: [mindeps, "3.9", "3.10", "3.11", "3.12"]
label: [queue]
label: [default]
extra_packages: [null]
# Cherry-pick test modules to split the overall runtime roughly in half
partition: [ci1, not ci1]
Expand Down Expand Up @@ -57,6 +57,16 @@ jobs:
label: no_queue
partition: "not ci1"

# Set dataframe.query-planning: false
- os: ubuntu-latest
environment: "3.9"
label: no_expr
partition: "ci1"
- os: ubuntu-latest
environment: "3.9"
label: no_expr
partition: "not ci1"

# dask.array P2P shuffle
- os: ubuntu-latest
environment: mindeps
Expand Down Expand Up @@ -115,7 +125,7 @@ jobs:
fetch-depth: 0

- name: Setup Conda Environment
uses: conda-incubator/[email protected].2
uses: conda-incubator/[email protected].3
with:
miniforge-variant: Mambaforge
miniforge-version: latest
Expand Down Expand Up @@ -154,7 +164,7 @@ jobs:
# Increase this value to reset cache if
# continuous_integration/environment-${{ matrix.environment }}.yaml has not
# changed. See also same variable in .pre-commit-config.yaml
CACHE_NUMBER: 1
CACHE_NUMBER: 2
id: cache

- name: Update environment
Expand Down Expand Up @@ -207,11 +217,16 @@ jobs:
if: ${{ matrix.os != 'windows-latest' }}
run: echo "DISABLE_IPV6=1" >> $GITHUB_ENV

- name: Set up dask env for job queuing
- name: Set up dask env to disable job queuing
shell: bash -l {0}
if: ${{ matrix.label == 'no_queue' }}
run: echo "DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=inf" >> $GITHUB_ENV

- name: Set up dask env to disable dask-expr
shell: bash -l {0}
if: ${{ matrix.label == 'no_expr' }}
run: echo "DASK_DATAFRAME__QUERY_PLANNING=False" >> $GITHUB_ENV

- name: Print host info
# host_info.py imports numpy, which isn't a direct dependency of distributed
if: matrix.environment != 'mindeps'
Expand Down
3 changes: 3 additions & 0 deletions continuous_integration/environment-3.10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ dependencies:
- ipykernel
- ipywidgets
- jinja2
- jupyter-server-proxy
- jupyterlab
- locket
- msgpack-python
- netcdf4
Expand All @@ -44,6 +46,7 @@ dependencies:
- zstandard
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/dask-contrib/dask-expr
- git+https://github.com/dask/s3fs
- git+https://github.com/dask/zict
- git+https://github.com/fsspec/filesystem_spec
Expand Down
3 changes: 3 additions & 0 deletions continuous_integration/environment-3.11.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ dependencies:
- ipykernel
- ipywidgets
- jinja2
- jupyter-server-proxy
- jupyterlab
- locket
- msgpack-python
- netcdf4
Expand All @@ -44,6 +46,7 @@ dependencies:
- zstandard
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/dask-contrib/dask-expr
- git+https://github.com/dask/s3fs
- git+https://github.com/dask/zict
- git+https://github.com/fsspec/filesystem_spec
Expand Down
3 changes: 3 additions & 0 deletions continuous_integration/environment-3.12.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ dependencies:
- ipykernel
- ipywidgets
- jinja2
- jupyter-server-proxy
- jupyterlab
- locket
- msgpack-python
- netcdf4
Expand All @@ -44,6 +46,7 @@ dependencies:
- zstandard
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/dask-contrib/dask-expr
- git+https://github.com/dask/s3fs
- git+https://github.com/dask/zict
- git+https://github.com/fsspec/filesystem_spec
Expand Down
3 changes: 3 additions & 0 deletions continuous_integration/environment-3.9.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ dependencies:
- ipykernel
- ipywidgets
- jinja2
- jupyter-server-proxy
- jupyterlab
- locket
- lz4 # Only tested here
- msgpack-python
Expand Down Expand Up @@ -51,5 +53,6 @@ dependencies:
- zstandard
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/dask-contrib/dask-expr
- git+https://github.com/dask/crick # Only tested here
- keras
51 changes: 22 additions & 29 deletions distributed/cli/tests/test_dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from distributed.compatibility import LINUX, WINDOWS
from distributed.deploy.utils import nprocesses_nthreads
from distributed.metrics import time
from distributed.utils import open_port
from distributed.utils import get_ip, open_port
from distributed.utils_test import (
gen_cluster,
inc,
Expand Down Expand Up @@ -485,39 +485,32 @@ def func(dask_worker):


@pytest.mark.slow
@gen_cluster(
client=True, nthreads=[], scheduler_kwargs={"dashboard_address": "localhost:8787"}
)
async def test_dashboard_non_standard_ports(c, s, requires_default_ports):
def test_dashboard_non_standard_ports():
pytest.importorskip("bokeh")
requests = pytest.importorskip("requests")

try:
import jupyter_server_proxy # noqa: F401

proxy_exists = True
except ImportError:
proxy_exists = False

with popen(
[
"dask",
"worker",
s.address,
"--dashboard-address",
":4833",
"--host",
"127.0.0.1",
]
):
await c.wait_for_workers(1)

response = requests.get("http://127.0.0.1:4833/status")
s_host = "127.0.0.1"
# use internal ip instead of localhost ip to verify GlobalProxyHandler will update
# to allow internal host ip of a worker.
w_host = get_ip()
s_port = "3233"
s_dashboard_port = "3232"
w_dashboard_port = "4833"
s_cmd = f"dask scheduler --host {s_host} --port {s_port} --dashboard-address :{s_dashboard_port}"
w_cmd = f"dask worker {s_host}:{s_port} --dashboard-address :{w_dashboard_port} --host {w_host}"

with popen(s_cmd.split()), popen(w_cmd.split()):
with Client(f"{s_host}:{s_port}") as c:
c.wait_for_workers(1)

response = requests.get(f"http://{s_host}:{w_dashboard_port}/status")
response.raise_for_status()

# TEST PROXYING WORKS
if proxy_exists:
response = requests.get("http://127.0.0.1:8787/proxy/4833/127.0.0.1/status")
response.raise_for_status()
response = requests.get(
f"http://{s_host}:{s_dashboard_port}/proxy/{w_dashboard_port}/{w_host}/status"
)
response.raise_for_status()

with pytest.raises(requests.ConnectionError):
requests.get("http://localhost:4833/status/")
Expand Down
39 changes: 21 additions & 18 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import atexit
import copy
import inspect
import itertools
import json
import logging
import os
Expand Down Expand Up @@ -31,7 +32,7 @@
from tlz import first, groupby, merge, partition_all, valmap

import dask
from dask.base import collections_to_dsk, normalize_token, tokenize
from dask.base import collections_to_dsk, tokenize
from dask.core import flatten, validate_key
from dask.highlevelgraph import HighLevelGraph
from dask.optimization import SubgraphCallable
Expand Down Expand Up @@ -210,11 +211,15 @@ class Future(WrappedKey):

_cb_executor = None
_cb_executor_pid = None
_counter = itertools.count()
# Make sure this stays unique even across multiple processes or hosts
_uid = uuid.uuid4().hex

def __init__(self, key, client=None, inform=True, state=None):
def __init__(self, key, client=None, inform=True, state=None, _id=None):
self.key = key
self._cleared = False
self._client = client
self._id = _id or (Future._uid, next(Future._counter))
self._input_state = state
self._inform = inform
self._state = None
Expand Down Expand Up @@ -499,8 +504,16 @@ def release(self):
except TypeError: # pragma: no cover
pass # Shutting down, add_callback may be None

@staticmethod
def make_future(key, id):
# Can't use kwargs in pickle __reduce__ methods
return Future(key=key, _id=id)

def __reduce__(self) -> str | tuple[Any, ...]:
return Future, (self.key,)
return Future.make_future, (self.key, self._id)

def __dask_tokenize__(self):
return (type(self).__name__, self.key, self._id)

def __del__(self):
try:
Expand Down Expand Up @@ -643,18 +656,6 @@ async def done_callback(future, callback):
callback(future)


@partial(normalize_token.register, Future)
def normalize_future(f):
"""Returns the key and the type as a list
Parameters
----------
list
The key and the type
"""
return [f.key, type(f)]


class AllExit(Exception):
"""Custom exception class to exit All(...) early."""

Expand Down Expand Up @@ -3434,9 +3435,11 @@ def compute(

if traverse:
collections = tuple(
dask.delayed(a)
if isinstance(a, (list, set, tuple, dict, Iterator))
else a
(
dask.delayed(a)
if isinstance(a, (list, set, tuple, dict, Iterator))
else a
)
for a in collections
)

Expand Down
7 changes: 4 additions & 3 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3911,19 +3911,20 @@ def __init__(self, scheduler, **kwargs):
@log_errors
def update(self):
s = self.scheduler
monitor_gil = s.monitor.monitor_gil_contention

self.data["values"] = [
s._tick_interval_observed,
self.gil_contention_scheduler,
sum(w.metrics["event_loop_interval"] for w in s.workers.values())
/ (len(s.workers) or 1),
self.gil_contention_workers,
][:: 1 if monitor_gil else 2]
][:: 1 if s.monitor.monitor_gil_contention else 2]

# Format event loop as time and GIL (if configured) as %
self.data["text"] = [
f"{x * 100:.1f}%" if i % 2 and monitor_gil else format_time(x)
f"{x * 100:.1f}%"
if i % 2 and s.monitor.monitor_gil_contention
else format_time(x)
for i, x in enumerate(self.data["values"])
]
update(self.source, self.data)
Expand Down
2 changes: 1 addition & 1 deletion distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -1346,7 +1346,7 @@ async def test_shuffling(c, s, a, b):
freq="10 s",
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
df2 = dd.shuffle.shuffle(df, "x").persist()
df2 = df.shuffle("x").persist()
start = time()
while not ss.source.data["comm_written"]:
ss.update()
Expand Down
2 changes: 2 additions & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ distributed:
default-task-durations: # How long we expect function names to run ("1h", "1s") (helps for long tasks)
rechunk-split: 1us
split-shuffle: 1us
split-taskshuffle: 1us
split-stage: 1us
validate: False # Check scheduler state at every step for debugging
dashboard:
status:
Expand Down
14 changes: 14 additions & 0 deletions distributed/http/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ def initialize(self, dask_server=None, extra=None):
self.scheduler = dask_server
self.extra = extra or {}

# `get_current_user` and `prepare` method impls reference
# issue in tornado & jupyter server compat here
# https://github.com/jupyter-server/jupyter_server/issues/1012
def get_current_user(self):
return "dask"

async def prepare(self):
web.authenticated(lambda rq: None)(self)

async def http_get(self, port, host, proxied_path):
# route here first
# incoming URI /proxy/{port}/{host}/{proxied_path}
Expand All @@ -29,6 +38,9 @@ async def http_get(self, port, host, proxied_path):
uri = f"/proxy/{port}/{proxied_path}"
self.request.uri = uri

if self.host not in self.host_allowlist:
self.host_allowlist.append(self.host)

# slash is removed during regex in handler
proxied_path = "/%s" % proxied_path

Expand All @@ -41,6 +53,8 @@ async def http_get(self, port, host, proxied_path):
return await self.proxy(port, proxied_path)

async def open(self, port, host, proxied_path):
if host not in self.host_allowlist:
self.host_allowlist.append(host)
# finally, proxy to other address/port
return await self.proxy_open(host, port, proxied_path)

Expand Down
3 changes: 2 additions & 1 deletion distributed/http/scheduler/prometheus/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def collect(self) -> Iterator[GaugeMetricFamily | CounterMetricFamily]:
yield CounterMetricFamily(
self.build_name("gil_contention"),
"GIL contention metric",
value=self.server.monitor._cumulative_gil_contention,
value=self.server.monitor.cumulative_gil_contention,
unit="seconds",
)

yield CounterMetricFamily(
Expand Down
2 changes: 1 addition & 1 deletion distributed/http/scheduler/tests/test_scheduler_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ async def test_prometheus(c, s, a, b):
except ImportError:
pass # pragma: nocover
else:
expected_metrics.add("dask_scheduler_gil_contention")
expected_metrics.add("dask_scheduler_gil_contention_seconds")

assert set(active_metrics.keys()) == expected_metrics
assert active_metrics["dask_scheduler_clients"].samples[0].value == 1.0
Expand Down
Loading

0 comments on commit 9e0e335

Please sign in to comment.