Skip to content

Commit

Permalink
See Also
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Oct 14, 2022
1 parent 45834d8 commit 33ebec3
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 23 deletions.
78 changes: 59 additions & 19 deletions distributed/diagnostics/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import uuid
import zipfile
from collections.abc import Awaitable
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, ClassVar

from dask.utils import funcname, tmpfile

Expand Down Expand Up @@ -242,7 +242,7 @@ def _get_plugin_name(plugin: SchedulerPlugin | WorkerPlugin | NannyPlugin) -> st


class PackageInstall(WorkerPlugin, abc.ABC):
"""A Worker Plugin to install a set of packages using conda or pip
"""Abstract parent class for a worker plugin to install a set of packages
This accepts a set of packages to install on all workers.
You can also optionally ask for the worker to restart itself after
Expand All @@ -259,19 +259,14 @@ class PackageInstall(WorkerPlugin, abc.ABC):
----------
packages
A list of packages (with optional versions) to install
options
Additional options to pass to the package installer
restart
Whether or not to restart the worker after installing the packages
Only functions if the worker has an attached nanny process
Examples
See Also
--------
>>> from dask.distributed import PackageInstall
>>> plugin = PackageInstall(
... packages=["scikit-learn"], installer="pip", options=["--upgrade"]
... )
>>> client.register_worker_plugin(plugin)
CondaInstall
PipInstall
"""

name: str
Expand All @@ -289,6 +284,7 @@ def __init__(

@abc.abstractproperty
def installer(self) -> str:
"""Return the name of the installer"""
raise NotImplementedError

async def setup(self, worker):
Expand Down Expand Up @@ -317,6 +313,7 @@ async def setup(self, worker):

@abc.abstractmethod
def _install(self) -> None:
"""Install the requested packages"""
raise NotImplementedError

async def _is_installed(self, worker):
Expand Down Expand Up @@ -354,7 +351,44 @@ def _compose_restarted_key(self, worker):


class CondaInstall(PackageInstall):
_INSTALLER: str = "conda"
"""A Worker Plugin to conda install a set of packages
This accepts a set of packages to install on all workers as well as
options to use when installing.
You can also optionally ask for the worker to restart itself after
performing this installation.
.. note::
This will increase the time it takes to start up
each worker. If possible, we recommend including the
libraries in the worker environment or image. This is
primarily intended for experimentation and debugging.
Parameters
----------
packages
A list of packages (with optional versions) to install using conda
conda_options
Additional options to pass to conda
restart
Whether or not to restart the worker after installing the packages
Only functions if the worker has an attached nanny process
Examples
--------
>>> from dask.distributed import PipInstall
>>> plugin = CondaInstall(packages=["scikit-learn"], conda_options=["--update-deps"])
>>> client.register_worker_plugin(plugin)
See Also
--------
PackageInstall
PipInstall
"""

_INSTALLER: ClassVar[str] = "conda"

conda_options: list[str]

Expand Down Expand Up @@ -394,7 +428,8 @@ def _install(self):
class PipInstall(PackageInstall):
"""A Worker Plugin to pip install a set of packages
This accepts a set of packages to install on all workers.
This accepts a set of packages to install on all workers as well as
options to use when installing.
You can also optionally ask for the worker to restart itself after
performing this installation.
Expand All @@ -407,12 +442,12 @@ class PipInstall(PackageInstall):
Parameters
----------
packages : List[str]
A list of strings to place after "pip install" command
pip_options : List[str]
Additional options to pass to pip.
restart : bool, default False
Whether or not to restart the worker after pip installing
packages
A list of packages (with optional versions) to install using pip
pip_options
Additional options to pass to pip
restart
Whether or not to restart the worker after installing the packages
Only functions if the worker has an attached nanny process
Examples
Expand All @@ -421,9 +456,14 @@ class PipInstall(PackageInstall):
>>> plugin = PipInstall(packages=["scikit-learn"], pip_options=["--upgrade"])
>>> client.register_worker_plugin(plugin)
See Also
--------
PackageInstall
CondaInstall
"""

_INSTALLER: str = "pip"
_INSTALLER: ClassVar[str] = "pip"

pip_options: list[str]

Expand Down
8 changes: 4 additions & 4 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1756,14 +1756,14 @@ async def test_conda_install_fails_on_returncode(c, s, a, b):


class StubInstall(PackageInstall):
INSTALLER = "stub"
_INSTALLER = "stub"

def __init__(self, packages: list[str], restart: bool = False):
super().__init__(packages=packages, restart=restart)

@property
def installer(self) -> str:
return self.INSTALLER
return self._INSTALLER

def _install(self) -> None:
pass
Expand Down Expand Up @@ -1801,14 +1801,14 @@ async def test_package_install_restarts_on_nanny(c, s, a):


class FailingInstall(PackageInstall):
INSTALLER = "fail"
_INSTALLER = "fail"

def __init__(self, packages: list[str], restart: bool = False):
super().__init__(packages=packages, restart=restart)

@property
def installer(self) -> str:
return self.INSTALLER
return self._INSTALLER

def _install(self) -> None:
raise RuntimeError()
Expand Down

0 comments on commit 33ebec3

Please sign in to comment.