diff --git a/distributed/diagnostics/progress.py b/distributed/diagnostics/progress.py index c620c25b8e9..3273f795fa2 100644 --- a/distributed/diagnostics/progress.py +++ b/distributed/diagnostics/progress.py @@ -8,11 +8,10 @@ from tlz import groupby, valmap from dask.base import tokenize -from dask.utils import stringify +from dask.utils import key_split, stringify from distributed.diagnostics.plugin import SchedulerPlugin from distributed.metrics import time -from distributed.utils import key_split logger = logging.getLogger(__name__) diff --git a/distributed/diagnostics/progress_stream.py b/distributed/diagnostics/progress_stream.py index 764aef790ea..aa4de6ef97e 100644 --- a/distributed/diagnostics/progress_stream.py +++ b/distributed/diagnostics/progress_stream.py @@ -4,9 +4,11 @@ from tlz import merge, valmap +from dask.utils import key_split + from distributed.core import coerce_to_address, connect from distributed.diagnostics.progress import AllProgress -from distributed.utils import color_of, key_split +from distributed.utils import color_of from distributed.worker import dumps_function logger = logging.getLogger(__name__) diff --git a/distributed/diagnostics/progressbar.py b/distributed/diagnostics/progressbar.py index 2e878176acf..828b2e3f4d0 100644 --- a/distributed/diagnostics/progressbar.py +++ b/distributed/diagnostics/progressbar.py @@ -12,6 +12,7 @@ from tornado.ioloop import IOLoop import dask +from dask.utils import key_split from distributed.client import default_client, futures_of from distributed.core import ( @@ -22,7 +23,7 @@ ) from distributed.diagnostics.progress import MultiProgress, Progress, format_time from distributed.protocol.pickle import dumps -from distributed.utils import LoopRunner, is_kernel, key_split +from distributed.utils import LoopRunner, is_kernel logger = logging.getLogger(__name__) diff --git a/distributed/diagnostics/task_stream.py b/distributed/diagnostics/task_stream.py index 274b55442fe..c632549f389 100644 --- a/distributed/diagnostics/task_stream.py +++ b/distributed/diagnostics/task_stream.py @@ -4,12 +4,11 @@ from collections import deque import dask -from dask.utils import format_time, parse_timedelta +from dask.utils import format_time, key_split, parse_timedelta from distributed.diagnostics.plugin import SchedulerPlugin from distributed.diagnostics.progress_stream import color_of from distributed.metrics import time -from distributed.utils import key_split logger = logging.getLogger(__name__) diff --git a/distributed/diagnostics/websocket.py b/distributed/diagnostics/websocket.py index df6515f775a..4279216f369 100644 --- a/distributed/diagnostics/websocket.py +++ b/distributed/diagnostics/websocket.py @@ -1,8 +1,9 @@ from __future__ import annotations +from dask.utils import key_split + from distributed.diagnostics.plugin import SchedulerPlugin from distributed.diagnostics.task_stream import colors -from distributed.utils import key_split class WebsocketPlugin(SchedulerPlugin): diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e03983bde49..c7aeba991e5 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -50,7 +50,14 @@ import dask from dask.highlevelgraph import HighLevelGraph -from dask.utils import format_bytes, format_time, parse_bytes, parse_timedelta, tmpfile +from dask.utils import ( + format_bytes, + format_time, + key_split, + parse_bytes, + parse_timedelta, + tmpfile, +) from dask.widgets import get_template from distributed import cluster_dump, preloading, profile @@ -92,7 +99,6 @@ TimeoutError, empty_context, get_fileno_limit, - key_split, key_split_group, log_errors, no_default, diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 3128f902b8c..5b39c1db6de 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -14,13 +14,13 @@ from tlz import concat, sliding_window import dask +from dask.utils import key_split from distributed import Event, Lock, Nanny, Worker, profile, wait, worker_client from distributed.compatibility import LINUX from distributed.config import config from distributed.core import Status from distributed.metrics import time -from distributed.scheduler import key_split from distributed.system import MEMORY_LIMIT from distributed.utils_test import ( captured_logger, diff --git a/distributed/utils.py b/distributed/utils.py index b5b18ba3890..44a666b2953 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -50,6 +50,7 @@ import dask from dask import istask from dask.utils import ensure_bytes as _ensure_bytes +from dask.utils import key_split from dask.utils import parse_timedelta as _parse_timedelta from dask.widgets import get_template @@ -643,66 +644,8 @@ def is_kernel(): return getattr(get_ipython(), "kernel", None) is not None -hex_pattern = re.compile("[a-f]+") - - -@functools.lru_cache(100000) -def key_split(s): - """ - >>> key_split('x') - 'x' - >>> key_split('x-1') - 'x' - >>> key_split('x-1-2-3') - 'x' - >>> key_split(('x-2', 1)) - 'x' - >>> key_split("('x-2', 1)") - 'x' - >>> key_split("('x', 1)") - 'x' - >>> key_split('hello-world-1') - 'hello-world' - >>> key_split(b'hello-world-1') - 'hello-world' - >>> key_split('ae05086432ca935f6eba409a8ecd4896') - 'data' - >>> key_split('>> key_split(None) - 'Other' - >>> key_split('x-abcdefab') # ignores hex - 'x' - """ - if type(s) is bytes: - s = s.decode() - if type(s) is tuple: - s = s[0] - try: - words = s.split("-") - if not words[0][0].isalpha(): - result = words[0].split(",")[0].strip("'(\"") - else: - result = words[0] - for word in words[1:]: - if word.isalpha() and not ( - len(word) == 8 and hex_pattern.match(word) is not None - ): - result += "-" + word - else: - break - if len(result) == 32 and re.match(r"[a-f0-9]{32}", result): - return "data" - else: - if result[0] == "<": - result = result.strip("<>").split()[0].split(".")[-1] - return result - except Exception: - return "Other" - - def key_split_group(x: object) -> str: - """A more fine-grained version of key_split + """A more fine-grained version of key_split. >>> key_split_group(('x-2', 1)) 'x-2' diff --git a/distributed/widgets/__init__.py b/distributed/widgets/__init__.py index 0fe618a39c6..af187c5476d 100644 --- a/distributed/widgets/__init__.py +++ b/distributed/widgets/__init__.py @@ -2,10 +2,9 @@ import os.path +from dask.utils import key_split from dask.widgets import FILTERS, TEMPLATE_PATHS -from distributed.utils import key_split - TEMPLATE_PATHS.append( os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates") ) diff --git a/distributed/worker.py b/distributed/worker.py index 9764de061bf..8663705941b 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -39,6 +39,7 @@ apply, format_bytes, funcname, + key_split, parse_bytes, parse_timedelta, stringify, @@ -88,7 +89,6 @@ is_python_shutting_down, iscoroutinefunction, json_load_robust, - key_split, log_errors, offload, parse_ports,