Skip to content

Commit

Permalink
Revert "Add dask plugin #patch (#1366)"
Browse files Browse the repository at this point in the history
This reverts commit 41a9c7a.

Signed-off-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
eapolinario committed Feb 22, 2023
1 parent ecea6cb commit abfcbb4
Show file tree
Hide file tree
Showing 19 changed files with 26 additions and 906 deletions.
1 change: 0 additions & 1 deletion .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ jobs:
- flytekit-aws-batch
- flytekit-aws-sagemaker
- flytekit-bigquery
- flytekit-dask
- flytekit-data-fsspec
- flytekit-dbt
- flytekit-deck-standard
Expand Down
1 change: 0 additions & 1 deletion doc-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,5 @@ whylogs # whylogs
whylabs-client # whylogs
ray # ray
scikit-learn # scikit-learn
dask[distributed] # dask
vaex # vaex
mlflow # mlflow
12 changes: 0 additions & 12 deletions docs/source/plugins/dask.rst

This file was deleted.

2 changes: 0 additions & 2 deletions docs/source/plugins/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ Plugin API reference
* :ref:`AWS Sagemaker <awssagemaker>` - AWS Sagemaker plugin reference
* :ref:`Google Bigquery <bigquery>` - Google Bigquery plugin reference
* :ref:`FS Spec <fsspec>` - FS Spec API reference
* :ref:`Dask <dask>` - Dask standard API reference
* :ref:`Deck standard <deck>` - Deck standard API reference
* :ref:`Dolt standard <dolt>` - Dolt standard API reference
* :ref:`Great expectations <greatexpectations>` - Great expectations API reference
Expand Down Expand Up @@ -41,7 +40,6 @@ Plugin API reference
AWS Sagemaker <awssagemaker>
Google Bigquery <bigquery>
FS Spec <fsspec>
Dask <dask>
Deck standard <deck>
Dolt standard <dolt>
Great expectations <greatexpectations>
Expand Down
16 changes: 6 additions & 10 deletions flytekit/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import typing
from typing import Any, List

from flytekit.core.resources import Resources, convert_resources_to_resource_model
from flytekit.core.resources import Resources
from flytekit.core.utils import _dnsify
from flytekit.loggers import logger
from flytekit.models import literals as _literal_models
Expand Down Expand Up @@ -93,14 +93,9 @@ def with_overrides(self, *args, **kwargs):
for k, v in alias_dict.items():
self._aliases.append(_workflow_model.Alias(var=k, alias=v))
if "requests" in kwargs or "limits" in kwargs:
requests = kwargs.get("requests")
if requests and not isinstance(requests, Resources):
raise AssertionError("requests should be specified as flytekit.Resources")
limits = kwargs.get("limits")
if limits and not isinstance(limits, Resources):
raise AssertionError("limits should be specified as flytekit.Resources")

self._resources = convert_resources_to_resource_model(requests=requests, limits=limits)
requests = _convert_resource_overrides(kwargs.get("requests"), "requests")
limits = _convert_resource_overrides(kwargs.get("limits"), "limits")
self._resources = _resources_model(requests=requests, limits=limits)
if "timeout" in kwargs:
timeout = kwargs["timeout"]
if timeout is None:
Expand Down Expand Up @@ -134,7 +129,8 @@ def _convert_resource_overrides(
) -> [_resources_model.ResourceEntry]:
if resources is None:
return []

if not isinstance(resources, Resources):
raise AssertionError(f"{resource_name} should be specified as flytekit.Resources")
resource_entries = []
if resources.cpu is not None:
resource_entries.append(_resources_model.ResourceEntry(_resources_model.ResourceName.CPU, resources.cpu))
Expand Down
43 changes: 1 addition & 42 deletions flytekit/core/resources.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from dataclasses import dataclass
from typing import List, Optional

from flytekit.models import task as task_models
from typing import Optional


@dataclass
Expand Down Expand Up @@ -37,42 +35,3 @@ class Resources(object):
class ResourceSpec(object):
requests: Optional[Resources] = None
limits: Optional[Resources] = None


_ResouceName = task_models.Resources.ResourceName
_ResourceEntry = task_models.Resources.ResourceEntry


def _convert_resources_to_resource_entries(resources: Resources) -> List[_ResourceEntry]:
resource_entries = []
if resources.cpu is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.CPU, value=resources.cpu))
if resources.mem is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.MEMORY, value=resources.mem))
if resources.gpu is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.GPU, value=resources.gpu))
if resources.storage is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.STORAGE, value=resources.storage))
if resources.ephemeral_storage is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.EPHEMERAL_STORAGE, value=resources.ephemeral_storage))
return resource_entries


def convert_resources_to_resource_model(
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
) -> task_models.Resources:
"""
Convert flytekit ``Resources`` objects to a Resources model
:param requests: Resource requests. Optional, defaults to ``None``
:param limits: Resource limits. Optional, defaults to ``None``
:return: The given resources as requests and limits
"""
request_entries = []
limit_entries = []
if requests is not None:
request_entries = _convert_resources_to_resource_entries(requests)
if limits is not None:
limit_entries = _convert_resources_to_resource_entries(limits)
return task_models.Resources(requests=request_entries, limits=limit_entries)
37 changes: 19 additions & 18 deletions flytekit/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Dict, List, Optional

from flytekit.loggers import logger
from flytekit.models import task as task_models
from flytekit.models import task as _task_models


def _dnsify(value: str) -> str:
Expand Down Expand Up @@ -52,7 +52,7 @@ def _get_container_definition(
image: str,
command: List[str],
args: List[str],
data_loading_config: Optional[task_models.DataLoadingConfig] = None,
data_loading_config: Optional[_task_models.DataLoadingConfig] = None,
storage_request: Optional[str] = None,
ephemeral_storage_request: Optional[str] = None,
cpu_request: Optional[str] = None,
Expand All @@ -64,7 +64,7 @@ def _get_container_definition(
gpu_limit: Optional[str] = None,
memory_limit: Optional[str] = None,
environment: Optional[Dict[str, str]] = None,
) -> task_models.Container:
) -> _task_models.Container:
storage_limit = storage_limit
storage_request = storage_request
ephemeral_storage_limit = ephemeral_storage_limit
Expand All @@ -76,49 +76,50 @@ def _get_container_definition(
memory_limit = memory_limit
memory_request = memory_request

# TODO: Use convert_resources_to_resource_model instead of manually fixing the resources.
requests = []
if storage_request:
requests.append(
task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.STORAGE, storage_request)
_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.STORAGE, storage_request)
)
if ephemeral_storage_request:
requests.append(
task_models.Resources.ResourceEntry(
task_models.Resources.ResourceName.EPHEMERAL_STORAGE, ephemeral_storage_request
_task_models.Resources.ResourceEntry(
_task_models.Resources.ResourceName.EPHEMERAL_STORAGE, ephemeral_storage_request
)
)
if cpu_request:
requests.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.CPU, cpu_request))
requests.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.CPU, cpu_request))
if gpu_request:
requests.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.GPU, gpu_request))
requests.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.GPU, gpu_request))
if memory_request:
requests.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.MEMORY, memory_request))
requests.append(
_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.MEMORY, memory_request)
)

limits = []
if storage_limit:
limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.STORAGE, storage_limit))
limits.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.STORAGE, storage_limit))
if ephemeral_storage_limit:
limits.append(
task_models.Resources.ResourceEntry(
task_models.Resources.ResourceName.EPHEMERAL_STORAGE, ephemeral_storage_limit
_task_models.Resources.ResourceEntry(
_task_models.Resources.ResourceName.EPHEMERAL_STORAGE, ephemeral_storage_limit
)
)
if cpu_limit:
limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.CPU, cpu_limit))
limits.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.CPU, cpu_limit))
if gpu_limit:
limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.GPU, gpu_limit))
limits.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.GPU, gpu_limit))
if memory_limit:
limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.MEMORY, memory_limit))
limits.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.MEMORY, memory_limit))

if environment is None:
environment = {}

return task_models.Container(
return _task_models.Container(
image=image,
command=command,
args=args,
resources=task_models.Resources(limits=limits, requests=requests),
resources=_task_models.Resources(limits=limits, requests=requests),
env=environment,
config={},
data_loading_config=data_loading_config,
Expand Down
1 change: 0 additions & 1 deletion plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ All the Flytekit plugins maintained by the core team are added here. It is not n
| Plugin | Installation | Description | Version | Type |
|------------------------------|-----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
| AWS Sagemaker Training | ```bash pip install flytekitplugins-awssagemaker ``` | Installs SDK to author Sagemaker built-in and custom training jobs in python | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-awssagemaker.svg)](https://pypi.python.org/pypi/flytekitplugins-awssagemaker/) | Backend |
| dask | ```bash pip install flytekitplugins-dask ``` | Installs SDK to author dask jobs that can be executed natively on Kubernetes using the Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-awssagemaker.svg)](https://pypi.python.org/pypi/flytekitplugins-dask/) | Backend |
| Hive Queries | ```bash pip install flytekitplugins-hive ``` | Installs SDK to author Hive Queries that can be executed on a configured hive backend using Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-hive.svg)](https://pypi.python.org/pypi/flytekitplugins-hive/) | Backend |
| K8s distributed PyTorch Jobs | ```bash pip install flytekitplugins-kfpytorch ``` | Installs SDK to author Distributed pyTorch Jobs in python using Kubeflow PyTorch Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-kfpytorch.svg)](https://pypi.python.org/pypi/flytekitplugins-kfpytorch/) | Backend |
| K8s native tensorflow Jobs | ```bash pip install flytekitplugins-kftensorflow ``` | Installs SDK to author Distributed tensorflow Jobs in python using Kubeflow Tensorflow Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-kftensorflow.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend |
Expand Down
21 changes: 0 additions & 21 deletions plugins/flytekit-dask/README.md

This file was deleted.

15 changes: 0 additions & 15 deletions plugins/flytekit-dask/flytekitplugins/dask/__init__.py

This file was deleted.

Loading

0 comments on commit abfcbb4

Please sign in to comment.