From 1474c811debb03363fedb6f6731d40ae13b74a71 Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Wed, 7 Feb 2024 15:24:52 +0100 Subject: [PATCH 1/4] Adds cwd propagation to DaskExecutor --- cluster_tools/cluster_tools/executors/dask.py | 7 ++++- cluster_tools/tests/test_dask.py | 29 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) create mode 100644 cluster_tools/tests/test_dask.py diff --git a/cluster_tools/cluster_tools/executors/dask.py b/cluster_tools/cluster_tools/executors/dask.py index 35ea39bc4..c312388e4 100644 --- a/cluster_tools/cluster_tools/executors/dask.py +++ b/cluster_tools/cluster_tools/executors/dask.py @@ -41,6 +41,9 @@ def _run_in_nanny( for key, value in __env.items(): os.environ[key] = value + print(os.environ["PWD"]) + if "PWD" in os.environ: + os.chdir(os.environ["PWD"]) ret = __fn(*args, **kwargs) queue.put({"value": ret}) except Exception as exc: @@ -174,7 +177,9 @@ def submit( # type: ignore[override] ), ) - kwargs["__env"] = os.environ.copy() + __env = os.environ.copy() + __env["PWD"] = os.getcwd() + kwargs["__env"] = __env # We run the functions in dask as a separate process to not hold the # GIL for too long, because dask workers need to be able to communicate diff --git a/cluster_tools/tests/test_dask.py b/cluster_tools/tests/test_dask.py new file mode 100644 index 000000000..f7393cda7 --- /dev/null +++ b/cluster_tools/tests/test_dask.py @@ -0,0 +1,29 @@ +import os +from typing import TYPE_CHECKING, List, Optional + +if TYPE_CHECKING: + from distributed import LocalCluster + +import cluster_tools + +_dask_cluster: Optional["LocalCluster"] = None + + +def job(_arg: None) -> str: + return os.getcwd() + + +def test_pass_cwd() -> None: + global _dask_cluster + if not _dask_cluster: + from distributed import LocalCluster, Worker + + _dask_cluster = LocalCluster( + worker_class=Worker, resources={"mem": 20e9, "cpus": 4}, nthreads=6 + ) + with cluster_tools.get_executor( + "dask", job_resources={"address": _dask_cluster} + ) as exec: + tmp_path = os.path.realpath("/tmp") # macOS redirects `/tmp` to `/private/tmp` + os.chdir(tmp_path) + assert list(exec.map(job, [None])) == [tmp_path] From 561f105d4462836ca4701f651e37f849567002ba Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Wed, 7 Feb 2024 15:28:25 +0100 Subject: [PATCH 2/4] changelog --- cluster_tools/Changelog.md | 1 + 1 file changed, 1 insertion(+) diff --git a/cluster_tools/Changelog.md b/cluster_tools/Changelog.md index 9ae58b858..a9019840c 100644 --- a/cluster_tools/Changelog.md +++ b/cluster_tools/Changelog.md @@ -16,6 +16,7 @@ For upgrade instructions, please check the respective *Breaking Changes* section ### Changed ### Fixed +- Fixed working directory propagation in DaskExecutor. [#994](https://github.com/scalableminds/webknossos-libs/pull/994) ## [0.14.14](https://github.com/scalableminds/webknossos-libs/releases/tag/v0.14.14) - 2024-01-12 From 3123c5ac43224e8ca2b1236e6882e8c670c306cb Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Wed, 7 Feb 2024 15:41:16 +0100 Subject: [PATCH 3/4] add test_dask to CI --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 713f104fd..98d6e88d0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -134,7 +134,7 @@ jobs: if: ${{ matrix.executors == 'dask' && matrix.python-version != '3.8' }} run: | cd tests - PYTEST_EXECUTORS=dask poetry run python -m pytest -sv test_all.py + PYTEST_EXECUTORS=dask poetry run python -m pytest -sv test_all.py test_dask.py webknossos_linux: needs: changes From 8a0d149d61994c50e6bf12ac1265632e0b9e26b9 Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Wed, 7 Feb 2024 16:21:06 +0100 Subject: [PATCH 4/4] Update cluster_tools/cluster_tools/executors/dask.py Co-authored-by: Daniel --- cluster_tools/cluster_tools/executors/dask.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cluster_tools/cluster_tools/executors/dask.py b/cluster_tools/cluster_tools/executors/dask.py index c312388e4..076ff7212 100644 --- a/cluster_tools/cluster_tools/executors/dask.py +++ b/cluster_tools/cluster_tools/executors/dask.py @@ -41,7 +41,6 @@ def _run_in_nanny( for key, value in __env.items(): os.environ[key] = value - print(os.environ["PWD"]) if "PWD" in os.environ: os.chdir(os.environ["PWD"]) ret = __fn(*args, **kwargs)