Skip to content

Commit

Permalink
Merge branch 'main' into wence/fix/tornado-dep
Browse files Browse the repository at this point in the history
  • Loading branch information
wence- committed Nov 23, 2022
2 parents f613809 + 803c624 commit ff6f673
Show file tree
Hide file tree
Showing 91 changed files with 3,629 additions and 2,077 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/conda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
with:
fetch-depth: 0
- name: Set up Python
uses: conda-incubator/setup-miniconda@v2
uses: conda-incubator/setup-miniconda@v2.2.0
with:
miniforge-variant: Mambaforge
use-mamba: true
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/test-report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- uses: actions/[email protected]

- name: Setup Conda Environment
uses: conda-incubator/setup-miniconda@v2
uses: conda-incubator/setup-miniconda@v2.2.0
with:
miniforge-variant: Mambaforge
miniforge-version: latest
Expand Down Expand Up @@ -50,8 +50,8 @@ jobs:

- name: Generate report
run: |
python continuous_integration/scripts/test_report.py --max-days 90 --max-runs 50 --nfails 1 -o test_report.html
python continuous_integration/scripts/test_report.py --max-days 7 --max-runs 50 --nfails 2 -o test_short_report.html --title "Test Short Report"
python continuous_integration/scripts/test_report.py --max-days 90 --max-runs 30 --nfails 1 -o test_report.html
python continuous_integration/scripts/test_report.py --max-days 7 --max-runs 30 --nfails 2 -o test_short_report.html --title "Test Short Report"
mkdir deploy
mv test_report.html test_short_report.html deploy/
Expand Down
14 changes: 8 additions & 6 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
python-version: ["3.8", "3.9", "3.10"]
queuing: [no_queue]
queuing: [queue]
# Cherry-pick test modules to split the overall runtime roughly in half
partition: [ci1, not ci1]
exclude:
Expand All @@ -33,11 +33,11 @@ jobs:
include:
- os: ubuntu-latest
python-version: 3.9
queuing: queue
queuing: no_queue
partition: "ci1"
- os: ubuntu-latest
python-version: 3.9
queuing: queue
queuing: no_queue
partition: "not ci1"

# Uncomment to stress-test the test suite for random failures.
Expand Down Expand Up @@ -68,7 +68,7 @@ jobs:
fetch-depth: 0

- name: Setup Conda Environment
uses: conda-incubator/setup-miniconda@v2
uses: conda-incubator/setup-miniconda@v2.2.0
with:
miniforge-variant: Mambaforge
miniforge-version: latest
Expand Down Expand Up @@ -144,8 +144,8 @@ jobs:

- name: Set up dask env for job queuing
shell: bash -l {0}
if: ${{ matrix.queuing == 'queue' }}
run: echo "DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1.0" >> $GITHUB_ENV
if: ${{ matrix.queuing == 'no_queue' }}
run: echo "DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=inf" >> $GITHUB_ENV

- name: Print host info
shell: bash -l {0}
Expand Down Expand Up @@ -200,6 +200,8 @@ jobs:
uses: codecov/codecov-action@v3
with:
name: ${{ env.TEST_ID }}
# See https://community.codecov.com/t/upload-issues-unable-to-locate-build-via-github-actions-api/3954
token: ${{ secrets.CODECOV_TOKEN }}

- name: Upload test results
# ensure this runs even if pytest fails
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/update-gpuci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jobs:
- uses: actions/[email protected]

- name: Parse current axis YAML
id: rapids_current
uses: the-coding-turtle/[email protected]
with:
file: continuous_integration/gpuci/axis.yaml
Expand All @@ -39,8 +40,8 @@ jobs:
FULL_RAPIDS_VER: ${{ steps.cudf_latest.outputs.version }}
FULL_UCX_PY_VER: ${{ steps.ucx_py_latest.outputs.version }}
run: |
echo RAPIDS_VER=$RAPIDS_VER_0 >> $GITHUB_ENV
echo UCX_PY_VER=$(curl -sL https://version.gpuci.io/rapids/$RAPIDS_VER_0) >> $GITHUB_ENV
echo RAPIDS_VER=${{ steps.rapids_current.outputs.RAPIDS_VER_0 }} >> $GITHUB_ENV
echo UCX_PY_VER=$(curl -sL https://version.gpuci.io/rapids/${{ steps.rapids_current.outputs.RAPIDS_VER_0 }}) >> $GITHUB_ENV
echo NEW_RAPIDS_VER=${FULL_RAPIDS_VER::-10} >> $GITHUB_ENV
echo NEW_UCX_PY_VER=${FULL_UCX_PY_VER::-10} >> $GITHUB_ENV
Expand Down
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ repos:
# NOTE: autoupdate does not pick up flake8-bugbear since it is a transitive
# dependency. Make sure to update flake8-bugbear manually on a regular basis.
- flake8-bugbear==22.9.23
- repo: https://github.com/codespell-project/codespell
rev: v2.1.0
hooks:
- id: codespell
types_or: [rst, markdown]
files: docs
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.982
hooks:
Expand Down
5 changes: 5 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
codecov:
require_ci_to_pass: yes

ignore:
# Files that exercise GPU-only functionality or are only tested by gpuCI
# but don't interact with codecov are ignored.
- "distributed/comm/ucx.py"

coverage:
precision: 2
round: down
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/gpuci/axis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ LINUX_VER:
- ubuntu18.04

RAPIDS_VER:
- "22.12"
- "23.02"

excludes:
6 changes: 3 additions & 3 deletions continuous_integration/recipes/distributed/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ requirements:
- python >=3.8
- click >=7.0
- cloudpickle >=1.5.0
- cytoolz >=0.8.2
- cytoolz >=0.10.0
- {{ pin_compatible('dask-core', max_pin='x.x.x.x') }}
- jinja2
- locket >=1.0.0
Expand All @@ -41,8 +41,8 @@ requirements:
- pyyaml
- sortedcontainers !=2.0.0,!=2.0.1
- tblib >=1.6.0
- toolz >=0.8.2
- tornado >=6.2
- toolz >=0.10.0
- tornado >=6.0.3
- urllib3
- zict >=0.1.3
run_constrained:
Expand Down
2 changes: 1 addition & 1 deletion distributed/cli/dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ def _apportion_ports(
Returns
=======
List of kwargs to pass to the Worker or Nanny construtors
List of kwargs to pass to the Worker or Nanny constructors
"""
seen = set()

Expand Down
10 changes: 5 additions & 5 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ def __del__(self):
try:
self.release()
except AttributeError:
# Ocassionally we see this error when shutting down the client
# Occasionally we see this error when shutting down the client
# https://github.com/dask/distributed/issues/4305
if not sys.is_finalizing():
raise
Expand Down Expand Up @@ -2851,7 +2851,7 @@ def run(
wait : boolean (optional)
If the function is asynchronous whether or not to wait until that
function finishes.
nanny : bool, defualt False
nanny : bool, default False
Whether to run ``function`` on the nanny. By default, the function
is run on the worker process. If specified, the addresses in
``workers`` should still be the worker addresses, not the nanny addresses.
Expand Down Expand Up @@ -2885,7 +2885,7 @@ def run(
>>> def get_status(dask_worker):
... return dask_worker.status
>>> c.run(get_hostname) # doctest: +SKIP
>>> c.run(get_status) # doctest: +SKIP
{'192.168.0.100:9000': 'running',
'192.168.0.101:9000': 'running}
Expand Down Expand Up @@ -4206,7 +4206,7 @@ def get_scheduler_logs(self, n=None):
Parameters
----------
n : int
Number of logs to retrive. Maxes out at 10000 by default,
Number of logs to retrieve. Maxes out at 10000 by default,
configurable via the ``distributed.admin.log-length``
configuration value.
Expand All @@ -4222,7 +4222,7 @@ def get_worker_logs(self, n=None, workers=None, nanny=False):
Parameters
----------
n : int
Number of logs to retrive. Maxes out at 10000 by default,
Number of logs to retrieve. Maxes out at 10000 by default,
configurable via the ``distributed.admin.log-length``
configuration value.
workers : iterable
Expand Down
4 changes: 2 additions & 2 deletions distributed/comm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def time_left():

backoff_base = 0.01
attempt = 0

logger.debug("Establishing connection to %s", loc)
# Prefer multiple small attempts than one long attempt. This should protect
# primarily from DNS race conditions
# gh3104, gh4176, gh4167
Expand All @@ -299,7 +299,7 @@ def time_left():
except (asyncio.TimeoutError, OSError) as exc:
active_exception = exc

# As descibed above, the intermediate timeout is used to distributed
# As described above, the intermediate timeout is used to distributed
# initial, bulk connect attempts homogeneously. In particular with
# the jitter upon retries we should not be worred about overloading
# any more DNS servers
Expand Down
2 changes: 1 addition & 1 deletion distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ async def connect(self, address, deserialize=True, **connection_args):
ip, port, max_buffer_size=MAX_BUFFER_SIZE, **kwargs
)

# Under certain circumstances tornado will have a closed connnection with an
# Under certain circumstances tornado will have a closed connection with an
# error and not raise a StreamClosedError.
#
# This occurs with tornado 5.x and openssl 1.1+
Expand Down
49 changes: 38 additions & 11 deletions distributed/comm/tests/test_ucx.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import asyncio
import os
from unittest.mock import patch

import pytest

Expand All @@ -15,7 +17,12 @@
from distributed.comm.core import CommClosedError
from distributed.comm.registry import backends, get_backend
from distributed.deploy.local import LocalCluster
from distributed.diagnostics.nvml import has_cuda_context
from distributed.diagnostics.nvml import (
device_get_count,
get_device_index_and_uuid,
get_device_mig_mode,
has_cuda_context,
)
from distributed.protocol import to_serialize
from distributed.utils_test import gen_test, inc

Expand Down Expand Up @@ -320,20 +327,40 @@ async def test_simple(
assert await client.submit(lambda x: x + 1, 10) == 11


@pytest.mark.xfail(reason="If running on Docker, requires --pid=host")
@gen_test()
async def test_cuda_context(
ucx_loop,
):
with dask.config.set({"distributed.comm.ucx.create-cuda-context": True}):
async with LocalCluster(
protocol="ucx", n_workers=1, asynchronous=True
) as cluster:
async with Client(cluster, asynchronous=True) as client:
assert cluster.scheduler_address.startswith("ucx://")
assert has_cuda_context() == 0
worker_cuda_context = await client.run(has_cuda_context)
assert len(worker_cuda_context) == 1
assert list(worker_cuda_context.values())[0] == 0
try:
device_info = get_device_index_and_uuid(
next(
filter(
lambda i: get_device_mig_mode(i)[0] == 0, range(device_get_count())
)
)
)
except StopIteration:
pytest.skip("No CUDA device in non-MIG mode available")

with patch.dict(
os.environ, {"CUDA_VISIBLE_DEVICES": device_info.uuid.decode("utf-8")}
):
with dask.config.set({"distributed.comm.ucx.create-cuda-context": True}):
async with LocalCluster(
protocol="ucx", n_workers=1, asynchronous=True
) as cluster:
async with Client(cluster, asynchronous=True) as client:
assert cluster.scheduler_address.startswith("ucx://")
ctx = has_cuda_context()
assert ctx.has_context and ctx.device_info == device_info
worker_cuda_context = await client.run(has_cuda_context)
assert len(worker_cuda_context) == 1
worker_cuda_context = list(worker_cuda_context.values())
assert (
worker_cuda_context[0].has_context
and worker_cuda_context[0].device_info == device_info
)


@gen_test()
Expand Down
40 changes: 28 additions & 12 deletions distributed/comm/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
host_array,
to_frames,
)
from distributed.diagnostics.nvml import has_cuda_context
from distributed.diagnostics.nvml import (
CudaDeviceInfo,
get_device_index_and_uuid,
has_cuda_context,
)
from distributed.utils import ensure_ip, get_ip, get_ipv6, log_errors, nbytes

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -57,17 +61,27 @@
)


def _warn_existing_cuda_context(ctx, pid):
def _get_device_and_uuid_str(device_info: CudaDeviceInfo) -> str:
return f"{device_info.device_index} ({str(device_info.uuid)})"


def _warn_existing_cuda_context(device_info: CudaDeviceInfo, pid: int) -> None:
device_uuid_str = _get_device_and_uuid_str(device_info)
logger.warning(
f"A CUDA context for device {ctx} already exists on process ID {pid}. {_warning_suffix}"
f"A CUDA context for device {device_uuid_str} already exists "
f"on process ID {pid}. {_warning_suffix}"
)


def _warn_cuda_context_wrong_device(ctx_expected, ctx_actual, pid):
def _warn_cuda_context_wrong_device(
device_info_expected: CudaDeviceInfo, device_info_actual: CudaDeviceInfo, pid: int
) -> None:
expected_device_uuid_str = _get_device_and_uuid_str(device_info_expected)
actual_device_uuid_str = _get_device_and_uuid_str(device_info_actual)
logger.warning(
f"Worker with process ID {pid} should have a CUDA context assigned to device "
f"{ctx_expected}, but instead the CUDA context is on device {ctx_actual}. "
f"{_warning_suffix}"
f"{expected_device_uuid_str}, but instead the CUDA context is on device "
f"{actual_device_uuid_str}. {_warning_suffix}"
)


Expand Down Expand Up @@ -116,22 +130,24 @@ def init_once():
"CUDA support with UCX requires Numba for context management"
)

cuda_visible_device = int(
cuda_visible_device = get_device_index_and_uuid(
os.environ.get("CUDA_VISIBLE_DEVICES", "0").split(",")[0]
)
pre_existing_cuda_context = has_cuda_context()
if pre_existing_cuda_context is not False:
_warn_existing_cuda_context(pre_existing_cuda_context, os.getpid())
if pre_existing_cuda_context.has_context:
_warn_existing_cuda_context(
pre_existing_cuda_context.device_info, os.getpid()
)

numba.cuda.current_context()

cuda_context_created = has_cuda_context()
if (
cuda_context_created is not False
and cuda_context_created != cuda_visible_device
cuda_context_created.has_context
and cuda_context_created.device_info.uuid != cuda_visible_device.uuid
):
_warn_cuda_context_wrong_device(
cuda_visible_device, cuda_context_created, os.getpid()
cuda_visible_device, cuda_context_created.device_info, os.getpid()
)

import ucp as _ucp
Expand Down
Loading

0 comments on commit ff6f673

Please sign in to comment.