Skip to content

Commit

Permalink
Replaced distributed.utils.key_split with dask.utils.key_split (#…
Browse files Browse the repository at this point in the history
…7005)


Co-authored-by: Luke Conibear <[email protected]>
  • Loading branch information
lukeconibear and luke-conibear authored Sep 6, 2022
1 parent 94d0c1d commit 79ca60b
Show file tree
Hide file tree
Showing 10 changed files with 22 additions and 72 deletions.
3 changes: 1 addition & 2 deletions distributed/diagnostics/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
from tlz import groupby, valmap

from dask.base import tokenize
from dask.utils import stringify
from dask.utils import key_split, stringify

from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.metrics import time
from distributed.utils import key_split

logger = logging.getLogger(__name__)

Expand Down
4 changes: 3 additions & 1 deletion distributed/diagnostics/progress_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

from tlz import merge, valmap

from dask.utils import key_split

from distributed.core import coerce_to_address, connect
from distributed.diagnostics.progress import AllProgress
from distributed.utils import color_of, key_split
from distributed.utils import color_of
from distributed.worker import dumps_function

logger = logging.getLogger(__name__)
Expand Down
3 changes: 2 additions & 1 deletion distributed/diagnostics/progressbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from tornado.ioloop import IOLoop

import dask
from dask.utils import key_split

from distributed.client import default_client, futures_of
from distributed.core import (
Expand All @@ -22,7 +23,7 @@
)
from distributed.diagnostics.progress import MultiProgress, Progress, format_time
from distributed.protocol.pickle import dumps
from distributed.utils import LoopRunner, is_kernel, key_split
from distributed.utils import LoopRunner, is_kernel

logger = logging.getLogger(__name__)

Expand Down
3 changes: 1 addition & 2 deletions distributed/diagnostics/task_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
from collections import deque

import dask
from dask.utils import format_time, parse_timedelta
from dask.utils import format_time, key_split, parse_timedelta

from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.diagnostics.progress_stream import color_of
from distributed.metrics import time
from distributed.utils import key_split

logger = logging.getLogger(__name__)

Expand Down
3 changes: 2 additions & 1 deletion distributed/diagnostics/websocket.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations

from dask.utils import key_split

from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.diagnostics.task_stream import colors
from distributed.utils import key_split


class WebsocketPlugin(SchedulerPlugin):
Expand Down
10 changes: 8 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,14 @@

import dask
from dask.highlevelgraph import HighLevelGraph
from dask.utils import format_bytes, format_time, parse_bytes, parse_timedelta, tmpfile
from dask.utils import (
format_bytes,
format_time,
key_split,
parse_bytes,
parse_timedelta,
tmpfile,
)
from dask.widgets import get_template

from distributed import cluster_dump, preloading, profile
Expand Down Expand Up @@ -92,7 +99,6 @@
TimeoutError,
empty_context,
get_fileno_limit,
key_split,
key_split_group,
log_errors,
no_default,
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
from tlz import concat, sliding_window

import dask
from dask.utils import key_split

from distributed import Event, Lock, Nanny, Worker, profile, wait, worker_client
from distributed.compatibility import LINUX
from distributed.config import config
from distributed.core import Status
from distributed.metrics import time
from distributed.scheduler import key_split
from distributed.system import MEMORY_LIMIT
from distributed.utils_test import (
captured_logger,
Expand Down
61 changes: 2 additions & 59 deletions distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import dask
from dask import istask
from dask.utils import ensure_bytes as _ensure_bytes
from dask.utils import key_split
from dask.utils import parse_timedelta as _parse_timedelta
from dask.widgets import get_template

Expand Down Expand Up @@ -643,66 +644,8 @@ def is_kernel():
return getattr(get_ipython(), "kernel", None) is not None


hex_pattern = re.compile("[a-f]+")


@functools.lru_cache(100000)
def key_split(s):
"""
>>> key_split('x')
'x'
>>> key_split('x-1')
'x'
>>> key_split('x-1-2-3')
'x'
>>> key_split(('x-2', 1))
'x'
>>> key_split("('x-2', 1)")
'x'
>>> key_split("('x', 1)")
'x'
>>> key_split('hello-world-1')
'hello-world'
>>> key_split(b'hello-world-1')
'hello-world'
>>> key_split('ae05086432ca935f6eba409a8ecd4896')
'data'
>>> key_split('<module.submodule.myclass object at 0xdaf372')
'myclass'
>>> key_split(None)
'Other'
>>> key_split('x-abcdefab') # ignores hex
'x'
"""
if type(s) is bytes:
s = s.decode()
if type(s) is tuple:
s = s[0]
try:
words = s.split("-")
if not words[0][0].isalpha():
result = words[0].split(",")[0].strip("'(\"")
else:
result = words[0]
for word in words[1:]:
if word.isalpha() and not (
len(word) == 8 and hex_pattern.match(word) is not None
):
result += "-" + word
else:
break
if len(result) == 32 and re.match(r"[a-f0-9]{32}", result):
return "data"
else:
if result[0] == "<":
result = result.strip("<>").split()[0].split(".")[-1]
return result
except Exception:
return "Other"


def key_split_group(x: object) -> str:
"""A more fine-grained version of key_split
"""A more fine-grained version of key_split.
>>> key_split_group(('x-2', 1))
'x-2'
Expand Down
3 changes: 1 addition & 2 deletions distributed/widgets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@

import os.path

from dask.utils import key_split
from dask.widgets import FILTERS, TEMPLATE_PATHS

from distributed.utils import key_split

TEMPLATE_PATHS.append(
os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates")
)
Expand Down
2 changes: 1 addition & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
apply,
format_bytes,
funcname,
key_split,
parse_bytes,
parse_timedelta,
stringify,
Expand Down Expand Up @@ -88,7 +89,6 @@
is_python_shutting_down,
iscoroutinefunction,
json_load_robust,
key_split,
log_errors,
offload,
parse_ports,
Expand Down

0 comments on commit 79ca60b

Please sign in to comment.