Skip to content

Commit

Permalink
Incorporate review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
eapolinario committed Jan 12, 2023
1 parent 1e6e3e1 commit aadf78c
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 39 deletions.
10 changes: 5 additions & 5 deletions flytekit/core/resources.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dataclasses import dataclass
from typing import List, Optional

from flytekit.models import task as _task_models
from flytekit.models import task as task_models


@dataclass
Expand Down Expand Up @@ -39,8 +39,8 @@ class ResourceSpec(object):
limits: Optional[Resources] = None


_ResouceName = _task_models.Resources.ResourceName
_ResourceEntry = _task_models.Resources.ResourceEntry
_ResouceName = task_models.Resources.ResourceName
_ResourceEntry = task_models.Resources.ResourceEntry


def _convert_resources_to_resource_entries(resources: Resources) -> List[_ResourceEntry]:
Expand All @@ -61,7 +61,7 @@ def _convert_resources_to_resource_entries(resources: Resources) -> List[_Resour
def convert_resources_to_resource_model(
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
) -> _task_models.Resources:
) -> task_models.Resources:
"""
Convert flytekit ``Resources`` objects to a Resources model
Expand All @@ -75,4 +75,4 @@ def convert_resources_to_resource_model(
request_entries = _convert_resources_to_resource_entries(requests)
if limits is not None:
limit_entries = _convert_resources_to_resource_entries(limits)
return _task_models.Resources(requests=request_entries, limits=limit_entries)
return task_models.Resources(requests=request_entries, limits=limit_entries)
35 changes: 18 additions & 17 deletions flytekit/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Dict, List, Optional

from flytekit.loggers import logger
from flytekit.models import task as _task_models
from flytekit.models import task as task_models


def _dnsify(value: str) -> str:
Expand Down Expand Up @@ -52,7 +52,7 @@ def _get_container_definition(
image: str,
command: List[str],
args: List[str],
data_loading_config: Optional[_task_models.DataLoadingConfig] = None,
data_loading_config: Optional[task_models.DataLoadingConfig] = None,
storage_request: Optional[str] = None,
ephemeral_storage_request: Optional[str] = None,
cpu_request: Optional[str] = None,
Expand All @@ -64,7 +64,7 @@ def _get_container_definition(
gpu_limit: Optional[str] = None,
memory_limit: Optional[str] = None,
environment: Optional[Dict[str, str]] = None,
) -> _task_models.Container:
) -> task_models.Container:
storage_limit = storage_limit
storage_request = storage_request
ephemeral_storage_limit = ephemeral_storage_limit
Expand All @@ -76,50 +76,51 @@ def _get_container_definition(
memory_limit = memory_limit
memory_request = memory_request

# TODO: Use convert_resources_to_resource_model instead of manually fixing the resources.
requests = []
if storage_request:
requests.append(
_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.STORAGE, storage_request)
task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.STORAGE, storage_request)
)
if ephemeral_storage_request:
requests.append(
_task_models.Resources.ResourceEntry(
_task_models.Resources.ResourceName.EPHEMERAL_STORAGE, ephemeral_storage_request
task_models.Resources.ResourceEntry(
task_models.Resources.ResourceName.EPHEMERAL_STORAGE, ephemeral_storage_request
)
)
if cpu_request:
requests.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.CPU, cpu_request))
requests.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.CPU, cpu_request))
if gpu_request:
requests.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.GPU, gpu_request))
requests.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.GPU, gpu_request))
if memory_request:
requests.append(
_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.MEMORY, memory_request)
task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.MEMORY, memory_request)
)

limits = []
if storage_limit:
limits.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.STORAGE, storage_limit))
limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.STORAGE, storage_limit))
if ephemeral_storage_limit:
limits.append(
_task_models.Resources.ResourceEntry(
_task_models.Resources.ResourceName.EPHEMERAL_STORAGE, ephemeral_storage_limit
task_models.Resources.ResourceEntry(
task_models.Resources.ResourceName.EPHEMERAL_STORAGE, ephemeral_storage_limit
)
)
if cpu_limit:
limits.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.CPU, cpu_limit))
limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.CPU, cpu_limit))
if gpu_limit:
limits.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.GPU, gpu_limit))
limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.GPU, gpu_limit))
if memory_limit:
limits.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.MEMORY, memory_limit))
limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.MEMORY, memory_limit))

if environment is None:
environment = {}

return _task_models.Container(
return task_models.Container(
image=image,
command=command,
args=args,
resources=_task_models.Resources(limits=limits, requests=requests),
resources=task_models.Resources(limits=limits, requests=requests),
env=environment,
config={},
data_loading_config=data_loading_config,
Expand Down
2 changes: 1 addition & 1 deletion plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ All the Flytekit plugins maintained by the core team are added here. It is not n
| Plugin | Installation | Description | Version | Type |
|------------------------------|-----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
| AWS Sagemaker Training | ```bash pip install flytekitplugins-awssagemaker ``` | Installs SDK to author Sagemaker built-in and custom training jobs in python | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-awssagemaker.svg)](https://pypi.python.org/pypi/flytekitplugins-awssagemaker/) | Backend |
| dask | ```bash pip install flytekitplugins-dask ``` | Installs SDK to author dask jobs that can be executed natively on Kubernetes using the Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-awssagemaker.svg)](https://pypi.python.org/pypi/flytekitplugins-awssagemaker/) | Backend |
| dask | ```bash pip install flytekitplugins-dask ``` | Installs SDK to author dask jobs that can be executed natively on Kubernetes using the Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-awssagemaker.svg)](https://pypi.python.org/pypi/flytekitplugins-dask/) | Backend |
| Hive Queries | ```bash pip install flytekitplugins-hive ``` | Installs SDK to author Hive Queries that can be executed on a configured hive backend using Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-hive.svg)](https://pypi.python.org/pypi/flytekitplugins-hive/) | Backend |
| K8s distributed PyTorch Jobs | ```bash pip install flytekitplugins-kfpytorch ``` | Installs SDK to author Distributed pyTorch Jobs in python using Kubeflow PyTorch Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-kfpytorch.svg)](https://pypi.python.org/pypi/flytekitplugins-kfpytorch/) | Backend |
| K8s native tensorflow Jobs | ```bash pip install flytekitplugins-kftensorflow ``` | Installs SDK to author Distributed tensorflow Jobs in python using Kubeflow Tensorflow Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-kftensorflow.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend |
Expand Down
32 changes: 16 additions & 16 deletions plugins/flytekit-dask/flytekitplugins/dask/models.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
from typing import Optional

from flyteidl.plugins import dask_pb2 as _dask_task
from flyteidl.plugins import dask_pb2 as dask_task

from flytekit.models import common as _common
from flytekit.models import task as _task
from flytekit.models import common as common
from flytekit.models import task as task


class Scheduler(_common.FlyteIdlEntity):
class Scheduler(common.FlyteIdlEntity):
"""
Configuration for the scheduler pod
:param image: Optional image to use.
:param resources: Optional resources to use.
"""

def __init__(self, image: Optional[str] = None, resources: Optional[_task.Resources] = None):
def __init__(self, image: Optional[str] = None, resources: Optional[task.Resources] = None):
self._image = image
self._resources = resources

Expand All @@ -26,23 +26,23 @@ def image(self) -> Optional[str]:
return self._image

@property
def resources(self) -> Optional[_task.Resources]:
def resources(self) -> Optional[task.Resources]:
"""
:return: Optional resources for the scheduler pod
"""
return self._resources

def to_flyte_idl(self) -> _dask_task.DaskScheduler:
def to_flyte_idl(self) -> dask_task.DaskScheduler:
"""
:return: The scheduler spec serialized to protobuf
"""
return _dask_task.DaskScheduler(
return dask_task.DaskScheduler(
image=self.image,
resources=self.resources.to_flyte_idl() if self.resources else None,
)


class WorkerGroup(_common.FlyteIdlEntity):
class WorkerGroup(common.FlyteIdlEntity):
"""
Configuration for a dask worker group
Expand All @@ -55,7 +55,7 @@ def __init__(
self,
number_of_workers: int,
image: Optional[str] = None,
resources: Optional[_task.Resources] = None,
resources: Optional[task.Resources] = None,
):
if number_of_workers < 1:
raise ValueError(
Expand All @@ -81,24 +81,24 @@ def image(self) -> Optional[str]:
return self._image

@property
def resources(self) -> Optional[_task.Resources]:
def resources(self) -> Optional[task.Resources]:
"""
:return: Optional resources to use for the worker pods
"""
return self._resources

def to_flyte_idl(self) -> _dask_task.DaskWorkerGroup:
def to_flyte_idl(self) -> dask_task.DaskWorkerGroup:
"""
:return: The dask cluster serialized to protobuf
"""
return _dask_task.DaskWorkerGroup(
return dask_task.DaskWorkerGroup(
number_of_workers=self.number_of_workers,
image=self.image,
resources=self.resources.to_flyte_idl() if self.resources else None,
)


class DaskJob(_common.FlyteIdlEntity):
class DaskJob(common.FlyteIdlEntity):
"""
Configuration for the custom dask job to run
Expand All @@ -124,11 +124,11 @@ def workers(self) -> WorkerGroup:
"""
return self._workers

def to_flyte_idl(self) -> _dask_task.DaskJob:
def to_flyte_idl(self) -> dask_task.DaskJob:
"""
:return: The dask job serialized to protobuf
"""
return _dask_task.DaskJob(
return dask_task.DaskJob(
scheduler=self.scheduler.to_flyte_idl(),
workers=self.workers.to_flyte_idl(),
)

0 comments on commit aadf78c

Please sign in to comment.