Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow worker to refuse data requests with busy signal #2092

Merged
merged 5 commits into from
Jul 8, 2018

Conversation

mrocklin
Copy link
Member

@mrocklin mrocklin commented Jul 3, 2018

This allows workers to say "I'm too busy right now" when presented with
a request for data from another worker. That worker then waits a bit,
queries the scheduler to see if anyone else has that data, and then
tries again. The wait time is an exponential backoff.

Pragmatically this means that when single pieces of data are in high
demand that the cluster will informally do a tree scattering. Some workers
will get the data directly while others wait on the busy signal. Then other
workers will get from them, etc.. We used to ask users to do this explicitly
with the following:

client.replicate(future)
or
client.scatter(data, broadcast=True)

And now the replicate/broadcast step is no longer strictly necessary. (though
some scattering of local data still is).

Machines on the same host are given some preference, and so should be able to
sneak in more easily.

Currently this has two issues:

  1. We need to unify the configuration with the total_connections parameter
    (which does the same thing, but in the opposite direction)
  2. We don't test the same-host behavior (this is hard because we're currently
    getting host information from the socket.)

@mrocklin
Copy link
Member Author

mrocklin commented Jul 3, 2018

cc @ogrisel @seibert

@@ -1381,28 +1398,32 @@ def transition_dep_waiting_flight(self, dep, worker=None):
pdb.set_trace()
raise

def transition_dep_flight_waiting(self, dep, worker=None):
def transition_dep_flight_waiting(self, dep, worker=None, busy=False):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This keyword should be inverted and changed to something like remove, which probably makes more sense locally

@ogrisel
Copy link
Contributor

ogrisel commented Jul 3, 2018

Machines on the same host

you mean workers on the same host?

yield wait(futures)

assert len(workers[0].outgoing_transfer_log) < 18
assert sum(not not w.outgoing_transfer_log for w in workers) >= 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not not is weird. wouldn't bool(w.outgoing_transfer_log) work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or sum(1 for w in workers if len(w.outgoing_transfer_log) > 0) that might be even more explicit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed to be more explicit. I'm using len(... for ... if ...)


yield wait(futures)

assert len(workers[0].outgoing_transfer_log) < 18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the 18 and the 3 come from? Where they measured from empirical runs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to increase the number secondary worker-to-worker transfers by increasing the size of x while making x cheaper to allocate initially? For instance:

x = c.submit(bytes, int(1e8), workers=[workers[0].address])

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two issues:

  1. We may start to run out of RAM on travis with 1e8 * 20 bytes
  2. Compression will make the transfers too fast

I'm not very concerned about the cost of creating the random array the first time. I don't think that this will affect the number of secondary worker-to-worker transfers. However I may not fully understand your meaning.

Copy link
Contributor

@ogrisel ogrisel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but I am really not familiar with the code so I trust you and the existing test suite. Did you run benchmarks to ensure that this does not cause any significant performance regression?

Feel free to upgrade the joblib connector as part of this PR to remove the explicit broadcasting in the auto-scatter thingy.


yield wait(futures)

assert len(workers[0].outgoing_transfer_log) < 18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to increase the number secondary worker-to-worker transfers by increasing the size of x while making x cheaper to allocate initially? For instance:

x = c.submit(bytes, int(1e8), workers=[workers[0].address])

@@ -19,6 +19,7 @@ distributed:
worker:
multiprocessing-method: forkserver
use-file-locking: True
max-connections: 10 # maximum simultaneous outgoing connections
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you have the opportunity to run some IO intensive benchmark/stress test on a "real" cluster (e.g. on GCP) to measure the impact of that setting on the overall completion time of a data bottlenecked set of tasks?

If you do I would be curious to see the empirical arity of the tree structure of the resulting broadcasting for different values of max-connections.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I haven't yet done any benchmarking. I'll play a bit on my local machine. I may get to trying this out on a larger cluster, but that's uncertain. Instead, I suspect that we will end up changing the default value over time.

This allows workers to say "I'm too busy right now" when presented with
a request for data from another worker.  That worker then waits a bit,
queries the scheduler to see if anyone else has that data, and then
tries again.  The wait time is an exponential backoff.

Pragmatically this means that when single pieces of data are in high
demand that the cluster will informally do a tree scattering.  Some workers
will get the data directly while others wait on the busy signal.  Then other
workers will get from them, etc..  We used to ask users to do this explicitly
with the following:

    client.replicate(future)
    or
    client.scatter(data, broadcast=True)

And now the replicate/broadcast step is no longer strictly necessary. (though
some scattering of local data still is).

Machines on the same host are given some preference, and so should be able to
sneak in more easily.

Currently this has two issues:

1.  We need to unify the configuration with the total_connections parameter
    (which does the same thing, but in the opposite direction)
2.  We don't test the same-host behavior (this is hard because we're currently
    getting host information from the socket.)
@mrocklin mrocklin force-pushed the worker-outgoing-saturation branch from 592df43 to c018823 Compare July 4, 2018 11:57
@mrocklin
Copy link
Member Author

mrocklin commented Jul 4, 2018

from dask_jobqueue import PBSCluster
cluster = PBSCluster(processes=18)
cluster.scale(20)  # results in 18 * 20 processes on 20 physical machines

from dask.distributed import Client
client = Client(cluster)
client

import numpy as np
x = client.submit(np.random.random, 100000000, pure=False)

workers = list(client.scheduler_info()['workers'])

futures = [client.submit(len, x, pure=False, workers=[w]) 
           for w in workers]

One max connection (double for same node)

Around 25s of communication time. (note that the communication starts after zero)

image

Ten max connections (double for same node)

Around 18s

image

100

Around 35s (note that the dashboard starts before zero for some reason)

image

1000

50-60s

image

@mrocklin
Copy link
Member Author

mrocklin commented Jul 4, 2018

The behavior here is as I would expect. I'm comfortable with this from a performance perspective, though I also think that we'll end up wanting to tune this default in the future by a factor of 2-3.

There is still some administrative cleanup to do I think.

@ogrisel
Copy link
Contributor

ogrisel commented Jul 4, 2018

Thanks for the benchmarks, that seems to work fine :)

@mrocklin mrocklin force-pushed the worker-outgoing-saturation branch from 6eb9a96 to 40409d1 Compare July 5, 2018 16:39
@mrocklin mrocklin merged commit 7a9fa83 into dask:master Jul 8, 2018
@mrocklin mrocklin deleted the worker-outgoing-saturation branch July 8, 2018 16:19
ogrisel added a commit to ogrisel/distributed that referenced this pull request Jul 12, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants