Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dask plugin #patch #1366

Merged
merged 31 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
eddb152
Add dummy task type to test backend plugin
bstadlbauer Jun 26, 2022
407d5cb
Add docs page
bstadlbauer Dec 10, 2022
992d5e7
Add dask models
bstadlbauer Dec 10, 2022
d7bdf47
Add function to convert resources
bstadlbauer Dec 10, 2022
641d2ac
Add tests to `dask` task
bstadlbauer Dec 10, 2022
96162bf
Remove namespace
bstadlbauer Dec 11, 2022
6092b6f
Update setup.py
bstadlbauer Dec 12, 2022
1caceff
Add dask to `plugin/README.md`
bstadlbauer Dec 12, 2022
afa54df
Add README.md for `dask`
bstadlbauer Dec 12, 2022
49bdab3
Top level export of `JopPodSpec` and `DaskCluster`
bstadlbauer Dec 12, 2022
96270f3
Update docs for images
bstadlbauer Dec 12, 2022
dfc4131
Update README.md
bstadlbauer Dec 12, 2022
de2af6d
Update models after `flyteidl` change
bstadlbauer Dec 17, 2022
8633ecc
Update task after `flyteidl` change
bstadlbauer Dec 17, 2022
528a085
Merge remote-tracking branch 'origin/master' into add-dask-plugin
bstadlbauer Dec 17, 2022
019529d
Raise error when less than 1 worker
bstadlbauer Dec 17, 2022
b04fb3d
Update flyteidl to >= 1.3.2
bstadlbauer Jan 5, 2023
4d5e445
Update doc requirements
bstadlbauer Jan 5, 2023
a5311ae
Merge branch 'master' into add-dask-plugin
bstadlbauer Jan 5, 2023
f703c09
Update doc-requirements.txt
bstadlbauer Jan 5, 2023
0465291
Re-lock dependencies on linux
Jan 5, 2023
f02d9dc
Update dask API docs
bstadlbauer Jan 5, 2023
b50be3e
Merge branch 'master' into add-dask-plugin
bstadlbauer Jan 9, 2023
167d654
Fix documentation links
bstadlbauer Jan 9, 2023
dab1bc6
Default optional model constructor arguments to `None`
bstadlbauer Jan 9, 2023
3722b0a
Refactor `convert_resources_to_resource_model` to `core.resources`
bstadlbauer Jan 9, 2023
4f829b7
Use `convert_resources_to_resource_model` in `core.node`
bstadlbauer Jan 9, 2023
43147cb
Merge branch 'master' into add-dask-plugin
bstadlbauer Jan 10, 2023
1e6e3e1
Merge branch 'master' into add-dask-plugin
eapolinario Jan 11, 2023
aadf78c
Incorporate review feedback
eapolinario Jan 12, 2023
8c62437
Lint
eapolinario Jan 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ jobs:
- flytekit-aws-batch
- flytekit-aws-sagemaker
- flytekit-bigquery
- flytekit-dask
- flytekit-data-fsspec
- flytekit-dbt
- flytekit-deck-standard
Expand Down
10 changes: 10 additions & 0 deletions docs/source/plugins/dask.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
.. _dask:

###################################################
Dask API reference
###################################################

.. automodule:: flytekitplugins.dask
:no-members:
:no-inherited-members:
:no-special-members:
2 changes: 2 additions & 0 deletions docs/source/plugins/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Plugin API reference
* :ref:`AWS Sagemaker <awssagemaker>` - AWS Sagemaker plugin reference
* :ref:`Google Bigquery <bigquery>` - Google Bigquery plugin reference
* :ref:`FS Spec <fsspec>` - FS Spec API reference
* :ref:`Dask <dask>` - Dask standard API reference
* :ref:`Deck standard <deck>` - Deck standard API reference
* :ref:`Dolt standard <dolt>` - Dolt standard API reference
* :ref:`Great expectations <greatexpectations>` - Great expectations API reference
Expand Down Expand Up @@ -38,6 +39,7 @@ Plugin API reference
AWS Sagemaker <awssagemaker>
Google Bigquery <bigquery>
FS Spec <fsspec>
Dask <dask>
Deck standard <deck>
Dolt standard <dolt>
Great expectations <greatexpectations>
Expand Down
1 change: 1 addition & 0 deletions plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ All the Flytekit plugins maintained by the core team are added here. It is not n
| Plugin | Installation | Description | Version | Type |
|------------------------------|-----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
| AWS Sagemaker Training | ```bash pip install flytekitplugins-awssagemaker ``` | Installs SDK to author Sagemaker built-in and custom training jobs in python | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-awssagemaker.svg)](https://pypi.python.org/pypi/flytekitplugins-awssagemaker/) | Backend |
| dask | ```bash pip install flytekitplugins-dask ``` | Installs SDK to author dask jobs that can be executed natively on Kubernetes using the Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-awssagemaker.svg)](https://pypi.python.org/pypi/flytekitplugins-awssagemaker/) | Backend |
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
| Hive Queries | ```bash pip install flytekitplugins-hive ``` | Installs SDK to author Hive Queries that can be executed on a configured hive backend using Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-hive.svg)](https://pypi.python.org/pypi/flytekitplugins-hive/) | Backend |
| K8s distributed PyTorch Jobs | ```bash pip install flytekitplugins-kfpytorch ``` | Installs SDK to author Distributed pyTorch Jobs in python using Kubeflow PyTorch Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-kfpytorch.svg)](https://pypi.python.org/pypi/flytekitplugins-kfpytorch/) | Backend |
| K8s native tensorflow Jobs | ```bash pip install flytekitplugins-kftensorflow ``` | Installs SDK to author Distributed tensorflow Jobs in python using Kubeflow Tensorflow Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-kftensorflow.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend |
Expand Down
21 changes: 21 additions & 0 deletions plugins/flytekit-dask/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Flytekit Dask Plugin

Flyte can execute `dask` jobs natively on a Kubernetes Cluster, which manages the virtual `dask` cluster's lifecycle
(spin-up and tear down). It leverages the open-source Kubernetes Dask Operator and can be enabled without signing up
for any service. This is like running a transient (ephemeral) `dask` cluster - a type of cluster spun up for a specific
task and torn down after completion. This helps in making sure that the Python environment is the same on the job-runner
(driver), scheduler and the workers.

To install the plugin, run the following command:

```bash
pip install flytekitplugins-dask
```

To configure Spark in the Flyte deployment's backed, follow
[step 1](https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/flytekit_plugins/k8s_dask/index.html#step-1-deploy-the-dask-plugin-in-the-flyte-backend)
and
[step 2](https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/flytekit_plugins/k8s_dask/index.html#step-2-environment-setup)

An [example](https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/flytekit_plugins/k8s_dask/index.html)
can be found in the documentation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add one line about what happens in local execution?

1 change: 1 addition & 0 deletions plugins/flytekit-dask/flytekitplugins/dask/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from flytekitplugins.dask.task import Dask, DaskCluster, JobPodSpec
124 changes: 124 additions & 0 deletions plugins/flytekit-dask/flytekitplugins/dask/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from typing import Optional

from flyteidl.plugins import dask_pb2 as _dask_task

from flytekit.models import common as _common
from flytekit.models import task as _task
eapolinario marked this conversation as resolved.
Show resolved Hide resolved


class JobPodSpec(_common.FlyteIdlEntity):
"""
Configuration for the job runner pod

:param image: Optional image to use.
:param resources: Optional resources to use.
"""

def __init__(self, image: Optional[str], resources: Optional[_task.Resources]):
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
self._image = image
self._resources = resources

@property
def image(self) -> Optional[str]:
"""
:return: The optional image for the job runner pod
"""
return self._image

@property
def resources(self) -> Optional[_task.Resources]:
"""
:return: Optional resources for the job runner pod
"""
return self._resources

def to_flyte_idl(self) -> _dask_task.JobPodSpec:
"""
:return: The job pod spec serialized to protobuf
"""
return _dask_task.JobPodSpec(
image=self.image,
resources=self.resources.to_flyte_idl() if self.resources else None,
)


class DaskCluster(_common.FlyteIdlEntity):
"""
Configuration for the dask cluster the job runner connects to

:param image: Optional image to use for the cluster pods (scheduler and workers)
:param n_workers: Optional worker count to use for the dask cluster
:param resources: Optional resources to use for the cluster pods (scheduler and workers)
"""

def __init__(self, image: Optional[str], n_workers: Optional[int], resources: Optional[_task.Resources]):
self._image = image
self._n_workers = n_workers
self._resources = resources

@property
def image(self) -> Optional[str]:
"""
:return: The optional image to use for the cluster pods
"""
return self._image

@property
def n_workers(self) -> Optional[int]:
"""
:return: Optional number of workers for the cluster
"""
return self._n_workers

@property
def resources(self) -> Optional[_task.Resources]:
"""
:return: Optional resources for the pods of the cluster
"""
return self._resources

def to_flyte_idl(self) -> _dask_task.DaskCluster:
"""
:return: The dask cluster serialized to protobuf
"""
return _dask_task.DaskCluster(
image=self.image,
nWorkers=self.n_workers,
resources=self.resources.to_flyte_idl() if self.resources else None,
)


class DaskJob(_common.FlyteIdlEntity):
"""
Configuration for the custom dask job to run

:param job_pod_spec: Configuration for the job runner pod
:param dask_cluster: Configuration for the dask cluster
"""

def __init__(self, job_pod_spec: JobPodSpec, dask_cluster: DaskCluster):
self._job_pod_spec = job_pod_spec
self._dask_cluster = dask_cluster

@property
def job_pod_spec(self) -> JobPodSpec:
"""
:return: Configuration for the job runner pod
"""
return self._job_pod_spec

@property
def dask_cluster(self) -> DaskCluster:
"""
:return: Configuration for the dask cluster
"""
return self._dask_cluster

def to_flyte_idl(self) -> _dask_task.DaskJob:
"""
:return: The dask job serialized to protobuf
"""
return _dask_task.DaskJob(
jobPodSpec=self.job_pod_spec.to_flyte_idl(),
cluster=self.dask_cluster.to_flyte_idl(),
)
42 changes: 42 additions & 0 deletions plugins/flytekit-dask/flytekitplugins/dask/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import List, Optional

import flytekit.models.task as _task_models
from flytekit import Resources

_ResouceName = _task_models.Resources.ResourceName
_ResourceEntry = _task_models.Resources.ResourceEntry


def _convert_resources_to_resouce_entries(resources: Resources) -> List[_ResourceEntry]:
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
resource_entries = []
if resources.cpu is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.CPU, value=resources.cpu))
if resources.mem is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.MEMORY, value=resources.mem))
if resources.gpu is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.GPU, value=resources.gpu))
if resources.storage is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.STORAGE, value=resources.storage))
if resources.ephemeral_storage is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.EPHEMERAL_STORAGE, value=resources.ephemeral_storage))
return resource_entries


def convert_resources_to_resource_model(
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
) -> _task_models.Resources:
"""
Convert flytekit ``Resources`` objects to a Resources model

:param requests: Resource requests. Optional, defaults to ``None``
:param limits: Resource limits. Optional, defaults to ``None``
:return: The given resources as requests and limits
"""
request_entries = []
limit_entries = []
if requests is not None:
request_entries = _convert_resources_to_resouce_entries(requests)
if limits is not None:
limit_entries = _convert_resources_to_resouce_entries(limits)
return _task_models.Resources(requests=request_entries, limits=limit_entries)
109 changes: 109 additions & 0 deletions plugins/flytekit-dask/flytekitplugins/dask/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from dataclasses import dataclass
from typing import Any, Callable, Dict, Optional

from flytekitplugins.dask import models
from flytekitplugins.dask.resources import convert_resources_to_resource_model
from google.protobuf.json_format import MessageToDict

from flytekit import PythonFunctionTask, Resources
from flytekit.configuration import SerializationSettings
from flytekit.core.task import TaskPlugins


@dataclass
class JobPodSpec:
"""
Configuration for the dask job runner pod

:param image: Custom image to use. If ``None``, will use the same image the task was registered with. Optional,
defaults to ``None``. The image must have ``dask[distributed]`` installed and should have the same Python
environment as the cluster (scheduler + worker pods).
:param requests: Resources to request for the job runner pod. If ``None``, the requests passed into the task will be
used. Optional, defaults to ``None``
:param limits: Resource limits for the job runner pod. If ``None``, the limits passed into the task will be used.
Optional, defaults to ``None``
"""

image: Optional[str] = None
requests: Optional[Resources] = None
limits: Optional[Resources] = None
bstadlbauer marked this conversation as resolved.
Show resolved Hide resolved


@dataclass
class DaskCluster:
"""
Configuration for the dask cluster pods

:param image: Custom image to use. If ``None``, will use the same image the task was registered with. Optional,
defaults to ``None``. The image must have ``dask[distributed]`` installed. The provided image should have the
same Python environment as the job runner/driver.
:param n_workers: Number of workers to use. Optional, defaults to 1.
:param requests: Resources to request for the scheduler pod as well as the worker pods. If ``None``, the requests
passed into the task will be used. Optional, defaults to ``None``
:param limits: Resource limits for the scheduler pod as well as the worker pods. If ``None``, the limits
passed into the task will be used. Optional, defaults to ``None``
"""

image: Optional[str] = None
n_workers: Optional[int] = 1
requests: Optional[Resources] = None
limits: Optional[Resources] = None


@dataclass
class Dask:
"""
Configuration for the dask task

:param job_pod_spec: Configuration for the job runner pod. Optional, defaults to ``JobPodSpec()``
:param cluster: Configuration for the dask cluster pods (scheduler and workers). Optional, defaults to
``DaskCluster()``
"""

job_pod_spec: JobPodSpec = JobPodSpec()
cluster: DaskCluster = DaskCluster()


class DaskTask(PythonFunctionTask[Dask]):
"""
Actual Plugin that transforms the local python code for execution within a dask cluster
"""

_DASK_TASK_TYPE = "dask"

def __init__(self, task_config: Dask, task_function: Callable, **kwargs):
super(DaskTask, self).__init__(
task_config=task_config,
task_type=self._DASK_TASK_TYPE,
task_function=task_function,
**kwargs,
)

def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any]]:
"""
Serialize the `dask` task config into a dict.

:param settings: Current serialization settings
:return: Dictionary representation of the dask task config.
"""
job_pod_spec = models.JobPodSpec(
image=self.task_config.job_pod_spec.image,
resources=convert_resources_to_resource_model(
requests=self.task_config.job_pod_spec.requests,
limits=self.task_config.job_pod_spec.limits,
),
)
dask_cluster = models.DaskCluster(
image=self.task_config.cluster.image,
n_workers=self.task_config.cluster.n_workers,
resources=convert_resources_to_resource_model(
requests=self.task_config.cluster.requests,
limits=self.task_config.cluster.limits,
),
)
job = models.DaskJob(job_pod_spec=job_pod_spec, dask_cluster=dask_cluster)
return MessageToDict(job.to_flyte_idl())


# Inject the `dask` plugin into flytekits dynamic plugin loading system
TaskPlugins.register_pythontask_plugin(Dask, DaskTask)
42 changes: 42 additions & 0 deletions plugins/flytekit-dask/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from setuptools import setup

PLUGIN_NAME = "dask"

microlib_name = f"flytekitplugins-{PLUGIN_NAME}"

plugin_requires = [
"flyteidl>=1.3.1", # FIXME: Use whatever has the dask idl # FIXME: Check this version
"flytekit>=1.2.5,<2.0.0",
"dask[distributed]>=2022.10.2",
]

__version__ = "0.0.0+develop"

setup(
name=microlib_name,
version=__version__,
author="flyteorg",
author_email="[email protected]",
description="Dask plugin for flytekit",
url="https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-dask",
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
namespace_packages=["flytekitplugins"],
packages=[f"flytekitplugins.{PLUGIN_NAME}"],
install_requires=plugin_requires,
license="apache2",
python_requires=">=3.8", # dask requires >= 3.8
classifiers=[
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Topic :: Scientific/Engineering",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Software Development",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
],
)
Empty file.
Loading