diff --git a/distributed/actor.py b/distributed/actor.py index 69172bf23ec..dc49571d1db 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -11,7 +11,7 @@ class Actor(WrappedKey): - """ Controls an object on a remote worker + """Controls an object on a remote worker An actor allows remote control of a stateful object living on a remote worker. Method calls on this object trigger operations on the remote @@ -195,7 +195,7 @@ async def func(**msg): class ActorFuture: - """ Future to an actor's method call + """Future to an actor's method call Whenever you call a method on an Actor you get an ActorFuture immediately while the computation happens in the background. You can call ``.result`` diff --git a/distributed/batched.py b/distributed/batched.py index 07eb8e41014..eab57c420ef 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -13,7 +13,7 @@ class BatchedSend: - """ Batch messages in batches on a stream + """Batch messages in batches on a stream This takes an IOStream and an interval (in ms) and ensures that we send no more than one message every interval milliseconds. We send lists of @@ -109,7 +109,7 @@ def _background_send(self): self.stopped.set() def send(self, msg): - """ Schedule a message for sending to the other side + """Schedule a message for sending to the other side This completes quickly and synchronously """ @@ -124,7 +124,7 @@ def send(self, msg): @gen.coroutine def close(self, timeout=None): - """ Flush existing messages and then close comm + """Flush existing messages and then close comm If set, raises `tornado.util.TimeoutError` after a timeout. """ diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index 543d4300ff3..470bfddf81d 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -320,7 +320,7 @@ def test_preload_remote_module(loop, tmp_path): str(tmp_path / "scheduler-file.json"), "--preload", "http://localhost:9382/scheduler_info.py", - ], + ] ) as proc: with Client( scheduler_file=tmp_path / "scheduler-file.json", loop=loop diff --git a/distributed/client.py b/distributed/client.py index d50f3dd1c0f..27cfced05d8 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -129,7 +129,7 @@ def _del_global_client(c): class Future(WrappedKey): - """ A remotely running computation + """A remotely running computation A Future is a local proxy to a result running on a remote worker. A user manages future objects in the local Python process to determine what @@ -210,7 +210,7 @@ def done(self): return self._state.done() def result(self, timeout=None): - """ Wait until computation completes, gather result to local process. + """Wait until computation completes, gather result to local process. If *timeout* seconds are elapsed before returning, a ``dask.distributed.TimeoutError`` is raised. @@ -255,7 +255,7 @@ async def _exception(self): return None def exception(self, timeout=None, **kwargs): - """ Return the exception of a failed task + """Return the exception of a failed task If *timeout* seconds are elapsed before returning, a ``dask.distributed.TimeoutError`` is raised. @@ -267,7 +267,7 @@ def exception(self, timeout=None, **kwargs): return self.client.sync(self._exception, callback_timeout=timeout, **kwargs) def add_done_callback(self, fn): - """ Call callback on future when callback has finished + """Call callback on future when callback has finished The callback ``fn`` should take the future as its only argument. This will be called regardless of if the future completes successfully, @@ -296,7 +296,7 @@ def execute_callback(fut): ) def cancel(self, **kwargs): - """ Cancel request to run this future + """Cancel request to run this future See Also -------- @@ -305,7 +305,7 @@ def cancel(self, **kwargs): return self.client.cancel([self], **kwargs) def retry(self, **kwargs): - """ Retry this future if it has failed + """Retry this future if it has failed See Also -------- @@ -325,7 +325,7 @@ async def _traceback(self): return None def traceback(self, timeout=None, **kwargs): - """ Return the traceback of a failed task + """Return the traceback of a failed task This returns a traceback object. You can inspect this object using the ``traceback`` module. Alternatively if you call ``future.result()`` @@ -495,12 +495,11 @@ def normalize_future(f): class AllExit(Exception): - """Custom exception class to exit All(...) early. - """ + """Custom exception class to exit All(...) early.""" class Client: - """ Connect to and submit computation to a Dask cluster + """Connect to and submit computation to a Dask cluster The Client connects users to a Dask cluster. It provides an asynchronous user interface around functions and futures. This class resembles @@ -702,7 +701,7 @@ def __init__( self._periodic_callbacks = dict() self._periodic_callbacks["scheduler-info"] = PeriodicCallback( - self._update_scheduler_info, scheduler_info_interval * 1000, + self._update_scheduler_info, scheduler_info_interval * 1000 ) self._periodic_callbacks["heartbeat"] = PeriodicCallback( self._heartbeat, heartbeat_interval * 1000 @@ -788,7 +787,7 @@ def current(cls, allow_global=True): @property def asynchronous(self): - """ Are we running in the event loop? + """Are we running in the event loop? This is true if the user signaled that we might be when creating the client as in the following:: @@ -1386,7 +1385,7 @@ async def _close(self, fast=False): _shutdown = _close def close(self, timeout=no_default): - """ Close this client + """Close this client Clients will also close automatically when your Python session ends @@ -1445,7 +1444,7 @@ async def _shutdown(self): await self.scheduler.terminate(close_workers=True) def shutdown(self): - """ Shut down the connected scheduler and workers + """Shut down the connected scheduler and workers Note, this may disrupt other clients that may be using the same scheduler and workers. @@ -1490,7 +1489,7 @@ def submit( **kwargs, ): - """ Submit a function application to the scheduler + """Submit a function application to the scheduler Parameters ---------- @@ -1610,7 +1609,7 @@ def map( batch_size=None, **kwargs, ): - """ Map a function on a sequence of arguments + """Map a function on a sequence of arguments Arguments can be normal objects or Futures @@ -1904,7 +1903,7 @@ async def wait(k): return result async def _gather_remote(self, direct, local_worker): - """ Perform gather with workers or scheduler + """Perform gather with workers or scheduler This method exists to limit and batch many concurrent gathers into a few. In controls access using a Tornado semaphore, and picks up keys @@ -1933,7 +1932,7 @@ async def _gather_remote(self, direct, local_worker): return response def gather(self, futures, errors="raise", direct=None, asynchronous=None): - """ Gather futures from distributed memory + """Gather futures from distributed memory Accepts a future, nested container of futures, iterator, or queue. The return type will match the input type. @@ -2111,7 +2110,7 @@ def scatter( timeout=no_default, asynchronous=None, ): - """ Scatter data into distributed memory + """Scatter data into distributed memory This moves data from the local client process into the workers of the distributed scheduler. Note that it is often better to submit jobs to @@ -2395,7 +2394,7 @@ async def _run_on_scheduler(self, function, *args, wait=True, **kwargs): return response["result"] def run_on_scheduler(self, function, *args, **kwargs): - """ Run a function on the scheduler process + """Run a function on the scheduler process This is typically used for live debugging. The function should take a keyword argument ``dask_scheduler=``, which will be given the scheduler @@ -2658,7 +2657,7 @@ def get( actors=None, **kwargs, ): - """ Compute dask graph + """Compute dask graph Parameters ---------- @@ -2720,7 +2719,7 @@ def get( return packed def _optimize_insert_futures(self, dsk, keys): - """ Replace known keys in dask graph with Futures + """Replace known keys in dask graph with Futures When given a Dask graph that might have overlapping keys with our known results we replace the values of that graph with futures. This can be @@ -2789,7 +2788,7 @@ def compute( traverse=True, **kwargs, ): - """ Compute dask collections on cluster + """Compute dask collections on cluster Parameters ---------- @@ -2937,7 +2936,7 @@ def persist( actors=None, **kwargs, ): - """ Persist dask collections on cluster + """Persist dask collections on cluster Starts computation of the collection on the cluster in the background. Provides a new dask collection that is semantically identical to the @@ -3052,7 +3051,7 @@ async def _restart(self, timeout=no_default): return self def restart(self, **kwargs): - """ Restart the distributed network + """Restart the distributed network This kills all active work, deletes all data on the network, and restarts the worker processes. @@ -3102,7 +3101,7 @@ def dump_to_file(dask_worker=None): assert all(len(data) == v for v in response.values()) def upload_file(self, filename, **kwargs): - """ Upload local package to workers + """Upload local package to workers This sends a local file up to all worker nodes. This file is placed into a temporary directory on Python's system path so any .py, .egg @@ -3138,7 +3137,7 @@ async def _rebalance(self, futures=None, workers=None): assert result["status"] == "OK" def rebalance(self, futures=None, workers=None, **kwargs): - """ Rebalance data within network + """Rebalance data within network Move data between workers to roughly balance memory burden. This either affects a subset of the keys/workers or the entire network, @@ -3166,7 +3165,7 @@ async def _replicate(self, futures, n=None, workers=None, branching_factor=2): ) def replicate(self, futures, n=None, workers=None, branching_factor=2, **kwargs): - """ Set replication of futures within network + """Set replication of futures within network Copy data onto many workers. This helps to broadcast frequently accessed data and it helps to improve resilience. @@ -3211,7 +3210,7 @@ def replicate(self, futures, n=None, workers=None, branching_factor=2, **kwargs) ) def nthreads(self, workers=None, **kwargs): - """ The number of threads/cores available on each worker node + """The number of threads/cores available on each worker node Parameters ---------- @@ -3243,7 +3242,7 @@ def nthreads(self, workers=None, **kwargs): ncores = nthreads def who_has(self, futures=None, **kwargs): - """ The workers storing each future's data + """The workers storing each future's data Parameters ---------- @@ -3276,7 +3275,7 @@ def who_has(self, futures=None, **kwargs): return self.sync(self.scheduler.who_has, keys=keys, **kwargs) def has_what(self, workers=None, **kwargs): - """ Which keys are held by which workers + """Which keys are held by which workers This returns the keys of the data that are held in each worker's memory. @@ -3310,7 +3309,7 @@ def has_what(self, workers=None, **kwargs): return self.sync(self.scheduler.has_what, workers=workers, **kwargs) def processing(self, workers=None): - """ The tasks currently running on each worker + """The tasks currently running on each worker Parameters ---------- @@ -3340,7 +3339,7 @@ def processing(self, workers=None): return self.sync(self.scheduler.processing, workers=workers) def nbytes(self, keys=None, summary=True, **kwargs): - """ The bytes taken up by each key on the cluster + """The bytes taken up by each key on the cluster This is as measured by ``sys.getsizeof`` which may not accurately reflect the true cost. @@ -3370,7 +3369,7 @@ def nbytes(self, keys=None, summary=True, **kwargs): return self.sync(self.scheduler.nbytes, keys=keys, summary=summary, **kwargs) def call_stack(self, futures=None, keys=None): - """ The actively running call stack of all relevant keys + """The actively running call stack of all relevant keys You can specify data of interest either by providing futures or collections in the ``futures=`` keyword or a list of explicit keys in @@ -3409,7 +3408,7 @@ def profile( server=False, scheduler=False, ): - """ Collect statistical profiling information about recent work + """Collect statistical profiling information about recent work Parameters ---------- @@ -3499,7 +3498,7 @@ async def _profile( return state def scheduler_info(self, **kwargs): - """ Basic information about the workers in the cluster + """Basic information about the workers in the cluster Examples -------- @@ -3519,7 +3518,7 @@ def scheduler_info(self, **kwargs): return self._scheduler_identity def write_scheduler_file(self, scheduler_file): - """ Write the scheduler information to a json file. + """Write the scheduler information to a json file. This facilitates easy sharing of scheduler information using a file system. The scheduler file can be used to instantiate a second Client @@ -3546,7 +3545,7 @@ def write_scheduler_file(self, scheduler_file): json.dump(self.scheduler_info(), f, indent=2) def get_metadata(self, keys, default=no_default): - """ Get arbitrary metadata from scheduler + """Get arbitrary metadata from scheduler See set_metadata for the full docstring with examples @@ -3568,7 +3567,7 @@ def get_metadata(self, keys, default=no_default): return self.sync(self.scheduler.get_metadata, keys=keys, default=default) def get_scheduler_logs(self, n=None): - """ Get logs from scheduler + """Get logs from scheduler Parameters ---------- @@ -3583,7 +3582,7 @@ def get_scheduler_logs(self, n=None): return self.sync(self.scheduler.logs, n=n) def get_worker_logs(self, n=None, workers=None, nanny=False): - """ Get logs from workers + """Get logs from workers Parameters ---------- @@ -3605,7 +3604,7 @@ def get_worker_logs(self, n=None, workers=None, nanny=False): return self.sync(self.scheduler.worker_logs, n=n, workers=workers, nanny=nanny) def retire_workers(self, workers=None, close_workers=True, **kwargs): - """ Retire certain workers on the scheduler + """Retire certain workers on the scheduler See dask.distributed.Scheduler.retire_workers for the full docstring. @@ -3631,7 +3630,7 @@ def retire_workers(self, workers=None, close_workers=True, **kwargs): ) def set_metadata(self, key, value): - """ Set arbitrary metadata in the scheduler + """Set arbitrary metadata in the scheduler This allows you to store small amounts of data on the central scheduler process for administrative purposes. Data should be msgpack @@ -3678,7 +3677,7 @@ def set_metadata(self, key, value): return self.sync(self.scheduler.set_metadata, keys=key, value=value) def get_versions(self, check=False, packages=[]): - """ Return version info for the scheduler, all workers and myself + """Return version info for the scheduler, all workers and myself Parameters ---------- @@ -3737,7 +3736,7 @@ async def _start_ipython_workers(self, workers): def start_ipython_workers( self, workers=None, magic_names=False, qtconsole=False, qtconsole_args=None ): - """ Start IPython kernels on workers + """Start IPython kernels on workers Parameters ---------- @@ -3819,7 +3818,7 @@ def start_ipython_workers( def start_ipython_scheduler( self, magic_name="scheduler_if_ipython", qtconsole=False, qtconsole_args=None ): - """ Start IPython kernel on the scheduler + """Start IPython kernel on the scheduler Parameters ---------- @@ -3987,7 +3986,7 @@ def get_task_stream( filename="task-stream.html", bokeh_resources=None, ): - """ Get task stream data from scheduler + """Get task stream data from scheduler This collects the data present in the diagnostic "Task Stream" plot on the dashboard. It includes the start, stop, transfer, and @@ -4247,7 +4246,7 @@ async def _wait(fs, timeout=None, return_when=ALL_COMPLETED): def wait(fs, timeout=None, return_when=ALL_COMPLETED): - """ Wait until all/any futures are finished + """Wait until all/any futures are finished Parameters ---------- @@ -4283,7 +4282,7 @@ async def _as_completed(fs, queue): async def _first_completed(futures): - """ Return a single completed future + """Return a single completed future See Also: _as_completed @@ -4399,7 +4398,7 @@ async def _track_future(self, future): self.thread_condition.notify() def update(self, futures): - """ Add multiple futures to the collection. + """Add multiple futures to the collection. The added futures will emit from the iterator once they finish""" with self.lock: @@ -4410,7 +4409,7 @@ def update(self, futures): self.loop.add_callback(self._track_future, f) def add(self, future): - """ Add a future to the collection + """Add a future to the collection This future will emit from the iterator once it finishes """ @@ -4425,7 +4424,7 @@ def has_ready(self): return not self.queue.empty() def count(self): - """ Return the number of futures yet to be returned + """Return the number of futures yet to be returned This includes both the number of futures still computing, as well as those that are finished, but have not yet been returned from this @@ -4476,7 +4475,7 @@ async def __anext__(self): next = __next__ def next_batch(self, block=True): - """ Get the next batch of completed futures. + """Get the next batch of completed futures. Parameters ---------- @@ -4573,7 +4572,7 @@ def redict_collection(c, dsk): def futures_of(o, client=None): - """ Future objects in a collection + """Future objects in a collection Parameters ---------- @@ -4618,7 +4617,7 @@ def futures_of(o, client=None): def fire_and_forget(obj): - """ Run tasks at least once, even if we release the futures + """Run tasks at least once, even if we release the futures Under normal operation Dask will not run any tasks for which there is not an active future (this avoids unnecessary work in many situations). @@ -4733,7 +4732,7 @@ async def __aexit__(self, typ, value, traceback): class performance_report: - """ Gather performance report + """Gather performance report This creates a static HTML file that includes many of the same plots of the dashboard for later viewing. @@ -4785,7 +4784,7 @@ def __exit__(self, typ, value, traceback): @contextmanager def temp_default_client(c): - """ Set the default client for the duration of the context + """Set the default client for the duration of the context .. note:: This function should be used exclusively for unit testing the default client diff --git a/distributed/comm/addressing.py b/distributed/comm/addressing.py index 2b1c4717407..537ecd4ef23 100644 --- a/distributed/comm/addressing.py +++ b/distributed/comm/addressing.py @@ -216,7 +216,7 @@ def addresses_from_user_args( security=None, default_port=0, ) -> list: - """ Get a list of addresses if the inputs are lists + """Get a list of addresses if the inputs are lists This is like ``address_from_user_args`` except that it also accepts lists for some of the arguments. If these arguments are lists then it will map diff --git a/distributed/comm/tests/test_comms.py b/distributed/comm/tests/test_comms.py index 15559250028..0a8f5250485 100644 --- a/distributed/comm/tests/test_comms.py +++ b/distributed/comm/tests/test_comms.py @@ -326,7 +326,7 @@ async def sleep_for_60ms(): sleep_future = sleep_for_60ms() with pytest.raises(IOError): await connect( - "tls://localhost:28400", 0.052, ssl_context=get_client_ssl_context(), + "tls://localhost:28400", 0.052, ssl_context=get_client_ssl_context() ) max_thread_count = await sleep_future assert max_thread_count <= 2 + original_thread_count @@ -639,7 +639,7 @@ async def handle_comm(comm): with pytest.raises(EnvironmentError) as excinfo: comm = await connect( - listener.contact_address, timeout=0.5, ssl_context=bad_cli_ctx, + listener.contact_address, timeout=0.5, ssl_context=bad_cli_ctx ) await comm.write({"x": "foo"}) # TODO: why is this necessary in Tornado 6 ? @@ -657,16 +657,14 @@ async def handle_comm(comm): raise # Sanity check - comm = await connect(listener.contact_address, timeout=2, ssl_context=cli_ctx,) + comm = await connect(listener.contact_address, timeout=2, ssl_context=cli_ctx) await comm.close() # Connector refuses a listener not signed by the CA listener = await listen("tls://", handle_comm, ssl_context=bad_serv_ctx) with pytest.raises(EnvironmentError) as excinfo: - await connect( - listener.contact_address, timeout=2, ssl_context=cli_ctx, - ) + await connect(listener.contact_address, timeout=2, ssl_context=cli_ctx) assert "certificate verify failed" in str(excinfo.value) diff --git a/distributed/comm/tests/test_ucx.py b/distributed/comm/tests/test_ucx.py index a7dbfd3e46a..aa5095e2f3c 100644 --- a/distributed/comm/tests/test_ucx.py +++ b/distributed/comm/tests/test_ucx.py @@ -228,9 +228,7 @@ async def test_ping_pong_cupy(shape): @pytest.mark.slow @pytest.mark.asyncio -@pytest.mark.parametrize( - "n", [int(1e9), int(2.5e9),], -) +@pytest.mark.parametrize("n", [int(1e9), int(2.5e9)]) async def test_large_cupy(n, cleanup): cupy = pytest.importorskip("cupy") com, serv_com = await get_comm_pair() diff --git a/distributed/comm/utils.py b/distributed/comm/utils.py index eda370eed6f..d1a1a97e63c 100644 --- a/distributed/comm/utils.py +++ b/distributed/comm/utils.py @@ -22,7 +22,7 @@ async def to_frames( - msg, serializers=None, on_error="message", context=None, allow_offload=True, + msg, serializers=None, on_error="message", context=None, allow_offload=True ): """ Serialize a message into a list of Distributed protocol frames. @@ -32,7 +32,7 @@ def _to_frames(): try: return list( protocol.dumps( - msg, serializers=serializers, on_error=on_error, context=context, + msg, serializers=serializers, on_error=on_error, context=context ) ) except Exception as e: diff --git a/distributed/core.py b/distributed/core.py index 7d244896dd9..5ede25d7a05 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -110,7 +110,7 @@ def _raise(*args, **kwargs): class Server: - """ Dask Distributed Server + """Dask Distributed Server Superclass for endpoints in a distributed cluster, such as Worker and Scheduler objects. @@ -319,7 +319,7 @@ async def __aexit__(self, typ, value, traceback): await self.close() def start_periodic_callbacks(self): - """ Start Periodic Callbacks consistently + """Start Periodic Callbacks consistently This starts all PeriodicCallbacks stored in self.periodic_callbacks if they are not yet running. It does this safely on the IOLoop. @@ -437,7 +437,7 @@ async def listen(self, port_or_addr=None, allow_offload=True, **kwargs): self.listeners.append(listener) async def handle_comm(self, comm, shutting_down=shutting_down): - """ Dispatch new communications to coroutine-handlers + """Dispatch new communications to coroutine-handlers Handlers is a dictionary mapping operation names to functions or coroutines. @@ -645,7 +645,7 @@ def pingpong(comm): async def send_recv(comm, reply=True, serializers=None, deserializers=None, **kwargs): - """ Send and recv with a Comm. + """Send and recv with a Comm. Keyword arguments turn into the message @@ -696,7 +696,7 @@ def addr_from_args(addr=None, ip=None, port=None): class rpc: - """ Conveniently interact with a remote server + """Conveniently interact with a remote server >>> remote = rpc(address) # doctest: +SKIP >>> response = yield remote.add(x=10, y=20) # doctest: +SKIP @@ -736,7 +736,7 @@ def __init__( rpc.active.add(self) async def live_comm(self): - """ Get an open communication + """Get an open communication Some comms to the ip/port target may be in current use by other coroutines. We track this with the `comms` dict @@ -855,7 +855,7 @@ def __repr__(self): class PooledRPCCall: - """ The result of ConnectionPool()('host:port') + """The result of ConnectionPool()('host:port') See Also: ConnectionPool @@ -904,7 +904,7 @@ def __repr__(self): class ConnectionPool: - """ A maximum sized pool of Comm objects. + """A maximum sized pool of Comm objects. This provides a connect method that mirrors the normal distributed.connect method, but provides connection sharing and tracks connection limits. @@ -1129,7 +1129,7 @@ def collect_causes(e): def error_message(e, status="error"): - """ Produce message to send back given an exception has occurred + """Produce message to send back given an exception has occurred This does the following: @@ -1167,7 +1167,7 @@ def error_message(e, status="error"): def clean_exception(exception, traceback, **kwargs): - """ Reraise exception and traceback. Deserialize if necessary + """Reraise exception and traceback. Deserialize if necessary See Also -------- diff --git a/distributed/dashboard/components/__init__.py b/distributed/dashboard/components/__init__.py index f6159e83bcf..78d60108c8e 100644 --- a/distributed/dashboard/components/__init__.py +++ b/distributed/dashboard/components/__init__.py @@ -42,7 +42,7 @@ class DashboardComponent: - """ Base class for Dask.distributed UI dashboard components. + """Base class for Dask.distributed UI dashboard components. This class must have two attributes, ``root`` and ``source``, and one method ``update``: @@ -62,7 +62,7 @@ def update(self, messages): def add_periodic_callback(doc, component, interval): - """ Add periodic callback to doc in a way that avoids reference cycles + """Add periodic callback to doc in a way that avoids reference cycles If we instead use ``doc.add_periodic_callback(component.update, 100)`` then the component stays in memory as a reference cycle because its method is diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index 34c6f5a8aa4..8c43d243673 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -1759,7 +1759,7 @@ def update(self): class WorkerTable(DashboardComponent): - """ Status of the current workers + """Status of the current workers This is two plots, a text-based table for each host and a thin horizontal plot laying out hosts by their current memory use. diff --git a/distributed/dashboard/components/shared.py b/distributed/dashboard/components/shared.py index 40865e10fee..9360a6b77ac 100644 --- a/distributed/dashboard/components/shared.py +++ b/distributed/dashboard/components/shared.py @@ -38,7 +38,7 @@ class Processing(DashboardComponent): - """ Processing and distribution per core + """Processing and distribution per core This shows how many tasks are actively running on each worker and how many tasks are enqueued for each worker and how many are in the common pool @@ -128,7 +128,7 @@ def processing_update(msg): class ProfilePlot(DashboardComponent): - """ Time plots of the current resource usage on the cluster + """Time plots of the current resource usage on the cluster This is two plots, one for CPU and Memory and another for Network I/O """ @@ -171,7 +171,7 @@ def update(self, state): class ProfileTimePlot(DashboardComponent): - """ Time plots of the current resource usage on the cluster + """Time plots of the current resource usage on the cluster This is two plots, one for CPU and Memory and another for Network I/O """ @@ -336,7 +336,7 @@ async def cb(): class ProfileServer(DashboardComponent): - """ Time plots of the current resource usage on the cluster + """Time plots of the current resource usage on the cluster This is two plots, one for CPU and Memory and another for Network I/O """ diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index 2d4f5a6bc16..e79f3494279 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -507,9 +507,7 @@ async def test_TaskGraph_clear(c, s, a, b): assert time() < start + 5 -@gen_cluster( - client=True, config={"distributed.dashboard.graph-max-items": 2,}, -) +@gen_cluster(client=True, config={"distributed.dashboard.graph-max-items": 2}) async def test_TaskGraph_limit(c, s, a, b): gp = TaskGraph(s) @@ -591,9 +589,7 @@ async def test_profile_server(c, s, a, b): assert time() < start + 2 -@gen_cluster( - client=True, scheduler_kwargs={"dashboard": True}, -) +@gen_cluster(client=True, scheduler_kwargs={"dashboard": True}) async def test_root_redirect(c, s, a, b): http_client = AsyncHTTPClient() response = await http_client.fetch("http://localhost:%d/" % s.http_server.port) diff --git a/distributed/dashboard/tests/test_worker_bokeh.py b/distributed/dashboard/tests/test_worker_bokeh.py index 47ac89c6b0a..6143e837529 100644 --- a/distributed/dashboard/tests/test_worker_bokeh.py +++ b/distributed/dashboard/tests/test_worker_bokeh.py @@ -24,9 +24,7 @@ @gen_cluster( - client=True, - worker_kwargs={"dashboard": True}, - scheduler_kwargs={"dashboard": True}, + client=True, worker_kwargs={"dashboard": True}, scheduler_kwargs={"dashboard": True} ) async def test_routes(c, s, a, b): port = a.http_server.port @@ -64,9 +62,7 @@ async def test_simple(c, s, a, b): assert "bokeh" in response.body.decode().lower() -@gen_cluster( - client=True, worker_kwargs={"dashboard": True}, -) +@gen_cluster(client=True, worker_kwargs={"dashboard": True}) async def test_services_kwargs(c, s, a, b): assert s.workers[a.address].services == {"dashboard": a.http_server.port} @@ -157,7 +153,7 @@ async def test_CommunicatingStream(c, s, a, b): @gen_cluster( - client=True, clean_kwargs={"threads": False}, worker_kwargs={"dashboard": True}, + client=True, clean_kwargs={"threads": False}, worker_kwargs={"dashboard": True} ) async def test_prometheus(c, s, a, b): pytest.importorskip("prometheus_client") diff --git a/distributed/dashboard/utils.py b/distributed/dashboard/utils.py index 0de536a6050..1c54e8b478e 100644 --- a/distributed/dashboard/utils.py +++ b/distributed/dashboard/utils.py @@ -44,7 +44,7 @@ def transpose(lod): @without_property_validation def update(source, data): - """ Update source with data + """Update source with data This checks a few things first diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 05d7880a5bc..5b9c598be9c 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -27,7 +27,7 @@ class Cluster: - """ Superclass for cluster objects + """Superclass for cluster objects This class contains common functionality for Dask Cluster manager classes. @@ -124,7 +124,7 @@ def _update_worker_status(self, op, msg): raise ValueError("Invalid op", op, msg) def adapt(self, Adaptive=Adaptive, **kwargs) -> Adaptive: - """ Turn on adaptivity + """Turn on adaptivity For keyword arguments see dask.distributed.Adaptive @@ -141,7 +141,7 @@ def adapt(self, Adaptive=Adaptive, **kwargs) -> Adaptive: return self._adaptive def scale(self, n: int) -> None: - """ Scale cluster to n workers + """Scale cluster to n workers Parameters ---------- @@ -209,7 +209,7 @@ async def _get_logs(self, cluster=True, scheduler=True, workers=True): return logs def get_logs(self, cluster=True, scheduler=True, workers=True): - """ Return logs for the cluster, scheduler and workers + """Return logs for the cluster, scheduler and workers Parameters ---------- diff --git a/distributed/deploy/local.py b/distributed/deploy/local.py index 54b8ae125e6..66cfe70ee6d 100644 --- a/distributed/deploy/local.py +++ b/distributed/deploy/local.py @@ -18,7 +18,7 @@ class LocalCluster(SpecCluster): - """ Create local Scheduler and Workers + """Create local Scheduler and Workers This creates a "cluster" of a scheduler and workers running on the local machine. diff --git a/distributed/deploy/old_ssh.py b/distributed/deploy/old_ssh.py index fe5703ea5e8..6d62e0cfd8b 100644 --- a/distributed/deploy/old_ssh.py +++ b/distributed/deploy/old_ssh.py @@ -209,7 +209,7 @@ def communicate(): def start_scheduler( - logdir, addr, port, ssh_username, ssh_port, ssh_private_key, remote_python=None, + logdir, addr, port, ssh_username, ssh_port, ssh_private_key, remote_python=None ): cmd = "{python} -m distributed.cli.dask_scheduler --port {port}".format( python=remote_python or sys.executable, port=port diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 1d1c449f7f7..0ba24c43632 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -75,7 +75,7 @@ async def _(): return _().__await__() async def start(self): - """ Submit the process to the resource manager + """Submit the process to the resource manager For workers this doesn't have to wait until the process actually starts, but can return once the resource manager has the request, and will work @@ -87,7 +87,7 @@ async def start(self): self.status = Status.running async def close(self): - """ Close the process + """Close the process This will be called by the Cluster object when we scale down a node, but only after we ask the Scheduler to close the worker gracefully. @@ -126,7 +126,7 @@ async def f(): class SpecCluster(Cluster): - """ Cluster that requires a full specification of workers + """Cluster that requires a full specification of workers The SpecCluster class expects a full specification of the Scheduler and Workers to use. It removes any handling of user inputs (like threads vs @@ -483,7 +483,7 @@ def scale(self, n=0, memory=None, cores=None): return NoOpAwaitable() def _new_worker_name(self, worker_number): - """ Returns new worker name. + """Returns new worker name. This can be overriden in SpecCluster derived classes to customise the worker names. @@ -491,7 +491,7 @@ def _new_worker_name(self, worker_number): return worker_number def new_worker_spec(self): - """ Return name and spec for the next worker + """Return name and spec for the next worker Returns ------- @@ -567,7 +567,7 @@ def adapt( maximum_memory: str = None, **kwargs, ) -> Adaptive: - """ Turn on adaptivity + """Turn on adaptivity This scales Dask clusters automatically based on scheduler activity. diff --git a/distributed/deploy/ssh.py b/distributed/deploy/ssh.py index bf75648efca..a6020998fec 100644 --- a/distributed/deploy/ssh.py +++ b/distributed/deploy/ssh.py @@ -17,7 +17,7 @@ class Process(ProcessInterface): - """ A superclass for SSH Workers and Nannies + """A superclass for SSH Workers and Nannies See Also -------- @@ -47,7 +47,7 @@ def __repr__(self): class Worker(Process): - """ A Remote Dask Worker controled by SSH + """A Remote Dask Worker controled by SSH Parameters ---------- @@ -138,7 +138,7 @@ async def start(self): class Scheduler(Process): - """ A Remote Dask Scheduler controlled by SSH + """A Remote Dask Scheduler controlled by SSH Parameters ---------- @@ -238,7 +238,7 @@ def SSHCluster( remote_python: str = None, **kwargs, ): - """ Deploy a Dask cluster using SSH + """Deploy a Dask cluster using SSH The SSHCluster function deploys a Dask Scheduler and Workers for you on a set of machine addresses that you provide. The first address will be used diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index bbdfd52c1f0..17f1b175a27 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -195,7 +195,7 @@ async def test_min_max(): @pytest.mark.asyncio async def test_avoid_churn(cleanup): - """ We want to avoid creating and deleting workers frequently + """We want to avoid creating and deleting workers frequently Instead we want to wait a few beats before removing a worker in case the user is taking a brief pause between work @@ -220,7 +220,7 @@ async def test_avoid_churn(cleanup): @pytest.mark.asyncio async def test_adapt_quickly(): - """ We want to avoid creating and deleting workers frequently + """We want to avoid creating and deleting workers frequently Instead we want to wait a few beats before removing a worker in case the user is taking a brief pause between work diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index e54ad048da8..b46fdd065de 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -1004,7 +1004,7 @@ async def test_threads_per_worker_set_to_0(cleanup): Warning, match="Setting `threads_per_worker` to 0 is discouraged." ): async with LocalCluster( - n_workers=2, processes=False, threads_per_worker=0, asynchronous=True, + n_workers=2, processes=False, threads_per_worker=0, asynchronous=True ) as cluster: assert len(cluster.workers) == 2 assert all(w.nthreads < CPU_COUNT for w in cluster.workers.values()) diff --git a/distributed/diagnostics/eventstream.py b/distributed/diagnostics/eventstream.py index c0fde24470b..f1f70f458af 100644 --- a/distributed/diagnostics/eventstream.py +++ b/distributed/diagnostics/eventstream.py @@ -34,7 +34,7 @@ def teardown(scheduler, es): async def eventstream(address, interval): - """ Open a TCP connection to scheduler, receive batched task messages + """Open a TCP connection to scheduler, receive batched task messages The messages coming back are lists of dicts. Each dict is of the following form:: diff --git a/distributed/diagnostics/graph_layout.py b/distributed/diagnostics/graph_layout.py index ab36bd6978f..4404aa6e111 100644 --- a/distributed/diagnostics/graph_layout.py +++ b/distributed/diagnostics/graph_layout.py @@ -2,7 +2,7 @@ class GraphLayout(SchedulerPlugin): - """ Dynamic graph layout during computation + """Dynamic graph layout during computation This assigns (x, y) locations to all tasks quickly and dynamically as new tasks are added. This scales to a few thousand nodes. @@ -116,7 +116,7 @@ def transition(self, key, start, finish, *args, **kwargs): del collection[key] def reset_index(self): - """ Reset the index and refill new and new_edges + """Reset the index and refill new and new_edges From time to time TaskGraph wants to remove invisible nodes and reset all of its indices. This helps. diff --git a/distributed/diagnostics/plugin.py b/distributed/diagnostics/plugin.py index fb3b2afe203..2e9cb31ba14 100644 --- a/distributed/diagnostics/plugin.py +++ b/distributed/diagnostics/plugin.py @@ -4,7 +4,7 @@ class SchedulerPlugin: - """ Interface to extend the Scheduler + """Interface to extend the Scheduler The scheduler operates by triggering and responding to events like ``task_finished``, ``update_graph``, ``task_erred``, etc.. @@ -38,14 +38,14 @@ class SchedulerPlugin: """ async def start(self, scheduler): - """ Run when the scheduler starts up + """Run when the scheduler starts up This runs at the end of the Scheduler startup process """ pass async def close(self): - """ Run when the scheduler closes down + """Run when the scheduler closes down This runs at the beginning of the Scheduler shutdown process, but after workers have been asked to shut down gracefully @@ -59,7 +59,7 @@ def restart(self, scheduler, **kwargs): """ Run when the scheduler restarts itself """ def transition(self, key, start, finish, *args, **kwargs): - """ Run whenever a task changes state + """Run whenever a task changes state Parameters ---------- @@ -87,7 +87,7 @@ def remove_client(self, scheduler=None, client=None, **kwargs): class WorkerPlugin: - """ Interface to extend the Worker + """Interface to extend the Worker A worker plugin enables custom code to run at different stages of the Workers' lifecycle: at setup, during task state transitions, when a task or dependency diff --git a/distributed/diagnostics/progress.py b/distributed/diagnostics/progress.py index 2aeba986839..4cb1188a010 100644 --- a/distributed/diagnostics/progress.py +++ b/distributed/diagnostics/progress.py @@ -40,7 +40,7 @@ def dependent_keys(tasks, complete=False): class Progress(SchedulerPlugin): - """ Tracks progress of a set of keys or futures + """Tracks progress of a set of keys or futures On creation we provide a set of keys or futures that interest us as well as a scheduler. We traverse through the scheduler's dependencies to find all @@ -130,7 +130,7 @@ def stop(self, exception=None, key=None): class MultiProgress(Progress): - """ Progress variant that keeps track of different groups of keys + """Progress variant that keeps track of different groups of keys See Progress for most details. This only adds a function ``func=`` that splits keys. This defaults to ``key_split`` which aligns with naming diff --git a/distributed/diagnostics/progress_stream.py b/distributed/diagnostics/progress_stream.py index c5e74a30f34..f40da0495bc 100644 --- a/distributed/diagnostics/progress_stream.py +++ b/distributed/diagnostics/progress_stream.py @@ -24,7 +24,7 @@ def counts(scheduler, allprogress): async def progress_stream(address, interval): - """ Open a TCP connection to scheduler, receive progress messages + """Open a TCP connection to scheduler, receive progress messages The messages coming back are dicts containing counts of key groups:: diff --git a/distributed/diagnostics/progressbar.py b/distributed/diagnostics/progressbar.py index fb663524c5e..af1c0b7f26c 100644 --- a/distributed/diagnostics/progressbar.py +++ b/distributed/diagnostics/progressbar.py @@ -64,7 +64,7 @@ def function(scheduler, p): return result self.comm = await connect( - self.scheduler, **(self.client().connection_args if self.client else {}), + self.scheduler, **(self.client().connection_args if self.client else {}) ) logger.debug("Progressbar Connected to scheduler") @@ -113,7 +113,7 @@ def __init__( loop=None, complete=True, start=True, - **kwargs + **kwargs, ): super(TextProgressBar, self).__init__(keys, scheduler, interval, complete) self.width = width @@ -141,7 +141,7 @@ def _draw_stop(self, **kwargs): class ProgressWidget(ProgressBar): - """ ProgressBar that uses an IPython ProgressBar widget for the notebook + """ProgressBar that uses an IPython ProgressBar widget for the notebook See Also -------- @@ -156,7 +156,7 @@ def __init__( interval="100ms", complete=False, loop=None, - **kwargs + **kwargs, ): super(ProgressWidget, self).__init__(keys, scheduler, interval, complete) @@ -215,7 +215,7 @@ def __init__( func=key_split, interval="100ms", complete=False, - **kwargs + **kwargs, ): self.scheduler = get_scheduler(scheduler) @@ -292,7 +292,7 @@ def __del__(self): class MultiProgressWidget(MultiProgressBar): - """ Multiple progress bar Widget suitable for the notebook + """Multiple progress bar Widget suitable for the notebook Displays multiple progress bars for a computation, split on computation type. @@ -312,7 +312,7 @@ def __init__( interval=0.1, func=key_split, complete=False, - **kwargs + **kwargs, ): super(MultiProgressWidget, self).__init__( keys, scheduler, func, interval, complete @@ -399,7 +399,7 @@ def _draw_bar(self, remaining, all, status, **kwargs): def progress(*futures, notebook=None, multi=True, complete=True, **kwargs): - """ Track progress of futures + """Track progress of futures This operates differently in the notebook and the console diff --git a/distributed/diagnostics/tests/test_worker_plugin.py b/distributed/diagnostics/tests/test_worker_plugin.py index c9cdbed784c..9bcc0c58ca4 100644 --- a/distributed/diagnostics/tests/test_worker_plugin.py +++ b/distributed/diagnostics/tests/test_worker_plugin.py @@ -31,18 +31,14 @@ def teardown(self, worker): def transition(self, key, start, finish, **kwargs): self.observed_notifications.append( - {"key": key, "start": start, "finish": finish,} + {"key": key, "start": start, "finish": finish} ) def release_key(self, key, state, cause, reason, report): - self.observed_notifications.append( - {"key": key, "state": state,} - ) + self.observed_notifications.append({"key": key, "state": state}) def release_dep(self, dep, state, report): - self.observed_notifications.append( - {"dep": dep, "state": state,} - ) + self.observed_notifications.append({"dep": dep, "state": state}) @gen_cluster(client=True, nthreads=[]) @@ -100,7 +96,7 @@ def failing(x): @gen_cluster( - nthreads=[("127.0.0.1", 1)], client=True, worker_kwargs={"resources": {"X": 1}}, + nthreads=[("127.0.0.1", 1)], client=True, worker_kwargs={"resources": {"X": 1}} ) async def test_superseding_task_transitions_called(c, s, w): expected_notifications = [ @@ -119,10 +115,7 @@ async def test_superseding_task_transitions_called(c, s, w): @gen_cluster(nthreads=[("127.0.0.1", 1)], client=True) async def test_release_dep_called(c, s, w): - dsk = { - "dep": 1, - "task": (inc, "dep"), - } + dsk = {"dep": 1, "task": (inc, "dep")} expected_notifications = [ {"key": "dep", "start": "waiting", "finish": "ready"}, diff --git a/distributed/diagnostics/websocket.py b/distributed/diagnostics/websocket.py index 641730faf54..f8704487c14 100644 --- a/distributed/diagnostics/websocket.py +++ b/distributed/diagnostics/websocket.py @@ -33,7 +33,7 @@ def update_graph(self, scheduler, client=None, **kwargs): self.socket.send("update_graph", {"client": client}) def transition(self, key, start, finish, *args, **kwargs): - """ Run whenever a task changes state + """Run whenever a task changes state Parameters ---------- diff --git a/distributed/event.py b/distributed/event.py index 75661654b86..0136d35ef26 100644 --- a/distributed/event.py +++ b/distributed/event.py @@ -13,7 +13,7 @@ class EventExtension: - """ An extension for the scheduler to manage Events + """An extension for the scheduler to manage Events This adds the following routes to the scheduler @@ -60,7 +60,7 @@ def __init__(self, scheduler): self.scheduler.extensions["events"] = self async def event_wait(self, comm=None, name=None, timeout=None): - """ Wait until the event is set to true. + """Wait until the event is set to true. Returns false, when this did not happen in the given time and true otherwise. """ @@ -89,7 +89,7 @@ async def event_wait(self, comm=None, name=None, timeout=None): return True def event_set(self, comm=None, name=None): - """ Set the event with the given name to true. + """Set the event with the given name to true. All waiters on this event will be notified. """ @@ -150,7 +150,7 @@ def _delete_event(self, name): class Event: - """ Distributed Centralized Event equivalent to asyncio.Event + """Distributed Centralized Event equivalent to asyncio.Event An event stores a single flag, which is set to false on start. The flag can be set to true (using the set() call) or back to false @@ -186,7 +186,7 @@ def __init__(self, name=None, client=None): self.name = name or "event-" + uuid.uuid4().hex def __await__(self): - """ async constructor + """async constructor Make it possible to write @@ -201,7 +201,7 @@ async def _(): return _().__await__() def wait(self, timeout=None): - """ Wait until the event is set. + """Wait until the event is set. Parameters ---------- @@ -223,28 +223,28 @@ def wait(self, timeout=None): timeout = parse_timedelta(timeout) result = self.client.sync( - self.client.scheduler.event_wait, name=self.name, timeout=timeout, + self.client.scheduler.event_wait, name=self.name, timeout=timeout ) return result def clear(self): - """ Clear the event (set its flag to false). + """Clear the event (set its flag to false). All waiters will now block. """ return self.client.sync(self.client.scheduler.event_clear, name=self.name) def set(self): - """ Set the event (set its flag to false). + """Set the event (set its flag to false). All waiters will now be released. """ - result = self.client.sync(self.client.scheduler.event_set, name=self.name,) + result = self.client.sync(self.client.scheduler.event_set, name=self.name) return result def is_set(self): """ Check if the event is set """ - result = self.client.sync(self.client.scheduler.event_is_set, name=self.name,) + result = self.client.sync(self.client.scheduler.event_is_set, name=self.name) return result def __reduce__(self): diff --git a/distributed/http/health.py b/distributed/http/health.py index 2a45c4abf77..0fab7dea7c8 100644 --- a/distributed/http/health.py +++ b/distributed/http/health.py @@ -7,6 +7,4 @@ def get(self): self.set_header("Content-Type", "text/plain") -routes = [ - ("/health", HealthHandler, {}), -] +routes = [("/health", HealthHandler, {})] diff --git a/distributed/http/proxy.py b/distributed/http/proxy.py index c1f437d9b5f..6e39a999990 100644 --- a/distributed/http/proxy.py +++ b/distributed/http/proxy.py @@ -74,8 +74,7 @@ def proxy(self, port, proxied_path): ) class GlobalProxyHandler(web.RequestHandler): - """Minimal Proxy handler when jupyter-server-proxy is not installed - """ + """Minimal Proxy handler when jupyter-server-proxy is not installed""" def initialize(self, dask_server=None, extra=None): self.server = dask_server @@ -130,6 +129,4 @@ def check_worker_dashboard_exits(scheduler, worker): return False -routes = [ - (r"proxy/(\d+)/(.*?)/(.*)", GlobalProxyHandler, {}), -] +routes = [(r"proxy/(\d+)/(.*?)/(.*)", GlobalProxyHandler, {})] diff --git a/distributed/http/routing.py b/distributed/http/routing.py index ecd14da28e4..8a1d90d5490 100644 --- a/distributed/http/routing.py +++ b/distributed/http/routing.py @@ -43,7 +43,7 @@ class RoutingApplication(web.Application): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.applications = [] - self.add_handlers(".*$", [(r"/sitemap.json", DirectoryHandler),]) + self.add_handlers(".*$", [(r"/sitemap.json", DirectoryHandler)]) def find_handler(self, request: tornado.httputil.HTTPServerRequest, **kwargs): handler = super().find_handler(request, **kwargs) diff --git a/distributed/http/statics.py b/distributed/http/statics.py index 4a8a60298fe..e1c7a98e9a2 100644 --- a/distributed/http/statics.py +++ b/distributed/http/statics.py @@ -6,5 +6,5 @@ r"/statics/(.*)", web.StaticFileHandler, {"path": os.path.join(os.path.dirname(__file__), "static")}, - ), + ) ] diff --git a/distributed/http/tests/test_routing.py b/distributed/http/tests/test_routing.py index 481cfb3a209..ca7d071d256 100644 --- a/distributed/http/tests/test_routing.py +++ b/distributed/http/tests/test_routing.py @@ -17,8 +17,8 @@ def get(self): @pytest.mark.asyncio async def test_basic(): - application = RoutingApplication([(r"/one", OneHandler),]) - two = web.Application([(r"/two", TwoHandler),]) + application = RoutingApplication([(r"/one", OneHandler)]) + two = web.Application([(r"/two", TwoHandler)]) server = application.listen(1234) client = AsyncHTTPClient("http://localhost:1234") diff --git a/distributed/http/worker/prometheus.py b/distributed/http/worker/prometheus.py index a60de3a6b64..69e3439df9b 100644 --- a/distributed/http/worker/prometheus.py +++ b/distributed/http/worker/prometheus.py @@ -93,6 +93,4 @@ def get(self): self.set_header("Content-Type", "text/plain; version=0.0.4") -routes = [ - (r"metrics", PrometheusHandler, {}), -] +routes = [(r"metrics", PrometheusHandler, {})] diff --git a/distributed/lock.py b/distributed/lock.py index 67e926ac281..7d1c1a4af57 100644 --- a/distributed/lock.py +++ b/distributed/lock.py @@ -12,7 +12,7 @@ class LockExtension: - """ An extension for the scheduler to manage Locks + """An extension for the scheduler to manage Locks This adds the following routes to the scheduler @@ -73,7 +73,7 @@ def release(self, comm=None, name=None, id=None): class Lock: - """ Distributed Centralized Lock + """Distributed Centralized Lock Parameters ---------- @@ -104,7 +104,7 @@ def __init__(self, name=None, client=None): self._locked = False def acquire(self, blocking=True, timeout=None): - """ Acquire the lock + """Acquire the lock Parameters ---------- diff --git a/distributed/nanny.py b/distributed/nanny.py index db29431211b..fd7078982a7 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -41,7 +41,7 @@ class Nanny(ServerNode): - """ A process to manage worker processes + """A process to manage worker processes The nanny spins up Worker processes, watches then, and kills or restarts them as necessary. It is necessary if you want to use the @@ -304,7 +304,7 @@ async def start(self): return self async def kill(self, comm=None, timeout=2): - """ Kill the local worker process + """Kill the local worker process Blocks until both the process is down and the scheduler is properly informed @@ -317,7 +317,7 @@ async def kill(self, comm=None, timeout=2): await self.process.kill(timeout=0.8 * (deadline - self.loop.time())) async def instantiate(self, comm=None) -> Status: - """ Start a local worker process + """Start a local worker process Blocks until the process is up and the scheduler is properly informed """ diff --git a/distributed/preloading.py b/distributed/preloading.py index 112f537e2ca..f1e125b7b13 100644 --- a/distributed/preloading.py +++ b/distributed/preloading.py @@ -72,7 +72,7 @@ def is_webaddress(s: str) -> bool: def _import_module(name, file_dir=None) -> ModuleType: - """ Imports module and extract preload interface functions. + """Imports module and extract preload interface functions. Import modules specified by name and extract 'dask_setup' and 'dask_teardown' if present. diff --git a/distributed/process.py b/distributed/process.py index b070342b340..a0d462d4074 100644 --- a/distributed/process.py +++ b/distributed/process.py @@ -162,7 +162,7 @@ def monitor_parent(): @staticmethod def reset_logger_locks(): - """ Python 2's logger's locks don't survive a fork event + """Python 2's logger's locks don't survive a fork event https://github.com/dask/distributed/issues/1491 """ diff --git a/distributed/profile.py b/distributed/profile.py index 33eba502ef9..2b3363e6945 100644 --- a/distributed/profile.py +++ b/distributed/profile.py @@ -38,7 +38,7 @@ def identifier(frame): - """ A string identifier from a frame + """A string identifier from a frame Strings are cheaper to use as indexes into dicts than tuples or dicts """ @@ -74,7 +74,7 @@ def info_frame(frame): def process(frame, child, state, stop=None, omit=None): - """ Add counts from a frame stack onto existing state + """Add counts from a frame stack onto existing state This recursively adds counts to the existing state dictionary and creates new entries for new functions. @@ -157,7 +157,7 @@ def create(): def call_stack(frame): - """ Create a call text stack from a frame + """Create a call text stack from a frame Returns ------- @@ -171,7 +171,7 @@ def call_stack(frame): def plot_data(state, profile_interval=0.010): - """ Convert a profile state into data useful by Bokeh + """Convert a profile state into data useful by Bokeh See Also -------- @@ -277,7 +277,7 @@ def watch( omit=None, stop=lambda: False, ): - """ Gather profile information on a particular thread + """Gather profile information on a particular thread This starts a new thread to watch a particular thread and returns a deque that holds periodic profile information. @@ -324,7 +324,7 @@ def watch( def get_profile(history, recent=None, start=None, stop=None, key=None): - """ Collect profile information from a sequence of profile states + """Collect profile information from a sequence of profile states Parameters ---------- @@ -366,7 +366,7 @@ def get_profile(history, recent=None, start=None, stop=None, key=None): def plot_figure(data, **kwargs): - """ Plot profile data using Bokeh + """Plot profile data using Bokeh This takes the output from the function ``plot_data`` and produces a Bokeh figure @@ -444,7 +444,7 @@ def _remove_py_stack(frames): def llprocess(frames, child, state): - """ Add counts from low level profile information onto existing state + """Add counts from low level profile information onto existing state This uses the ``stacktrace`` module to collect low level stack trace information and place it onto the given sttate. diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 3f46fc5d58b..82cfbeec3c2 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -143,7 +143,7 @@ def get_default_compression(): def byte_sample(b, size, n): - """ Sample a bytestring from many locations + """Sample a bytestring from many locations Parameters ---------- diff --git a/distributed/protocol/core.py b/distributed/protocol/core.py index 32071d9e3d0..ceee3982941 100644 --- a/distributed/protocol/core.py +++ b/distributed/protocol/core.py @@ -173,7 +173,7 @@ def put_in(keys, coll, val): def dumps_msgpack(msg, compression=None): - """ Dump msg into header and payload, both bytestrings + """Dump msg into header and payload, both bytestrings All of the message must be msgpack encodable @@ -196,7 +196,7 @@ def dumps_msgpack(msg, compression=None): def loads_msgpack(header, payload): - """ Read msgpack header and payload back to Python object + """Read msgpack header and payload back to Python object See Also: dumps_msgpack diff --git a/distributed/protocol/numba.py b/distributed/protocol/numba.py index 93d65c0ef6d..e1915251f6f 100644 --- a/distributed/protocol/numba.py +++ b/distributed/protocol/numba.py @@ -26,7 +26,7 @@ def cuda_serialize_numba_ndarray(x): header["lengths"] = [x.nbytes] frames = [ numba.cuda.cudadrv.devicearray.DeviceNDArray( - shape=(x.nbytes,), strides=(1,), dtype=np.dtype("u1"), gpu_data=x.gpu_data, + shape=(x.nbytes,), strides=(1,), dtype=np.dtype("u1"), gpu_data=x.gpu_data ) ] diff --git a/distributed/protocol/numpy.py b/distributed/protocol/numpy.py index 497bc7a045e..4ae9298f142 100644 --- a/distributed/protocol/numpy.py +++ b/distributed/protocol/numpy.py @@ -8,7 +8,7 @@ def itemsize(dt): - """ Itemsize of dtype + """Itemsize of dtype Try to return the itemsize of the base element, return 8 as a fallback """ diff --git a/distributed/protocol/pickle.py b/distributed/protocol/pickle.py index 0a9f86846a2..843f6a28d9a 100644 --- a/distributed/protocol/pickle.py +++ b/distributed/protocol/pickle.py @@ -34,7 +34,7 @@ def _always_use_pickle_for(x): def dumps(x, *, buffer_callback=None, protocol=HIGHEST_PROTOCOL): - """ Manage between cloudpickle and pickle + """Manage between cloudpickle and pickle 1. Try pickle 2. If it is short then check if it contains __main__ diff --git a/distributed/protocol/serialize.py b/distributed/protocol/serialize.py index abbbb9a5302..97b962a3f67 100644 --- a/distributed/protocol/serialize.py +++ b/distributed/protocol/serialize.py @@ -336,7 +336,7 @@ def deserialize(header, frames, deserializers=None): class Serialize: - """ Mark an object that should be serialized + """Mark an object that should be serialized Examples -------- @@ -408,7 +408,7 @@ def container_copy(c): def extract_serialize(x): - """ Pull out Serialize objects from message + """Pull out Serialize objects from message This also remove large bytestrings from the message into a second dictionary. @@ -547,7 +547,7 @@ def deserialize_bytes(b): def register_serialization(cls, serialize, deserialize): - """ Register a new class for dask-custom serialization + """Register a new class for dask-custom serialization Parameters ---------- @@ -752,7 +752,7 @@ def register_generic( serialize_func=dask_serialize, deserialize_func=dask_deserialize, ): - """ Register (de)serialize to traverse through __dict__ + """Register (de)serialize to traverse through __dict__ Normally when registering new classes for Dask's custom serialization you need to manage headers and frames, which can be tedious. If all you want diff --git a/distributed/protocol/tests/test_cupy.py b/distributed/protocol/tests/test_cupy.py index 95cb530c4db..520693fb5c1 100644 --- a/distributed/protocol/tests/test_cupy.py +++ b/distributed/protocol/tests/test_cupy.py @@ -65,11 +65,11 @@ def test_serialize_cupy_from_rmm(size): @pytest.mark.parametrize( - "sparse_name", ["coo_matrix", "csc_matrix", "csr_matrix", "dia_matrix",], + "sparse_name", ["coo_matrix", "csc_matrix", "csr_matrix", "dia_matrix"] ) @pytest.mark.parametrize( "dtype", - [numpy.dtype("f4"), numpy.dtype("f8"),], + [numpy.dtype("f4"), numpy.dtype("f8")], ) @pytest.mark.parametrize("serializer", ["cuda", "dask", "pickle"]) def test_serialize_cupy_sparse(sparse_name, dtype, serializer): diff --git a/distributed/protocol/tests/test_protocol_utils.py b/distributed/protocol/tests/test_protocol_utils.py index 2132f94008b..847dec1ac3d 100644 --- a/distributed/protocol/tests/test_protocol_utils.py +++ b/distributed/protocol/tests/test_protocol_utils.py @@ -21,10 +21,7 @@ ], ) def test_merge_frames(lengths, writeable, frames): - header = { - "lengths": lengths, - "writeable": writeable, - } + header = {"lengths": lengths, "writeable": writeable} result = merge_frames(header, frames) data = b"".join(frames) diff --git a/distributed/protocol/tests/test_scipy.py b/distributed/protocol/tests/test_scipy.py index 2cb5d7477e5..4e5eb8423cf 100644 --- a/distributed/protocol/tests/test_scipy.py +++ b/distributed/protocol/tests/test_scipy.py @@ -20,7 +20,7 @@ ) @pytest.mark.parametrize( "dtype", - [numpy.dtype("f4"), numpy.dtype("f8"),], + [numpy.dtype("f4"), numpy.dtype("f8")], ) def test_serialize_scipy_sparse(sparse_type, dtype): a = numpy.array([[0, 1, 0], [2, 0, 3], [0, 4, 0]], dtype=dtype) diff --git a/distributed/protocol/tests/test_serialize.py b/distributed/protocol/tests/test_serialize.py index 4ead70361b7..72bc9764a22 100644 --- a/distributed/protocol/tests/test_serialize.py +++ b/distributed/protocol/tests/test_serialize.py @@ -81,7 +81,7 @@ def test_serialize_bytestrings(): @pytest.mark.parametrize( - "typecode", ["b", "B", "h", "H", "i", "I", "l", "L", "q", "Q", "f", "d"], + "typecode", ["b", "B", "h", "H", "i", "I", "l", "L", "q", "Q", "f", "d"] ) def test_serialize_arrays(typecode): a = array(typecode) @@ -212,9 +212,7 @@ def test_empty_loads_deep(): assert isinstance(e2[0][0][0], Empty) -@pytest.mark.parametrize( - "kwargs", [{}, {"serializers": ["pickle"]},], -) +@pytest.mark.parametrize("kwargs", [{}, {"serializers": ["pickle"]}]) def test_serialize_bytes(kwargs): for x in [ 1, @@ -464,7 +462,7 @@ def test_serialize_lists(serializers): @pytest.mark.parametrize( - "data_in", [memoryview(b"hello"), memoryview(np.random.random((3, 4)))], + "data_in", [memoryview(b"hello"), memoryview(np.random.random((3, 4)))] ) def test_deser_memoryview(data_in): header, frames = serialize(data_in) diff --git a/distributed/protocol/utils.py b/distributed/protocol/utils.py index 32fdfd7af1b..2b96fba4c61 100644 --- a/distributed/protocol/utils.py +++ b/distributed/protocol/utils.py @@ -35,7 +35,7 @@ def frame_split_size(frame, n=BIG_BYTES_SHARD_SIZE) -> list: def merge_frames(header, frames): - """ Merge frames into original lengths + """Merge frames into original lengths Examples -------- @@ -92,7 +92,7 @@ def pack_frames_prelude(frames): def pack_frames(frames): - """ Pack frames into a byte-like object + """Pack frames into a byte-like object This prepends length information to the front of the bytes-like object @@ -104,7 +104,7 @@ def pack_frames(frames): def unpack_frames(b): - """ Unpack bytes into a sequence of frames + """Unpack bytes into a sequence of frames This assumes that length information is at the front of the bytestring, as performed by pack_frames diff --git a/distributed/publish.py b/distributed/publish.py index 44f34b68a72..9dbab4ff3dd 100644 --- a/distributed/publish.py +++ b/distributed/publish.py @@ -4,7 +4,7 @@ class PublishExtension: - """ An extension for the scheduler to manage collections + """An extension for the scheduler to manage collections * publish_list * publish_put diff --git a/distributed/pubsub.py b/distributed/pubsub.py index 8234d1a8e19..3aeec084df1 100644 --- a/distributed/pubsub.py +++ b/distributed/pubsub.py @@ -199,7 +199,7 @@ def cleanup(self): class Pub: - """ Publish data with Publish-Subscribe pattern + """Publish data with Publish-Subscribe pattern This allows clients and workers to directly communicate data between each other with a typical Publish-Subscribe pattern. This involves two @@ -355,7 +355,7 @@ def __repr__(self): class Sub: - """ Subscribe to a Publish/Subscribe topic + """Subscribe to a Publish/Subscribe topic See Also -------- @@ -429,7 +429,7 @@ async def _(): __anext__ = _get def get(self, timeout=None): - """ Get a single message + """Get a single message Parameters ---------- diff --git a/distributed/queues.py b/distributed/queues.py index 9b5d1dbf1f3..6646676cce3 100644 --- a/distributed/queues.py +++ b/distributed/queues.py @@ -12,7 +12,7 @@ class QueueExtension: - """ An extension for the scheduler to manage queues + """An extension for the scheduler to manage queues This adds the following routes to the scheduler @@ -127,7 +127,7 @@ def qsize(self, comm=None, name=None, client=None): class Queue: - """ Distributed Queue + """Distributed Queue This allows multiple clients to share futures or small bits of data between each other with a multi-producer/multi-consumer queue. All metadata is @@ -209,7 +209,7 @@ async def _put(self, value, timeout=None): ) def put(self, value, timeout=None, **kwargs): - """ Put data into the queue + """Put data into the queue Parameters ---------- @@ -222,7 +222,7 @@ def put(self, value, timeout=None, **kwargs): return self.client.sync(self._put, value, timeout=timeout, **kwargs) def get(self, timeout=None, batch=False, **kwargs): - """ Get data from the queue + """Get data from the queue Parameters ---------- @@ -234,7 +234,7 @@ def get(self, timeout=None, batch=False, **kwargs): If True then return all elements currently waiting in the queue. If an integer than return that many elements from the queue If False (default) then return one item at a time - """ + """ timeout = parse_timedelta(timeout) return self.client.sync(self._get, timeout=timeout, batch=batch, **kwargs) diff --git a/distributed/recreate_exceptions.py b/distributed/recreate_exceptions.py index 4aaa851ee23..51a30e52e1c 100644 --- a/distributed/recreate_exceptions.py +++ b/distributed/recreate_exceptions.py @@ -8,7 +8,7 @@ class ReplayExceptionScheduler: - """ A plugin for the scheduler to recreate exceptions locally + """A plugin for the scheduler to recreate exceptions locally This adds the following routes to the scheduler diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5f7bb1d4eab..3f787a1f92e 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -371,235 +371,235 @@ def ncores(self): class TaskState: """ - A simple object holding information about a task. + A simple object holding information about a task. - .. attribute:: key: str + .. attribute:: key: str - The key is the unique identifier of a task, generally formed - from the name of the function, followed by a hash of the function - and arguments, like ``'inc-ab31c010444977004d656610d2d421ec'``. + The key is the unique identifier of a task, generally formed + from the name of the function, followed by a hash of the function + and arguments, like ``'inc-ab31c010444977004d656610d2d421ec'``. - .. attribute:: prefix: TaskPrefix + .. attribute:: prefix: TaskPrefix - The broad class of tasks to which this task belongs like "inc" or - "read_csv" + The broad class of tasks to which this task belongs like "inc" or + "read_csv" - .. attribute:: run_spec: object + .. attribute:: run_spec: object - A specification of how to run the task. The type and meaning of this - value is opaque to the scheduler, as it is only interpreted by the - worker to which the task is sent for executing. + A specification of how to run the task. The type and meaning of this + value is opaque to the scheduler, as it is only interpreted by the + worker to which the task is sent for executing. - As a special case, this attribute may also be ``None``, in which case - the task is "pure data" (such as, for example, a piece of data loaded - in the scheduler using :meth:`Client.scatter`). A "pure data" task - cannot be computed again if its value is lost. + As a special case, this attribute may also be ``None``, in which case + the task is "pure data" (such as, for example, a piece of data loaded + in the scheduler using :meth:`Client.scatter`). A "pure data" task + cannot be computed again if its value is lost. - .. attribute:: priority: tuple + .. attribute:: priority: tuple - The priority provides each task with a relative ranking which is used - to break ties when many tasks are being considered for execution. + The priority provides each task with a relative ranking which is used + to break ties when many tasks are being considered for execution. - This ranking is generally a 2-item tuple. The first (and dominant) - item corresponds to when it was submitted. Generally, earlier tasks - take precedence. The second item is determined by the client, and is - a way to prioritize tasks within a large graph that may be important, - such as if they are on the critical path, or good to run in order to - release many dependencies. This is explained further in - :doc:`Scheduling Policy `. + This ranking is generally a 2-item tuple. The first (and dominant) + item corresponds to when it was submitted. Generally, earlier tasks + take precedence. The second item is determined by the client, and is + a way to prioritize tasks within a large graph that may be important, + such as if they are on the critical path, or good to run in order to + release many dependencies. This is explained further in + :doc:`Scheduling Policy `. - .. attribute:: state: str + .. attribute:: state: str - This task's current state. Valid states include ``released``, - ``waiting``, ``no-worker``, ``processing``, ``memory``, ``erred`` - and ``forgotten``. If it is ``forgotten``, the task isn't stored - in the ``tasks`` dictionary anymore and will probably disappear - soon from memory. + This task's current state. Valid states include ``released``, + ``waiting``, ``no-worker``, ``processing``, ``memory``, ``erred`` + and ``forgotten``. If it is ``forgotten``, the task isn't stored + in the ``tasks`` dictionary anymore and will probably disappear + soon from memory. - .. attribute:: dependencies: {TaskState} + .. attribute:: dependencies: {TaskState} - The set of tasks this task depends on for proper execution. Only - tasks still alive are listed in this set. If, for whatever reason, - this task also depends on a forgotten task, the - :attr:`has_lost_dependencies` flag is set. + The set of tasks this task depends on for proper execution. Only + tasks still alive are listed in this set. If, for whatever reason, + this task also depends on a forgotten task, the + :attr:`has_lost_dependencies` flag is set. - A task can only be executed once all its dependencies have already - been successfully executed and have their result stored on at least - one worker. This is tracked by progressively draining the - :attr:`waiting_on` set. + A task can only be executed once all its dependencies have already + been successfully executed and have their result stored on at least + one worker. This is tracked by progressively draining the + :attr:`waiting_on` set. - .. attribute:: dependents: {TaskState} + .. attribute:: dependents: {TaskState} - The set of tasks which depend on this task. Only tasks still alive - are listed in this set. + The set of tasks which depend on this task. Only tasks still alive + are listed in this set. - This is the reverse mapping of :attr:`dependencies`. + This is the reverse mapping of :attr:`dependencies`. - .. attribute:: has_lost_dependencies: bool + .. attribute:: has_lost_dependencies: bool - Whether any of the dependencies of this task has been forgotten. - For memory consumption reasons, forgotten tasks are not kept in - memory even though they may have dependent tasks. When a task is - forgotten, therefore, each of its dependents has their - :attr:`has_lost_dependencies` attribute set to ``True``. + Whether any of the dependencies of this task has been forgotten. + For memory consumption reasons, forgotten tasks are not kept in + memory even though they may have dependent tasks. When a task is + forgotten, therefore, each of its dependents has their + :attr:`has_lost_dependencies` attribute set to ``True``. - If :attr:`has_lost_dependencies` is true, this task cannot go - into the "processing" state anymore. + If :attr:`has_lost_dependencies` is true, this task cannot go + into the "processing" state anymore. - .. attribute:: waiting_on: {TaskState} + .. attribute:: waiting_on: {TaskState} - The set of tasks this task is waiting on *before* it can be executed. - This is always a subset of :attr:`dependencies`. Each time one of the - dependencies has finished processing, it is removed from the - :attr:`waiting_on` set. + The set of tasks this task is waiting on *before* it can be executed. + This is always a subset of :attr:`dependencies`. Each time one of the + dependencies has finished processing, it is removed from the + :attr:`waiting_on` set. - Once :attr:`waiting_on` becomes empty, this task can move from the - "waiting" state to the "processing" state (unless one of the - dependencies errored out, in which case this task is instead - marked "erred"). + Once :attr:`waiting_on` becomes empty, this task can move from the + "waiting" state to the "processing" state (unless one of the + dependencies errored out, in which case this task is instead + marked "erred"). - .. attribute:: waiters: {TaskState} + .. attribute:: waiters: {TaskState} - The set of tasks which need this task to remain alive. This is always - a subset of :attr:`dependents`. Each time one of the dependents - has finished processing, it is removed from the :attr:`waiters` - set. + The set of tasks which need this task to remain alive. This is always + a subset of :attr:`dependents`. Each time one of the dependents + has finished processing, it is removed from the :attr:`waiters` + set. - Once both :attr:`waiters` and :attr:`who_wants` become empty, this - task can be released (if it has a non-empty :attr:`run_spec`) or - forgotten (otherwise) by the scheduler, and by any workers - in :attr:`who_has`. + Once both :attr:`waiters` and :attr:`who_wants` become empty, this + task can be released (if it has a non-empty :attr:`run_spec`) or + forgotten (otherwise) by the scheduler, and by any workers + in :attr:`who_has`. - .. note:: Counter-intuitively, :attr:`waiting_on` and - :attr:`waiters` are not reverse mappings of each other. + .. note:: Counter-intuitively, :attr:`waiting_on` and + :attr:`waiters` are not reverse mappings of each other. - .. attribute:: who_wants: {ClientState} + .. attribute:: who_wants: {ClientState} - The set of clients who want this task's result to remain alive. - This is the reverse mapping of :attr:`ClientState.wants_what`. + The set of clients who want this task's result to remain alive. + This is the reverse mapping of :attr:`ClientState.wants_what`. - When a client submits a graph to the scheduler it also specifies - which output tasks it desires, such that their results are not released - from memory. + When a client submits a graph to the scheduler it also specifies + which output tasks it desires, such that their results are not released + from memory. - Once a task has finished executing (i.e. moves into the "memory" - or "erred" state), the clients in :attr:`who_wants` are notified. + Once a task has finished executing (i.e. moves into the "memory" + or "erred" state), the clients in :attr:`who_wants` are notified. - Once both :attr:`waiters` and :attr:`who_wants` become empty, this - task can be released (if it has a non-empty :attr:`run_spec`) or - forgotten (otherwise) by the scheduler, and by any workers - in :attr:`who_has`. + Once both :attr:`waiters` and :attr:`who_wants` become empty, this + task can be released (if it has a non-empty :attr:`run_spec`) or + forgotten (otherwise) by the scheduler, and by any workers + in :attr:`who_has`. - .. attribute:: who_has: {WorkerState} + .. attribute:: who_has: {WorkerState} - The set of workers who have this task's result in memory. - It is non-empty iff the task is in the "memory" state. There can be - more than one worker in this set if, for example, :meth:`Client.scatter` - or :meth:`Client.replicate` was used. + The set of workers who have this task's result in memory. + It is non-empty iff the task is in the "memory" state. There can be + more than one worker in this set if, for example, :meth:`Client.scatter` + or :meth:`Client.replicate` was used. - This is the reverse mapping of :attr:`WorkerState.has_what`. + This is the reverse mapping of :attr:`WorkerState.has_what`. - .. attribute:: processing_on: WorkerState (or None) + .. attribute:: processing_on: WorkerState (or None) - If this task is in the "processing" state, which worker is currently - processing it. Otherwise this is ``None``. + If this task is in the "processing" state, which worker is currently + processing it. Otherwise this is ``None``. - This attribute is kept in sync with :attr:`WorkerState.processing`. + This attribute is kept in sync with :attr:`WorkerState.processing`. - .. attribute:: retries: int + .. attribute:: retries: int - The number of times this task can automatically be retried in case - of failure. If a task fails executing (the worker returns with - an error), its :attr:`retries` attribute is checked. If it is - equal to 0, the task is marked "erred". If it is greater than 0, - the :attr:`retries` attribute is decremented and execution is - attempted again. + The number of times this task can automatically be retried in case + of failure. If a task fails executing (the worker returns with + an error), its :attr:`retries` attribute is checked. If it is + equal to 0, the task is marked "erred". If it is greater than 0, + the :attr:`retries` attribute is decremented and execution is + attempted again. - .. attribute:: nbytes: int (or None) + .. attribute:: nbytes: int (or None) - The number of bytes, as determined by ``sizeof``, of the result - of a finished task. This number is used for diagnostics and to - help prioritize work. + The number of bytes, as determined by ``sizeof``, of the result + of a finished task. This number is used for diagnostics and to + help prioritize work. - .. attribute:: type: str + .. attribute:: type: str - The type of the object as a string. Only present for tasks that have - been computed. + The type of the object as a string. Only present for tasks that have + been computed. - .. attribute:: exception: object + .. attribute:: exception: object - If this task failed executing, the exception object is stored here. - Otherwise this is ``None``. + If this task failed executing, the exception object is stored here. + Otherwise this is ``None``. - .. attribute:: traceback: object + .. attribute:: traceback: object - If this task failed executing, the traceback object is stored here. - Otherwise this is ``None``. + If this task failed executing, the traceback object is stored here. + Otherwise this is ``None``. - .. attribute:: exception_blame: TaskState (or None) + .. attribute:: exception_blame: TaskState (or None) - If this task or one of its dependencies failed executing, the - failed task is stored here (possibly itself). Otherwise this - is ``None``. + If this task or one of its dependencies failed executing, the + failed task is stored here (possibly itself). Otherwise this + is ``None``. - .. attribute:: suspicious: int + .. attribute:: suspicious: int - The number of times this task has been involved in a worker death. + The number of times this task has been involved in a worker death. - Some tasks may cause workers to die (such as calling ``os._exit(0)``). - When a worker dies, all of the tasks on that worker are reassigned - to others. This combination of behaviors can cause a bad task to - catastrophically destroy all workers on the cluster, one after - another. Whenever a worker dies, we mark each task currently - processing on that worker (as recorded by - :attr:`WorkerState.processing`) as suspicious. + Some tasks may cause workers to die (such as calling ``os._exit(0)``). + When a worker dies, all of the tasks on that worker are reassigned + to others. This combination of behaviors can cause a bad task to + catastrophically destroy all workers on the cluster, one after + another. Whenever a worker dies, we mark each task currently + processing on that worker (as recorded by + :attr:`WorkerState.processing`) as suspicious. - If a task is involved in three deaths (or some other fixed constant) - then we mark the task as ``erred``. + If a task is involved in three deaths (or some other fixed constant) + then we mark the task as ``erred``. - .. attribute:: host_restrictions: {hostnames} + .. attribute:: host_restrictions: {hostnames} - A set of hostnames where this task can be run (or ``None`` if empty). - Usually this is empty unless the task has been specifically restricted - to only run on certain hosts. A hostname may correspond to one or - several connected workers. + A set of hostnames where this task can be run (or ``None`` if empty). + Usually this is empty unless the task has been specifically restricted + to only run on certain hosts. A hostname may correspond to one or + several connected workers. - .. attribute:: worker_restrictions: {worker addresses} + .. attribute:: worker_restrictions: {worker addresses} - A set of complete worker addresses where this can be run (or ``None`` - if empty). Usually this is empty unless the task has been specifically - restricted to only run on certain workers. + A set of complete worker addresses where this can be run (or ``None`` + if empty). Usually this is empty unless the task has been specifically + restricted to only run on certain workers. - Note this is tracking worker addresses, not worker states, since - the specific workers may not be connected at this time. + Note this is tracking worker addresses, not worker states, since + the specific workers may not be connected at this time. - .. attribute:: resource_restrictions: {resource: quantity} + .. attribute:: resource_restrictions: {resource: quantity} - Resources required by this task, such as ``{'gpu': 1}`` or - ``{'memory': 1e9}`` (or ``None`` if empty). These are user-defined - names and are matched against the contents of each - :attr:`WorkerState.resources` dictionary. + Resources required by this task, such as ``{'gpu': 1}`` or + ``{'memory': 1e9}`` (or ``None`` if empty). These are user-defined + names and are matched against the contents of each + :attr:`WorkerState.resources` dictionary. - .. attribute:: loose_restrictions: bool + .. attribute:: loose_restrictions: bool - If ``False``, each of :attr:`host_restrictions`, - :attr:`worker_restrictions` and :attr:`resource_restrictions` is - a hard constraint: if no worker is available satisfying those - restrictions, the task cannot go into the "processing" state and - will instead go into the "no-worker" state. + If ``False``, each of :attr:`host_restrictions`, + :attr:`worker_restrictions` and :attr:`resource_restrictions` is + a hard constraint: if no worker is available satisfying those + restrictions, the task cannot go into the "processing" state and + will instead go into the "no-worker" state. - If ``True``, the above restrictions are mere preferences: if no worker - is available satisfying those restrictions, the task can still go - into the "processing" state and be sent for execution to another - connected worker. + If ``True``, the above restrictions are mere preferences: if no worker + is available satisfying those restrictions, the task can still go + into the "processing" state and be sent for execution to another + connected worker. - .. attribute: actor: bool + .. attribute: actor: bool - Whether or not this task is an Actor. + Whether or not this task is an Actor. - .. attribute: group: TaskGroup + .. attribute: group: TaskGroup -: The group of tasks to which this one belongs. + : The group of tasks to which this one belongs. """ __slots__ = ( @@ -729,7 +729,7 @@ def validate(self): class TaskGroup: - """ Collection tracking all tasks within a group + """Collection tracking all tasks within a group Keys often have a structure like ``("x-123", 0)`` A group takes the first section, like ``"x-123"`` @@ -799,7 +799,7 @@ def __len__(self): class TaskPrefix: - """ Collection tracking all tasks within a group + """Collection tracking all tasks within a group Keys often have a structure like ``("x-123", 0)`` A group takes the first section, like ``"x"`` @@ -999,7 +999,7 @@ def _task_key_or_none(task): class Scheduler(ServerNode): - """ Dynamic distributed task scheduler + """Dynamic distributed task scheduler The scheduler tracks the current state of workers, data, and computations. The scheduler listens for events and responds by controlling workers @@ -1144,7 +1144,7 @@ def __init__( assert isinstance(self.security, Security) self.connection_args = self.security.get_connection_args("scheduler") self.connection_args["handshake_overrides"] = { # common denominator - "pickle-protocol": 4, + "pickle-protocol": 4 } self._start_address = addresses_from_user_args( @@ -1167,7 +1167,7 @@ def __init__( missing_bokeh = True http_server_modules.append("distributed.http.scheduler.missing_bokeh") routes = get_handlers( - server=self, modules=http_server_modules, prefix=http_prefix, + server=self, modules=http_server_modules, prefix=http_prefix ) self.start_http_server(routes, dashboard_address, default_port=8787) if show_dashboard and not missing_bokeh: @@ -1522,7 +1522,7 @@ def del_scheduler_file(): return self async def close(self, comm=None, fast=False, close_workers=False): - """ Send cleanup signal to all coroutines then wait until finished + """Send cleanup signal to all coroutines then wait until finished See Also -------- @@ -1586,7 +1586,7 @@ async def close(self, comm=None, fast=False, close_workers=False): disable_gc_diagnosis() async def close_worker(self, comm=None, worker=None, safe=None): - """ Remove a worker from the cluster + """Remove a worker from the cluster This both removes the worker from our local state and also sends a signal to the worker to shut down. This works regardless of whether or @@ -2257,7 +2257,7 @@ async def remove_worker(self, comm=None, address=None, safe=False, close=True): if ts.suspicious > self.allowed_failures: del recommendations[k] e = pickle.dumps( - KilledWorker(task=k, last_worker=ws.clean()), protocol=4, + KilledWorker(task=k, last_worker=ws.clean()), protocol=4 ) r = self.transition(k, "erred", exception=e, cause=k) recommendations.update(r) @@ -2544,7 +2544,7 @@ def report(self, msg, ts=None, client=None): logger.critical("Tried writing to closed comm: %s", msg) async def add_client(self, comm, client=None, versions=None): - """ Add client to network + """Add client to network We listen to all future messages from this Comm. """ @@ -2723,7 +2723,7 @@ def release_worker_data(self, comm=None, keys=None, worker=None): self.transitions(recommendations) def handle_long_running(self, key=None, worker=None, compute_duration=None): - """ A task has seceded from the thread pool + """A task has seceded from the thread pool We stop the task from being stolen in the future, and change task duration accounting as if the task has stopped. @@ -2792,7 +2792,7 @@ def remove_plugin(self, plugin): self.plugins.remove(plugin) def worker_send(self, worker, msg): - """ Send message to worker + """Send message to worker This also handles connection failures by adding a callback to remove the worker on the next cycle. @@ -2815,7 +2815,7 @@ async def scatter( broadcast=False, timeout=2, ): - """ Send data out to workers + """Send data out to workers See also -------- @@ -3044,7 +3044,7 @@ async def proxy(self, comm=None, msg=None, worker=None, serializers=None): return d[worker] async def _delete_worker_data(self, worker_address, keys): - """ Delete data from a worker and update the corresponding worker/task states + """Delete data from a worker and update the corresponding worker/task states Parameters ---------- @@ -3066,7 +3066,7 @@ async def _delete_worker_data(self, worker_address, keys): self.log_event(ws.address, {"action": "remove-worker-data", "keys": keys}) async def rebalance(self, comm=None, keys=None, workers=None): - """ Rebalance keys so that each worker stores roughly equal bytes + """Rebalance keys so that each worker stores roughly equal bytes **Policy** @@ -3202,7 +3202,7 @@ async def replicate( delete=True, lock=True, ): - """ Replicate data throughout cluster + """Replicate data throughout cluster This performs a tree copy of the data throughout the network individually on each piece of data. @@ -3445,7 +3445,7 @@ async def retire_workers( lock=True, **kwargs, ): - """ Gracefully retire workers from cluster + """Gracefully retire workers from cluster Parameters ---------- @@ -3794,7 +3794,7 @@ def get_task_duration(self, ts, default=None): return duration def run_function(self, stream, function, args=(), kwargs={}, wait=True): - """ Run a function within this process + """Run a function within this process See Also -------- @@ -4679,7 +4679,7 @@ def transition_released_forgotten(self, key): raise def transition(self, key, finish, *args, **kwargs): - """ Transition a key from its current state to the finish state + """Transition a key from its current state to the finish state Examples -------- @@ -4771,7 +4771,7 @@ def transition(self, key, finish, *args, **kwargs): raise def transitions(self, recommendations): - """ Process transitions until none are left + """Process transitions until none are left This includes feedback from previous transitions and continues until we reach a steady state @@ -4798,7 +4798,7 @@ def story(self, *keys): transition_story = story def reschedule(self, key=None, worker=None): - """ Reschedule a task + """Reschedule a task Things may have shifted and this task may now be better suited to run elsewhere @@ -4822,7 +4822,7 @@ def reschedule(self, key=None, worker=None): ############################## def check_idle_saturated(self, ws, occ=None): - """ Update the status of the idle and saturated state + """Update the status of the idle and saturated state The scheduler keeps track of workers that are .. @@ -4857,7 +4857,7 @@ def check_idle_saturated(self, ws, occ=None): self.saturated.discard(ws) def valid_workers(self, ts): - """ Return set of currently valid workers for key + """Return set of currently valid workers for key If all workers are valid then this returns ``True``. This checks tracks the following state: @@ -5237,7 +5237,7 @@ async def get_worker_logs(self, comm=None, n=None, workers=None, nanny=False): ########### def reevaluate_occupancy(self, worker_index=0): - """ Periodically reassess task duration time + """Periodically reassess task duration time The expected duration of a task can change over time. Unfortunately we don't have a good constant-time way to propagate the effects of these @@ -5335,7 +5335,7 @@ def check_idle(self): self.loop.add_callback(self.close) def adaptive_target(self, comm=None, target_duration=None): - """ Desired number of workers based on the current workload + """Desired number of workers based on the current workload This looks at the current running tasks and memory use, and returns a number of desired workers. This is often used by adaptive scheduling. diff --git a/distributed/semaphore.py b/distributed/semaphore.py index b1c3c029177..5d86bd8ea59 100644 --- a/distributed/semaphore.py +++ b/distributed/semaphore.py @@ -35,7 +35,7 @@ def leftover(self): class SemaphoreExtension: - """ An extension for the scheduler to manage Semaphores + """An extension for the scheduler to manage Semaphores This adds the following routes to the scheduler @@ -267,7 +267,7 @@ def close(self, comm=None, name=None): class Semaphore: - """ Semaphore + """Semaphore This `semaphore `_ will track leases on the scheduler which can be acquired and diff --git a/distributed/sizeof.py b/distributed/sizeof.py index bc51b3603ae..699db3e8fb3 100644 --- a/distributed/sizeof.py +++ b/distributed/sizeof.py @@ -6,7 +6,7 @@ def safe_sizeof(obj, default_size=1e6): - """ Safe variant of sizeof that captures and logs exceptions + """Safe variant of sizeof that captures and logs exceptions This returns a default size of 1e6 if the sizeof function fails """ diff --git a/distributed/stealing.py b/distributed/stealing.py index 88c15809ea7..0ebffcba6d1 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -109,7 +109,7 @@ def remove_key_from_stealable(self, ts): pass def steal_time_ratio(self, ts): - """ The compute to communication time ratio of a key + """The compute to communication time ratio of a key Returns ------- diff --git a/distributed/tests/test_asyncprocess.py b/distributed/tests/test_asyncprocess.py index 3923d81cf2c..f5cfab17895 100644 --- a/distributed/tests/test_asyncprocess.py +++ b/distributed/tests/test_asyncprocess.py @@ -339,10 +339,10 @@ def _worker_process(worker_ready, child_pipe): def _parent_process(child_pipe): - """ Simulate starting an AsyncProcess and then dying. + """Simulate starting an AsyncProcess and then dying. The child_alive pipe is held open for as long as the child is alive, and can - be used to determine if it exited correctly. """ + be used to determine if it exited correctly.""" async def parent_process_coroutine(): worker_ready = mp_context.Event() @@ -369,7 +369,7 @@ async def parent_process_coroutine(): def test_asyncprocess_child_teardown_on_parent_exit(): - r""" Check that a child process started by AsyncProcess exits if its parent + r"""Check that a child process started by AsyncProcess exits if its parent exits. The motivation is to ensure that if an AsyncProcess is created and the diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 21fc3b54cd3..39580bc6857 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -58,14 +58,7 @@ from distributed.metrics import time from distributed.scheduler import Scheduler, KilledWorker from distributed.sizeof import sizeof -from distributed.utils import ( - mp_context, - sync, - tmp_text, - tokey, - tmpfile, - is_valid_xml, -) +from distributed.utils import mp_context, sync, tmp_text, tokey, tmpfile, is_valid_xml from distributed.utils_test import ( cluster, slowinc, diff --git a/distributed/tests/test_preload.py b/distributed/tests/test_preload.py index 0000cd94206..2c35a59e06c 100644 --- a/distributed/tests/test_preload.py +++ b/distributed/tests/test_preload.py @@ -69,7 +69,7 @@ def dask_teardown(worker): worker.foo = 'teardown' """ with dask.config.set( - {"distributed.worker.preload": text, "distributed.nanny.preload": text,} + {"distributed.worker.preload": text, "distributed.nanny.preload": text} ): async with Scheduler(port=0) as s: async with Nanny(s.address) as w: diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index dc2ac7d8217..00d6631c9c5 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -531,7 +531,7 @@ async def test_broadcast(s, a, b): assert result == {a.address: b"pong", b.address: b"pong"} -@gen_cluster(security=tls_only_security(),) +@gen_cluster(security=tls_only_security()) async def test_broadcast_tls(s, a, b): result = await s.broadcast(msg={"op": "ping"}) assert result == {a.address: b"pong", b.address: b"pong"} @@ -1918,8 +1918,7 @@ async def test_task_group_non_tuple_key(c, s, a, b): @gen_cluster(client=True) async def test_task_unique_groups(c, s, a, b): - """ This test ensure that task groups remain unique when using submit - """ + """This test ensure that task groups remain unique when using submit""" x = c.submit(sum, [1, 2]) y = c.submit(len, [1, 2]) z = c.submit(sum, [3, 4]) @@ -2044,7 +2043,7 @@ def reducer(x, y): s.rpc = await FlakyConnectionPool(failing_connections=4) with dask.config.set( - {"distributed.comm.retry.delay_min": 0.5, "distributed.comm.retry.count": 3,} + {"distributed.comm.retry.delay_min": 0.5, "distributed.comm.retry.count": 3} ): with captured_logger( logging.getLogger("distributed.scheduler") diff --git a/distributed/tests/test_security.py b/distributed/tests/test_security.py index ab6646f0a12..dc1aee9e9b6 100644 --- a/distributed/tests/test_security.py +++ b/distributed/tests/test_security.py @@ -285,13 +285,11 @@ async def handle_comm(comm): # No SSL context for client with pytest.raises(TypeError): - await connect( - listener.contact_address, **sec.get_connection_args("client"), - ) + await connect(listener.contact_address, **sec.get_connection_args("client")) # Check forced cipher comm = await connect( - listener.contact_address, **forced_cipher_sec.get_connection_args("worker"), + listener.contact_address, **forced_cipher_sec.get_connection_args("worker") ) cipher, _, _ = comm.extra_info["cipher"] assert cipher in [FORCED_CIPHER] + TLS_13_CIPHERS @@ -325,7 +323,7 @@ async def handle_comm(comm): listen_addr, handle_comm, **sec.get_listen_args("scheduler") ) as listener: comm = await connect( - listener.contact_address, **sec2.get_connection_args("worker"), + listener.contact_address, **sec2.get_connection_args("worker") ) comm.abort() @@ -333,7 +331,7 @@ async def handle_comm(comm): listen_addr, handle_comm, **sec2.get_listen_args("scheduler") ) as listener: comm = await connect( - listener.contact_address, **sec2.get_connection_args("worker"), + listener.contact_address, **sec2.get_connection_args("worker") ) comm.abort() @@ -348,19 +346,17 @@ def check_encryption_error(): listen_addr, handle_comm, **sec.get_listen_args("scheduler") ) as listener: comm = await connect( - listener.contact_address, **sec.get_connection_args("worker"), + listener.contact_address, **sec.get_connection_args("worker") ) comm.abort() with pytest.raises(RuntimeError): await connect( - listener.contact_address, **sec2.get_connection_args("worker"), + listener.contact_address, **sec2.get_connection_args("worker") ) with pytest.raises(RuntimeError): - listen( - listen_addr, handle_comm, **sec2.get_listen_args("scheduler"), - ) + listen(listen_addr, handle_comm, **sec2.get_listen_args("scheduler")) def test_temporary_credentials(): diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index c13fed73ab5..796eca82e3f 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -30,12 +30,7 @@ from distributed.core import rpc, CommClosedError, Status from distributed.scheduler import Scheduler from distributed.metrics import time -from distributed.worker import ( - Worker, - error_message, - logger, - parse_memory_limit, -) +from distributed.worker import Worker, error_message, logger, parse_memory_limit from distributed.utils import tmpfile, TimeoutError from distributed.utils_test import ( # noqa: F401 cleanup, diff --git a/distributed/threadpoolexecutor.py b/distributed/threadpoolexecutor.py index 44770900028..c3112b3056b 100644 --- a/distributed/threadpoolexecutor.py +++ b/distributed/threadpoolexecutor.py @@ -106,7 +106,7 @@ def shutdown(self, wait=True, timeout=None): def secede(adjust=True): - """ Have this thread secede from the ThreadPoolExecutor + """Have this thread secede from the ThreadPoolExecutor See Also -------- @@ -120,7 +120,7 @@ def secede(adjust=True): def rejoin(): - """ Have this thread rejoin the ThreadPoolExecutor + """Have this thread rejoin the ThreadPoolExecutor This will block until a new slot opens up in the executor. The next thread to finish a task will leave the pool to allow this one to join. diff --git a/distributed/utils.py b/distributed/utils.py index fb4ad8f9015..237b19ea37b 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -195,7 +195,7 @@ def get_ip_interface(ifname): # FIXME: this breaks if changed to async def... @gen.coroutine def ignore_exceptions(coroutines, *exceptions): - """ Process list of coroutines, ignoring certain exceptions + """Process list of coroutines, ignoring certain exceptions >>> coroutines = [cor(...) for ...] # doctest: +SKIP >>> x = yield ignore_exceptions(coroutines, TypeError) # doctest: +SKIP @@ -210,7 +210,7 @@ def ignore_exceptions(coroutines, *exceptions): async def All(args, quiet_exceptions=()): - """ Wait on many tasks at the same time + """Wait on many tasks at the same time Err once any of the tasks err. @@ -231,7 +231,7 @@ async def All(args, quiet_exceptions=()): @gen.coroutine def quiet(): - """ Watch unfinished tasks + """Watch unfinished tasks Otherwise if they err they get logged in a way that is hard to control. They need some other task to watch them so that they @@ -250,7 +250,7 @@ def quiet(): async def Any(args, quiet_exceptions=()): - """ Wait on many tasks at the same time and return when any is finished + """Wait on many tasks at the same time and return when any is finished Err once any of the tasks err. @@ -269,7 +269,7 @@ async def Any(args, quiet_exceptions=()): @gen.coroutine def quiet(): - """ Watch unfinished tasks + """Watch unfinished tasks Otherwise if they err they get logged in a way that is hard to control. They need some other task to watch them so that they @@ -544,7 +544,7 @@ def clear_queue(q): def is_kernel(): - """ Determine if we're running within an IPython kernel + """Determine if we're running within an IPython kernel >>> is_kernel() False @@ -687,7 +687,7 @@ def silence_logging(level, root="distributed"): @toolz.memoize def ensure_ip(hostname): - """ Ensure that address is an IP address + """Ensure that address is an IP address Examples -------- @@ -741,7 +741,7 @@ def truncate_exception(e, n=10000): def tokey(o): - """ Convert an object to a string. + """Convert an object to a string. Examples -------- @@ -761,8 +761,7 @@ def tokey(o): def validate_key(k): - """Validate a key as received on a stream. - """ + """Validate a key as received on a stream.""" typ = type(k) if typ is not str and typ is not bytes: raise TypeError("Unexpected key type %s (value: %r)" % (typ, k)) @@ -799,7 +798,7 @@ def str_graph(dsk, extra_values=()): def seek_delimiter(file, delimiter, blocksize): - """ Seek current file to next byte after a delimiter bytestring + """Seek current file to next byte after a delimiter bytestring This seeks the file to the next byte following the delimiter. It does not return anything. Use ``file.tell()`` to see location afterwards. @@ -832,7 +831,7 @@ def seek_delimiter(file, delimiter, blocksize): def read_block(f, offset, length, delimiter=None): - """ Read a block of bytes from a file + """Read a block of bytes from a file Parameters ---------- @@ -1000,7 +999,7 @@ def shutting_down(l=_shutting_down): def open_port(host=""): - """ Return a probably-open port + """Return a probably-open port There is a chance that this port will be taken by the operating system soon after returning from this function. @@ -1173,7 +1172,7 @@ def clear_all_instances(cls): def reset_logger_locks(): - """ Python 2's logger's locks don't survive a fork event + """Python 2's logger's locks don't survive a fork event https://github.com/dask/distributed/issues/1491 """ @@ -1301,7 +1300,7 @@ def warn_on_duration(duration, msg): def typename(typ): - """ Return name of type + """Return name of type Examples -------- @@ -1327,7 +1326,7 @@ def format_dashboard_link(host, port): def parse_ports(port): - """ Parse input port information into list of ports + """Parse input port information into list of ports Parameters ---------- @@ -1412,7 +1411,7 @@ def _repr_html_(self): def cli_keywords(d: dict, cls=None, cmd=None): - """ Convert a kwargs dictionary into a list of CLI keywords + """Convert a kwargs dictionary into a list of CLI keywords Parameters ---------- @@ -1481,7 +1480,7 @@ def is_valid_xml(text): def import_term(name: str): - """ Return the fully qualified term + """Return the fully qualified term Examples -------- @@ -1503,7 +1502,7 @@ async def offload(fn, *args, **kwargs): def serialize_for_cli(data): - """ Serialize data into a string that can be passthrough cli + """Serialize data into a string that can be passthrough cli Parameters ---------- @@ -1518,7 +1517,7 @@ def serialize_for_cli(data): def deserialize_for_cli(data): - """ De-serialize data into the original object + """De-serialize data into the original object Parameters ---------- @@ -1550,8 +1549,7 @@ async def __aexit__(self, *args): class LRU(UserDict): - """ Limited size mapping, evicting the least recently looked-up key when full - """ + """Limited size mapping, evicting the least recently looked-up key when full""" def __init__(self, maxsize): super().__init__() diff --git a/distributed/utils_comm.py b/distributed/utils_comm.py index b7e33656ab8..814ebb7bce8 100644 --- a/distributed/utils_comm.py +++ b/distributed/utils_comm.py @@ -17,7 +17,7 @@ async def gather_from_workers(who_has, rpc, close=True, serializers=None, who=None): - """ Gather data directly from peers + """Gather data directly from peers Parameters ---------- @@ -97,7 +97,7 @@ async def gather_from_workers(who_has, rpc, close=True, serializers=None, who=No class WrappedKey: - """ Interface for a key in a dask graph. + """Interface for a key in a dask graph. Subclasses must have .key attribute that refers to a key in a dask graph. @@ -118,7 +118,7 @@ def __repr__(self): async def scatter_to_workers(nthreads, data, rpc=rpc, report=True, serializers=None): - """ Scatter data directly to workers + """Scatter data directly to workers This distributes data in a round-robin fashion to a set of workers based on how many cores they have. nthreads should be a dictionary mapping worker @@ -164,7 +164,7 @@ async def scatter_to_workers(nthreads, data, rpc=rpc, report=True, serializers=N def unpack_remotedata(o, byte_keys=False, myset=None): - """ Unpack WrappedKey objects from collection + """Unpack WrappedKey objects from collection Returns original collection and set of all found WrappedKey objects @@ -246,7 +246,7 @@ def unpack_remotedata(o, byte_keys=False, myset=None): def pack_data(o, d, key_types=object): - """ Merge known data into tuple or dict + """Merge known data into tuple or dict Parameters ---------- @@ -281,7 +281,7 @@ def pack_data(o, d, key_types=object): def subs_multiple(o, d): - """ Perform substitutions on a tasks + """Perform substitutions on a tasks Parameters ---------- diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 4daf84e804c..1a373aeac2c 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -741,7 +741,7 @@ async def disconnect_all(addresses, timeout=3, rpc_kwargs=None): def gen_test(timeout=10): - """ Coroutine test + """Coroutine test @gen_test(timeout=5) async def test_foo(): @@ -1137,9 +1137,7 @@ async def assert_can_connect_from_everywhere_4_6(port, protocol="tcp", **kwargs) await asyncio.gather(*futures) -async def assert_can_connect_from_everywhere_4( - port, protocol="tcp", **kwargs, -): +async def assert_can_connect_from_everywhere_4(port, protocol="tcp", **kwargs): """ Check that the local *port* is reachable from all IPv4 addresses. """ @@ -1205,8 +1203,7 @@ async def assert_can_connect_locally_6(port, **kwargs): @contextmanager def captured_logger(logger, level=logging.INFO, propagate=None): - """Capture output from the given Logger. - """ + """Capture output from the given Logger.""" if isinstance(logger, str): logger = logging.getLogger(logger) orig_level = logger.level @@ -1228,8 +1225,7 @@ def captured_logger(logger, level=logging.INFO, propagate=None): @contextmanager def captured_handler(handler): - """Capture output from the given logging.StreamHandler. - """ + """Capture output from the given logging.StreamHandler.""" assert isinstance(handler, logging.StreamHandler) orig_stream = handler.stream handler.stream = io.StringIO() diff --git a/distributed/variable.py b/distributed/variable.py index b20273031ab..26a56466588 100644 --- a/distributed/variable.py +++ b/distributed/variable.py @@ -14,7 +14,7 @@ class VariableExtension: - """ An extension for the scheduler to manage queues + """An extension for the scheduler to manage queues This adds the following routes to the scheduler @@ -123,7 +123,7 @@ async def delete(self, comm=None, name=None, client=None): class Variable: - """ Distributed Global Variable + """Distributed Global Variable This allows multiple clients to share futures and data between each other with a single mutable variable. All metadata is sequentialized through the @@ -176,7 +176,7 @@ async def _set(self, value): await self.client.scheduler.variable_set(data=value, name=self.name) def set(self, value, **kwargs): - """ Set the value of this variable + """Set the value of this variable Parameters ---------- @@ -206,7 +206,7 @@ async def _get(self, timeout=None): return value def get(self, timeout=None, **kwargs): - """ Get the value of this variable + """Get the value of this variable Parameters ---------- @@ -219,7 +219,7 @@ def get(self, timeout=None, **kwargs): return self.client.sync(self._get, timeout=timeout, **kwargs) def delete(self): - """ Delete this variable + """Delete this variable Caution, this affects all clients currently pointing to this variable. """ diff --git a/distributed/versions.py b/distributed/versions.py index 4e4bff35067..a276eb418ca 100644 --- a/distributed/versions.py +++ b/distributed/versions.py @@ -33,7 +33,7 @@ # notes to be displayed for mismatch packages notes_mismatch_package = { - "msgpack": "Variation is ok, as long as everything is above 0.6", + "msgpack": "Variation is ok, as long as everything is above 0.6" } diff --git a/distributed/worker.py b/distributed/worker.py index 4c3be973478..848b00d564d 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -92,7 +92,7 @@ class Worker(ServerNode): - """ Worker node in a Dask distributed cluster + """Worker node in a Dask distributed cluster Workers perform two functions: @@ -645,7 +645,7 @@ def __init__( pc = PeriodicCallback(self.heartbeat, 1000) self.periodic_callbacks["heartbeat"] = pc pc = PeriodicCallback( - lambda: self.batched_stream.send({"op": "keep-alive"}), 60000, + lambda: self.batched_stream.send({"op": "keep-alive"}), 60000 ) self.periodic_callbacks["keep-alive"] = pc @@ -657,7 +657,7 @@ def __init__( if self.memory_limit: self._memory_monitoring = False pc = PeriodicCallback( - self.memory_monitor, self.memory_monitor_interval * 1000, + self.memory_monitor, self.memory_monitor_interval * 1000 ) self.periodic_callbacks["memory"] = pc @@ -706,20 +706,17 @@ def __init__( ################## def __repr__(self): - return ( - "<%s: %r, %s, %s, stored: %d, running: %d/%d, ready: %d, comm: %d, waiting: %d>" - % ( - self.__class__.__name__, - self.address, - self.name, - self.status, - len(self.data), - len(self.executing), - self.nthreads, - len(self.ready), - len(self.in_flight_tasks), - len(self.waiting_for_data), - ) + return "<%s: %r, %s, %s, stored: %d, running: %d/%d, ready: %d, comm: %d, waiting: %d>" % ( + self.__class__.__name__, + self.address, + self.name, + self.status, + len(self.data), + len(self.executing), + self.nthreads, + len(self.ready), + len(self.in_flight_tasks), + len(self.waiting_for_data), ) @property @@ -1180,7 +1177,7 @@ async def close( return "OK" async def close_gracefully(self): - """ Gracefully shut down a worker + """Gracefully shut down a worker This first informs the scheduler that we're shutting down, and asks it to move our data elsewhere. Afterwards, we close as normal @@ -2336,7 +2333,7 @@ def rescind_key(self, key): # xref: https://github.com/dask/distributed/issues/3938 @gen.coroutine def executor_submit(self, key, function, args=(), kwargs=None, executor=None): - """ Safely run function in thread pool executor + """Safely run function in thread pool executor We've run into issues running concurrent.future futures within tornado. Apparently it's advantageous to use timeouts and periodic @@ -2616,7 +2613,7 @@ async def execute(self, key, report=False): ################## async def memory_monitor(self): - """ Track this process's memory usage and act accordingly + """Track this process's memory usage and act accordingly If we rise above 70% memory use, start dumping data to disk. @@ -2988,7 +2985,7 @@ def client(self): return self._get_client() def _get_client(self, timeout=3): - """ Get local client attached to this worker + """Get local client attached to this worker If no such client exists, create one @@ -3029,7 +3026,7 @@ def _get_client(self, timeout=3): return self._client def get_current_task(self): - """ Get the key of the task we are currently running + """Get the key of the task we are currently running This only makes sense to run within a task @@ -3051,7 +3048,7 @@ def get_current_task(self): def get_worker(): - """ Get the worker currently running this task + """Get the worker currently running this task Examples -------- @@ -3172,7 +3169,7 @@ def secede(): class Reschedule(Exception): - """ Reschedule this task + """Reschedule this task Raising this exception will stop the current execution of the task and ask the scheduler to reschedule this task, possibly on a different machine. @@ -3215,7 +3212,7 @@ async def get_data_from_worker( serializers=None, deserializers=None, ): - """ Get keys from worker + """Get keys from worker The worker has a two step handshake to acknowledge when data has been fully delivered. This function implements that handshake. @@ -3294,7 +3291,7 @@ def _deserialize(function=None, args=None, kwargs=None, task=no_value): def execute_task(task): - """ Evaluate a nested task + """Evaluate a nested task >>> inc = lambda x: x + 1 >>> execute_task((inc, 1)) @@ -3332,7 +3329,7 @@ def dumps_function(func): def dumps_task(task): - """ Serialize a dask task + """Serialize a dask task Returns a dict of bytestrings that can each be loaded with ``loads`` @@ -3397,7 +3394,7 @@ def apply_function( active_threads_lock, time_delay, ): - """ Run a function, collect information + """Run a function, collect information Returns ------- @@ -3437,7 +3434,7 @@ def apply_function( def apply_function_actor( function, args, kwargs, execution_state, key, active_threads, active_threads_lock ): - """ Run a function, collect information + """Run a function, collect information Returns ------- @@ -3460,7 +3457,7 @@ def apply_function_actor( def get_msg_safe_str(msg): - """ Make a worker msg, which contains args and kwargs, safe to cast to str: + """Make a worker msg, which contains args and kwargs, safe to cast to str: allowing for some arguments to raise exceptions during conversion and ignoring them. """ @@ -3482,7 +3479,7 @@ def __repr__(self): def convert_args_to_str(args, max_len=None): - """ Convert args to a string, allowing for some arguments to raise + """Convert args to a string, allowing for some arguments to raise exceptions during conversion and ignoring them. """ length = 0 @@ -3501,7 +3498,7 @@ def convert_args_to_str(args, max_len=None): def convert_kwargs_to_str(kwargs, max_len=None): - """ Convert kwargs to a string, allowing for some arguments to raise + """Convert kwargs to a string, allowing for some arguments to raise exceptions during conversion and ignoring them. """ length = 0 diff --git a/distributed/worker_client.py b/distributed/worker_client.py index a45eb891f7d..b39c6236df9 100644 --- a/distributed/worker_client.py +++ b/distributed/worker_client.py @@ -7,7 +7,7 @@ @contextmanager def worker_client(timeout=3, separate_thread=True): - """ Get client for this thread + """Get client for this thread This context manager is intended to be called within functions that we run on workers. When run as a context manager it delivers a client