Skip to content

Commit

Permalink
Introduce pod_spec_from_resources()ray helper function (#2943)
Browse files Browse the repository at this point in the history
* expose requests & limits instead of k8spod

Signed-off-by: Jan Fiedler <[email protected]>

* put construct_k8s_pod_spec_from_resources into core/resources.py

Signed-off-by: Jan Fiedler <[email protected]>

* adjust ray tests

Signed-off-by: Jan Fiedler <[email protected]>

* ruff check fix

Signed-off-by: Jan Fiedler <[email protected]>

* ruff format

Signed-off-by: Jan Fiedler <[email protected]>

* remove demo files from PR

Signed-off-by: Jan Fiedler <[email protected]>

* remove kubernetes from ray plugin dependencies

Signed-off-by: Jan Fiedler <[email protected]>

* Update structured_dataset.py

Signed-off-by: Jan Fiedler <[email protected]>

* add tests for construct_k8s_pod_spec_from_resources

Signed-off-by: Jan Fiedler <[email protected]>

* add kubernetes to pyproject.toml

Signed-off-by: Jan Fiedler <[email protected]>

* add underscore prefix to _construct_k8s_pods_resources

Signed-off-by: Jan Fiedler <[email protected]>

* remove parantheses from_flyte_idl

Signed-off-by: Jan Fiedler <[email protected]>

* expose k8s_gpu_resource_key

Signed-off-by: Jan Fiedler <[email protected]>

* remove parantheses

Signed-off-by: Jan Fiedler <[email protected]>

* rename & expose nvidia gpu key

Signed-off-by: Jan Fiedler <[email protected]>

* adjust resource tests

Signed-off-by: Jan Fiedler <[email protected]>

* back to exposing k8s pod

Signed-off-by: Jan Fiedler <[email protected]>

* adjusts tests

Signed-off-by: Jan Fiedler <[email protected]>

* ruff

Signed-off-by: Jan Fiedler <[email protected]>

* ruff

Signed-off-by: Jan Fiedler <[email protected]>

* fix ray tests

Signed-off-by: Jan Fiedler <[email protected]>

* adjust ray README

Signed-off-by: Jan Fiedler <[email protected]>

* end of file readme

Signed-off-by: Jan Fiedler <[email protected]>

* default Resources to None

Signed-off-by: Jan Fiedler <[email protected]>

* remove optional from k8s_gpu_resource_key

Signed-off-by: Jan Fiedler <[email protected]>

---------

Signed-off-by: Jan Fiedler <[email protected]>
  • Loading branch information
fiedlerNr9 authored Dec 13, 2024
1 parent f99d50e commit 2da64ef
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 14 deletions.
56 changes: 53 additions & 3 deletions flytekit/core/resources.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dataclasses import dataclass
from typing import List, Optional, Union
from dataclasses import dataclass, fields
from typing import Any, List, Optional, Union

from kubernetes.client import V1Container, V1PodSpec, V1ResourceRequirements
from mashumaro.mixins.json import DataClassJSONMixin

from flytekit.models import task as task_models
Expand Down Expand Up @@ -73,7 +74,10 @@ def _convert_resources_to_resource_entries(resources: Resources) -> List[_Resour
resource_entries.append(_ResourceEntry(name=_ResourceName.GPU, value=str(resources.gpu)))
if resources.ephemeral_storage is not None:
resource_entries.append(
_ResourceEntry(name=_ResourceName.EPHEMERAL_STORAGE, value=str(resources.ephemeral_storage))
_ResourceEntry(
name=_ResourceName.EPHEMERAL_STORAGE,
value=str(resources.ephemeral_storage),
)
)
return resource_entries

Expand All @@ -96,3 +100,49 @@ def convert_resources_to_resource_model(
if limits is not None:
limit_entries = _convert_resources_to_resource_entries(limits)
return task_models.Resources(requests=request_entries, limits=limit_entries)


def pod_spec_from_resources(
k8s_pod_name: str,
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
k8s_gpu_resource_key: str = "nvidia.com/gpu",
) -> dict[str, Any]:
def _construct_k8s_pods_resources(resources: Optional[Resources], k8s_gpu_resource_key: str):
if resources is None:
return None

resources_map = {
"cpu": "cpu",
"mem": "memory",
"gpu": k8s_gpu_resource_key,
"ephemeral_storage": "ephemeral-storage",
}

k8s_pod_resources = {}

for resource in fields(resources):
resource_value = getattr(resources, resource.name)
if resource_value is not None:
k8s_pod_resources[resources_map[resource.name]] = resource_value

return k8s_pod_resources

requests = _construct_k8s_pods_resources(resources=requests, k8s_gpu_resource_key=k8s_gpu_resource_key)
limits = _construct_k8s_pods_resources(resources=limits, k8s_gpu_resource_key=k8s_gpu_resource_key)
requests = requests or limits
limits = limits or requests

k8s_pod = V1PodSpec(
containers=[
V1Container(
name=k8s_pod_name,
resources=V1ResourceRequirements(
requests=requests,
limits=limits,
),
)
]
)

return k8s_pod.to_dict()
2 changes: 2 additions & 0 deletions plugins/flytekit-ray/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ To install the plugin, run the following command:
```bash
pip install flytekitplugins-ray
```

All [examples](https://docs.flyte.org/en/latest/flytesnacks/examples/ray_plugin/index.html) showcasing execution of Ray jobs using the plugin can be found in the documentation.
7 changes: 1 addition & 6 deletions plugins/flytekit-ray/flytekitplugins/ray/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,7 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any]
),
worker_group_spec=[
WorkerGroupSpec(
c.group_name,
c.replicas,
c.min_replicas,
c.max_replicas,
c.ray_start_params,
c.k8s_pod,
c.group_name, c.replicas, c.min_replicas, c.max_replicas, c.ray_start_params, c.k8s_pod
)
for c in cfg.worker_node_config
],
Expand Down
33 changes: 29 additions & 4 deletions plugins/flytekit-ray/tests/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,29 @@
import ray
import yaml
from flytekitplugins.ray import HeadNodeConfig
from flytekitplugins.ray.models import RayCluster, RayJob, WorkerGroupSpec, HeadGroupSpec
from flytekitplugins.ray.models import (
HeadGroupSpec,
RayCluster,
RayJob,
WorkerGroupSpec,
)
from flytekitplugins.ray.task import RayJobConfig, WorkerNodeConfig
from google.protobuf.json_format import MessageToDict
from flytekit.models.task import K8sPod

from flytekit import PythonFunctionTask, task
from flytekit.configuration import Image, ImageConfig, SerializationSettings
from flytekit.models.task import K8sPod

config = RayJobConfig(
worker_node_config=[WorkerNodeConfig(group_name="test_group", replicas=3, min_replicas=0, max_replicas=10, k8s_pod=K8sPod(pod_spec={"str": "worker", "int": 1}))],
worker_node_config=[
WorkerNodeConfig(
group_name="test_group",
replicas=3,
min_replicas=0,
max_replicas=10,
k8s_pod=K8sPod(pod_spec={"str": "worker", "int": 1}),
)
],
head_node_config=HeadNodeConfig(k8s_pod=K8sPod(pod_spec={"str": "head", "int": 2})),
runtime_env={"pip": ["numpy"]},
enable_autoscaling=True,
Expand Down Expand Up @@ -44,7 +57,19 @@ def t1(a: int) -> str:
)

ray_job_pb = RayJob(
ray_cluster=RayCluster(worker_group_spec=[WorkerGroupSpec(group_name="test_group", replicas=3, min_replicas=0, max_replicas=10, k8s_pod=K8sPod(pod_spec={"str": "worker", "int": 1}))], head_group_spec=HeadGroupSpec(k8s_pod=K8sPod(pod_spec={"str": "head", "int": 2})), enable_autoscaling=True),
ray_cluster=RayCluster(
worker_group_spec=[
WorkerGroupSpec(
group_name="test_group",
replicas=3,
min_replicas=0,
max_replicas=10,
k8s_pod=K8sPod(pod_spec={"str": "worker", "int": 1}),
)
],
head_group_spec=HeadGroupSpec(k8s_pod=K8sPod(pod_spec={"str": "head", "int": 2})),
enable_autoscaling=True,
),
runtime_env=base64.b64encode(json.dumps({"pip": ["numpy"]}).encode()).decode(),
runtime_env_yaml=yaml.dump({"pip": ["numpy"]}),
shutdown_after_job_finishes=True,
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies = [
"jsonlines",
"jsonpickle",
"keyring>=18.0.1",
"kubernetes>=12.0.1",
"markdown-it-py",
"marshmallow-enum",
"marshmallow-jsonschema>=0.12.0",
Expand Down
56 changes: 55 additions & 1 deletion tests/flytekit/unit/core/test_resources.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from typing import Dict

import pytest
from kubernetes.client import V1Container, V1PodSpec, V1ResourceRequirements

import flytekit.models.task as _task_models
from flytekit import Resources
from flytekit.core.resources import convert_resources_to_resource_model
from flytekit.core.resources import (
pod_spec_from_resources,
convert_resources_to_resource_model,
)

_ResourceName = _task_models.Resources.ResourceName

Expand Down Expand Up @@ -101,3 +105,53 @@ def test_resources_round_trip():
json_str = original.to_json()
result = Resources.from_json(json_str)
assert original == result


def test_pod_spec_from_resources_requests_limits_set():
requests = Resources(cpu="1", mem="1Gi", gpu="1", ephemeral_storage="1Gi")
limits = Resources(cpu="4", mem="2Gi", gpu="1", ephemeral_storage="1Gi")
k8s_pod_name = "foo"

expected_pod_spec = V1PodSpec(
containers=[
V1Container(
name=k8s_pod_name,
resources=V1ResourceRequirements(
requests={
"cpu": "1",
"memory": "1Gi",
"nvidia.com/gpu": "1",
"ephemeral-storage": "1Gi",
},
limits={
"cpu": "4",
"memory": "2Gi",
"nvidia.com/gpu": "1",
"ephemeral-storage": "1Gi",
},
),
)
]
)
pod_spec = pod_spec_from_resources(k8s_pod_name=k8s_pod_name, requests=requests, limits=limits)
assert expected_pod_spec == V1PodSpec(**pod_spec)


def test_pod_spec_from_resources_requests_set():
requests = Resources(cpu="1", mem="1Gi")
limits = None
k8s_pod_name = "foo"

expected_pod_spec = V1PodSpec(
containers=[
V1Container(
name=k8s_pod_name,
resources=V1ResourceRequirements(
requests={"cpu": "1", "memory": "1Gi"},
limits={"cpu": "1", "memory": "1Gi"},
),
)
]
)
pod_spec = pod_spec_from_resources(k8s_pod_name=k8s_pod_name, requests=requests, limits=limits)
assert expected_pod_spec == V1PodSpec(**pod_spec)

0 comments on commit 2da64ef

Please sign in to comment.