Skip to content

Commit

Permalink
feat(sdk): Support setting cpu/memory requests. (kubeflow#9121)
Browse files Browse the repository at this point in the history
* Support setting cpu/memory requests.

* address review comments

* address review comments
  • Loading branch information
chensun authored and rd-pong committed Apr 26, 2023
1 parent b77dde8 commit 737d079
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 35 deletions.
9 changes: 8 additions & 1 deletion sdk/python/kfp/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3262,7 +3262,8 @@ def simple_pipeline():
predict_op()
predict_op().set_cpu_limit('5')
predict_op().set_memory_limit('50G')
predict_op().set_cpu_limit('5').set_memory_limit('50G')
predict_op().set_cpu_request('2').set_cpu_limit(
'5').set_memory_request('4G').set_memory_limit('50G')

dict_format = json_format.MessageToDict(simple_pipeline.pipeline_spec)

Expand All @@ -3284,9 +3285,15 @@ def simple_pipeline():
'cpuLimit', dict_format['deploymentSpec']['executors']
['exec-predict-op-3']['container']['resources'])

self.assertEqual(
2, dict_format['deploymentSpec']['executors']['exec-predict-op-4']
['container']['resources']['cpuRequest'])
self.assertEqual(
5, dict_format['deploymentSpec']['executors']['exec-predict-op-4']
['container']['resources']['cpuLimit'])
self.assertEqual(
4, dict_format['deploymentSpec']['executors']['exec-predict-op-4']
['container']['resources']['memoryRequest'])
self.assertEqual(
50, dict_format['deploymentSpec']['executors']['exec-predict-op-4']
['container']['resources']['memoryLimit'])
Expand Down
6 changes: 6 additions & 0 deletions sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,15 @@ def build_container_spec_for_task(
]))

if task.container_spec.resources is not None:
if task.container_spec.resources.cpu_request is not None:
container_spec.resources.cpu_request = (
task.container_spec.resources.cpu_request)
if task.container_spec.resources.cpu_limit is not None:
container_spec.resources.cpu_limit = (
task.container_spec.resources.cpu_limit)
if task.container_spec.resources.memory_request is not None:
container_spec.resources.memory_request = (
task.container_spec.resources.memory_request)
if task.container_spec.resources.memory_limit is not None:
container_spec.resources.memory_limit = (
task.container_spec.resources.memory_limit)
Expand Down
138 changes: 116 additions & 22 deletions sdk/python/kfp/components/pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""Pipeline task class and operations."""

import copy
import inspect
import itertools
import re
from typing import Any, Dict, List, Mapping, Optional, Union
Expand Down Expand Up @@ -246,24 +247,77 @@ def set_caching_options(self, enable_caching: bool) -> 'PipelineTask':
self._task_spec.enable_caching = enable_caching
return self

def set_cpu_limit(self, cpu: str) -> 'PipelineTask':
"""Sets CPU limit (maximum) for the task.
def _ensure_container_spec_exists(self) -> None:
"""Ensures that the task has a container spec."""
caller_method_name = inspect.stack()[1][3]

if self.container_spec is None:
raise ValueError(
f'{caller_method_name} can only be used on single-step components, not pipelines used as components, or special components like importers.'
)

def _validate_cpu_request_limit(self, cpu: str) -> float:
"""Validates cpu request/limit string and converts to its numeric
value.
Args:
cpu: Maximum CPU requests allowed. This string should be a number or a number followed by an "m" to indicate millicores (1/1000). For more information, see `Specify a CPU Request and a CPU Limit <https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#specify-a-cpu-request-and-a-cpu-limit>`_.
cpu: CPU requests or limits. This string should be a number or a
number followed by an "m" to indicate millicores (1/1000). For
more information, see `Specify a CPU Request and a CPU Limit
<https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#specify-a-cpu-request-and-a-cpu-limit>`_.
Raises:
ValueError if the cpu request/limit string value is invalid.
Returns:
Self return to allow chained setting calls.
The numeric value (float) of the cpu request/limit.
"""
if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None:
raise ValueError(
'Invalid cpu string. Should be float or integer, or integer'
' followed by "m".')

cpu = float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu)
if self.container_spec is None:
raise ValueError(
'There is no container specified in implementation')
return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu)

def set_cpu_request(self, cpu: str) -> 'PipelineTask':
"""Sets CPU request (minimum) for the task.
Args:
cpu: Minimum CPU requests required. This string should be a number
or a number followed by an "m" to indicate millicores (1/1000).
For more information, see `Specify a CPU Request and a CPU Limit
<https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#specify-a-cpu-request-and-a-cpu-limit>`_.
Returns:
Self return to allow chained setting calls.
"""
self._ensure_container_spec_exists()

cpu = self._validate_cpu_request_limit(cpu)

if self.container_spec.resources is not None:
self.container_spec.resources.cpu_request = cpu
else:
self.container_spec.resources = structures.ResourceSpec(
cpu_request=cpu)

return self

def set_cpu_limit(self, cpu: str) -> 'PipelineTask':
"""Sets CPU limit (maximum) for the task.
Args:
cpu: Maximum CPU requests allowed. This string should be a number
or a number followed by an "m" to indicate millicores (1/1000).
For more information, see `Specify a CPU Request and a CPU Limit
<https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#specify-a-cpu-request-and-a-cpu-limit>`_.
Returns:
Self return to allow chained setting calls.
"""
self._ensure_container_spec_exists()

cpu = self._validate_cpu_request_limit(cpu)

if self.container_spec.resources is not None:
self.container_spec.resources.cpu_limit = cpu
Expand All @@ -283,15 +337,13 @@ def set_accelerator_limit(self, limit: int) -> 'PipelineTask':
Returns:
Self return to allow chained setting calls.
"""
self._ensure_container_spec_exists()

if isinstance(limit, str):
if re.match(r'[1-9]\d*$', limit) is None:
raise ValueError(f'{"limit"!r} must be positive integer.')
limit = int(limit)

if self.container_spec is None:
raise ValueError(
'There is no container specified in implementation')

if self.container_spec.resources is not None:
self.container_spec.resources.accelerator_count = limit
else:
Expand All @@ -317,14 +369,20 @@ def set_gpu_limit(self, gpu: str) -> 'PipelineTask':
category=DeprecationWarning)
return self.set_accelerator_limit(gpu)

def set_memory_limit(self, memory: str) -> 'PipelineTask':
"""Sets memory limit (maximum) for the task.
def _validate_memory_request_limit(self, memory: str) -> float:
"""Validates memory request/limit string and converts to its numeric
value.
Args:
memory: The maximum memory requests allowed. This string should be a number or a number followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", "Gi", "M", "Mi", "K", or "Ki".
memory: Memory requests or limits. This string should be a number or
a number followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G",
"Gi", "M", "Mi", "K", or "Ki".
Raises:
ValueError if the memory request/limit string value is invalid.
Returns:
Self return to allow chained setting calls.
The numeric value (float) of the memory request/limit.
"""
if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$',
memory) is None:
Expand Down Expand Up @@ -361,9 +419,45 @@ def set_memory_limit(self, memory: str) -> 'PipelineTask':
# By default interpret as a plain integer, in the unit of Bytes.
memory = float(memory) / constants._G

if self.container_spec is None:
raise ValueError(
'There is no container specified in implementation')
return memory

def set_memory_request(self, memory: str) -> 'PipelineTask':
"""Sets memory request (minimum) for the task.
Args:
memory: The minimum memory requests required. This string should be
a number or a number followed by one of "E", "Ei", "P", "Pi",
"T", "Ti", "G", "Gi", "M", "Mi", "K", or "Ki".
Returns:
Self return to allow chained setting calls.
"""
self._ensure_container_spec_exists()

memory = self._validate_memory_request_limit(memory)

if self.container_spec.resources is not None:
self.container_spec.resources.memory_request = memory
else:
self.container_spec.resources = structures.ResourceSpec(
memory_request=memory)

return self

def set_memory_limit(self, memory: str) -> 'PipelineTask':
"""Sets memory limit (maximum) for the task.
Args:
memory: The maximum memory requests allowed. This string should be
a number or a number followed by one of "E", "Ei", "P", "Pi",
"T", "Ti", "G", "Gi", "M", "Mi", "K", or "Ki".
Returns:
Self return to allow chained setting calls.
"""
self._ensure_container_spec_exists()

memory = self._validate_memory_request_limit(memory)

if self.container_spec.resources is not None:
self.container_spec.resources.memory_limit = memory
Expand Down Expand Up @@ -420,9 +514,7 @@ def set_accelerator_type(self, accelerator: str) -> 'PipelineTask':
Returns:
Self return to allow chained setting calls.
"""
if self.container_spec is None:
raise ValueError(
'There is no container specified in implementation')
self._ensure_container_spec_exists()

if self.container_spec.resources is not None:
self.container_spec.resources.accelerator_type = accelerator
Expand Down Expand Up @@ -456,6 +548,8 @@ def set_env_variable(self, name: str, value: str) -> 'PipelineTask':
Returns:
Self return to allow chained setting calls.
"""
self._ensure_container_spec_exists()

if self.container_spec.env is not None:
self.container_spec.env[name] = value
else:
Expand Down
39 changes: 32 additions & 7 deletions sdk/python/kfp/components/pipeline_task_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,30 +144,33 @@ def test_set_caching_options(self):

@parameterized.parameters(
{
'cpu_limit': '123',
'cpu': '123',
'expected_cpu_number': 123,
},
{
'cpu_limit': '123m',
'cpu': '123m',
'expected_cpu_number': 0.123,
},
{
'cpu_limit': '123.0',
'cpu': '123.0',
'expected_cpu_number': 123,
},
{
'cpu_limit': '123.0m',
'cpu': '123.0m',
'expected_cpu_number': 0.123,
},
)
def test_set_valid_cpu_limit(self, cpu_limit: str,
expected_cpu_number: float):
def test_set_valid_cpu_request_limit(self, cpu: str,
expected_cpu_number: float):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.from_yaml_documents(
V2_YAML),
args={'input1': 'value'},
)
task.set_cpu_limit(cpu_limit)
task.set_cpu_request(cpu)
self.assertEqual(expected_cpu_number,
task.container_spec.resources.cpu_request)
task.set_cpu_limit(cpu)
self.assertEqual(expected_cpu_number,
task.container_spec.resources.cpu_limit)

Expand Down Expand Up @@ -286,6 +289,9 @@ def test_set_memory_limit(self, memory: str, expected_memory_number: int):
V2_YAML),
args={'input1': 'value'},
)
task.set_memory_request(memory)
self.assertEqual(expected_memory_number,
task.container_spec.resources.memory_request)
task.set_memory_limit(memory)
self.assertEqual(expected_memory_number,
task.container_spec.resources.memory_limit)
Expand Down Expand Up @@ -332,6 +338,25 @@ def test_set_display_name(self):
task.set_display_name('test_name')
self.assertEqual('test_name', task._task_spec.display_name)

def test_set_cpu_limit_on_pipeline_should_raise(self):

@dsl.component
def comp():
print('hello')

@dsl.pipeline
def graph():
comp()
comp()

with self.assertRaisesRegex(
ValueError,
r'set_cpu_limit can only be used on single-step components'):

@dsl.pipeline
def my_pipeline():
graph().set_cpu_limit('1')


class TestPlatformSpecificFunctionality(unittest.TestCase):

Expand Down
4 changes: 4 additions & 0 deletions sdk/python/kfp/components/structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,17 @@ class ResourceSpec:
"""The resource requirements of a container execution.
Attributes:
cpu_request (optional): the requirement of the number of vCPU cores.
cpu_limit (optional): the limit of the number of vCPU cores.
memory_request (optional): the memory requirement in GB.
memory_limit (optional): the memory limit in GB.
accelerator_type (optional): the type of accelerators attached to the
container.
accelerator_count (optional): the number of accelerators attached.
"""
cpu_request: Optional[float] = None
cpu_limit: Optional[float] = None
memory_request: Optional[float] = None
memory_limit: Optional[float] = None
accelerator_type: Optional[str] = None
accelerator_count: Optional[int] = None
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ google-auth>=1.6.1,<3
# https://github.com/googleapis/python-storage/blob/main/CHANGELOG.md#221-2022-03-15
google-cloud-storage>=2.2.1,<3
# pin kfp-pipeline-spec to an exact version, since this is the contract between a given KFP SDK version and the BE. we don't want old version of the SDK to write new fields and to have the BE reject the new unsupported field (even if the new field backward compatible from a proto perspective)
kfp-pipeline-spec==0.2.1
kfp-pipeline-spec==0.2.2
# Update the upper version whenever a new major version of the
# kfp-server-api package is released.
# Update the lower version when kfp sdk depends on new apis/fields in
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ googleapis-common-protos==1.56.4
# via google-api-core
idna==3.3
# via requests
kfp-pipeline-spec==0.2.0
kfp-pipeline-spec==0.2.2
# via -r requirements.in
kfp-server-api==2.0.0a6
# via -r requirements.in
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/test_data/pipelines/pipeline_with_resource_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ def my_pipeline(input_location: str = 'gs://test-bucket/pipeline_root',
training_op(
examples=ingestor.outputs['examples'],
optimizer=optimizer,
n_epochs=n_epochs).set_cpu_limit('4').set_memory_limit(
'14Gi').set_accelerator_type('tpu-v3').set_accelerator_limit(1))
n_epochs=n_epochs).set_cpu_request('2').set_cpu_limit(
'4').set_memory_request('4Gi').set_memory_limit('14Gi')
.set_accelerator_type('tpu-v3').set_accelerator_limit(1))


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ deploymentSpec:
count: '1'
type: tpu-v3
cpuLimit: 4.0
cpuRequest: 2.0
memoryLimit: 15.032385536
memoryRequest: 4.294967296
pipelineInfo:
name: two-step-pipeline-with-resource-spec
root:
Expand Down Expand Up @@ -117,4 +119,4 @@ root:
isOptional: true
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.0.0-beta.8
sdkVersion: kfp-2.0.0-beta.13

0 comments on commit 737d079

Please sign in to comment.