Skip to content

Commit

Permalink
Use a tempdir path by default instead of cwd for the worker scratch d…
Browse files Browse the repository at this point in the history
…ir (#6658)
  • Loading branch information
fjetter authored Jul 8, 2022
1 parent 8dd1872 commit d59500e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 35 deletions.
16 changes: 4 additions & 12 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
import shutil
import tempfile
import threading
import uuid
import warnings
Expand Down Expand Up @@ -121,7 +122,6 @@ def __init__( # type: ignore[no-untyped-def]
worker_port: int | str | Collection[int] | None = 0,
nthreads=None,
loop=None,
local_dir=None,
local_directory=None,
services=None,
name=None,
Expand Down Expand Up @@ -165,12 +165,10 @@ def __init__( # type: ignore[no-untyped-def]
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("worker")

if local_dir is not None:
warnings.warn("The local_dir keyword has moved to local_directory")
local_directory = local_dir

if local_directory is None:
local_directory = dask.config.get("temporary-directory") or os.getcwd()
local_directory = (
dask.config.get("temporary-directory") or tempfile.gettempdir()
)
self._original_local_dir = local_directory
local_directory = os.path.join(local_directory, "dask-worker-space")
else:
Expand Down Expand Up @@ -320,12 +318,6 @@ def worker_address(self):
def worker_dir(self):
return None if self.process is None else self.process.worker_dir

@property
def local_dir(self):
"""For API compatibility with Nanny"""
warnings.warn("The local_dir attribute has moved to local_directory")
return self.local_directory

async def start_unsafe(self):
"""Start nanny, start local process, start watching"""

Expand Down
31 changes: 17 additions & 14 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import os
import sys
import tempfile
import threading
import traceback
import warnings
Expand Down Expand Up @@ -934,25 +935,27 @@ async def test_heartbeats(c, s, a, b):


@pytest.mark.parametrize("worker", [Worker, Nanny])
def test_worker_dir(worker):
with tmpfile() as fn:
def test_worker_dir(worker, tmpdir):
@gen_cluster(client=True, worker_kwargs={"local_directory": str(tmpdir)})
async def test_worker_dir(c, s, a, b):
directories = [w.local_directory for w in s.workers.values()]
assert all(d.startswith(str(tmpdir)) for d in directories)
assert len(set(directories)) == 2 # distinct

@gen_cluster(client=True, worker_kwargs={"local_directory": fn})
async def test_worker_dir(c, s, a, b):
directories = [w.local_directory for w in s.workers.values()]
assert all(d.startswith(fn) for d in directories)
assert len(set(directories)) == 2 # distinct
test_worker_dir()

test_worker_dir()

@gen_cluster(client=True, nthreads=[], config={"temporary-directory": None})
async def test_default_worker_dir(c, s):
expect = os.path.join(tempfile.gettempdir(), "dask-worker-space")

@gen_cluster(nthreads=[], config={"temporary-directory": None})
async def test_false_worker_dir(s):
async with Worker(s.address, local_directory="") as w:
local_directory = w.local_directory
async with Worker(s.address) as w:
assert os.path.dirname(w.local_directory) == expect

cwd = os.getcwd()
assert os.path.dirname(local_directory) == os.path.join(cwd, "dask-worker-space")
async with Nanny(s.address) as n:
assert n.local_directory == expect
results = await c.run(lambda dask_worker: dask_worker.local_directory)
assert os.path.dirname(results[n.worker_address]) == expect


@gen_cluster(client=True)
Expand Down
12 changes: 4 additions & 8 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pathlib
import random
import sys
import tempfile
import threading
import warnings
import weakref
Expand Down Expand Up @@ -439,7 +440,6 @@ def __init__(
scheduler_file: str | None = None,
nthreads: int | None = None,
loop: IOLoop | None = None, # Deprecated
local_dir: None = None, # Deprecated, use local_directory instead
local_directory: str | None = None,
services: dict | None = None,
name: Any | None = None,
Expand Down Expand Up @@ -560,14 +560,10 @@ def __init__(

self._setup_logging(logger, wsm_logger)

if local_dir is not None:
warnings.warn( # type: ignore[unreachable]
"The local_dir keyword has moved to local_directory"
)
local_directory = local_dir

if not local_directory:
local_directory = dask.config.get("temporary-directory") or os.getcwd()
local_directory = (
dask.config.get("temporary-directory") or tempfile.gettempdir()
)

os.makedirs(local_directory, exist_ok=True)
local_directory = os.path.join(local_directory, "dask-worker-space")
Expand Down
3 changes: 2 additions & 1 deletion docs/source/worker-memory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ types like NumPy arrays and Pandas dataframes. The sum of the ``sizeof`` of all
tracked by Dask is called :ref:`managed memory <memtypes>`.

When the managed memory exceeds 60% of the memory limit (*target threshold*), the worker
will begin to dump the least recently used data to disk. You can control this location
will begin to dump the least recently used data to disk. By default, it writes to the
OS's temporary directory (``/tmp`` in Linux); you can control this location
with the ``--local-directory`` keyword::

$ dask-worker tcp://scheduler:port --memory-limit="4 GiB" --local-directory /scratch
Expand Down

0 comments on commit d59500e

Please sign in to comment.