Skip to content

Commit

Permalink
Merge branch 'main' into pause_while_spilling
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Apr 26, 2022
2 parents e036bb0 + 84cbb09 commit 4f2558d
Show file tree
Hide file tree
Showing 20 changed files with 685 additions and 286 deletions.
File renamed without changes.
10 changes: 7 additions & 3 deletions distributed/cli/tests/test_dask_spec.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,37 @@
import random
import sys

import pytest
import yaml

from distributed import Client
from distributed.utils_test import gen_cluster, gen_test, popen


@pytest.mark.flaky(reruns=2)
@gen_test(timeout=120)
async def test_text():
port = random.randint(10000, 50000)
with popen(
[
sys.executable,
"-m",
"distributed.cli.dask_spec",
"--spec",
'{"cls": "dask.distributed.Scheduler", "opts": {"port": 9373}}',
'{"cls": "dask.distributed.Scheduler", "opts": {"port": %d}}' % port,
]
):
with popen(
[
sys.executable,
"-m",
"distributed.cli.dask_spec",
"tcp://localhost:9373",
"tcp://localhost:%d" % port,
"--spec",
'{"cls": "dask.distributed.Worker", "opts": {"nanny": false, "nthreads": 3, "name": "foo"}}',
]
):
async with Client("tcp://localhost:9373", asynchronous=True) as client:
async with Client("tcp://localhost:%d" % port, asynchronous=True) as client:
await client.wait_for_workers(1)
info = await client.scheduler.identity()
[w] = info["workers"].values()
Expand Down
25 changes: 25 additions & 0 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4276,6 +4276,31 @@ def collections_to_dsk(collections, *args, **kwargs):
"""Convert many collections into a single dask graph, after optimization"""
return collections_to_dsk(collections, *args, **kwargs)

async def _story(self, keys=(), on_error="raise"):
assert on_error in ("raise", "ignore")

try:
flat_stories = await self.scheduler.get_story(keys=keys)
flat_stories = [("scheduler", *msg) for msg in flat_stories]
except Exception:
if on_error == "raise":
raise
elif on_error == "ignore":
flat_stories = []
else:
raise ValueError(f"on_error not in {'raise', 'ignore'}")

responses = await self.scheduler.broadcast(
msg={"op": "get_story", "keys": keys}, on_error=on_error
)
for worker, stories in responses.items():
flat_stories.extend((worker, *msg) for msg in stories)
return flat_stories

def story(self, *keys_or_stimulus_ids, on_error="raise"):
"""Returns a cluster-wide story for the given keys or simtulus_id's"""
return self.sync(self._story, keys=keys_or_stimulus_ids, on_error=on_error)

def get_task_stream(
self,
start=None,
Expand Down
4 changes: 2 additions & 2 deletions distributed/cluster_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import fsspec
import msgpack

from distributed._stories import scheduler_story as _scheduler_story
from distributed._stories import worker_story as _worker_story
from distributed.compatibility import to_thread
from distributed.stories import scheduler_story as _scheduler_story
from distributed.stories import worker_story as _worker_story

DEFAULT_CLUSTER_DUMP_FORMAT: Literal["msgpack" | "yaml"] = "msgpack"
DEFAULT_CLUSTER_DUMP_EXCLUDE: Collection[str] = ("run_spec",)
Expand Down
2 changes: 2 additions & 0 deletions distributed/comm/tests/test_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
xfail_ssl_issue5601,
)

pytestmark = pytest.mark.flaky(reruns=2)


def test_registered():
assert "ws" in backends
Expand Down
2 changes: 1 addition & 1 deletion distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ distributed:
events-log-length: 100000
work-stealing: True # workers should steal tasks from each other
work-stealing-interval: 100ms # Callback time for work stealing
worker-ttl: null # like '60s'. Time to live for workers. They must heartbeat faster than this
worker-ttl: "5 minutes" # like '60s'. Time to live for workers. They must heartbeat faster than this
pickle: True # Is the scheduler allowed to deserialize arbitrary bytestrings
preload: [] # Run custom modules with Scheduler
preload-argv: [] # See https://docs.dask.org/en/latest/how-to/customize-initialization.html
Expand Down
5 changes: 4 additions & 1 deletion distributed/http/templates/task.html
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,18 @@ <h3 class="title is-5"> Transition Log </h3>
<th> Key </th>
<th> Start </th>
<th> Finish </th>
<th> Stimulus ID </th>
<th> Recommended Key </th>
<th> Recommended Action </th>
</thead>

{% for key, start, finish, recommendations, transition_time in scheduler.story(Task) %}
{% for key, start, finish, recommendations, stimulus_id, transition_time in scheduler.story(Task) %}
<tr>
<td> {{ fromtimestamp(transition_time) }} </td>
<td> <a href="{{ url_escape(key) }}.html">{{key}}</a> </td>
<td> {{ start }} </td>
<td> {{ finish }} </td>
<td> {{ stimulus_id }} </td>
<td> </td>
<td> </td>
</tr>
Expand All @@ -137,6 +139,7 @@ <h3 class="title is-5"> Transition Log </h3>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> <a href="{{ url_escape(key2) }}.html">{{key2}}</a> </td>
<td> {{ rec }} </td>
</tr>
Expand Down
18 changes: 13 additions & 5 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,10 @@ async def _unregister(self, timeout=10):
allowed_errors = (TimeoutError, CommClosedError, EnvironmentError, RPCClosed)
with suppress(allowed_errors):
await asyncio.wait_for(
self.scheduler.unregister(address=self.worker_address), timeout
self.scheduler.unregister(
address=self.worker_address, stimulus_id=f"nanny-close-{time()}"
),
timeout,
)

@property
Expand Down Expand Up @@ -848,10 +851,15 @@ def watch_stop_q():
Wait for an incoming stop message and then stop the
worker cleanly.
"""
msg = child_stop_q.get()
child_stop_q.close()
assert msg.pop("op") == "stop"
loop.add_callback(do_stop, **msg)
try:
msg = child_stop_q.get()
except (TypeError, OSError):
logger.error("Worker process died unexpectedly")
msg = {"op": "stop"}
finally:
child_stop_q.close()
assert msg.pop("op") == "stop"
loop.add_callback(do_stop, **msg)

thread = threading.Thread(
target=watch_stop_q, name="Nanny stop queue watch"
Expand Down
Loading

0 comments on commit 4f2558d

Please sign in to comment.