Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement PackageInstall plugin for pip and conda #7126

Merged
merged 17 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
from distributed.core import Status, connect, rpc
from distributed.deploy import Adaptive, LocalCluster, SpecCluster, SSHCluster
from distributed.diagnostics.plugin import (
CondaInstall,
Environ,
NannyPlugin,
PackageInstall,
PipInstall,
SchedulerPlugin,
UploadDirectory,
Expand Down Expand Up @@ -109,6 +111,7 @@ def _():
"CancelledError",
"Client",
"CompatibleExecutor",
"CondaInstall",
"Environ",
"Event",
"Future",
Expand All @@ -118,6 +121,7 @@ def _():
"MultiLock",
"Nanny",
"NannyPlugin",
"PackageInstall",
"PipInstall",
"Pub",
"Queue",
Expand Down
208 changes: 174 additions & 34 deletions distributed/diagnostics/plugin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import abc
import logging
import os
import socket
Expand All @@ -8,7 +9,7 @@
import uuid
import zipfile
from collections.abc import Awaitable
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, ClassVar

from dask.utils import funcname, tmpfile

Expand Down Expand Up @@ -240,8 +241,8 @@ def _get_plugin_name(plugin: SchedulerPlugin | WorkerPlugin | NannyPlugin) -> st
return funcname(type(plugin)) + "-" + str(uuid.uuid4())


class PipInstall(WorkerPlugin):
"""A Worker Plugin to pip install a set of packages
class PackageInstall(WorkerPlugin, abc.ABC):
"""Abstract parent class for a worker plugin to install a set of packages

This accepts a set of packages to install on all workers.
You can also optionally ask for the worker to restart itself after
Expand All @@ -256,29 +257,32 @@ class PipInstall(WorkerPlugin):

Parameters
----------
packages : List[str]
A list of strings to place after "pip install" command
pip_options : List[str]
Additional options to pass to pip.
restart : bool, default False
Whether or not to restart the worker after pip installing
packages
A list of packages (with optional versions) to install
restart
Whether or not to restart the worker after installing the packages
Only functions if the worker has an attached nanny process

Examples
See Also
--------
>>> from dask.distributed import PipInstall
>>> plugin = PipInstall(packages=["scikit-learn"], pip_options=["--upgrade"])

>>> client.register_worker_plugin(plugin)
CondaInstall
PipInstall
"""

name = "pip"
INSTALLER: ClassVar[str]

def __init__(self, packages, pip_options=None, restart=False):
name: str
packages: list[str]
restart: bool

def __init__(
self,
packages: list[str],
restart: bool,
):
self.packages = packages
self.restart = restart
self.pip_options = pip_options or []
self.id = f"pip-install-{uuid.uuid4()}"
self.name = f"{self.INSTALLER}-install-{uuid.uuid4()}"

async def setup(self, worker):
from distributed.semaphore import Semaphore
Expand All @@ -287,9 +291,13 @@ async def setup(self, worker):
await Semaphore(max_leases=1, name=socket.gethostname(), register=True)
):
if not await self._is_installed(worker):
logger.info("Pip installing the following packages: %s", self.packages)
logger.info(
"%s installing the following packages: %s",
self.INSTALLER,
self.packages,
)
await self._set_installed(worker)
self._install()
self.install()
else:
logger.info(
"The following packages have already been installed: %s",
Expand All @@ -301,18 +309,9 @@ async def setup(self, worker):
await self._set_restarted(worker)
worker.loop.add_callback(worker.close_gracefully, restart=True)

def _install(self):
proc = subprocess.Popen(
[sys.executable, "-m", "pip", "install"] + self.pip_options + self.packages,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
_, stderr = proc.communicate()
returncode = proc.wait()
if returncode != 0:
msg = f"Pip install failed with '{stderr.decode().strip()}'"
logger.error(msg)
raise RuntimeError(msg)
@abc.abstractmethod
def install(self) -> None:
"""Install the requested packages"""

async def _is_installed(self, worker):
return await worker.client.get_metadata(
Expand All @@ -327,7 +326,7 @@ async def _set_installed(self, worker):

def _compose_installed_key(self):
return [
self.id,
self.name,
"installed",
socket.gethostname(),
]
Expand All @@ -345,7 +344,148 @@ async def _set_restarted(self, worker):
)

def _compose_restarted_key(self, worker):
return [self.id, "restarted", worker.nanny]
return [self.name, "restarted", worker.nanny]


class CondaInstall(PackageInstall):
"""A Worker Plugin to conda install a set of packages

This accepts a set of packages to install on all workers as well as
options to use when installing.
You can also optionally ask for the worker to restart itself after
performing this installation.

.. note::

This will increase the time it takes to start up
each worker. If possible, we recommend including the
libraries in the worker environment or image. This is
primarily intended for experimentation and debugging.

Parameters
----------
packages
A list of packages (with optional versions) to install using conda
conda_options
Additional options to pass to conda
restart
Whether or not to restart the worker after installing the packages
Only functions if the worker has an attached nanny process

Examples
--------
>>> from dask.distributed import CondaInstall
>>> plugin = CondaInstall(packages=["scikit-learn"], conda_options=["--update-deps"])

>>> client.register_worker_plugin(plugin)

See Also
--------
PackageInstall
PipInstall
"""

INSTALLER = "conda"

conda_options: list[str]

def __init__(
self,
packages: list[str],
conda_options: list[str] | None = None,
restart: bool = False,
):
super().__init__(packages, restart=restart)
self.conda_options = conda_options or []

def install(self) -> None:
try:
from conda.cli.python_api import Commands, run_command
except ModuleNotFoundError as e: # pragma: nocover
msg = (
"conda install failed because conda could not be found. "
"Please make sure that conda is installed."
)
logger.error(msg)
raise RuntimeError(msg) from e
try:
_, stderr, returncode = run_command(
Commands.INSTALL, self.conda_options + self.packages
)
except Exception as e:
msg = "conda install failed"
logger.error(msg)
raise RuntimeError(msg) from e

if returncode != 0:
msg = f"conda install failed with '{stderr.decode().strip()}'"
logger.error(msg)
raise RuntimeError(msg)


class PipInstall(PackageInstall):
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
"""A Worker Plugin to pip install a set of packages
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved

This accepts a set of packages to install on all workers as well as
options to use when installing.
You can also optionally ask for the worker to restart itself after
performing this installation.

.. note::

This will increase the time it takes to start up
each worker. If possible, we recommend including the
libraries in the worker environment or image. This is
primarily intended for experimentation and debugging.

Parameters
----------
packages
A list of packages (with optional versions) to install using pip
pip_options
Additional options to pass to pip
restart
Whether or not to restart the worker after installing the packages
Only functions if the worker has an attached nanny process

Examples
--------
>>> from dask.distributed import PipInstall
>>> plugin = PipInstall(packages=["scikit-learn"], pip_options=["--upgrade"])

>>> client.register_worker_plugin(plugin)

See Also
--------
PackageInstall
CondaInstall
"""

INSTALLER = "pip"

pip_options: list[str]

def __init__(
self,
packages: list[str],
pip_options: list[str] | None = None,
restart: bool = False,
):
super().__init__(packages, restart=restart)
self.pip_options = pip_options or []

def install(self) -> None:
proc = subprocess.Popen(
[sys.executable, "-m", "pip", "install"] + self.pip_options + self.packages,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
_, stderr = proc.communicate()
returncode = proc.wait()
if returncode != 0:
msg = f"pip install failed with '{stderr.decode().strip()}'"
logger.error(msg)
raise RuntimeError(msg)


# Adapted from https://github.com/dask/distributed/issues/3560#issuecomment-596138522
Expand Down
Loading