diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 26b92ede4e9c..854430f38c42 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -3262,7 +3262,8 @@ def simple_pipeline(): predict_op() predict_op().set_cpu_limit('5') predict_op().set_memory_limit('50G') - predict_op().set_cpu_limit('5').set_memory_limit('50G') + predict_op().set_cpu_request('2').set_cpu_limit( + '5').set_memory_request('4G').set_memory_limit('50G') dict_format = json_format.MessageToDict(simple_pipeline.pipeline_spec) @@ -3284,9 +3285,15 @@ def simple_pipeline(): 'cpuLimit', dict_format['deploymentSpec']['executors'] ['exec-predict-op-3']['container']['resources']) + self.assertEqual( + 2, dict_format['deploymentSpec']['executors']['exec-predict-op-4'] + ['container']['resources']['cpuRequest']) self.assertEqual( 5, dict_format['deploymentSpec']['executors']['exec-predict-op-4'] ['container']['resources']['cpuLimit']) + self.assertEqual( + 4, dict_format['deploymentSpec']['executors']['exec-predict-op-4'] + ['container']['resources']['memoryRequest']) self.assertEqual( 50, dict_format['deploymentSpec']['executors']['exec-predict-op-4'] ['container']['resources']['memoryLimit']) diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index 4b3dfd28b3c0..de92ef35735e 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -507,9 +507,15 @@ def build_container_spec_for_task( ])) if task.container_spec.resources is not None: + if task.container_spec.resources.cpu_request is not None: + container_spec.resources.cpu_request = ( + task.container_spec.resources.cpu_request) if task.container_spec.resources.cpu_limit is not None: container_spec.resources.cpu_limit = ( task.container_spec.resources.cpu_limit) + if task.container_spec.resources.memory_request is not None: + container_spec.resources.memory_request = ( + task.container_spec.resources.memory_request) if task.container_spec.resources.memory_limit is not None: container_spec.resources.memory_limit = ( task.container_spec.resources.memory_limit) diff --git a/sdk/python/kfp/components/pipeline_task.py b/sdk/python/kfp/components/pipeline_task.py index be5c79b057b6..67f2083b3767 100644 --- a/sdk/python/kfp/components/pipeline_task.py +++ b/sdk/python/kfp/components/pipeline_task.py @@ -14,6 +14,7 @@ """Pipeline task class and operations.""" import copy +import inspect import itertools import re from typing import Any, Dict, List, Mapping, Optional, Union @@ -246,24 +247,77 @@ def set_caching_options(self, enable_caching: bool) -> 'PipelineTask': self._task_spec.enable_caching = enable_caching return self - def set_cpu_limit(self, cpu: str) -> 'PipelineTask': - """Sets CPU limit (maximum) for the task. + def _ensure_container_spec_exists(self) -> None: + """Ensures that the task has a container spec.""" + caller_method_name = inspect.stack()[1][3] + + if self.container_spec is None: + raise ValueError( + f'{caller_method_name} can only be used on single-step components, not pipelines used as components, or special components like importers.' + ) + + def _validate_cpu_request_limit(self, cpu: str) -> float: + """Validates cpu request/limit string and converts to its numeric + value. Args: - cpu: Maximum CPU requests allowed. This string should be a number or a number followed by an "m" to indicate millicores (1/1000). For more information, see `Specify a CPU Request and a CPU Limit `_. + cpu: CPU requests or limits. This string should be a number or a + number followed by an "m" to indicate millicores (1/1000). For + more information, see `Specify a CPU Request and a CPU Limit + `_. + + Raises: + ValueError if the cpu request/limit string value is invalid. Returns: - Self return to allow chained setting calls. + The numeric value (float) of the cpu request/limit. """ if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None: raise ValueError( 'Invalid cpu string. Should be float or integer, or integer' ' followed by "m".') - cpu = float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu) - if self.container_spec is None: - raise ValueError( - 'There is no container specified in implementation') + return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu) + + def set_cpu_request(self, cpu: str) -> 'PipelineTask': + """Sets CPU request (minimum) for the task. + + Args: + cpu: Minimum CPU requests required. This string should be a number + or a number followed by an "m" to indicate millicores (1/1000). + For more information, see `Specify a CPU Request and a CPU Limit + `_. + + Returns: + Self return to allow chained setting calls. + """ + self._ensure_container_spec_exists() + + cpu = self._validate_cpu_request_limit(cpu) + + if self.container_spec.resources is not None: + self.container_spec.resources.cpu_request = cpu + else: + self.container_spec.resources = structures.ResourceSpec( + cpu_request=cpu) + + return self + + def set_cpu_limit(self, cpu: str) -> 'PipelineTask': + """Sets CPU limit (maximum) for the task. + + Args: + cpu: Maximum CPU requests allowed. This string should be a number + or a number followed by an "m" to indicate millicores (1/1000). + For more information, see `Specify a CPU Request and a CPU Limit + `_. + + Returns: + Self return to allow chained setting calls. + """ + self._ensure_container_spec_exists() + + cpu = self._validate_cpu_request_limit(cpu) if self.container_spec.resources is not None: self.container_spec.resources.cpu_limit = cpu @@ -283,15 +337,13 @@ def set_accelerator_limit(self, limit: int) -> 'PipelineTask': Returns: Self return to allow chained setting calls. """ + self._ensure_container_spec_exists() + if isinstance(limit, str): if re.match(r'[1-9]\d*$', limit) is None: raise ValueError(f'{"limit"!r} must be positive integer.') limit = int(limit) - if self.container_spec is None: - raise ValueError( - 'There is no container specified in implementation') - if self.container_spec.resources is not None: self.container_spec.resources.accelerator_count = limit else: @@ -317,14 +369,20 @@ def set_gpu_limit(self, gpu: str) -> 'PipelineTask': category=DeprecationWarning) return self.set_accelerator_limit(gpu) - def set_memory_limit(self, memory: str) -> 'PipelineTask': - """Sets memory limit (maximum) for the task. + def _validate_memory_request_limit(self, memory: str) -> float: + """Validates memory request/limit string and converts to its numeric + value. Args: - memory: The maximum memory requests allowed. This string should be a number or a number followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", "Gi", "M", "Mi", "K", or "Ki". + memory: Memory requests or limits. This string should be a number or + a number followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", + "Gi", "M", "Mi", "K", or "Ki". + + Raises: + ValueError if the memory request/limit string value is invalid. Returns: - Self return to allow chained setting calls. + The numeric value (float) of the memory request/limit. """ if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$', memory) is None: @@ -361,9 +419,45 @@ def set_memory_limit(self, memory: str) -> 'PipelineTask': # By default interpret as a plain integer, in the unit of Bytes. memory = float(memory) / constants._G - if self.container_spec is None: - raise ValueError( - 'There is no container specified in implementation') + return memory + + def set_memory_request(self, memory: str) -> 'PipelineTask': + """Sets memory request (minimum) for the task. + + Args: + memory: The minimum memory requests required. This string should be + a number or a number followed by one of "E", "Ei", "P", "Pi", + "T", "Ti", "G", "Gi", "M", "Mi", "K", or "Ki". + + Returns: + Self return to allow chained setting calls. + """ + self._ensure_container_spec_exists() + + memory = self._validate_memory_request_limit(memory) + + if self.container_spec.resources is not None: + self.container_spec.resources.memory_request = memory + else: + self.container_spec.resources = structures.ResourceSpec( + memory_request=memory) + + return self + + def set_memory_limit(self, memory: str) -> 'PipelineTask': + """Sets memory limit (maximum) for the task. + + Args: + memory: The maximum memory requests allowed. This string should be + a number or a number followed by one of "E", "Ei", "P", "Pi", + "T", "Ti", "G", "Gi", "M", "Mi", "K", or "Ki". + + Returns: + Self return to allow chained setting calls. + """ + self._ensure_container_spec_exists() + + memory = self._validate_memory_request_limit(memory) if self.container_spec.resources is not None: self.container_spec.resources.memory_limit = memory @@ -420,9 +514,7 @@ def set_accelerator_type(self, accelerator: str) -> 'PipelineTask': Returns: Self return to allow chained setting calls. """ - if self.container_spec is None: - raise ValueError( - 'There is no container specified in implementation') + self._ensure_container_spec_exists() if self.container_spec.resources is not None: self.container_spec.resources.accelerator_type = accelerator @@ -456,6 +548,8 @@ def set_env_variable(self, name: str, value: str) -> 'PipelineTask': Returns: Self return to allow chained setting calls. """ + self._ensure_container_spec_exists() + if self.container_spec.env is not None: self.container_spec.env[name] = value else: diff --git a/sdk/python/kfp/components/pipeline_task_test.py b/sdk/python/kfp/components/pipeline_task_test.py index b56452ed788f..128a83a34998 100644 --- a/sdk/python/kfp/components/pipeline_task_test.py +++ b/sdk/python/kfp/components/pipeline_task_test.py @@ -144,30 +144,33 @@ def test_set_caching_options(self): @parameterized.parameters( { - 'cpu_limit': '123', + 'cpu': '123', 'expected_cpu_number': 123, }, { - 'cpu_limit': '123m', + 'cpu': '123m', 'expected_cpu_number': 0.123, }, { - 'cpu_limit': '123.0', + 'cpu': '123.0', 'expected_cpu_number': 123, }, { - 'cpu_limit': '123.0m', + 'cpu': '123.0m', 'expected_cpu_number': 0.123, }, ) - def test_set_valid_cpu_limit(self, cpu_limit: str, - expected_cpu_number: float): + def test_set_valid_cpu_request_limit(self, cpu: str, + expected_cpu_number: float): task = pipeline_task.PipelineTask( component_spec=structures.ComponentSpec.from_yaml_documents( V2_YAML), args={'input1': 'value'}, ) - task.set_cpu_limit(cpu_limit) + task.set_cpu_request(cpu) + self.assertEqual(expected_cpu_number, + task.container_spec.resources.cpu_request) + task.set_cpu_limit(cpu) self.assertEqual(expected_cpu_number, task.container_spec.resources.cpu_limit) @@ -286,6 +289,9 @@ def test_set_memory_limit(self, memory: str, expected_memory_number: int): V2_YAML), args={'input1': 'value'}, ) + task.set_memory_request(memory) + self.assertEqual(expected_memory_number, + task.container_spec.resources.memory_request) task.set_memory_limit(memory) self.assertEqual(expected_memory_number, task.container_spec.resources.memory_limit) @@ -332,6 +338,25 @@ def test_set_display_name(self): task.set_display_name('test_name') self.assertEqual('test_name', task._task_spec.display_name) + def test_set_cpu_limit_on_pipeline_should_raise(self): + + @dsl.component + def comp(): + print('hello') + + @dsl.pipeline + def graph(): + comp() + comp() + + with self.assertRaisesRegex( + ValueError, + r'set_cpu_limit can only be used on single-step components'): + + @dsl.pipeline + def my_pipeline(): + graph().set_cpu_limit('1') + class TestPlatformSpecificFunctionality(unittest.TestCase): diff --git a/sdk/python/kfp/components/structures.py b/sdk/python/kfp/components/structures.py index 1e3218ceef8f..4b18bb9d4582 100644 --- a/sdk/python/kfp/components/structures.py +++ b/sdk/python/kfp/components/structures.py @@ -228,13 +228,17 @@ class ResourceSpec: """The resource requirements of a container execution. Attributes: + cpu_request (optional): the requirement of the number of vCPU cores. cpu_limit (optional): the limit of the number of vCPU cores. + memory_request (optional): the memory requirement in GB. memory_limit (optional): the memory limit in GB. accelerator_type (optional): the type of accelerators attached to the container. accelerator_count (optional): the number of accelerators attached. """ + cpu_request: Optional[float] = None cpu_limit: Optional[float] = None + memory_request: Optional[float] = None memory_limit: Optional[float] = None accelerator_type: Optional[str] = None accelerator_count: Optional[int] = None diff --git a/sdk/python/requirements.in b/sdk/python/requirements.in index c1bf3df8e510..186f6b4bdb24 100644 --- a/sdk/python/requirements.in +++ b/sdk/python/requirements.in @@ -11,7 +11,7 @@ google-auth>=1.6.1,<3 # https://github.com/googleapis/python-storage/blob/main/CHANGELOG.md#221-2022-03-15 google-cloud-storage>=2.2.1,<3 # pin kfp-pipeline-spec to an exact version, since this is the contract between a given KFP SDK version and the BE. we don't want old version of the SDK to write new fields and to have the BE reject the new unsupported field (even if the new field backward compatible from a proto perspective) -kfp-pipeline-spec==0.2.1 +kfp-pipeline-spec==0.2.2 # Update the upper version whenever a new major version of the # kfp-server-api package is released. # Update the lower version when kfp sdk depends on new apis/fields in diff --git a/sdk/python/requirements.txt b/sdk/python/requirements.txt index 9690c000e220..a636813607c0 100644 --- a/sdk/python/requirements.txt +++ b/sdk/python/requirements.txt @@ -41,7 +41,7 @@ googleapis-common-protos==1.56.4 # via google-api-core idna==3.3 # via requests -kfp-pipeline-spec==0.2.0 +kfp-pipeline-spec==0.2.2 # via -r requirements.in kfp-server-api==2.0.0a6 # via -r requirements.in diff --git a/sdk/python/test_data/pipelines/pipeline_with_resource_spec.py b/sdk/python/test_data/pipelines/pipeline_with_resource_spec.py index e10384493bc5..c4a42119c62a 100644 --- a/sdk/python/test_data/pipelines/pipeline_with_resource_spec.py +++ b/sdk/python/test_data/pipelines/pipeline_with_resource_spec.py @@ -39,8 +39,9 @@ def my_pipeline(input_location: str = 'gs://test-bucket/pipeline_root', training_op( examples=ingestor.outputs['examples'], optimizer=optimizer, - n_epochs=n_epochs).set_cpu_limit('4').set_memory_limit( - '14Gi').set_accelerator_type('tpu-v3').set_accelerator_limit(1)) + n_epochs=n_epochs).set_cpu_request('2').set_cpu_limit( + '4').set_memory_request('4Gi').set_memory_limit('14Gi') + .set_accelerator_type('tpu-v3').set_accelerator_limit(1)) if __name__ == '__main__': diff --git a/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml b/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml index 89d72d6d48d0..f62a3047c59a 100644 --- a/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml +++ b/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml @@ -65,7 +65,9 @@ deploymentSpec: count: '1' type: tpu-v3 cpuLimit: 4.0 + cpuRequest: 2.0 memoryLimit: 15.032385536 + memoryRequest: 4.294967296 pipelineInfo: name: two-step-pipeline-with-resource-spec root: @@ -117,4 +119,4 @@ root: isOptional: true parameterType: STRING schemaVersion: 2.1.0 -sdkVersion: kfp-2.0.0-beta.8 +sdkVersion: kfp-2.0.0-beta.13