From 55847c77656074631eb1b39e0bb61e5664c20c83 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Tue, 17 Oct 2023 09:32:33 +0200 Subject: [PATCH] Propagate `client_kwargs` argument and lower extract_images python version (#525) This PR addresses 2 issues: - The `client_kwargs` argument was not propagated properly and couldn't be used - Python 3.11 leads to very long dependency resolution times with KfP and aiplatform dependencies --------- Co-authored-by: Philippe Moussalli --- .../extract_images_from_warc/Dockerfile | 2 +- src/fondant/component_spec.py | 6 ++++++ src/fondant/executor.py | 11 +++++------ .../example_1/kubeflow_pipeline.yml | 16 ++++++++++++---- .../example_1/vertex_pipeline.yml | 15 ++++++++++++--- .../example_2/kubeflow_pipeline.yml | 12 ++++++++---- .../example_2/vertex_pipeline.yml | 12 ++++++++---- .../component_specs/kubeflow_component.yaml | 6 ++++++ 8 files changed, 58 insertions(+), 22 deletions(-) diff --git a/examples/pipelines/commoncrawl/components/extract_images_from_warc/Dockerfile b/examples/pipelines/commoncrawl/components/extract_images_from_warc/Dockerfile index cec274d1e..a757491a7 100644 --- a/examples/pipelines/commoncrawl/components/extract_images_from_warc/Dockerfile +++ b/examples/pipelines/commoncrawl/components/extract_images_from_warc/Dockerfile @@ -1,4 +1,4 @@ -FROM --platform=linux/amd64 python:3.11-slim as base +FROM --platform=linux/amd64 python:3.10-slim as base # System dependencies RUN apt-get update && \ diff --git a/src/fondant/component_spec.py b/src/fondant/component_spec.py index ed54ac03f..f6dcc967a 100644 --- a/src/fondant/component_spec.py +++ b/src/fondant/component_spec.py @@ -248,6 +248,12 @@ def default_arguments(self) -> t.Dict[str, Argument]: type="str", default="default", ), + "client_kwargs": Argument( + name="client_kwargs", + description="Keyword arguments to pass to the Dask client", + type="dict", + default={}, + ), "metadata": Argument( name="metadata", description="Metadata arguments containing the run id and base path", diff --git a/src/fondant/executor.py b/src/fondant/executor.py index 41141c1d1..efea5809f 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -77,12 +77,11 @@ def __init__( self.input_partition_rows = input_partition_rows if cluster_type == "local": - if client_kwargs is None: - client_kwargs = { - "processes": True, - "n_workers": os.cpu_count(), - "threads_per_worker": 1, - } + client_kwargs = client_kwargs or { + "processes": True, + "n_workers": os.cpu_count(), + "threads_per_worker": 1, + } logger.info(f"Initialize local dask cluster with arguments {client_kwargs}") diff --git a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml index b39df7c9e..aa8cf7463 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml @@ -1,7 +1,3 @@ -# PIPELINE DEFINITION -# Name: testpipeline -# Description: description of the test pipeline ---- components: comp-first-component: executorLabel: exec-first-component @@ -11,6 +7,10 @@ components: defaultValue: true isOptional: true parameterType: BOOLEAN + client_kwargs: + defaultValue: {} + isOptional: true + parameterType: STRUCT cluster_type: defaultValue: default isOptional: true @@ -37,6 +37,10 @@ components: defaultValue: true isOptional: true parameterType: BOOLEAN + client_kwargs: + defaultValue: {} + isOptional: true + parameterType: STRUCT cluster_type: defaultValue: default isOptional: true @@ -63,6 +67,10 @@ components: defaultValue: true isOptional: true parameterType: BOOLEAN + client_kwargs: + defaultValue: {} + isOptional: true + parameterType: STRUCT cluster_type: defaultValue: default isOptional: true diff --git a/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml index a524d8e35..4e32a92d6 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml @@ -1,6 +1,3 @@ -# PIPELINE DEFINITION -# Name: testpipeline -# Description: description of the test pipeline components: comp-first-component: executorLabel: exec-first-component @@ -10,6 +7,10 @@ components: defaultValue: true isOptional: true parameterType: BOOLEAN + client_kwargs: + defaultValue: {} + isOptional: true + parameterType: STRUCT cluster_type: defaultValue: default isOptional: true @@ -36,6 +37,10 @@ components: defaultValue: true isOptional: true parameterType: BOOLEAN + client_kwargs: + defaultValue: {} + isOptional: true + parameterType: STRUCT cluster_type: defaultValue: default isOptional: true @@ -62,6 +67,10 @@ components: defaultValue: true isOptional: true parameterType: BOOLEAN + client_kwargs: + defaultValue: {} + isOptional: true + parameterType: STRUCT cluster_type: defaultValue: default isOptional: true diff --git a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml index 99f57051a..989003a61 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml @@ -1,7 +1,3 @@ -# PIPELINE DEFINITION -# Name: testpipeline -# Description: description of the test pipeline ---- components: comp-first-component: executorLabel: exec-first-component @@ -11,6 +7,10 @@ components: defaultValue: true isOptional: true parameterType: BOOLEAN + client_kwargs: + defaultValue: {} + isOptional: true + parameterType: STRUCT cluster_type: defaultValue: default isOptional: true @@ -37,6 +37,10 @@ components: defaultValue: true isOptional: true parameterType: BOOLEAN + client_kwargs: + defaultValue: {} + isOptional: true + parameterType: STRUCT cluster_type: defaultValue: default isOptional: true diff --git a/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml index 99f57051a..989003a61 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml @@ -1,7 +1,3 @@ -# PIPELINE DEFINITION -# Name: testpipeline -# Description: description of the test pipeline ---- components: comp-first-component: executorLabel: exec-first-component @@ -11,6 +7,10 @@ components: defaultValue: true isOptional: true parameterType: BOOLEAN + client_kwargs: + defaultValue: {} + isOptional: true + parameterType: STRUCT cluster_type: defaultValue: default isOptional: true @@ -37,6 +37,10 @@ components: defaultValue: true isOptional: true parameterType: BOOLEAN + client_kwargs: + defaultValue: {} + isOptional: true + parameterType: STRUCT cluster_type: defaultValue: default isOptional: true diff --git a/tests/example_specs/component_specs/kubeflow_component.yaml b/tests/example_specs/component_specs/kubeflow_component.yaml index d8864f0f4..10dfd860f 100644 --- a/tests/example_specs/component_specs/kubeflow_component.yaml +++ b/tests/example_specs/component_specs/kubeflow_component.yaml @@ -7,6 +7,10 @@ components: defaultValue: true description: Set to False to disable caching, True by default. parameterType: BOOLEAN + client_kwargs: + defaultValue: {} + description: Keyword arguments to pass to the Dask client + parameterType: STRUCT cluster_type: defaultValue: default description: The cluster type to use for the execution @@ -55,6 +59,8 @@ root: parameters: cache: componentInputParameter: cache + client_kwargs: + componentInputParameter: client_kwargs cluster_type: componentInputParameter: cluster_type component_spec: