From 5f551d95823505011ad90871b63e32a70c9abde8 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Tue, 19 Nov 2024 21:42:49 -0800 Subject: [PATCH 01/25] expose requests & limits instead of k8spod Signed-off-by: Jan Fiedler --- .../flytekitplugins/ray/models.py | 119 +++++++++++++++--- .../flytekit-ray/flytekitplugins/ray/task.py | 26 +++- plugins/flytekit-ray/setup.py | 7 +- 3 files changed, 131 insertions(+), 21 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 1f3a830f16..3c009dee5a 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -1,9 +1,58 @@ import typing - +from dataclasses import fields from flyteidl.plugins import ray_pb2 as _ray_pb2 from flytekit.models import common as _common -from flytekit.models.task import K8sPod +from flytekit.models.task import K8sPod, K8sObjectMetadata +from flytekit.core.resources import Resources +from kubernetes.client import V1PodSpec, V1Container, V1ResourceRequirements + + +def construct_k8s_pod_spec( + k8s_pod_name: str, + requests: typing.Optional[Resources], + limits: typing.Optional[Resources], +) -> dict[str, typing.Any]: + + def construct_k8s_pods_resources(resources: typing.Optional[Resources]): + if resources is None: + return None + + resources_map = { + "cpu": "cpu", + "mem": "memory", + "gpu": "nvidia.com/gpu", + "ephemeral_storage": "ephemeral-storage", + } + + k8s_pod_resources = {} + + for resource in fields(resources): + resource_value = getattr(resources, resource.name) + if resource_value is not None: + k8s_pod_resources[resources_map[resource.name]] = resource_value + + print(k8s_pod_resources) + return k8s_pod_resources + + requests = construct_k8s_pods_resources(resources=requests) + limits = construct_k8s_pods_resources(resources=limits) + requests = requests or limits + limits = limits or requests + + k8s_pod = V1PodSpec( + containers=[ + V1Container( + name=k8s_pod_name, + resources=V1ResourceRequirements( + requests=requests, + limits=limits, + ), + ) + ] + ) + + return k8s_pod.to_dict() class WorkerGroupSpec(_common.FlyteIdlEntity): @@ -14,14 +63,27 @@ def __init__( min_replicas: typing.Optional[int] = None, max_replicas: typing.Optional[int] = None, ray_start_params: typing.Optional[typing.Dict[str, str]] = None, - k8s_pod: typing.Optional[K8sPod] = None, + requests: typing.Optional[Resources] = None, + limits: typing.Optional[Resources] = None, ): self._group_name = group_name self._replicas = replicas - self._max_replicas = max(replicas, max_replicas) if max_replicas is not None else replicas - self._min_replicas = min(replicas, min_replicas) if min_replicas is not None else replicas + self._max_replicas = ( + max(replicas, max_replicas) if max_replicas is not None else replicas + ) + self._min_replicas = ( + min(replicas, min_replicas) if min_replicas is not None else replicas + ) self._ray_start_params = ray_start_params - self._k8s_pod = k8s_pod + self._requests = requests + self._limits = limits + self._k8s_pod = K8sPod( + metadata=K8sObjectMetadata(), + pod_spec=construct_k8s_pod_spec( + k8s_pod_name="ray-worker", requests=self._requests, limits=self._limits + ), + ) + # exit(0) @property def group_name(self): @@ -96,7 +158,11 @@ def from_flyte_idl(cls, proto): min_replicas=proto.min_replicas, max_replicas=proto.max_replicas, ray_start_params=proto.ray_start_params, - k8s_pod=K8sPod.from_flyte_idl(proto.k8s_pod) if proto.HasField("k8s_pod") else None, + k8s_pod=( + K8sPod.from_flyte_idl(proto.k8s_pod) + if proto.HasField("k8s_pod") + else None + ), ) @@ -104,10 +170,19 @@ class HeadGroupSpec(_common.FlyteIdlEntity): def __init__( self, ray_start_params: typing.Optional[typing.Dict[str, str]] = None, - k8s_pod: typing.Optional[K8sPod] = None, + requests: typing.Optional[Resources] = None, + limits: typing.Optional[Resources] = None, ): self._ray_start_params = ray_start_params - self._k8s_pod = k8s_pod + self._requests = requests + self._limits = limits + + self._k8s_pod = K8sPod( + metadata=K8sObjectMetadata(), + pod_spec=construct_k8s_pod_spec( + k8s_pod_name="ray-head", requests=self._requests, limits=self._limits + ), + ) @property def ray_start_params(self): @@ -142,7 +217,11 @@ def from_flyte_idl(cls, proto): """ return cls( ray_start_params=proto.ray_start_params, - k8s_pod=K8sPod.from_flyte_idl(proto.k8s_pod) if proto.HasField("k8s_pod") else None, + k8s_pod=( + K8sPod.from_flyte_idl(proto.k8s_pod) + if proto.HasField("k8s_pod") + else None + ), ) @@ -190,7 +269,9 @@ def to_flyte_idl(self) -> _ray_pb2.RayCluster: :rtype: flyteidl.plugins._ray_pb2.RayCluster """ return _ray_pb2.RayCluster( - head_group_spec=self.head_group_spec.to_flyte_idl() if self.head_group_spec else None, + head_group_spec=( + self.head_group_spec.to_flyte_idl() if self.head_group_spec else None + ), worker_group_spec=[wg.to_flyte_idl() for wg in self.worker_group_spec], enable_autoscaling=self.enable_autoscaling, ) @@ -202,8 +283,14 @@ def from_flyte_idl(cls, proto): :rtype: RayCluster """ return cls( - head_group_spec=HeadGroupSpec.from_flyte_idl(proto.head_group_spec) if proto.head_group_spec else None, - worker_group_spec=[WorkerGroupSpec.from_flyte_idl(wg) for wg in proto.worker_group_spec], + head_group_spec=( + HeadGroupSpec.from_flyte_idl(proto.head_group_spec) + if proto.head_group_spec + else None + ), + worker_group_spec=[ + WorkerGroupSpec.from_flyte_idl(wg) for wg in proto.worker_group_spec + ], enable_autoscaling=proto.enable_autoscaling, ) @@ -261,7 +348,11 @@ def to_flyte_idl(self) -> _ray_pb2.RayJob: @classmethod def from_flyte_idl(cls, proto: _ray_pb2.RayJob): return cls( - ray_cluster=RayCluster.from_flyte_idl(proto.ray_cluster) if proto.ray_cluster else None, + ray_cluster=( + RayCluster.from_flyte_idl(proto.ray_cluster) + if proto.ray_cluster + else None + ), runtime_env=proto.runtime_env, runtime_env_yaml=proto.runtime_env_yaml, ttl_seconds_after_finished=proto.ttl_seconds_after_finished, diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index 3620a0494c..18fc15a817 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -20,6 +20,7 @@ from flytekit.core.python_function_task import PythonFunctionTask from flytekit.extend import TaskPlugins from flytekit.models.task import K8sPod +from flytekit.core.resources import Resources ray = lazy_module("ray") @@ -27,7 +28,8 @@ @dataclass class HeadNodeConfig: ray_start_params: typing.Optional[typing.Dict[str, str]] = None - k8s_pod: typing.Optional[K8sPod] = None + requests: typing.Optional[Resources] = None + limits: typing.Optional[Resources] = None @dataclass @@ -37,7 +39,8 @@ class WorkerNodeConfig: min_replicas: typing.Optional[int] = None max_replicas: typing.Optional[int] = None ray_start_params: typing.Optional[typing.Dict[str, str]] = None - k8s_pod: typing.Optional[K8sPod] = None + requests: typing.Optional[Resources] = None + limits: typing.Optional[Resources] = None @dataclass @@ -85,14 +88,22 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] cfg = self._task_config # Deprecated: runtime_env is removed KubeRay >= 1.1.0. It is replaced by runtime_env_yaml - runtime_env = base64.b64encode(json.dumps(cfg.runtime_env).encode()).decode() if cfg.runtime_env else None + runtime_env = ( + base64.b64encode(json.dumps(cfg.runtime_env).encode()).decode() + if cfg.runtime_env + else None + ) runtime_env_yaml = yaml.dump(cfg.runtime_env) if cfg.runtime_env else None ray_job = RayJob( ray_cluster=RayCluster( head_group_spec=( - HeadGroupSpec(cfg.head_node_config.ray_start_params, cfg.head_node_config.k8s_pod) + HeadGroupSpec( + cfg.head_node_config.ray_start_params, + cfg.head_node_config.requests, + cfg.head_node_config.limits, + ) if cfg.head_node_config else None ), @@ -103,11 +114,14 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] c.min_replicas, c.max_replicas, c.ray_start_params, - c.k8s_pod, + c.requests, + c.limits, ) for c in cfg.worker_node_config ], - enable_autoscaling=(cfg.enable_autoscaling if cfg.enable_autoscaling else False), + enable_autoscaling=( + cfg.enable_autoscaling if cfg.enable_autoscaling else False + ), ), runtime_env=runtime_env, runtime_env_yaml=runtime_env_yaml, diff --git a/plugins/flytekit-ray/setup.py b/plugins/flytekit-ray/setup.py index 18b95498ee..c70a3629d2 100644 --- a/plugins/flytekit-ray/setup.py +++ b/plugins/flytekit-ray/setup.py @@ -4,7 +4,12 @@ microlib_name = f"flytekitplugins-{PLUGIN_NAME}" -plugin_requires = ["ray[default]", "flytekit>=1.3.0b2,<2.0.0", "flyteidl>=1.13.6"] +plugin_requires = [ + "ray[default]", + "flytekit>=1.3.0b2,<2.0.0", + "flyteidl>=1.13.6", + "kubernetes", +] __version__ = "0.0.0+develop" From efd58f0e208f76b1b4e68dafebe35691fd863382 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Wed, 20 Nov 2024 09:32:55 -0800 Subject: [PATCH 02/25] put construct_k8s_pod_spec_from_resources into core/resources.py Signed-off-by: Jan Fiedler --- flytekit/core/resources.py | 68 +++++++++++++++++-- .../flytekitplugins/ray/models.py | 54 +-------------- 2 files changed, 65 insertions(+), 57 deletions(-) diff --git a/flytekit/core/resources.py b/flytekit/core/resources.py index 8a99dbf2ea..e1b241b64d 100644 --- a/flytekit/core/resources.py +++ b/flytekit/core/resources.py @@ -1,9 +1,10 @@ -from dataclasses import dataclass -from typing import List, Optional, Union +from dataclasses import dataclass, fields +from typing import List, Optional, Union, Any from mashumaro.mixins.json import DataClassJSONMixin from flytekit.models import task as task_models +from kubernetes.client import V1PodSpec, V1Container, V1ResourceRequirements @dataclass @@ -66,14 +67,23 @@ class ResourceSpec(DataClassJSONMixin): def _convert_resources_to_resource_entries(resources: Resources) -> List[_ResourceEntry]: # type: ignore resource_entries = [] if resources.cpu is not None: - resource_entries.append(_ResourceEntry(name=_ResourceName.CPU, value=str(resources.cpu))) + resource_entries.append( + _ResourceEntry(name=_ResourceName.CPU, value=str(resources.cpu)) + ) if resources.mem is not None: - resource_entries.append(_ResourceEntry(name=_ResourceName.MEMORY, value=str(resources.mem))) + resource_entries.append( + _ResourceEntry(name=_ResourceName.MEMORY, value=str(resources.mem)) + ) if resources.gpu is not None: - resource_entries.append(_ResourceEntry(name=_ResourceName.GPU, value=str(resources.gpu))) + resource_entries.append( + _ResourceEntry(name=_ResourceName.GPU, value=str(resources.gpu)) + ) if resources.ephemeral_storage is not None: resource_entries.append( - _ResourceEntry(name=_ResourceName.EPHEMERAL_STORAGE, value=str(resources.ephemeral_storage)) + _ResourceEntry( + name=_ResourceName.EPHEMERAL_STORAGE, + value=str(resources.ephemeral_storage), + ) ) return resource_entries @@ -96,3 +106,49 @@ def convert_resources_to_resource_model( if limits is not None: limit_entries = _convert_resources_to_resource_entries(limits) return task_models.Resources(requests=request_entries, limits=limit_entries) + + +def construct_k8s_pod_spec_from_resources( + k8s_pod_name: str, + requests: Optional[Resources], + limits: Optional[Resources], +) -> dict[str, Any]: + + def construct_k8s_pods_resources(resources: Optional[Resources]): + if resources is None: + return None + + resources_map = { + "cpu": "cpu", + "mem": "memory", + "gpu": "nvidia.com/gpu", + "ephemeral_storage": "ephemeral-storage", + } + + k8s_pod_resources = {} + + for resource in fields(resources): + resource_value = getattr(resources, resource.name) + if resource_value is not None: + k8s_pod_resources[resources_map[resource.name]] = resource_value + + return k8s_pod_resources + + requests = construct_k8s_pods_resources(resources=requests) + limits = construct_k8s_pods_resources(resources=limits) + requests = requests or limits + limits = limits or requests + + k8s_pod = V1PodSpec( + containers=[ + V1Container( + name=k8s_pod_name, + resources=V1ResourceRequirements( + requests=requests, + limits=limits, + ), + ) + ] + ) + + return k8s_pod.to_dict() diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 3c009dee5a..1d4d790b4e 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -4,57 +4,10 @@ from flytekit.models import common as _common from flytekit.models.task import K8sPod, K8sObjectMetadata -from flytekit.core.resources import Resources +from flytekit.core.resources import Resources, construct_k8s_pod_spec_from_resources from kubernetes.client import V1PodSpec, V1Container, V1ResourceRequirements -def construct_k8s_pod_spec( - k8s_pod_name: str, - requests: typing.Optional[Resources], - limits: typing.Optional[Resources], -) -> dict[str, typing.Any]: - - def construct_k8s_pods_resources(resources: typing.Optional[Resources]): - if resources is None: - return None - - resources_map = { - "cpu": "cpu", - "mem": "memory", - "gpu": "nvidia.com/gpu", - "ephemeral_storage": "ephemeral-storage", - } - - k8s_pod_resources = {} - - for resource in fields(resources): - resource_value = getattr(resources, resource.name) - if resource_value is not None: - k8s_pod_resources[resources_map[resource.name]] = resource_value - - print(k8s_pod_resources) - return k8s_pod_resources - - requests = construct_k8s_pods_resources(resources=requests) - limits = construct_k8s_pods_resources(resources=limits) - requests = requests or limits - limits = limits or requests - - k8s_pod = V1PodSpec( - containers=[ - V1Container( - name=k8s_pod_name, - resources=V1ResourceRequirements( - requests=requests, - limits=limits, - ), - ) - ] - ) - - return k8s_pod.to_dict() - - class WorkerGroupSpec(_common.FlyteIdlEntity): def __init__( self, @@ -79,11 +32,10 @@ def __init__( self._limits = limits self._k8s_pod = K8sPod( metadata=K8sObjectMetadata(), - pod_spec=construct_k8s_pod_spec( + pod_spec=construct_k8s_pod_spec_from_resources( k8s_pod_name="ray-worker", requests=self._requests, limits=self._limits ), ) - # exit(0) @property def group_name(self): @@ -179,7 +131,7 @@ def __init__( self._k8s_pod = K8sPod( metadata=K8sObjectMetadata(), - pod_spec=construct_k8s_pod_spec( + pod_spec=construct_k8s_pod_spec_from_resources( k8s_pod_name="ray-head", requests=self._requests, limits=self._limits ), ) From 7c74bb788df3f6fc98617a95a8f87c56f62fef36 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Wed, 20 Nov 2024 10:24:33 -0800 Subject: [PATCH 03/25] adjust ray tests Signed-off-by: Jan Fiedler --- plugins/flytekit-ray/tests/test_ray.py | 37 ++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/plugins/flytekit-ray/tests/test_ray.py b/plugins/flytekit-ray/tests/test_ray.py index 737cdf6f4a..206c8ea04a 100644 --- a/plugins/flytekit-ray/tests/test_ray.py +++ b/plugins/flytekit-ray/tests/test_ray.py @@ -4,17 +4,31 @@ import ray import yaml from flytekitplugins.ray import HeadNodeConfig -from flytekitplugins.ray.models import RayCluster, RayJob, WorkerGroupSpec, HeadGroupSpec +from flytekitplugins.ray.models import ( + RayCluster, + RayJob, + WorkerGroupSpec, + HeadGroupSpec, +) from flytekitplugins.ray.task import RayJobConfig, WorkerNodeConfig from google.protobuf.json_format import MessageToDict -from flytekit.models.task import K8sPod +from flytekit.core.resources import Resources from flytekit import PythonFunctionTask, task from flytekit.configuration import Image, ImageConfig, SerializationSettings config = RayJobConfig( - worker_node_config=[WorkerNodeConfig(group_name="test_group", replicas=3, min_replicas=0, max_replicas=10, k8s_pod=K8sPod(pod_spec={"str": "worker", "int": 1}))], - head_node_config=HeadNodeConfig(k8s_pod=K8sPod(pod_spec={"str": "head", "int": 2})), + worker_node_config=[ + WorkerNodeConfig( + group_name="test_group", + replicas=3, + min_replicas=0, + max_replicas=10, + requests=Resources(cpu=2, mem="2Gi"), + limits=Resources(cpu=2, mem="4Gi"), + ) + ], + head_node_config=HeadNodeConfig(requests=Resources(cpu=2)), runtime_env={"pip": ["numpy"]}, enable_autoscaling=True, shutdown_after_job_finishes=True, @@ -44,7 +58,20 @@ def t1(a: int) -> str: ) ray_job_pb = RayJob( - ray_cluster=RayCluster(worker_group_spec=[WorkerGroupSpec(group_name="test_group", replicas=3, min_replicas=0, max_replicas=10, k8s_pod=K8sPod(pod_spec={"str": "worker", "int": 1}))], head_group_spec=HeadGroupSpec(k8s_pod=K8sPod(pod_spec={"str": "head", "int": 2})), enable_autoscaling=True), + ray_cluster=RayCluster( + worker_group_spec=[ + WorkerGroupSpec( + group_name="test_group", + replicas=3, + min_replicas=0, + max_replicas=10, + requests=Resources(cpu=2, mem="2Gi"), + limits=Resources(cpu=2, mem="4Gi"), + ) + ], + head_group_spec=HeadGroupSpec(requests=Resources(cpu=2)), + enable_autoscaling=True, + ), runtime_env=base64.b64encode(json.dumps({"pip": ["numpy"]}).encode()).decode(), runtime_env_yaml=yaml.dump({"pip": ["numpy"]}), shutdown_after_job_finishes=True, From 3d84516e94b28f740a982d5bef39d7256ef3ba1d Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Wed, 20 Nov 2024 10:42:16 -0800 Subject: [PATCH 04/25] ruff check fix Signed-off-by: Jan Fiedler --- flytekit/core/resources.py | 4 +- .../types/structured/structured_dataset.py | 2 +- .../flytekitplugins/ray/models.py | 7 +-- .../flytekit-ray/flytekitplugins/ray/task.py | 3 +- ray_demo.py | 59 +++++++++++++++++++ ray_helper.py | 39 ++++++++++++ 6 files changed, 105 insertions(+), 9 deletions(-) create mode 100644 ray_demo.py create mode 100644 ray_helper.py diff --git a/flytekit/core/resources.py b/flytekit/core/resources.py index e1b241b64d..8d819daa53 100644 --- a/flytekit/core/resources.py +++ b/flytekit/core/resources.py @@ -1,10 +1,10 @@ from dataclasses import dataclass, fields -from typing import List, Optional, Union, Any +from typing import Any, List, Optional, Union +from kubernetes.client import V1Container, V1PodSpec, V1ResourceRequirements from mashumaro.mixins.json import DataClassJSONMixin from flytekit.models import task as task_models -from kubernetes.client import V1PodSpec, V1Container, V1ResourceRequirements @dataclass diff --git a/flytekit/types/structured/structured_dataset.py b/flytekit/types/structured/structured_dataset.py index f4a2194749..040f40643a 100644 --- a/flytekit/types/structured/structured_dataset.py +++ b/flytekit/types/structured/structured_dataset.py @@ -1,6 +1,5 @@ from __future__ import annotations -import _datetime import collections import json import types @@ -9,6 +8,7 @@ from dataclasses import dataclass, field, is_dataclass from typing import Dict, Generator, Generic, List, Optional, Type, Union +import _datetime import msgpack from dataclasses_json import config from fsspec.utils import get_protocol diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 1d4d790b4e..6fca5b6310 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -1,11 +1,10 @@ import typing -from dataclasses import fields + from flyteidl.plugins import ray_pb2 as _ray_pb2 -from flytekit.models import common as _common -from flytekit.models.task import K8sPod, K8sObjectMetadata from flytekit.core.resources import Resources, construct_k8s_pod_spec_from_resources -from kubernetes.client import V1PodSpec, V1Container, V1ResourceRequirements +from flytekit.models import common as _common +from flytekit.models.task import K8sObjectMetadata, K8sPod class WorkerGroupSpec(_common.FlyteIdlEntity): diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index 18fc15a817..d7d3916091 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -18,9 +18,8 @@ from flytekit.configuration import SerializationSettings from flytekit.core.context_manager import ExecutionParameters, FlyteContextManager from flytekit.core.python_function_task import PythonFunctionTask -from flytekit.extend import TaskPlugins -from flytekit.models.task import K8sPod from flytekit.core.resources import Resources +from flytekit.extend import TaskPlugins ray = lazy_module("ray") diff --git a/ray_demo.py b/ray_demo.py new file mode 100644 index 0000000000..5567470584 --- /dev/null +++ b/ray_demo.py @@ -0,0 +1,59 @@ +import typing + +import ray +from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig + +from flytekit import ImageSpec, Resources, task, workflow + +flytekit_hash = "2778db206bbea478908c4e529dcb63cd438b6065" +flytekitplugins_ray = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}#subdirectory=plugins/flytekit-ray" +new_flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}" + +container_image = ImageSpec( + name="ray-union-demo", + python_version="3.11.9", + apt_packages=["wget", "gdb", "git"], + packages=[ + new_flytekit, + flytekitplugins_ray, + "kubernetes", + ], + registry="ghcr.io/fiedlerNr9", +) +ray_config = RayJobConfig( + head_node_config=HeadNodeConfig( + ray_start_params={"num-cpus": "0", "log-color": "true"}, + requests=Resources(cpu="1", mem="3Gi"), + ), + worker_node_config=[ + WorkerNodeConfig( + group_name="ray-group", + replicas=0, + min_replicas=0, + max_replicas=2, + ) + ], + shutdown_after_job_finishes=True, + ttl_seconds_after_finished=120, + enable_autoscaling=True, +) + + +@ray.remote +def f(x): + return x * x + + +@task( + task_config=ray_config, + requests=Resources(mem="2Gi", cpu="3000m"), + container_image=container_image, +) +def ray_task(n: int) -> typing.List[int]: + futures = [f.remote(i) for i in range(n)] + return ray.get(futures) + + +@workflow +def wf(n: int = 50): + ray_task(n=n) diff --git a/ray_helper.py b/ray_helper.py new file mode 100644 index 0000000000..ba62424d62 --- /dev/null +++ b/ray_helper.py @@ -0,0 +1,39 @@ +import math +import random + +import ray + + +@ray.remote +class ProgressActor: + def __init__(self, total_num_samples: int): + self.total_num_samples = total_num_samples + self.num_samples_completed_per_task = {} + + def report_progress(self, task_id: int, num_samples_completed: int) -> None: + self.num_samples_completed_per_task[task_id] = num_samples_completed + + def get_progress(self) -> float: + return ( + sum(self.num_samples_completed_per_task.values()) / self.total_num_samples + ) + + +@ray.remote +def sampling_task( + num_samples: int, task_id: int, progress_actor: ray.actor.ActorHandle +) -> int: + num_inside = 0 + for i in range(num_samples): + x, y = random.uniform(-1, 1), random.uniform(-1, 1) + if math.hypot(x, y) <= 1: + num_inside += 1 + + # Report progress every 1 million samples. + if (i + 1) % 1_000_000 == 0: + # This is async. + progress_actor.report_progress.remote(task_id, i + 1) + + # Report the final progress. + progress_actor.report_progress.remote(task_id, num_samples) + return num_inside From d192625d8a1ef652d12f67682b6e77cbd14c7765 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Wed, 20 Nov 2024 10:53:05 -0800 Subject: [PATCH 05/25] ruff format Signed-off-by: Jan Fiedler --- flytekit/core/resources.py | 13 ++---- .../flytekitplugins/ray/models.py | 40 ++++--------------- .../flytekit-ray/flytekitplugins/ray/task.py | 10 +---- 3 files changed, 13 insertions(+), 50 deletions(-) diff --git a/flytekit/core/resources.py b/flytekit/core/resources.py index 8d819daa53..eaa5298ef7 100644 --- a/flytekit/core/resources.py +++ b/flytekit/core/resources.py @@ -67,17 +67,11 @@ class ResourceSpec(DataClassJSONMixin): def _convert_resources_to_resource_entries(resources: Resources) -> List[_ResourceEntry]: # type: ignore resource_entries = [] if resources.cpu is not None: - resource_entries.append( - _ResourceEntry(name=_ResourceName.CPU, value=str(resources.cpu)) - ) + resource_entries.append(_ResourceEntry(name=_ResourceName.CPU, value=str(resources.cpu))) if resources.mem is not None: - resource_entries.append( - _ResourceEntry(name=_ResourceName.MEMORY, value=str(resources.mem)) - ) + resource_entries.append(_ResourceEntry(name=_ResourceName.MEMORY, value=str(resources.mem))) if resources.gpu is not None: - resource_entries.append( - _ResourceEntry(name=_ResourceName.GPU, value=str(resources.gpu)) - ) + resource_entries.append(_ResourceEntry(name=_ResourceName.GPU, value=str(resources.gpu))) if resources.ephemeral_storage is not None: resource_entries.append( _ResourceEntry( @@ -113,7 +107,6 @@ def construct_k8s_pod_spec_from_resources( requests: Optional[Resources], limits: Optional[Resources], ) -> dict[str, Any]: - def construct_k8s_pods_resources(resources: Optional[Resources]): if resources is None: return None diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 6fca5b6310..2378455928 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -20,12 +20,8 @@ def __init__( ): self._group_name = group_name self._replicas = replicas - self._max_replicas = ( - max(replicas, max_replicas) if max_replicas is not None else replicas - ) - self._min_replicas = ( - min(replicas, min_replicas) if min_replicas is not None else replicas - ) + self._max_replicas = max(replicas, max_replicas) if max_replicas is not None else replicas + self._min_replicas = min(replicas, min_replicas) if min_replicas is not None else replicas self._ray_start_params = ray_start_params self._requests = requests self._limits = limits @@ -109,11 +105,7 @@ def from_flyte_idl(cls, proto): min_replicas=proto.min_replicas, max_replicas=proto.max_replicas, ray_start_params=proto.ray_start_params, - k8s_pod=( - K8sPod.from_flyte_idl(proto.k8s_pod) - if proto.HasField("k8s_pod") - else None - ), + k8s_pod=(K8sPod.from_flyte_idl(proto.k8s_pod) if proto.HasField("k8s_pod") else None), ) @@ -168,11 +160,7 @@ def from_flyte_idl(cls, proto): """ return cls( ray_start_params=proto.ray_start_params, - k8s_pod=( - K8sPod.from_flyte_idl(proto.k8s_pod) - if proto.HasField("k8s_pod") - else None - ), + k8s_pod=(K8sPod.from_flyte_idl(proto.k8s_pod) if proto.HasField("k8s_pod") else None), ) @@ -220,9 +208,7 @@ def to_flyte_idl(self) -> _ray_pb2.RayCluster: :rtype: flyteidl.plugins._ray_pb2.RayCluster """ return _ray_pb2.RayCluster( - head_group_spec=( - self.head_group_spec.to_flyte_idl() if self.head_group_spec else None - ), + head_group_spec=(self.head_group_spec.to_flyte_idl() if self.head_group_spec else None), worker_group_spec=[wg.to_flyte_idl() for wg in self.worker_group_spec], enable_autoscaling=self.enable_autoscaling, ) @@ -234,14 +220,8 @@ def from_flyte_idl(cls, proto): :rtype: RayCluster """ return cls( - head_group_spec=( - HeadGroupSpec.from_flyte_idl(proto.head_group_spec) - if proto.head_group_spec - else None - ), - worker_group_spec=[ - WorkerGroupSpec.from_flyte_idl(wg) for wg in proto.worker_group_spec - ], + head_group_spec=(HeadGroupSpec.from_flyte_idl(proto.head_group_spec) if proto.head_group_spec else None), + worker_group_spec=[WorkerGroupSpec.from_flyte_idl(wg) for wg in proto.worker_group_spec], enable_autoscaling=proto.enable_autoscaling, ) @@ -299,11 +279,7 @@ def to_flyte_idl(self) -> _ray_pb2.RayJob: @classmethod def from_flyte_idl(cls, proto: _ray_pb2.RayJob): return cls( - ray_cluster=( - RayCluster.from_flyte_idl(proto.ray_cluster) - if proto.ray_cluster - else None - ), + ray_cluster=(RayCluster.from_flyte_idl(proto.ray_cluster) if proto.ray_cluster else None), runtime_env=proto.runtime_env, runtime_env_yaml=proto.runtime_env_yaml, ttl_seconds_after_finished=proto.ttl_seconds_after_finished, diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index d7d3916091..57be2a0b4b 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -87,11 +87,7 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] cfg = self._task_config # Deprecated: runtime_env is removed KubeRay >= 1.1.0. It is replaced by runtime_env_yaml - runtime_env = ( - base64.b64encode(json.dumps(cfg.runtime_env).encode()).decode() - if cfg.runtime_env - else None - ) + runtime_env = base64.b64encode(json.dumps(cfg.runtime_env).encode()).decode() if cfg.runtime_env else None runtime_env_yaml = yaml.dump(cfg.runtime_env) if cfg.runtime_env else None @@ -118,9 +114,7 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] ) for c in cfg.worker_node_config ], - enable_autoscaling=( - cfg.enable_autoscaling if cfg.enable_autoscaling else False - ), + enable_autoscaling=(cfg.enable_autoscaling if cfg.enable_autoscaling else False), ), runtime_env=runtime_env, runtime_env_yaml=runtime_env_yaml, From 94b77972edeeb7e950278ac34c3054d8f4a13511 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Wed, 20 Nov 2024 10:53:35 -0800 Subject: [PATCH 06/25] remove demo files from PR Signed-off-by: Jan Fiedler --- ray_demo.py | 59 --------------------------------------------------- ray_helper.py | 39 ---------------------------------- 2 files changed, 98 deletions(-) delete mode 100644 ray_demo.py delete mode 100644 ray_helper.py diff --git a/ray_demo.py b/ray_demo.py deleted file mode 100644 index 5567470584..0000000000 --- a/ray_demo.py +++ /dev/null @@ -1,59 +0,0 @@ -import typing - -import ray -from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig - -from flytekit import ImageSpec, Resources, task, workflow - -flytekit_hash = "2778db206bbea478908c4e529dcb63cd438b6065" -flytekitplugins_ray = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}#subdirectory=plugins/flytekit-ray" -new_flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}" - -container_image = ImageSpec( - name="ray-union-demo", - python_version="3.11.9", - apt_packages=["wget", "gdb", "git"], - packages=[ - new_flytekit, - flytekitplugins_ray, - "kubernetes", - ], - registry="ghcr.io/fiedlerNr9", -) -ray_config = RayJobConfig( - head_node_config=HeadNodeConfig( - ray_start_params={"num-cpus": "0", "log-color": "true"}, - requests=Resources(cpu="1", mem="3Gi"), - ), - worker_node_config=[ - WorkerNodeConfig( - group_name="ray-group", - replicas=0, - min_replicas=0, - max_replicas=2, - ) - ], - shutdown_after_job_finishes=True, - ttl_seconds_after_finished=120, - enable_autoscaling=True, -) - - -@ray.remote -def f(x): - return x * x - - -@task( - task_config=ray_config, - requests=Resources(mem="2Gi", cpu="3000m"), - container_image=container_image, -) -def ray_task(n: int) -> typing.List[int]: - futures = [f.remote(i) for i in range(n)] - return ray.get(futures) - - -@workflow -def wf(n: int = 50): - ray_task(n=n) diff --git a/ray_helper.py b/ray_helper.py deleted file mode 100644 index ba62424d62..0000000000 --- a/ray_helper.py +++ /dev/null @@ -1,39 +0,0 @@ -import math -import random - -import ray - - -@ray.remote -class ProgressActor: - def __init__(self, total_num_samples: int): - self.total_num_samples = total_num_samples - self.num_samples_completed_per_task = {} - - def report_progress(self, task_id: int, num_samples_completed: int) -> None: - self.num_samples_completed_per_task[task_id] = num_samples_completed - - def get_progress(self) -> float: - return ( - sum(self.num_samples_completed_per_task.values()) / self.total_num_samples - ) - - -@ray.remote -def sampling_task( - num_samples: int, task_id: int, progress_actor: ray.actor.ActorHandle -) -> int: - num_inside = 0 - for i in range(num_samples): - x, y = random.uniform(-1, 1), random.uniform(-1, 1) - if math.hypot(x, y) <= 1: - num_inside += 1 - - # Report progress every 1 million samples. - if (i + 1) % 1_000_000 == 0: - # This is async. - progress_actor.report_progress.remote(task_id, i + 1) - - # Report the final progress. - progress_actor.report_progress.remote(task_id, num_samples) - return num_inside From 0b727074c3100e83e8fbae83cbe6c1c09d389273 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Wed, 20 Nov 2024 10:56:17 -0800 Subject: [PATCH 07/25] remove kubernetes from ray plugin dependencies Signed-off-by: Jan Fiedler --- plugins/flytekit-ray/setup.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/plugins/flytekit-ray/setup.py b/plugins/flytekit-ray/setup.py index c70a3629d2..18b95498ee 100644 --- a/plugins/flytekit-ray/setup.py +++ b/plugins/flytekit-ray/setup.py @@ -4,12 +4,7 @@ microlib_name = f"flytekitplugins-{PLUGIN_NAME}" -plugin_requires = [ - "ray[default]", - "flytekit>=1.3.0b2,<2.0.0", - "flyteidl>=1.13.6", - "kubernetes", -] +plugin_requires = ["ray[default]", "flytekit>=1.3.0b2,<2.0.0", "flyteidl>=1.13.6"] __version__ = "0.0.0+develop" From da7d6ae0665c11b4efca9284ca75565b9e2ff171 Mon Sep 17 00:00:00 2001 From: Jan Fiedler <89976021+fiedlerNr9@users.noreply.github.com> Date: Wed, 20 Nov 2024 20:03:17 +0100 Subject: [PATCH 08/25] Update structured_dataset.py Signed-off-by: Jan Fiedler --- flytekit/types/structured/structured_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/types/structured/structured_dataset.py b/flytekit/types/structured/structured_dataset.py index 040f40643a..f4a2194749 100644 --- a/flytekit/types/structured/structured_dataset.py +++ b/flytekit/types/structured/structured_dataset.py @@ -1,5 +1,6 @@ from __future__ import annotations +import _datetime import collections import json import types @@ -8,7 +9,6 @@ from dataclasses import dataclass, field, is_dataclass from typing import Dict, Generator, Generic, List, Optional, Type, Union -import _datetime import msgpack from dataclasses_json import config from fsspec.utils import get_protocol From 17d9d6606ee2460806e403fdba6f8755a3bfaefa Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Wed, 20 Nov 2024 13:36:55 -0800 Subject: [PATCH 09/25] add tests for construct_k8s_pod_spec_from_resources Signed-off-by: Jan Fiedler --- tests/flytekit/unit/core/test_resources.py | 56 +++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/tests/flytekit/unit/core/test_resources.py b/tests/flytekit/unit/core/test_resources.py index 5dd9926039..57885527ed 100644 --- a/tests/flytekit/unit/core/test_resources.py +++ b/tests/flytekit/unit/core/test_resources.py @@ -1,10 +1,14 @@ from typing import Dict import pytest +from kubernetes.client import V1Container, V1PodSpec, V1ResourceRequirements import flytekit.models.task as _task_models from flytekit import Resources -from flytekit.core.resources import convert_resources_to_resource_model +from flytekit.core.resources import ( + construct_k8s_pod_spec_from_resources, + convert_resources_to_resource_model, +) _ResourceName = _task_models.Resources.ResourceName @@ -101,3 +105,53 @@ def test_resources_round_trip(): json_str = original.to_json() result = Resources.from_json(json_str) assert original == result + + +def test_construct_k8s_pod_spec_from_resources_requests_limits_set(): + requests = Resources(cpu="1", mem="1Gi", gpu="1", ephemeral_storage="1Gi") + limits = Resources(cpu="4", mem="2Gi", gpu="1", ephemeral_storage="1Gi") + k8s_pod_name = "foo" + + expected_pod_spec = V1PodSpec( + containers=[ + V1Container( + name=k8s_pod_name, + resources=V1ResourceRequirements( + requests={ + "cpu": "1", + "memory": "1Gi", + "nvidia.com/gpu": "1", + "ephemeral-storage": "1Gi", + }, + limits={ + "cpu": "4", + "memory": "2Gi", + "nvidia.com/gpu": "1", + "ephemeral-storage": "1Gi", + }, + ), + ) + ] + ) + pod_spec = construct_k8s_pod_spec_from_resources(k8s_pod_name=k8s_pod_name, requests=requests, limits=limits) + assert expected_pod_spec == V1PodSpec(**pod_spec) + + +def test_construct_k8s_pod_spec_from_resources_requests_set(): + requests = Resources(cpu="1", mem="1Gi") + limits = None + k8s_pod_name = "foo" + + expected_pod_spec = V1PodSpec( + containers=[ + V1Container( + name=k8s_pod_name, + resources=V1ResourceRequirements( + requests={"cpu": "1", "memory": "1Gi"}, + limits={"cpu": "1", "memory": "1Gi"}, + ), + ) + ] + ) + pod_spec = construct_k8s_pod_spec_from_resources(k8s_pod_name=k8s_pod_name, requests=requests, limits=limits) + assert expected_pod_spec == V1PodSpec(**pod_spec) From 72a3339d9bf9d1cbc763a0f7fe7dd1fbb99b6730 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Wed, 20 Nov 2024 14:00:47 -0800 Subject: [PATCH 10/25] add kubernetes to pyproject.toml Signed-off-by: Jan Fiedler --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 4cba669364..862b516204 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ "jsonlines", "jsonpickle", "keyring>=18.0.1", + "kubernetes>=12.0.1", "markdown-it-py", "marshmallow-enum", "marshmallow-jsonschema>=0.12.0", From f3bd304261d3d8a47673a33a3bcd5831d1f9f268 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Thu, 21 Nov 2024 16:32:33 -0800 Subject: [PATCH 11/25] add underscore prefix to _construct_k8s_pods_resources Signed-off-by: Jan Fiedler --- flytekit/core/resources.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flytekit/core/resources.py b/flytekit/core/resources.py index eaa5298ef7..3f8f1de4d0 100644 --- a/flytekit/core/resources.py +++ b/flytekit/core/resources.py @@ -107,7 +107,7 @@ def construct_k8s_pod_spec_from_resources( requests: Optional[Resources], limits: Optional[Resources], ) -> dict[str, Any]: - def construct_k8s_pods_resources(resources: Optional[Resources]): + def _construct_k8s_pods_resources(resources: Optional[Resources]): if resources is None: return None @@ -127,8 +127,8 @@ def construct_k8s_pods_resources(resources: Optional[Resources]): return k8s_pod_resources - requests = construct_k8s_pods_resources(resources=requests) - limits = construct_k8s_pods_resources(resources=limits) + requests = _construct_k8s_pods_resources(resources=requests) + limits = _construct_k8s_pods_resources(resources=limits) requests = requests or limits limits = limits or requests From 76926418f4b2f4e8ee63f92249656ace9455c40a Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Thu, 21 Nov 2024 16:35:20 -0800 Subject: [PATCH 12/25] remove parantheses from_flyte_idl Signed-off-by: Jan Fiedler --- plugins/flytekit-ray/flytekitplugins/ray/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 2378455928..2af237b1c3 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -279,7 +279,7 @@ def to_flyte_idl(self) -> _ray_pb2.RayJob: @classmethod def from_flyte_idl(cls, proto: _ray_pb2.RayJob): return cls( - ray_cluster=(RayCluster.from_flyte_idl(proto.ray_cluster) if proto.ray_cluster else None), + ray_cluster=RayCluster.from_flyte_idl(proto.ray_cluster) if proto.ray_cluster else None, runtime_env=proto.runtime_env, runtime_env_yaml=proto.runtime_env_yaml, ttl_seconds_after_finished=proto.ttl_seconds_after_finished, From 0edfb6846aa2a1b9734b28bdeceaf2a675a55559 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Thu, 21 Nov 2024 16:39:22 -0800 Subject: [PATCH 13/25] expose k8s_gpu_resource_key Signed-off-by: Jan Fiedler --- flytekit/core/resources.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flytekit/core/resources.py b/flytekit/core/resources.py index 3f8f1de4d0..9575995ce4 100644 --- a/flytekit/core/resources.py +++ b/flytekit/core/resources.py @@ -107,14 +107,14 @@ def construct_k8s_pod_spec_from_resources( requests: Optional[Resources], limits: Optional[Resources], ) -> dict[str, Any]: - def _construct_k8s_pods_resources(resources: Optional[Resources]): + def _construct_k8s_pods_resources(resources: Optional[Resources], k8s_gpu_resource_key: str = "nvidia.com/gpu"): if resources is None: return None resources_map = { "cpu": "cpu", "mem": "memory", - "gpu": "nvidia.com/gpu", + "gpu": k8s_gpu_resource_key, "ephemeral_storage": "ephemeral-storage", } From 7676af6bc3df89f1b577a5feadcdbb3dc34f4127 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Thu, 21 Nov 2024 16:43:26 -0800 Subject: [PATCH 14/25] remove parantheses Signed-off-by: Jan Fiedler --- plugins/flytekit-ray/flytekitplugins/ray/models.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 2af237b1c3..295b119efb 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -105,7 +105,7 @@ def from_flyte_idl(cls, proto): min_replicas=proto.min_replicas, max_replicas=proto.max_replicas, ray_start_params=proto.ray_start_params, - k8s_pod=(K8sPod.from_flyte_idl(proto.k8s_pod) if proto.HasField("k8s_pod") else None), + k8s_pod=K8sPod.from_flyte_idl(proto.k8s_pod) if proto.HasField("k8s_pod") else None, ) @@ -160,7 +160,7 @@ def from_flyte_idl(cls, proto): """ return cls( ray_start_params=proto.ray_start_params, - k8s_pod=(K8sPod.from_flyte_idl(proto.k8s_pod) if proto.HasField("k8s_pod") else None), + k8s_pod=K8sPod.from_flyte_idl(proto.k8s_pod) if proto.HasField("k8s_pod") else None, ) @@ -208,7 +208,7 @@ def to_flyte_idl(self) -> _ray_pb2.RayCluster: :rtype: flyteidl.plugins._ray_pb2.RayCluster """ return _ray_pb2.RayCluster( - head_group_spec=(self.head_group_spec.to_flyte_idl() if self.head_group_spec else None), + head_group_spec=self.head_group_spec.to_flyte_idl() if self.head_group_spec else None, worker_group_spec=[wg.to_flyte_idl() for wg in self.worker_group_spec], enable_autoscaling=self.enable_autoscaling, ) @@ -220,7 +220,7 @@ def from_flyte_idl(cls, proto): :rtype: RayCluster """ return cls( - head_group_spec=(HeadGroupSpec.from_flyte_idl(proto.head_group_spec) if proto.head_group_spec else None), + head_group_spec=HeadGroupSpec.from_flyte_idl(proto.head_group_spec) if proto.head_group_spec else None, worker_group_spec=[WorkerGroupSpec.from_flyte_idl(wg) for wg in proto.worker_group_spec], enable_autoscaling=proto.enable_autoscaling, ) From ee738ae9058db5ae8ec1b800c4c0c9c57c4867b3 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Fri, 6 Dec 2024 10:53:48 -0800 Subject: [PATCH 15/25] rename & expose nvidia gpu key Signed-off-by: Jan Fiedler --- flytekit/core/resources.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/flytekit/core/resources.py b/flytekit/core/resources.py index 9575995ce4..0e446a16a5 100644 --- a/flytekit/core/resources.py +++ b/flytekit/core/resources.py @@ -102,12 +102,13 @@ def convert_resources_to_resource_model( return task_models.Resources(requests=request_entries, limits=limit_entries) -def construct_k8s_pod_spec_from_resources( +def pod_spec_from_resources( k8s_pod_name: str, requests: Optional[Resources], limits: Optional[Resources], + k8s_gpu_resource_key: str = "nvidia.com/gpu" ) -> dict[str, Any]: - def _construct_k8s_pods_resources(resources: Optional[Resources], k8s_gpu_resource_key: str = "nvidia.com/gpu"): + def _construct_k8s_pods_resources(resources: Optional[Resources], k8s_gpu_resource_key: str): if resources is None: return None @@ -127,8 +128,8 @@ def _construct_k8s_pods_resources(resources: Optional[Resources], k8s_gpu_resour return k8s_pod_resources - requests = _construct_k8s_pods_resources(resources=requests) - limits = _construct_k8s_pods_resources(resources=limits) + requests = _construct_k8s_pods_resources(resources=requests, k8s_gpu_resource_key=k8s_gpu_resource_key) + limits = _construct_k8s_pods_resources(resources=limits, k8s_gpu_resource_key=k8s_gpu_resource_key) requests = requests or limits limits = limits or requests From 8896764c279a9b4dc96e10cb0778f6e774772ceb Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Fri, 6 Dec 2024 10:54:09 -0800 Subject: [PATCH 16/25] adjust resource tests Signed-off-by: Jan Fiedler --- tests/flytekit/unit/core/test_resources.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/flytekit/unit/core/test_resources.py b/tests/flytekit/unit/core/test_resources.py index 57885527ed..1c09a111e3 100644 --- a/tests/flytekit/unit/core/test_resources.py +++ b/tests/flytekit/unit/core/test_resources.py @@ -6,7 +6,7 @@ import flytekit.models.task as _task_models from flytekit import Resources from flytekit.core.resources import ( - construct_k8s_pod_spec_from_resources, + pod_spec_from_resources, convert_resources_to_resource_model, ) @@ -107,7 +107,7 @@ def test_resources_round_trip(): assert original == result -def test_construct_k8s_pod_spec_from_resources_requests_limits_set(): +def test_pod_spec_from_resources_requests_limits_set(): requests = Resources(cpu="1", mem="1Gi", gpu="1", ephemeral_storage="1Gi") limits = Resources(cpu="4", mem="2Gi", gpu="1", ephemeral_storage="1Gi") k8s_pod_name = "foo" @@ -133,11 +133,11 @@ def test_construct_k8s_pod_spec_from_resources_requests_limits_set(): ) ] ) - pod_spec = construct_k8s_pod_spec_from_resources(k8s_pod_name=k8s_pod_name, requests=requests, limits=limits) + pod_spec = pod_spec_from_resources(k8s_pod_name=k8s_pod_name, requests=requests, limits=limits) assert expected_pod_spec == V1PodSpec(**pod_spec) -def test_construct_k8s_pod_spec_from_resources_requests_set(): +def test_pod_spec_from_resources_requests_set(): requests = Resources(cpu="1", mem="1Gi") limits = None k8s_pod_name = "foo" @@ -153,5 +153,5 @@ def test_construct_k8s_pod_spec_from_resources_requests_set(): ) ] ) - pod_spec = construct_k8s_pod_spec_from_resources(k8s_pod_name=k8s_pod_name, requests=requests, limits=limits) + pod_spec = pod_spec_from_resources(k8s_pod_name=k8s_pod_name, requests=requests, limits=limits) assert expected_pod_spec == V1PodSpec(**pod_spec) From a9efb966afca633de10108035370b8002f884445 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Fri, 6 Dec 2024 15:28:01 -0800 Subject: [PATCH 17/25] back to exposing k8s pod Signed-off-by: Jan Fiedler --- .../flytekitplugins/ray/models.py | 28 ++++--------------- .../flytekit-ray/flytekitplugins/ray/task.py | 13 ++++----- 2 files changed, 10 insertions(+), 31 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 295b119efb..1f3a830f16 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -2,9 +2,8 @@ from flyteidl.plugins import ray_pb2 as _ray_pb2 -from flytekit.core.resources import Resources, construct_k8s_pod_spec_from_resources from flytekit.models import common as _common -from flytekit.models.task import K8sObjectMetadata, K8sPod +from flytekit.models.task import K8sPod class WorkerGroupSpec(_common.FlyteIdlEntity): @@ -15,22 +14,14 @@ def __init__( min_replicas: typing.Optional[int] = None, max_replicas: typing.Optional[int] = None, ray_start_params: typing.Optional[typing.Dict[str, str]] = None, - requests: typing.Optional[Resources] = None, - limits: typing.Optional[Resources] = None, + k8s_pod: typing.Optional[K8sPod] = None, ): self._group_name = group_name self._replicas = replicas self._max_replicas = max(replicas, max_replicas) if max_replicas is not None else replicas self._min_replicas = min(replicas, min_replicas) if min_replicas is not None else replicas self._ray_start_params = ray_start_params - self._requests = requests - self._limits = limits - self._k8s_pod = K8sPod( - metadata=K8sObjectMetadata(), - pod_spec=construct_k8s_pod_spec_from_resources( - k8s_pod_name="ray-worker", requests=self._requests, limits=self._limits - ), - ) + self._k8s_pod = k8s_pod @property def group_name(self): @@ -113,19 +104,10 @@ class HeadGroupSpec(_common.FlyteIdlEntity): def __init__( self, ray_start_params: typing.Optional[typing.Dict[str, str]] = None, - requests: typing.Optional[Resources] = None, - limits: typing.Optional[Resources] = None, + k8s_pod: typing.Optional[K8sPod] = None, ): self._ray_start_params = ray_start_params - self._requests = requests - self._limits = limits - - self._k8s_pod = K8sPod( - metadata=K8sObjectMetadata(), - pod_spec=construct_k8s_pod_spec_from_resources( - k8s_pod_name="ray-head", requests=self._requests, limits=self._limits - ), - ) + self._k8s_pod = k8s_pod @property def ray_start_params(self): diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index 57be2a0b4b..8e925a3d0c 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -20,6 +20,7 @@ from flytekit.core.python_function_task import PythonFunctionTask from flytekit.core.resources import Resources from flytekit.extend import TaskPlugins +from flytekit.models.task import K8sPod ray = lazy_module("ray") @@ -27,8 +28,7 @@ @dataclass class HeadNodeConfig: ray_start_params: typing.Optional[typing.Dict[str, str]] = None - requests: typing.Optional[Resources] = None - limits: typing.Optional[Resources] = None + k8s_pod: typing.Optional[K8sPod] = None @dataclass @@ -38,8 +38,7 @@ class WorkerNodeConfig: min_replicas: typing.Optional[int] = None max_replicas: typing.Optional[int] = None ray_start_params: typing.Optional[typing.Dict[str, str]] = None - requests: typing.Optional[Resources] = None - limits: typing.Optional[Resources] = None + k8s_pod: typing.Optional[K8sPod] = None @dataclass @@ -96,8 +95,7 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] head_group_spec=( HeadGroupSpec( cfg.head_node_config.ray_start_params, - cfg.head_node_config.requests, - cfg.head_node_config.limits, + cfg.head_node_config.k8s_pod ) if cfg.head_node_config else None @@ -109,8 +107,7 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] c.min_replicas, c.max_replicas, c.ray_start_params, - c.requests, - c.limits, + c.k8s_pod ) for c in cfg.worker_node_config ], From 63fc026dc6dac8d52aa07faa65dec69f7f3506e1 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Fri, 6 Dec 2024 15:32:07 -0800 Subject: [PATCH 18/25] adjusts tests Signed-off-by: Jan Fiedler --- plugins/flytekit-ray/tests/test_ray.py | 27 ++++---------------------- 1 file changed, 4 insertions(+), 23 deletions(-) diff --git a/plugins/flytekit-ray/tests/test_ray.py b/plugins/flytekit-ray/tests/test_ray.py index 206c8ea04a..7155675363 100644 --- a/plugins/flytekit-ray/tests/test_ray.py +++ b/plugins/flytekit-ray/tests/test_ray.py @@ -12,23 +12,14 @@ ) from flytekitplugins.ray.task import RayJobConfig, WorkerNodeConfig from google.protobuf.json_format import MessageToDict -from flytekit.core.resources import Resources +from flytekit.models.task import K8sPod from flytekit import PythonFunctionTask, task from flytekit.configuration import Image, ImageConfig, SerializationSettings config = RayJobConfig( - worker_node_config=[ - WorkerNodeConfig( - group_name="test_group", - replicas=3, - min_replicas=0, - max_replicas=10, - requests=Resources(cpu=2, mem="2Gi"), - limits=Resources(cpu=2, mem="4Gi"), - ) - ], - head_node_config=HeadNodeConfig(requests=Resources(cpu=2)), + worker_node_config=[WorkerNodeConfig(group_name="test_group", replicas=3, min_replicas=0, max_replicas=10, k8s_pod=K8sPod(pod_spec={"str": "worker", "int": 1}))], + head_node_config=HeadNodeConfig(k8s_pod=K8sPod(pod_spec={"str": "head", "int": 2})), runtime_env={"pip": ["numpy"]}, enable_autoscaling=True, shutdown_after_job_finishes=True, @@ -59,17 +50,7 @@ def t1(a: int) -> str: ray_job_pb = RayJob( ray_cluster=RayCluster( - worker_group_spec=[ - WorkerGroupSpec( - group_name="test_group", - replicas=3, - min_replicas=0, - max_replicas=10, - requests=Resources(cpu=2, mem="2Gi"), - limits=Resources(cpu=2, mem="4Gi"), - ) - ], - head_group_spec=HeadGroupSpec(requests=Resources(cpu=2)), + ray_cluster=RayCluster(worker_group_spec=[WorkerGroupSpec(group_name="test_group", replicas=3, min_replicas=0, max_replicas=10, k8s_pod=K8sPod(pod_spec={"str": "worker", "int": 1}))], head_group_spec=HeadGroupSpec(k8s_pod=K8sPod(pod_spec={"str": "head", "int": 2})), enable_autoscaling=True), enable_autoscaling=True, ), runtime_env=base64.b64encode(json.dumps({"pip": ["numpy"]}).encode()).decode(), From 60b20a58838bfc2e15d647d1d461475274447287 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Fri, 6 Dec 2024 15:34:42 -0800 Subject: [PATCH 19/25] ruff Signed-off-by: Jan Fiedler --- .../flytekit-ray/flytekitplugins/ray/task.py | 13 ++------- plugins/flytekit-ray/tests/test_ray.py | 28 ++++++++++++++++--- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index 8e925a3d0c..98a653a990 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -18,7 +18,6 @@ from flytekit.configuration import SerializationSettings from flytekit.core.context_manager import ExecutionParameters, FlyteContextManager from flytekit.core.python_function_task import PythonFunctionTask -from flytekit.core.resources import Resources from flytekit.extend import TaskPlugins from flytekit.models.task import K8sPod @@ -93,21 +92,13 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] ray_job = RayJob( ray_cluster=RayCluster( head_group_spec=( - HeadGroupSpec( - cfg.head_node_config.ray_start_params, - cfg.head_node_config.k8s_pod - ) + HeadGroupSpec(cfg.head_node_config.ray_start_params, cfg.head_node_config.k8s_pod) if cfg.head_node_config else None ), worker_group_spec=[ WorkerGroupSpec( - c.group_name, - c.replicas, - c.min_replicas, - c.max_replicas, - c.ray_start_params, - c.k8s_pod + c.group_name, c.replicas, c.min_replicas, c.max_replicas, c.ray_start_params, c.k8s_pod ) for c in cfg.worker_node_config ], diff --git a/plugins/flytekit-ray/tests/test_ray.py b/plugins/flytekit-ray/tests/test_ray.py index 7155675363..d248108ed7 100644 --- a/plugins/flytekit-ray/tests/test_ray.py +++ b/plugins/flytekit-ray/tests/test_ray.py @@ -5,20 +5,28 @@ import yaml from flytekitplugins.ray import HeadNodeConfig from flytekitplugins.ray.models import ( + HeadGroupSpec, RayCluster, RayJob, WorkerGroupSpec, - HeadGroupSpec, ) from flytekitplugins.ray.task import RayJobConfig, WorkerNodeConfig from google.protobuf.json_format import MessageToDict -from flytekit.models.task import K8sPod from flytekit import PythonFunctionTask, task from flytekit.configuration import Image, ImageConfig, SerializationSettings +from flytekit.models.task import K8sPod config = RayJobConfig( - worker_node_config=[WorkerNodeConfig(group_name="test_group", replicas=3, min_replicas=0, max_replicas=10, k8s_pod=K8sPod(pod_spec={"str": "worker", "int": 1}))], + worker_node_config=[ + WorkerNodeConfig( + group_name="test_group", + replicas=3, + min_replicas=0, + max_replicas=10, + k8s_pod=K8sPod(pod_spec={"str": "worker", "int": 1}), + ) + ], head_node_config=HeadNodeConfig(k8s_pod=K8sPod(pod_spec={"str": "head", "int": 2})), runtime_env={"pip": ["numpy"]}, enable_autoscaling=True, @@ -50,7 +58,19 @@ def t1(a: int) -> str: ray_job_pb = RayJob( ray_cluster=RayCluster( - ray_cluster=RayCluster(worker_group_spec=[WorkerGroupSpec(group_name="test_group", replicas=3, min_replicas=0, max_replicas=10, k8s_pod=K8sPod(pod_spec={"str": "worker", "int": 1}))], head_group_spec=HeadGroupSpec(k8s_pod=K8sPod(pod_spec={"str": "head", "int": 2})), enable_autoscaling=True), + ray_cluster=RayCluster( + worker_group_spec=[ + WorkerGroupSpec( + group_name="test_group", + replicas=3, + min_replicas=0, + max_replicas=10, + k8s_pod=K8sPod(pod_spec={"str": "worker", "int": 1}), + ) + ], + head_group_spec=HeadGroupSpec(k8s_pod=K8sPod(pod_spec={"str": "head", "int": 2})), + enable_autoscaling=True, + ), enable_autoscaling=True, ), runtime_env=base64.b64encode(json.dumps({"pip": ["numpy"]}).encode()).decode(), From 3bcec9c12a362c57838db797ff10f464df28b249 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Fri, 6 Dec 2024 15:40:00 -0800 Subject: [PATCH 20/25] ruff Signed-off-by: Jan Fiedler --- flytekit/core/resources.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/core/resources.py b/flytekit/core/resources.py index 0e446a16a5..5d17b9b4aa 100644 --- a/flytekit/core/resources.py +++ b/flytekit/core/resources.py @@ -106,7 +106,7 @@ def pod_spec_from_resources( k8s_pod_name: str, requests: Optional[Resources], limits: Optional[Resources], - k8s_gpu_resource_key: str = "nvidia.com/gpu" + k8s_gpu_resource_key: str = "nvidia.com/gpu", ) -> dict[str, Any]: def _construct_k8s_pods_resources(resources: Optional[Resources], k8s_gpu_resource_key: str): if resources is None: From f85c86a9910d22a5b709e03865510902efc8f872 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Fri, 6 Dec 2024 16:58:48 -0800 Subject: [PATCH 21/25] fix ray tests Signed-off-by: Jan Fiedler --- plugins/flytekit-ray/tests/test_ray.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/plugins/flytekit-ray/tests/test_ray.py b/plugins/flytekit-ray/tests/test_ray.py index d248108ed7..c943067013 100644 --- a/plugins/flytekit-ray/tests/test_ray.py +++ b/plugins/flytekit-ray/tests/test_ray.py @@ -58,19 +58,16 @@ def t1(a: int) -> str: ray_job_pb = RayJob( ray_cluster=RayCluster( - ray_cluster=RayCluster( - worker_group_spec=[ - WorkerGroupSpec( - group_name="test_group", - replicas=3, - min_replicas=0, - max_replicas=10, - k8s_pod=K8sPod(pod_spec={"str": "worker", "int": 1}), - ) - ], - head_group_spec=HeadGroupSpec(k8s_pod=K8sPod(pod_spec={"str": "head", "int": 2})), - enable_autoscaling=True, - ), + worker_group_spec=[ + WorkerGroupSpec( + group_name="test_group", + replicas=3, + min_replicas=0, + max_replicas=10, + k8s_pod=K8sPod(pod_spec={"str": "worker", "int": 1}), + ) + ], + head_group_spec=HeadGroupSpec(k8s_pod=K8sPod(pod_spec={"str": "head", "int": 2})), enable_autoscaling=True, ), runtime_env=base64.b64encode(json.dumps({"pip": ["numpy"]}).encode()).decode(), From 4a7e888ba49b2fc88443bb78ae7ccb1063b29ed2 Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Fri, 6 Dec 2024 17:06:01 -0800 Subject: [PATCH 22/25] adjust ray README Signed-off-by: Jan Fiedler --- plugins/flytekit-ray/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugins/flytekit-ray/README.md b/plugins/flytekit-ray/README.md index f7db403a6c..0ef9c4b818 100644 --- a/plugins/flytekit-ray/README.md +++ b/plugins/flytekit-ray/README.md @@ -7,3 +7,5 @@ To install the plugin, run the following command: ```bash pip install flytekitplugins-ray ``` + +All [examples](https://docs.flyte.org/en/latest/flytesnacks/examples/ray_plugin/index.html) showcasing execution of Ray jobs using the plugin can be found in the documentation. \ No newline at end of file From e686aaae92dfbf3497999be8d45bf96aba9e62af Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Fri, 6 Dec 2024 17:08:39 -0800 Subject: [PATCH 23/25] end of file readme Signed-off-by: Jan Fiedler --- plugins/flytekit-ray/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/flytekit-ray/README.md b/plugins/flytekit-ray/README.md index 0ef9c4b818..250321fd05 100644 --- a/plugins/flytekit-ray/README.md +++ b/plugins/flytekit-ray/README.md @@ -8,4 +8,4 @@ To install the plugin, run the following command: pip install flytekitplugins-ray ``` -All [examples](https://docs.flyte.org/en/latest/flytesnacks/examples/ray_plugin/index.html) showcasing execution of Ray jobs using the plugin can be found in the documentation. \ No newline at end of file +All [examples](https://docs.flyte.org/en/latest/flytesnacks/examples/ray_plugin/index.html) showcasing execution of Ray jobs using the plugin can be found in the documentation. From 248cda186dd95253f432507b42e906f6de48201b Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Fri, 6 Dec 2024 17:44:10 -0800 Subject: [PATCH 24/25] default Resources to None Signed-off-by: Jan Fiedler --- flytekit/core/resources.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flytekit/core/resources.py b/flytekit/core/resources.py index 5d17b9b4aa..6fa322e30e 100644 --- a/flytekit/core/resources.py +++ b/flytekit/core/resources.py @@ -104,9 +104,9 @@ def convert_resources_to_resource_model( def pod_spec_from_resources( k8s_pod_name: str, - requests: Optional[Resources], - limits: Optional[Resources], - k8s_gpu_resource_key: str = "nvidia.com/gpu", + requests: Optional[Resources] = None, + limits: Optional[Resources] = None, + k8s_gpu_resource_key: Optional[str] = "nvidia.com/gpu", ) -> dict[str, Any]: def _construct_k8s_pods_resources(resources: Optional[Resources], k8s_gpu_resource_key: str): if resources is None: From cad3d33923b099ada182352f401562215bedaeea Mon Sep 17 00:00:00 2001 From: Jan Fiedler Date: Fri, 6 Dec 2024 18:16:43 -0800 Subject: [PATCH 25/25] remove optional from k8s_gpu_resource_key Signed-off-by: Jan Fiedler --- flytekit/core/resources.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/core/resources.py b/flytekit/core/resources.py index 6fa322e30e..9a334f98f6 100644 --- a/flytekit/core/resources.py +++ b/flytekit/core/resources.py @@ -106,7 +106,7 @@ def pod_spec_from_resources( k8s_pod_name: str, requests: Optional[Resources] = None, limits: Optional[Resources] = None, - k8s_gpu_resource_key: Optional[str] = "nvidia.com/gpu", + k8s_gpu_resource_key: str = "nvidia.com/gpu", ) -> dict[str, Any]: def _construct_k8s_pods_resources(resources: Optional[Resources], k8s_gpu_resource_key: str): if resources is None: