From 3da114debec66a137fda75bbe636d72f08d8f460 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Tue, 17 Oct 2023 22:50:02 +0200 Subject: [PATCH 1/2] Add resource limits for Vertex --- src/fondant/compiler.py | 12 ++++++++++++ src/fondant/pipeline.py | 30 ++++++++++++++++++++++++++---- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index 301a60a29..15b66e273 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -407,10 +407,16 @@ def _set_configuration(self, task, fondant_component_operation): accelerator_name = fondant_component_operation.accelerator_name node_pool_label = fondant_component_operation.node_pool_label node_pool_name = fondant_component_operation.node_pool_name + cpu_request = fondant_component_operation.cpu_request + cpu_limit = fondant_component_operation.cpu_limit memory_request = fondant_component_operation.memory_request memory_limit = fondant_component_operation.memory_limit # Assign optional specification + if cpu_request is not None: + task.set_memory_request(cpu_request) + if cpu_limit is not None: + task.set_memory_limit(cpu_limit) if memory_request is not None: task.set_memory_request(memory_request) if memory_limit is not None: @@ -459,10 +465,16 @@ def resolve_imports(self): @staticmethod def _set_configuration(task, fondant_component_operation): # Unpack optional specifications + cpu_limit = fondant_component_operation.cpu_limit + memory_limit = fondant_component_operation.memory_limit number_of_accelerators = fondant_component_operation.number_of_accelerators accelerator_name = fondant_component_operation.accelerator_name # Assign optional specification + if cpu_limit is not None: + task.set_cpu_limit(cpu_limit) + if memory_limit is not None: + task.set_memory_limit(memory_limit) if number_of_accelerators is not None: task.set_accelerator_limit(number_of_accelerators) if accelerator_name not in valid_vertex_accelerator_types: diff --git a/src/fondant/pipeline.py b/src/fondant/pipeline.py index 420fb7ccf..4885c83a8 100644 --- a/src/fondant/pipeline.py +++ b/src/fondant/pipeline.py @@ -61,6 +61,12 @@ class ComponentOp: node_pool_label: The label of the node pool to which the operation will be assigned. node_pool_name: The name of the node pool to which the operation will be assigned. cache: Set to False to disable caching, True by default. + cpu_request: the memory requested by the component. The value + should be a string which can be a number or a number followed by “m”, which means + 1/1000. + cpu_limit: the maximum amount of CPU that can be used by the component. The value + should be a string which can be a number or a number followed by “m”, which means + 1/1000. memory_request: the memory requested by the component. The value can be a number or a number followed by one of “E”, “P”, “T”, “G”, “M”, “K”. memory_limit: the maximum memory that can be used by the component. The value can be a @@ -98,8 +104,10 @@ def __init__( preemptible: t.Optional[bool] = False, cluster_type: t.Optional[str] = "default", client_kwargs: t.Optional[dict] = None, - memory_request: t.Optional[t.Union[str, int]] = None, - memory_limit: t.Optional[t.Union[str, int]] = None, + cpu_request: t.Optional[str] = None, + cpu_limit: t.Optional[str] = None, + memory_request: t.Optional[str] = None, + memory_limit: t.Optional[str] = None, ) -> None: self.component_dir = Path(component_dir) self.input_partition_rows = input_partition_rows @@ -119,6 +127,8 @@ def __init__( self.arguments.setdefault("component_spec", self.component_spec.specification) + self.cpu_request = cpu_request + self.cpu_limit = cpu_limit self.memory_request = memory_request self.memory_limit = memory_limit self.node_pool_label, self.node_pool_name = self._validate_node_pool_spec( @@ -231,8 +241,10 @@ def from_registry( preemptible: t.Optional[bool] = False, cluster_type: t.Optional[str] = "default", client_kwargs: t.Optional[dict] = None, - memory_request: t.Optional[t.Union[str, int]] = None, - memory_limit: t.Optional[t.Union[str, int]] = None, + cpu_request: t.Optional[str] = None, + cpu_limit: t.Optional[str] = None, + memory_request: t.Optional[str] = None, + memory_limit: t.Optional[str] = None, ) -> "ComponentOp": """Load a reusable component by its name. @@ -254,6 +266,14 @@ def from_registry( Requires the setup and assignment of a preemptible node pool. Note that preemptibles only work when KFP is setup on GCP. More info here: https://v1-6-branch.kubeflow.org/docs/distributions/gke/pipelines/preemptible/ + cluster_type: The type of cluster to use for distributed execution (default is "local"). + client_kwargs: Keyword arguments used to initialise the dask client. + cpu_request: the memory requested by the component. The value + should be a string which can be a number or a number followed by “m”, which means + 1/1000. + cpu_limit: the maximum amount of CPU that can be used by the component. The value + should be a string which can be a number or a number followed by “m”, which means + 1/1000. memory_request: the memory requested by the component. The value can be a number or a number followed by one of “E”, “P”, “T”, “G”, “M”, “K”. memory_limit: the maximum memory that can be used by the component. The value can be a @@ -279,6 +299,8 @@ def from_registry( preemptible=preemptible, cluster_type=cluster_type, client_kwargs=client_kwargs, + cpu_request=cpu_request, + cpu_limit=cpu_limit, memory_request=memory_request, memory_limit=memory_limit, ) From 63b852d092fb4752875efccb2790e53d1457823b Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Tue, 17 Oct 2023 23:15:44 +0200 Subject: [PATCH 2/2] Fix test --- .../example_1/vertex_pipeline.yml | 104 +++++++++--------- 1 file changed, 53 insertions(+), 51 deletions(-) diff --git a/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml index 4e32a92d6..f81e56f1c 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml @@ -94,44 +94,46 @@ deploymentSpec: exec-first-component: container: args: - - --storage_args - - '{{$.inputs.parameters[''storage_args'']}}' - - --input_partition_rows - - '{{$.inputs.parameters[''input_partition_rows'']}}' - - --cache - - '{{$.inputs.parameters[''cache'']}}' - - --cluster_type - - '{{$.inputs.parameters[''cluster_type'']}}' - - --component_spec - - '{{$.inputs.parameters[''component_spec'']}}' - - --output_manifest_path - - '{{$.inputs.parameters[''output_manifest_path'']}}' - - --metadata - - '{{$.inputs.parameters[''metadata'']}}' + - "--storage_args" + - "{{$.inputs.parameters['storage_args']}}" + - "--input_partition_rows" + - "{{$.inputs.parameters['input_partition_rows']}}" + - "--cache" + - "{{$.inputs.parameters['cache']}}" + - "--cluster_type" + - "{{$.inputs.parameters['cluster_type']}}" + - "--component_spec" + - "{{$.inputs.parameters['component_spec']}}" + - "--output_manifest_path" + - "{{$.inputs.parameters['output_manifest_path']}}" + - "--metadata" + - "{{$.inputs.parameters['metadata']}}" command: - fondant - execute - main image: example_component:latest + resources: + memoryLimit: 0.512 exec-second-component: container: args: - - --storage_args - - '{{$.inputs.parameters[''storage_args'']}}' - - --input_partition_rows - - '{{$.inputs.parameters[''input_partition_rows'']}}' - - --cache - - '{{$.inputs.parameters[''cache'']}}' - - --cluster_type - - '{{$.inputs.parameters[''cluster_type'']}}' - - --component_spec - - '{{$.inputs.parameters[''component_spec'']}}' - - --output_manifest_path - - '{{$.inputs.parameters[''output_manifest_path'']}}' - - --metadata - - '{{$.inputs.parameters[''metadata'']}}' - - --input_manifest_path - - '{{$.inputs.parameters[''input_manifest_path'']}}' + - "--storage_args" + - "{{$.inputs.parameters['storage_args']}}" + - "--input_partition_rows" + - "{{$.inputs.parameters['input_partition_rows']}}" + - "--cache" + - "{{$.inputs.parameters['cache']}}" + - "--cluster_type" + - "{{$.inputs.parameters['cluster_type']}}" + - "--component_spec" + - "{{$.inputs.parameters['component_spec']}}" + - "--output_manifest_path" + - "{{$.inputs.parameters['output_manifest_path']}}" + - "--metadata" + - "{{$.inputs.parameters['metadata']}}" + - "--input_manifest_path" + - "{{$.inputs.parameters['input_manifest_path']}}" command: - fondant - execute @@ -140,20 +142,20 @@ deploymentSpec: exec-third-component: container: args: - - --storage_args - - '{{$.inputs.parameters[''storage_args'']}}' - - --cache - - '{{$.inputs.parameters[''cache'']}}' - - --cluster_type - - '{{$.inputs.parameters[''cluster_type'']}}' - - --component_spec - - '{{$.inputs.parameters[''component_spec'']}}' - - --output_manifest_path - - '{{$.inputs.parameters[''output_manifest_path'']}}' - - --metadata - - '{{$.inputs.parameters[''metadata'']}}' - - --input_manifest_path - - '{{$.inputs.parameters[''input_manifest_path'']}}' + - "--storage_args" + - "{{$.inputs.parameters['storage_args']}}" + - "--cache" + - "{{$.inputs.parameters['cache']}}" + - "--cluster_type" + - "{{$.inputs.parameters['cluster_type']}}" + - "--component_spec" + - "{{$.inputs.parameters['component_spec']}}" + - "--output_manifest_path" + - "{{$.inputs.parameters['output_manifest_path']}}" + - "--metadata" + - "{{$.inputs.parameters['metadata']}}" + - "--input_manifest_path" + - "{{$.inputs.parameters['input_manifest_path']}}" command: - fondant - execute @@ -198,7 +200,7 @@ root: type: binary input_partition_rows: runtimeValue: - constant: 10.0 + constant: 10 metadata: runtimeValue: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", @@ -206,7 +208,7 @@ root: "cache_key": "1"}' output_manifest_path: runtimeValue: - constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json + constant: "/foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json" storage_args: runtimeValue: constant: a dummy string arg @@ -250,10 +252,10 @@ root: type: array input_manifest_path: runtimeValue: - constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json + constant: "/foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json" input_partition_rows: runtimeValue: - constant: 10.0 + constant: 10 metadata: runtimeValue: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", @@ -261,7 +263,7 @@ root: "cache_key": "2"}' output_manifest_path: runtimeValue: - constant: /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json + constant: "/foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json" storage_args: runtimeValue: constant: a dummy string arg @@ -314,7 +316,7 @@ root: type: binary input_manifest_path: runtimeValue: - constant: /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json + constant: "/foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json" metadata: runtimeValue: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", @@ -322,7 +324,7 @@ root: "cache_key": "3"}' output_manifest_path: runtimeValue: - constant: /foo/bar/testpipeline/testpipeline-20230101000000/third_component/manifest.json + constant: "/foo/bar/testpipeline/testpipeline-20230101000000/third_component/manifest.json" storage_args: runtimeValue: constant: a dummy string arg