Skip to content

Commit

Permalink
Add dask plugin #patch (#1366)
Browse files Browse the repository at this point in the history
* Add dummy task type to test backend plugin

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add docs page

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add dask models

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add function to convert resources

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add tests to `dask` task

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Remove namespace

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Update setup.py

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add dask to `plugin/README.md`

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add README.md for `dask`

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Top level export of `JopPodSpec` and `DaskCluster`

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Update docs for images

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Update README.md

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Update models after `flyteidl` change

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Update task after `flyteidl` change

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Raise error when less than 1 worker

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Update flyteidl to >= 1.3.2

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Update doc requirements

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Update doc-requirements.txt

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Re-lock dependencies on linux

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Update dask API docs

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Fix documentation links

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Default optional model constructor arguments to `None`

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Refactor `convert_resources_to_resource_model` to `core.resources`

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Use `convert_resources_to_resource_model` in `core.node`

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Incorporate review feedback

Signed-off-by: Eduardo Apolinario <[email protected]>

* Lint

Signed-off-by: Eduardo Apolinario <[email protected]>

Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
3 people committed Jun 30, 2023
1 parent a7e7b46 commit 689c4ae
Show file tree
Hide file tree
Showing 19 changed files with 892 additions and 15 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ jobs:
- flytekit-aws-batch
- flytekit-aws-sagemaker
- flytekit-bigquery
- flytekit-dask
- flytekit-data-fsspec
- flytekit-dbt
- flytekit-deck-standard
Expand Down
1 change: 1 addition & 0 deletions doc-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ whylogs # whylogs
whylabs-client # whylogs
ray # ray
scikit-learn # scikit-learn
dask[distributed] # dask
vaex # vaex
mlflow # mlflow
12 changes: 12 additions & 0 deletions docs/source/plugins/dask.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
.. _dask:

###################################################
Dask API reference
###################################################

.. tags:: Integration, DistributedComputing, KubernetesOperator

.. automodule:: flytekitplugins.dask
:no-members:
:no-inherited-members:
:no-special-members:
2 changes: 2 additions & 0 deletions docs/source/plugins/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Plugin API reference
* :ref:`AWS Sagemaker <awssagemaker>` - AWS Sagemaker plugin reference
* :ref:`Google Bigquery <bigquery>` - Google Bigquery plugin reference
* :ref:`FS Spec <fsspec>` - FS Spec API reference
* :ref:`Dask <dask>` - Dask standard API reference
* :ref:`Deck standard <deck>` - Deck standard API reference
* :ref:`Dolt standard <dolt>` - Dolt standard API reference
* :ref:`Great expectations <greatexpectations>` - Great expectations API reference
Expand Down Expand Up @@ -41,6 +42,7 @@ Plugin API reference
AWS Sagemaker <awssagemaker>
Google Bigquery <bigquery>
FS Spec <fsspec>
Dask <dask>
Deck standard <deck>
Dolt standard <dolt>
Great expectations <greatexpectations>
Expand Down
16 changes: 10 additions & 6 deletions flytekit/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import typing
from typing import Any, List

from flytekit.core.resources import Resources
from flytekit.core.resources import Resources, convert_resources_to_resource_model
from flytekit.core.utils import _dnsify
from flytekit.loggers import logger
from flytekit.models import literals as _literal_models
Expand Down Expand Up @@ -95,9 +95,14 @@ def with_overrides(self, *args, **kwargs):
for k, v in alias_dict.items():
self._aliases.append(_workflow_model.Alias(var=k, alias=v))
if "requests" in kwargs or "limits" in kwargs:
requests = _convert_resource_overrides(kwargs.get("requests"), "requests")
limits = _convert_resource_overrides(kwargs.get("limits"), "limits")
self._resources = _resources_model(requests=requests, limits=limits)
requests = kwargs.get("requests")
if requests and not isinstance(requests, Resources):
raise AssertionError("requests should be specified as flytekit.Resources")
limits = kwargs.get("limits")
if limits and not isinstance(limits, Resources):
raise AssertionError("limits should be specified as flytekit.Resources")

self._resources = convert_resources_to_resource_model(requests=requests, limits=limits)
if "timeout" in kwargs:
timeout = kwargs["timeout"]
if timeout is None:
Expand Down Expand Up @@ -131,8 +136,7 @@ def _convert_resource_overrides(
) -> [_resources_model.ResourceEntry]:
if resources is None:
return []
if not isinstance(resources, Resources):
raise AssertionError(f"{resource_name} should be specified as flytekit.Resources")

resource_entries = []
if resources.cpu is not None:
resource_entries.append(_resources_model.ResourceEntry(_resources_model.ResourceName.CPU, resources.cpu))
Expand Down
43 changes: 42 additions & 1 deletion flytekit/core/resources.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from dataclasses import dataclass
from typing import Optional
from typing import List, Optional

from flytekit.models import task as task_models


@dataclass
Expand Down Expand Up @@ -35,3 +37,42 @@ class Resources(object):
class ResourceSpec(object):
requests: Optional[Resources] = None
limits: Optional[Resources] = None


_ResouceName = task_models.Resources.ResourceName
_ResourceEntry = task_models.Resources.ResourceEntry


def _convert_resources_to_resource_entries(resources: Resources) -> List[_ResourceEntry]:
resource_entries = []
if resources.cpu is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.CPU, value=resources.cpu))
if resources.mem is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.MEMORY, value=resources.mem))
if resources.gpu is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.GPU, value=resources.gpu))
if resources.storage is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.STORAGE, value=resources.storage))
if resources.ephemeral_storage is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.EPHEMERAL_STORAGE, value=resources.ephemeral_storage))
return resource_entries


def convert_resources_to_resource_model(
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
) -> task_models.Resources:
"""
Convert flytekit ``Resources`` objects to a Resources model
:param requests: Resource requests. Optional, defaults to ``None``
:param limits: Resource limits. Optional, defaults to ``None``
:return: The given resources as requests and limits
"""
request_entries = []
limit_entries = []
if requests is not None:
request_entries = _convert_resources_to_resource_entries(requests)
if limits is not None:
limit_entries = _convert_resources_to_resource_entries(limits)
return task_models.Resources(requests=request_entries, limits=limit_entries)
12 changes: 4 additions & 8 deletions flytekit/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@

from flytekit.core.pod_template import PodTemplate
from flytekit.loggers import logger

if TYPE_CHECKING:
from flytekit.models import task as task_models
from flytekit.models import task as task_models


def _dnsify(value: str) -> str:
Expand Down Expand Up @@ -58,8 +56,8 @@ def _dnsify(value: str) -> str:
def _get_container_definition(
image: str,
command: List[str],
args: Optional[List[str]] = None,
data_loading_config: Optional["task_models.DataLoadingConfig"] = None,
args: List[str],
data_loading_config: Optional[task_models.DataLoadingConfig] = None,
storage_request: Optional[str] = None,
ephemeral_storage_request: Optional[str] = None,
cpu_request: Optional[str] = None,
Expand All @@ -71,7 +69,7 @@ def _get_container_definition(
gpu_limit: Optional[str] = None,
memory_limit: Optional[str] = None,
environment: Optional[Dict[str, str]] = None,
) -> "task_models.Container":
) -> task_models.Container:
storage_limit = storage_limit
storage_request = storage_request
ephemeral_storage_limit = ephemeral_storage_limit
Expand All @@ -83,8 +81,6 @@ def _get_container_definition(
memory_limit = memory_limit
memory_request = memory_request

from flytekit.models import task as task_models

# TODO: Use convert_resources_to_resource_model instead of manually fixing the resources.
requests = []
if storage_request:
Expand Down
1 change: 1 addition & 0 deletions plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ All the Flytekit plugins maintained by the core team are added here. It is not n
| Plugin | Installation | Description | Version | Type |
|------------------------------|-----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
| AWS Sagemaker Training | ```bash pip install flytekitplugins-awssagemaker ``` | Installs SDK to author Sagemaker built-in and custom training jobs in python | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-awssagemaker.svg)](https://pypi.python.org/pypi/flytekitplugins-awssagemaker/) | Backend |
| dask | ```bash pip install flytekitplugins-dask ``` | Installs SDK to author dask jobs that can be executed natively on Kubernetes using the Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-awssagemaker.svg)](https://pypi.python.org/pypi/flytekitplugins-dask/) | Backend |
| Hive Queries | ```bash pip install flytekitplugins-hive ``` | Installs SDK to author Hive Queries that can be executed on a configured hive backend using Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-hive.svg)](https://pypi.python.org/pypi/flytekitplugins-hive/) | Backend |
| K8s distributed PyTorch Jobs | ```bash pip install flytekitplugins-kfpytorch ``` | Installs SDK to author Distributed pyTorch Jobs in python using Kubeflow PyTorch Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-kfpytorch.svg)](https://pypi.python.org/pypi/flytekitplugins-kfpytorch/) | Backend |
| K8s native tensorflow Jobs | ```bash pip install flytekitplugins-kftensorflow ``` | Installs SDK to author Distributed tensorflow Jobs in python using Kubeflow Tensorflow Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-kftensorflow.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend |
Expand Down
21 changes: 21 additions & 0 deletions plugins/flytekit-dask/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Flytekit Dask Plugin

Flyte can execute `dask` jobs natively on a Kubernetes Cluster, which manages the virtual `dask` cluster's lifecycle
(spin-up and tear down). It leverages the open-source Kubernetes Dask Operator and can be enabled without signing up
for any service. This is like running a transient (ephemeral) `dask` cluster - a type of cluster spun up for a specific
task and torn down after completion. This helps in making sure that the Python environment is the same on the job-runner
(driver), scheduler and the workers.

To install the plugin, run the following command:

```bash
pip install flytekitplugins-dask
```

To configure Dask in the Flyte deployment's backed, follow
[step 1](https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/kubernetes/k8s_dask/index.html#step-1-deploy-the-dask-plugin-in-the-flyte-backend)
and
[step 2](https://docs.flyte.org/projects/cookbook/en/latest/auto/auto/integrations/kubernetes/k8s_dask/index.html#step-2-environment-setup)

An [example](https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/kubernetes/k8s_dask/index.html)
can be found in the documentation.
15 changes: 15 additions & 0 deletions plugins/flytekit-dask/flytekitplugins/dask/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""
.. currentmodule:: flytekitplugins.dask
This package contains the Python related side of the Dask Plugin
.. autosummary::
:template: custom.rst
:toctree: generated/
Dask
Scheduler
WorkerGroup
"""

from flytekitplugins.dask.task import Dask, Scheduler, WorkerGroup
134 changes: 134 additions & 0 deletions plugins/flytekit-dask/flytekitplugins/dask/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from typing import Optional

from flyteidl.plugins import dask_pb2 as dask_task

from flytekit.models import common as common
from flytekit.models import task as task


class Scheduler(common.FlyteIdlEntity):
"""
Configuration for the scheduler pod
:param image: Optional image to use.
:param resources: Optional resources to use.
"""

def __init__(self, image: Optional[str] = None, resources: Optional[task.Resources] = None):
self._image = image
self._resources = resources

@property
def image(self) -> Optional[str]:
"""
:return: The optional image for the scheduler pod
"""
return self._image

@property
def resources(self) -> Optional[task.Resources]:
"""
:return: Optional resources for the scheduler pod
"""
return self._resources

def to_flyte_idl(self) -> dask_task.DaskScheduler:
"""
:return: The scheduler spec serialized to protobuf
"""
return dask_task.DaskScheduler(
image=self.image,
resources=self.resources.to_flyte_idl() if self.resources else None,
)


class WorkerGroup(common.FlyteIdlEntity):
"""
Configuration for a dask worker group
:param number_of_workers:Number of workers in the group
:param image: Optional image to use for the pods of the worker group
:param resources: Optional resources to use for the pods of the worker group
"""

def __init__(
self,
number_of_workers: int,
image: Optional[str] = None,
resources: Optional[task.Resources] = None,
):
if number_of_workers < 1:
raise ValueError(
f"Each worker group needs to have at least one worker, but {number_of_workers} have been specified."
)

self._number_of_workers = number_of_workers
self._image = image
self._resources = resources

@property
def number_of_workers(self) -> Optional[int]:
"""
:return: Optional number of workers for the worker group
"""
return self._number_of_workers

@property
def image(self) -> Optional[str]:
"""
:return: The optional image to use for the worker pods
"""
return self._image

@property
def resources(self) -> Optional[task.Resources]:
"""
:return: Optional resources to use for the worker pods
"""
return self._resources

def to_flyte_idl(self) -> dask_task.DaskWorkerGroup:
"""
:return: The dask cluster serialized to protobuf
"""
return dask_task.DaskWorkerGroup(
number_of_workers=self.number_of_workers,
image=self.image,
resources=self.resources.to_flyte_idl() if self.resources else None,
)


class DaskJob(common.FlyteIdlEntity):
"""
Configuration for the custom dask job to run
:param scheduler: Configuration for the scheduler
:param workers: Configuration of the default worker group
"""

def __init__(self, scheduler: Scheduler, workers: WorkerGroup):
self._scheduler = scheduler
self._workers = workers

@property
def scheduler(self) -> Scheduler:
"""
:return: Configuration for the scheduler pod
"""
return self._scheduler

@property
def workers(self) -> WorkerGroup:
"""
:return: Configuration of the default worker group
"""
return self._workers

def to_flyte_idl(self) -> dask_task.DaskJob:
"""
:return: The dask job serialized to protobuf
"""
return dask_task.DaskJob(
scheduler=self.scheduler.to_flyte_idl(),
workers=self.workers.to_flyte_idl(),
)
Loading

0 comments on commit 689c4ae

Please sign in to comment.