From 0914ab615c81954b1a969955bb5e5341ff417352 Mon Sep 17 00:00:00 2001 From: Georges Lorre Date: Thu, 21 Sep 2023 14:50:49 +0200 Subject: [PATCH 1/2] Remove artifacts and unify default arguments --- src/fondant/compiler.py | 79 +++---- src/fondant/component_spec.py | 203 ++++++------------ src/fondant/executor.py | 3 +- .../example_1/kubeflow_pipeline.yml | 127 ++++++----- .../example_1/vertex_pipeline.yml | 127 ++++++----- .../example_2/kubeflow_pipeline.yml | 83 ++++--- .../example_2/vertex_pipeline.yml | 83 ++++--- .../component_specs/kubeflow_component.yaml | 186 ++++++++-------- tests/test_component_specs.py | 8 +- 9 files changed, 449 insertions(+), 450 deletions(-) diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index 09417bb34..352513dac 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -277,21 +277,10 @@ def compile( @self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description) def kfp_pipeline(): previous_component_task = None - manifest_path = "" - for component_name, component in pipeline._graph.items(): - component_op = component["fondant_component_op"] - - metadata = Metadata( - pipeline_name=pipeline.name, - run_id=run_id, - base_path=pipeline.base_path, - component_id=component_name, - cache_key=component_op.get_component_cache_key(), - ) - logger.info(f"Compiling service for {component_name}") + component_op = component["fondant_component_op"] # convert ComponentOp to Kubeflow component kubeflow_component_op = self.kfp.components.load_component_from_text( text=component_op.component_spec.kubeflow_specification.to_string(), @@ -302,33 +291,43 @@ def kfp_pipeline(): k: v for k, v in component_op.arguments.items() if v is not None } - # # Set image pull policy to always - # Execute the Kubeflow component and pass in the output manifest path from - # the previous component. + metadata = Metadata( + pipeline_name=pipeline.name, + run_id=run_id, + base_path=pipeline.base_path, + component_id=component_name, + cache_key=component_op.get_component_cache_key(), + ) - if previous_component_task is not None: - component_task = kubeflow_component_op( - input_manifest_path=manifest_path, - metadata=metadata.to_json(), - **component_args, - ) - component_task.after(previous_component_task) + output_manifest_path = f"{pipeline.base_path}/{pipeline.name}/" + f"{run_id}/{component_name}/manifest.json" + # Set the execution order of the component task to be after the previous + # component task. + if component["dependencies"]: + for dependency in component["dependencies"]: + input_manifest_path = f"{pipeline.base_path}/{pipeline.name}/" + f"{run_id}/{dependency}/manifest.json" + component_task = kubeflow_component_op( + input_manifest_path=input_manifest_path, + output_manifest_path=output_manifest_path, + metadata=metadata.to_json(), + **component_args, + ) + component_task.after(previous_component_task) else: component_task = kubeflow_component_op( metadata=metadata.to_json(), + output_manifest_path=output_manifest_path, **component_args, ) - # Set optional configurations + # Set optional arguments component_task = self._set_configuration( component_task, component_op, ) - # Update the manifest path to be the output path of the current component task. - manifest_path = component_task.outputs["output_manifest_path"] - previous_component_task = component_task logger.info(f"Compiling {pipeline.name} to {output_path}") @@ -404,7 +403,6 @@ def compile( @self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description) def kfp_pipeline(): previous_component_task = None - manifest_path = None for component_name, component in pipeline._graph.items(): logger.info(f"Compiling service for {component_name}") @@ -426,21 +424,27 @@ def kfp_pipeline(): component_id=component_name, cache_key=component_op.get_component_cache_key(), ) + + output_manifest_path = f"{pipeline.base_path}/{pipeline.name}/" + f"{run_id}/{component_name}/manifest.json" # Set the execution order of the component task to be after the previous # component task. - if previous_component_task is not None: - # Execute the Kubeflow component and pass in the output manifest path from - # the previous component. - component_task = kubeflow_component_op( - input_manifest_path=manifest_path, - metadata=metadata.to_json(), - **component_args, - ) - component_task.after(previous_component_task) + if component["dependencies"]: + for dependency in component["dependencies"]: + input_manifest_path = f"{pipeline.base_path}/{pipeline.name}/" + f"{run_id}/{dependency}/manifest.json" + component_task = kubeflow_component_op( + input_manifest_path=input_manifest_path, + output_manifest_path=output_manifest_path, + metadata=metadata.to_json(), + **component_args, + ) + component_task.after(previous_component_task) else: component_task = kubeflow_component_op( metadata=metadata.to_json(), + output_manifest_path=output_manifest_path, **component_args, ) @@ -450,9 +454,6 @@ def kfp_pipeline(): component_op, ) - # Update the manifest path to be the output path of the current component task. - manifest_path = component_task.outputs["output_manifest_path"] - previous_component_task = component_task self.kfp.compiler.Compiler().compile(kfp_pipeline, output_path) # type: ignore diff --git a/src/fondant/component_spec.py b/src/fondant/component_spec.py index 43b1bd97f..bb76f9e03 100644 --- a/src/fondant/component_spec.py +++ b/src/fondant/component_spec.py @@ -190,84 +190,76 @@ def outputs_additional_subsets(self) -> bool: return self._specification.get("produces", {}).get("additionalSubsets", True) @property - def args(self) -> t.Dict[str, Argument]: + def args(self) -> t.Mapping[str, Argument]: def _is_optional(arg_information): if "default" in arg_information: return arg_information["default"] == "None" return False - return { - name: Argument( - name=name, - description=arg_info["description"], - type=arg_info["type"], - default=arg_info["default"] if "default" in arg_info else None, - optional=_is_optional(arg_info), - ) - for name, arg_info in self._specification.get("args", {}).items() - } - - @property - def specification(self) -> t.Dict[str, t.Any]: - return copy.deepcopy(self._specification) + args = self.default_arguments - @property - def input_arguments(self) -> t.Mapping[str, Argument]: - """The input arguments (default + custom) of the component as an immutable mapping.""" - args = self.args - - # Add default arguments args.update( { - "input_manifest_path": Argument( - name="input_manifest_path", - description="Path to the input manifest", - type="str", - default=None, - ), - "component_spec": Argument( - name="component_spec", - description="The component specification as a dictionary", - type="dict", - default={}, - ), - "input_partition_rows": Argument( - name="input_partition_rows", - description="The number of rows to load per partition. \ - Set to override the automatic partitioning", - type="str", - default=None, - optional=True, - ), - "cache": Argument( - name="cache", - description="Set to False to disable caching, True by default.", - type="bool", - default=True, - ), - "metadata": Argument( - name="metadata", - description="Metadata arguments containing the run id and base path", - type="str", - default=None, - ), + name: Argument( + name=name, + description=arg_info["description"], + type=arg_info["type"], + default=arg_info["default"] if "default" in arg_info else None, + optional=_is_optional(arg_info), + ) + for name, arg_info in self._specification.get("args", {}).items() }, ) return types.MappingProxyType(args) @property - def output_arguments(self) -> t.Mapping[str, Argument]: - """The output arguments of the component as an immutable mapping.""" - return types.MappingProxyType( - { - "output_manifest_path": Argument( - name="output_manifest_path", - description="Path to the output manifest", - type="str", - default=None, - ), - }, - ) + def specification(self) -> t.Dict[str, t.Any]: + return copy.deepcopy(self._specification) + + @property + def default_arguments(self) -> t.Dict[str, Argument]: + """Add the default arguments of a fondant component.""" + return { + "input_manifest_path": Argument( + name="input_manifest_path", + description="Path to the input manifest", + type="str", + default=None, + optional=True, + ), + "component_spec": Argument( + name="component_spec", + description="The component specification as a dictionary", + type="dict", + default={}, + ), + "input_partition_rows": Argument( + name="input_partition_rows", + description="The number of rows to load per partition. \ + Set to override the automatic partitioning", + type="int", + default=None, + optional=True, + ), + "cache": Argument( + name="cache", + description="Set to False to disable caching, True by default.", + type="bool", + default=True, + ), + "metadata": Argument( + name="metadata", + description="Metadata arguments containing the run id and base path", + type="str", + default=None, + ), + "output_manifest_path": Argument( + name="output_manifest_path", + description="Path to the output manifest", + type="str", + default=None, + ), + } @property def kubeflow_specification(self) -> "KubeflowComponentSpec": @@ -299,9 +291,9 @@ def convert_arguments(fondant_component: ComponentSpec): for arg in fondant_component.args.values(): arg_type_dict = {} - if arg.optional and arg.default == "None": + if arg.optional and arg.default is None: arg_type_dict["isOptional"] = True - if arg.default is not None and arg.default != "None": + if arg.default is not None and arg.default is not None: arg_type_dict["defaultValue"] = arg.default args[arg.name] = { @@ -329,64 +321,19 @@ def sanitize_component_name(name: str) -> str: def from_fondant_component_spec(cls, fondant_component: ComponentSpec): """Generate a Kubeflow component spec from a ComponentOp.""" input_definitions = { - "artifacts": { - "input_manifest_path": { - "description": "Path to the input manifest", - "artifactType": { - "schemaTitle": "system.Artifact", - "schemaVersion": "0.0.1", - }, - "isOptional": True, - }, - }, "parameters": { - "component_spec": { - "description": "The component specification as a dictionary", - "defaultValue": {}, - "isOptional": True, - "parameterType": "STRUCT", - }, - "input_partition_rows": { - "description": "The number of rows to load per partition." - + " Set to override the automatic partitioning", - "isOptional": True, - "parameterType": "NUMBER_INTEGER", - }, - "cache": { - "parameterType": "BOOLEAN", - "description": "Set to False to disable caching, True by default.", - "defaultValue": True, - "isOptional": True, - }, - "metadata": { - "description": "Metadata arguments containing the run id and base path", - "parameterType": "STRING", - }, **cls.convert_arguments(fondant_component), }, } cleaned_component_name = cls.sanitize_component_name(fondant_component.name) - output_definitions = { - "artifacts": { - "output_manifest_path": { - "artifactType": { - "schemaTitle": "system.Artifact", - "schemaVersion": "0.0.1", - }, - "description": "Path to the output manifest", - }, - }, - } - specification = { "components": { "comp-" + cleaned_component_name: { "executorLabel": "exec-" + cleaned_component_name, "inputDefinitions": input_definitions, - "outputDefinitions": output_definitions, }, }, "deploymentSpec": { @@ -396,7 +343,7 @@ def from_fondant_component_spec(cls, fondant_component: ComponentSpec): "container": { "args": [ "--input_manifest_path", - "{{$.inputs.artifacts['input_manifest_path'].uri}}", + "{{$.inputs.parameters['input_manifest_path']}}", "--metadata", "{{$.inputs.parameters['metadata']}}", "--component_spec", @@ -407,7 +354,7 @@ def from_fondant_component_spec(cls, fondant_component: ComponentSpec): "{{$.inputs.parameters['cache']}}", *cls._dump_args(fondant_component.args.values()), "--output_manifest_path", - "{{$.outputs.artifacts['output_manifest_path'].uri}}", + "{{$.inputs.parameters['output_manifest_path']}}", ], "command": ["fondant", "execute", "main"], "image": fondant_component.image, @@ -418,37 +365,14 @@ def from_fondant_component_spec(cls, fondant_component: ComponentSpec): "pipelineInfo": {"name": cleaned_component_name}, "root": { "dag": { - "outputs": { - "artifacts": { - "output_manifest_path": { - "artifactSelectors": [ - { - "outputArtifactKey": "output_manifest_path", - "producerSubtask": cleaned_component_name, - }, - ], - }, - }, - }, "tasks": { cleaned_component_name: { "cachingOptions": {"enableCache": True}, "componentRef": {"name": "comp-" + cleaned_component_name}, "inputs": { - "artifacts": { - "input_manifest_path": { - "componentInputArtifact": "input_manifest_path", - }, - }, "parameters": { - "component_spec": { - "componentInputParameter": "component_spec", - }, - "input_partition_rows": { - "componentInputParameter": "input_partition_rows", - }, - "metadata": {"componentInputParameter": "metadata"}, - "cache": {"componentInputParameter": "cache"}, + param: {"componentInputParameter": param} + for param in input_definitions["parameters"] }, }, "taskInfo": {"name": cleaned_component_name}, @@ -456,7 +380,6 @@ def from_fondant_component_spec(cls, fondant_component: ComponentSpec): }, }, "inputDefinitions": input_definitions, - "outputDefinitions": output_definitions, }, "schemaVersion": "2.1.0", "sdkVersion": "kfp-2.0.1", diff --git a/src/fondant/executor.py b/src/fondant/executor.py index 5103b9f5e..1ecae514b 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -154,8 +154,7 @@ def _get_component_arguments(spec: ComponentSpec) -> t.Dict[str, Argument]: Input and output arguments of the component. """ component_arguments: t.Dict[str, Argument] = {} - component_arguments.update(spec.input_arguments) - component_arguments.update(spec.output_arguments) + component_arguments.update(spec.args) return component_arguments @abstractmethod diff --git a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml index a855ab698..4f44daea7 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml @@ -1,13 +1,10 @@ +# PIPELINE DEFINITION +# Name: testpipeline +# Description: description of the test pipeline components: comp-first-component: executorLabel: exec-first-component inputDefinitions: - artifacts: - input_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 - isOptional: true parameters: cache: defaultValue: true @@ -17,28 +14,21 @@ components: defaultValue: {} isOptional: true parameterType: STRUCT + input_manifest_path: + isOptional: true + parameterType: STRING input_partition_rows: isOptional: true parameterType: NUMBER_INTEGER metadata: parameterType: STRING + output_manifest_path: + parameterType: STRING storage_args: parameterType: STRING - outputDefinitions: - artifacts: - output_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 comp-second-component: executorLabel: exec-second-component inputDefinitions: - artifacts: - input_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 - isOptional: true parameters: cache: defaultValue: true @@ -48,28 +38,21 @@ components: defaultValue: {} isOptional: true parameterType: STRUCT + input_manifest_path: + isOptional: true + parameterType: STRING input_partition_rows: isOptional: true parameterType: NUMBER_INTEGER metadata: parameterType: STRING + output_manifest_path: + parameterType: STRING storage_args: parameterType: STRING - outputDefinitions: - artifacts: - output_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 comp-third-component: executorLabel: exec-third-component inputDefinitions: - artifacts: - input_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 - isOptional: true parameters: cache: defaultValue: true @@ -79,26 +62,25 @@ components: defaultValue: {} isOptional: true parameterType: STRUCT + input_manifest_path: + isOptional: true + parameterType: STRING input_partition_rows: isOptional: true parameterType: NUMBER_INTEGER metadata: parameterType: STRING + output_manifest_path: + parameterType: STRING storage_args: parameterType: STRING - outputDefinitions: - artifacts: - output_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 deploymentSpec: executors: exec-first-component: container: args: - --input_manifest_path - - '{{$.inputs.artifacts[''input_manifest_path''].uri}}' + - '{{$.inputs.parameters[''input_manifest_path'']}}' - --metadata - '{{$.inputs.parameters[''metadata'']}}' - --component_spec @@ -107,10 +89,22 @@ deploymentSpec: - '{{$.inputs.parameters[''input_partition_rows'']}}' - --cache - '{{$.inputs.parameters[''cache'']}}' + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --input_partition_rows + - '{{$.inputs.parameters[''input_partition_rows'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' - --storage_args - '{{$.inputs.parameters[''storage_args'']}}' - --output_manifest_path - - '{{$.outputs.artifacts[''output_manifest_path''].uri}}' + - '{{$.inputs.parameters[''output_manifest_path'']}}' command: - fondant - execute @@ -120,7 +114,7 @@ deploymentSpec: container: args: - --input_manifest_path - - '{{$.inputs.artifacts[''input_manifest_path''].uri}}' + - '{{$.inputs.parameters[''input_manifest_path'']}}' - --metadata - '{{$.inputs.parameters[''metadata'']}}' - --component_spec @@ -129,10 +123,22 @@ deploymentSpec: - '{{$.inputs.parameters[''input_partition_rows'']}}' - --cache - '{{$.inputs.parameters[''cache'']}}' + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --input_partition_rows + - '{{$.inputs.parameters[''input_partition_rows'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' - --storage_args - '{{$.inputs.parameters[''storage_args'']}}' - --output_manifest_path - - '{{$.outputs.artifacts[''output_manifest_path''].uri}}' + - '{{$.inputs.parameters[''output_manifest_path'']}}' command: - fondant - execute @@ -142,7 +148,7 @@ deploymentSpec: container: args: - --input_manifest_path - - '{{$.inputs.artifacts[''input_manifest_path''].uri}}' + - '{{$.inputs.parameters[''input_manifest_path'']}}' - --metadata - '{{$.inputs.parameters[''metadata'']}}' - --component_spec @@ -151,10 +157,22 @@ deploymentSpec: - '{{$.inputs.parameters[''input_partition_rows'']}}' - --cache - '{{$.inputs.parameters[''cache'']}}' + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --input_partition_rows + - '{{$.inputs.parameters[''input_partition_rows'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' - --storage_args - '{{$.inputs.parameters[''storage_args'']}}' - --output_manifest_path - - '{{$.outputs.artifacts[''output_manifest_path''].uri}}' + - '{{$.inputs.parameters[''output_manifest_path'']}}' command: - fondant - execute @@ -203,6 +221,9 @@ root: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", "run_id": "testpipeline-20230101000000", "component_id": "first_component", "cache_key": "1"}' + output_manifest_path: + runtimeValue: + constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json storage_args: runtimeValue: constant: a dummy string arg @@ -216,11 +237,6 @@ root: dependentTasks: - first-component inputs: - artifacts: - input_manifest_path: - taskOutputArtifact: - outputArtifactKey: output_manifest_path - producerTask: first-component parameters: cache: runtimeValue: @@ -247,6 +263,9 @@ root: items: type: float32 type: array + input_manifest_path: + runtimeValue: + constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json input_partition_rows: runtimeValue: constant: 10.0 @@ -255,6 +274,9 @@ root: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", "run_id": "testpipeline-20230101000000", "component_id": "second_component", "cache_key": "2"}' + output_manifest_path: + runtimeValue: + constant: /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json storage_args: runtimeValue: constant: a dummy string arg @@ -268,11 +290,6 @@ root: dependentTasks: - second-component inputs: - artifacts: - input_manifest_path: - taskOutputArtifact: - outputArtifactKey: output_manifest_path - producerTask: second-component parameters: cache: runtimeValue: @@ -308,11 +325,17 @@ root: fields: data: type: binary + input_manifest_path: + runtimeValue: + constant: /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json metadata: runtimeValue: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", "run_id": "testpipeline-20230101000000", "component_id": "third_component", "cache_key": "3"}' + output_manifest_path: + runtimeValue: + constant: /foo/bar/testpipeline/testpipeline-20230101000000/third_component/manifest.json storage_args: runtimeValue: constant: a dummy string arg diff --git a/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml index a855ab698..4f44daea7 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml @@ -1,13 +1,10 @@ +# PIPELINE DEFINITION +# Name: testpipeline +# Description: description of the test pipeline components: comp-first-component: executorLabel: exec-first-component inputDefinitions: - artifacts: - input_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 - isOptional: true parameters: cache: defaultValue: true @@ -17,28 +14,21 @@ components: defaultValue: {} isOptional: true parameterType: STRUCT + input_manifest_path: + isOptional: true + parameterType: STRING input_partition_rows: isOptional: true parameterType: NUMBER_INTEGER metadata: parameterType: STRING + output_manifest_path: + parameterType: STRING storage_args: parameterType: STRING - outputDefinitions: - artifacts: - output_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 comp-second-component: executorLabel: exec-second-component inputDefinitions: - artifacts: - input_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 - isOptional: true parameters: cache: defaultValue: true @@ -48,28 +38,21 @@ components: defaultValue: {} isOptional: true parameterType: STRUCT + input_manifest_path: + isOptional: true + parameterType: STRING input_partition_rows: isOptional: true parameterType: NUMBER_INTEGER metadata: parameterType: STRING + output_manifest_path: + parameterType: STRING storage_args: parameterType: STRING - outputDefinitions: - artifacts: - output_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 comp-third-component: executorLabel: exec-third-component inputDefinitions: - artifacts: - input_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 - isOptional: true parameters: cache: defaultValue: true @@ -79,26 +62,25 @@ components: defaultValue: {} isOptional: true parameterType: STRUCT + input_manifest_path: + isOptional: true + parameterType: STRING input_partition_rows: isOptional: true parameterType: NUMBER_INTEGER metadata: parameterType: STRING + output_manifest_path: + parameterType: STRING storage_args: parameterType: STRING - outputDefinitions: - artifacts: - output_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 deploymentSpec: executors: exec-first-component: container: args: - --input_manifest_path - - '{{$.inputs.artifacts[''input_manifest_path''].uri}}' + - '{{$.inputs.parameters[''input_manifest_path'']}}' - --metadata - '{{$.inputs.parameters[''metadata'']}}' - --component_spec @@ -107,10 +89,22 @@ deploymentSpec: - '{{$.inputs.parameters[''input_partition_rows'']}}' - --cache - '{{$.inputs.parameters[''cache'']}}' + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --input_partition_rows + - '{{$.inputs.parameters[''input_partition_rows'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' - --storage_args - '{{$.inputs.parameters[''storage_args'']}}' - --output_manifest_path - - '{{$.outputs.artifacts[''output_manifest_path''].uri}}' + - '{{$.inputs.parameters[''output_manifest_path'']}}' command: - fondant - execute @@ -120,7 +114,7 @@ deploymentSpec: container: args: - --input_manifest_path - - '{{$.inputs.artifacts[''input_manifest_path''].uri}}' + - '{{$.inputs.parameters[''input_manifest_path'']}}' - --metadata - '{{$.inputs.parameters[''metadata'']}}' - --component_spec @@ -129,10 +123,22 @@ deploymentSpec: - '{{$.inputs.parameters[''input_partition_rows'']}}' - --cache - '{{$.inputs.parameters[''cache'']}}' + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --input_partition_rows + - '{{$.inputs.parameters[''input_partition_rows'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' - --storage_args - '{{$.inputs.parameters[''storage_args'']}}' - --output_manifest_path - - '{{$.outputs.artifacts[''output_manifest_path''].uri}}' + - '{{$.inputs.parameters[''output_manifest_path'']}}' command: - fondant - execute @@ -142,7 +148,7 @@ deploymentSpec: container: args: - --input_manifest_path - - '{{$.inputs.artifacts[''input_manifest_path''].uri}}' + - '{{$.inputs.parameters[''input_manifest_path'']}}' - --metadata - '{{$.inputs.parameters[''metadata'']}}' - --component_spec @@ -151,10 +157,22 @@ deploymentSpec: - '{{$.inputs.parameters[''input_partition_rows'']}}' - --cache - '{{$.inputs.parameters[''cache'']}}' + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --input_partition_rows + - '{{$.inputs.parameters[''input_partition_rows'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' - --storage_args - '{{$.inputs.parameters[''storage_args'']}}' - --output_manifest_path - - '{{$.outputs.artifacts[''output_manifest_path''].uri}}' + - '{{$.inputs.parameters[''output_manifest_path'']}}' command: - fondant - execute @@ -203,6 +221,9 @@ root: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", "run_id": "testpipeline-20230101000000", "component_id": "first_component", "cache_key": "1"}' + output_manifest_path: + runtimeValue: + constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json storage_args: runtimeValue: constant: a dummy string arg @@ -216,11 +237,6 @@ root: dependentTasks: - first-component inputs: - artifacts: - input_manifest_path: - taskOutputArtifact: - outputArtifactKey: output_manifest_path - producerTask: first-component parameters: cache: runtimeValue: @@ -247,6 +263,9 @@ root: items: type: float32 type: array + input_manifest_path: + runtimeValue: + constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json input_partition_rows: runtimeValue: constant: 10.0 @@ -255,6 +274,9 @@ root: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", "run_id": "testpipeline-20230101000000", "component_id": "second_component", "cache_key": "2"}' + output_manifest_path: + runtimeValue: + constant: /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json storage_args: runtimeValue: constant: a dummy string arg @@ -268,11 +290,6 @@ root: dependentTasks: - second-component inputs: - artifacts: - input_manifest_path: - taskOutputArtifact: - outputArtifactKey: output_manifest_path - producerTask: second-component parameters: cache: runtimeValue: @@ -308,11 +325,17 @@ root: fields: data: type: binary + input_manifest_path: + runtimeValue: + constant: /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json metadata: runtimeValue: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", "run_id": "testpipeline-20230101000000", "component_id": "third_component", "cache_key": "3"}' + output_manifest_path: + runtimeValue: + constant: /foo/bar/testpipeline/testpipeline-20230101000000/third_component/manifest.json storage_args: runtimeValue: constant: a dummy string arg diff --git a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml index c9ba66940..a8837327c 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml @@ -1,13 +1,10 @@ +# PIPELINE DEFINITION +# Name: testpipeline +# Description: description of the test pipeline components: comp-first-component: executorLabel: exec-first-component inputDefinitions: - artifacts: - input_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 - isOptional: true parameters: cache: defaultValue: true @@ -17,28 +14,21 @@ components: defaultValue: {} isOptional: true parameterType: STRUCT + input_manifest_path: + isOptional: true + parameterType: STRING input_partition_rows: isOptional: true parameterType: NUMBER_INTEGER metadata: parameterType: STRING + output_manifest_path: + parameterType: STRING storage_args: parameterType: STRING - outputDefinitions: - artifacts: - output_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 comp-image-cropping: executorLabel: exec-image-cropping inputDefinitions: - artifacts: - input_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 - isOptional: true parameters: cache: defaultValue: true @@ -52,28 +42,27 @@ components: defaultValue: -30.0 isOptional: true parameterType: NUMBER_INTEGER + input_manifest_path: + isOptional: true + parameterType: STRING input_partition_rows: isOptional: true parameterType: NUMBER_INTEGER metadata: parameterType: STRING + output_manifest_path: + parameterType: STRING padding: defaultValue: 10.0 isOptional: true parameterType: NUMBER_INTEGER - outputDefinitions: - artifacts: - output_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 deploymentSpec: executors: exec-first-component: container: args: - --input_manifest_path - - '{{$.inputs.artifacts[''input_manifest_path''].uri}}' + - '{{$.inputs.parameters[''input_manifest_path'']}}' - --metadata - '{{$.inputs.parameters[''metadata'']}}' - --component_spec @@ -82,10 +71,22 @@ deploymentSpec: - '{{$.inputs.parameters[''input_partition_rows'']}}' - --cache - '{{$.inputs.parameters[''cache'']}}' + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --input_partition_rows + - '{{$.inputs.parameters[''input_partition_rows'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' - --storage_args - '{{$.inputs.parameters[''storage_args'']}}' - --output_manifest_path - - '{{$.outputs.artifacts[''output_manifest_path''].uri}}' + - '{{$.inputs.parameters[''output_manifest_path'']}}' command: - fondant - execute @@ -95,7 +96,7 @@ deploymentSpec: container: args: - --input_manifest_path - - '{{$.inputs.artifacts[''input_manifest_path''].uri}}' + - '{{$.inputs.parameters[''input_manifest_path'']}}' - --metadata - '{{$.inputs.parameters[''metadata'']}}' - --component_spec @@ -104,12 +105,24 @@ deploymentSpec: - '{{$.inputs.parameters[''input_partition_rows'']}}' - --cache - '{{$.inputs.parameters[''cache'']}}' + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --input_partition_rows + - '{{$.inputs.parameters[''input_partition_rows'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' - --cropping_threshold - '{{$.inputs.parameters[''cropping_threshold'']}}' - --padding - '{{$.inputs.parameters[''padding'']}}' - --output_manifest_path - - '{{$.outputs.artifacts[''output_manifest_path''].uri}}' + - '{{$.inputs.parameters[''output_manifest_path'']}}' command: - fondant - execute @@ -155,6 +168,9 @@ root: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", "run_id": "testpipeline-20230101000000", "component_id": "first_component", "cache_key": "1"}' + output_manifest_path: + runtimeValue: + constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json storage_args: runtimeValue: constant: a dummy string arg @@ -168,11 +184,6 @@ root: dependentTasks: - first-component inputs: - artifacts: - input_manifest_path: - taskOutputArtifact: - outputArtifactKey: output_manifest_path - producerTask: first-component parameters: cache: runtimeValue: @@ -213,11 +224,17 @@ root: cropping_threshold: runtimeValue: constant: 0.0 + input_manifest_path: + runtimeValue: + constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json metadata: runtimeValue: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", "run_id": "testpipeline-20230101000000", "component_id": "image_cropping", "cache_key": "2"}' + output_manifest_path: + runtimeValue: + constant: /foo/bar/testpipeline/testpipeline-20230101000000/image_cropping/manifest.json padding: runtimeValue: constant: 0.0 diff --git a/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml index c9ba66940..a8837327c 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml @@ -1,13 +1,10 @@ +# PIPELINE DEFINITION +# Name: testpipeline +# Description: description of the test pipeline components: comp-first-component: executorLabel: exec-first-component inputDefinitions: - artifacts: - input_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 - isOptional: true parameters: cache: defaultValue: true @@ -17,28 +14,21 @@ components: defaultValue: {} isOptional: true parameterType: STRUCT + input_manifest_path: + isOptional: true + parameterType: STRING input_partition_rows: isOptional: true parameterType: NUMBER_INTEGER metadata: parameterType: STRING + output_manifest_path: + parameterType: STRING storage_args: parameterType: STRING - outputDefinitions: - artifacts: - output_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 comp-image-cropping: executorLabel: exec-image-cropping inputDefinitions: - artifacts: - input_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 - isOptional: true parameters: cache: defaultValue: true @@ -52,28 +42,27 @@ components: defaultValue: -30.0 isOptional: true parameterType: NUMBER_INTEGER + input_manifest_path: + isOptional: true + parameterType: STRING input_partition_rows: isOptional: true parameterType: NUMBER_INTEGER metadata: parameterType: STRING + output_manifest_path: + parameterType: STRING padding: defaultValue: 10.0 isOptional: true parameterType: NUMBER_INTEGER - outputDefinitions: - artifacts: - output_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 deploymentSpec: executors: exec-first-component: container: args: - --input_manifest_path - - '{{$.inputs.artifacts[''input_manifest_path''].uri}}' + - '{{$.inputs.parameters[''input_manifest_path'']}}' - --metadata - '{{$.inputs.parameters[''metadata'']}}' - --component_spec @@ -82,10 +71,22 @@ deploymentSpec: - '{{$.inputs.parameters[''input_partition_rows'']}}' - --cache - '{{$.inputs.parameters[''cache'']}}' + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --input_partition_rows + - '{{$.inputs.parameters[''input_partition_rows'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' - --storage_args - '{{$.inputs.parameters[''storage_args'']}}' - --output_manifest_path - - '{{$.outputs.artifacts[''output_manifest_path''].uri}}' + - '{{$.inputs.parameters[''output_manifest_path'']}}' command: - fondant - execute @@ -95,7 +96,7 @@ deploymentSpec: container: args: - --input_manifest_path - - '{{$.inputs.artifacts[''input_manifest_path''].uri}}' + - '{{$.inputs.parameters[''input_manifest_path'']}}' - --metadata - '{{$.inputs.parameters[''metadata'']}}' - --component_spec @@ -104,12 +105,24 @@ deploymentSpec: - '{{$.inputs.parameters[''input_partition_rows'']}}' - --cache - '{{$.inputs.parameters[''cache'']}}' + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --input_partition_rows + - '{{$.inputs.parameters[''input_partition_rows'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' - --cropping_threshold - '{{$.inputs.parameters[''cropping_threshold'']}}' - --padding - '{{$.inputs.parameters[''padding'']}}' - --output_manifest_path - - '{{$.outputs.artifacts[''output_manifest_path''].uri}}' + - '{{$.inputs.parameters[''output_manifest_path'']}}' command: - fondant - execute @@ -155,6 +168,9 @@ root: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", "run_id": "testpipeline-20230101000000", "component_id": "first_component", "cache_key": "1"}' + output_manifest_path: + runtimeValue: + constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json storage_args: runtimeValue: constant: a dummy string arg @@ -168,11 +184,6 @@ root: dependentTasks: - first-component inputs: - artifacts: - input_manifest_path: - taskOutputArtifact: - outputArtifactKey: output_manifest_path - producerTask: first-component parameters: cache: runtimeValue: @@ -213,11 +224,17 @@ root: cropping_threshold: runtimeValue: constant: 0.0 + input_manifest_path: + runtimeValue: + constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json metadata: runtimeValue: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", "run_id": "testpipeline-20230101000000", "component_id": "image_cropping", "cache_key": "2"}' + output_manifest_path: + runtimeValue: + constant: /foo/bar/testpipeline/testpipeline-20230101000000/image_cropping/manifest.json padding: runtimeValue: constant: 0.0 diff --git a/tests/example_specs/component_specs/kubeflow_component.yaml b/tests/example_specs/component_specs/kubeflow_component.yaml index d175ea854..2401cc282 100644 --- a/tests/example_specs/component_specs/kubeflow_component.yaml +++ b/tests/example_specs/component_specs/kubeflow_component.yaml @@ -1,99 +1,99 @@ components: - comp-example-component: - executorLabel: exec-example-component - inputDefinitions: &id001 - artifacts: - input_manifest_path: - description: Path to the input manifest - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 - isOptional: true - parameters: - component_spec: - description: The component specification as a dictionary - defaultValue: {} - isOptional: true - parameterType: STRUCT - input_partition_rows: - description: The number of rows to load per partition. Set to override the - automatic partitioning - isOptional: true - parameterType: NUMBER_INTEGER - cache: - parameterType: BOOLEAN - description: Set to False to disable caching, True by default. - defaultValue: true - isOptional: true - metadata: - description: Metadata arguments containing the run id and base path - parameterType: STRING - storage_args: - parameterType: STRING - description: Storage arguments - outputDefinitions: &id002 - artifacts: - output_manifest_path: - artifactType: - schemaTitle: system.Artifact - schemaVersion: 0.0.1 - description: Path to the output manifest + comp-example-component: + executorLabel: exec-example-component + inputDefinitions: &id001 + parameters: + input_manifest_path: + parameterType: STRING + description: Path to the input manifest + isOptional: true + component_spec: + parameterType: STRUCT + description: The component specification as a dictionary + defaultValue: {} + input_partition_rows: + parameterType: NUMBER_INTEGER + description: The number of rows to load per partition. Set + to override the automatic partitioning + isOptional: true + cache: + parameterType: BOOLEAN + description: Set to False to disable caching, True by default. + defaultValue: true + metadata: + parameterType: STRING + description: Metadata arguments containing the run id and base + path + output_manifest_path: + parameterType: STRING + description: Path to the output manifest + storage_args: + parameterType: STRING + description: Storage arguments deploymentSpec: - executors: - exec-example-component: - container: - args: - - --input_manifest_path - - '{{$.inputs.artifacts[''input_manifest_path''].uri}}' - - --metadata - - '{{$.inputs.parameters[''metadata'']}}' - - --component_spec - - '{{$.inputs.parameters[''component_spec'']}}' - - --input_partition_rows - - '{{$.inputs.parameters[''input_partition_rows'']}}' - - --cache - - '{{$.inputs.parameters[''cache'']}}' - - --storage_args - - '{{$.inputs.parameters[''storage_args'']}}' - - --output_manifest_path - - '{{$.outputs.artifacts[''output_manifest_path''].uri}}' - command: - - fondant - - execute - - main - image: example_component:latest + executors: + exec-example-component: + container: + args: + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --input_partition_rows + - '{{$.inputs.parameters[''input_partition_rows'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --input_partition_rows + - '{{$.inputs.parameters[''input_partition_rows'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' + - --storage_args + - '{{$.inputs.parameters[''storage_args'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' + command: + - fondant + - execute + - main + image: example_component:latest pipelineInfo: - name: example-component + name: example-component root: - dag: - outputs: - artifacts: - output_manifest_path: - artifactSelectors: - - outputArtifactKey: output_manifest_path - producerSubtask: example-component - tasks: - example-component: - cachingOptions: - enableCache: true - componentRef: - name: comp-example-component - inputs: - artifacts: - input_manifest_path: - componentInputArtifact: input_manifest_path - parameters: - component_spec: - componentInputParameter: component_spec - input_partition_rows: - componentInputParameter: input_partition_rows - metadata: - componentInputParameter: metadata - cache: - componentInputParameter: cache - taskInfo: - name: example-component - inputDefinitions: *id001 - outputDefinitions: *id002 + dag: + tasks: + example-component: + cachingOptions: + enableCache: true + componentRef: + name: comp-example-component + inputs: + parameters: + input_manifest_path: + componentInputParameter: input_manifest_path + component_spec: + componentInputParameter: component_spec + input_partition_rows: + componentInputParameter: input_partition_rows + cache: + componentInputParameter: cache + metadata: + componentInputParameter: metadata + output_manifest_path: + componentInputParameter: output_manifest_path + storage_args: + componentInputParameter: storage_args + taskInfo: + name: example-component + inputDefinitions: *id001 schemaVersion: 2.1.0 sdkVersion: kfp-2.0.1 diff --git a/tests/test_component_specs.py b/tests/test_component_specs.py index 12baf3805..0b0909a77 100644 --- a/tests/test_component_specs.py +++ b/tests/test_component_specs.py @@ -6,11 +6,7 @@ import pytest import yaml -from fondant.component_spec import ( - ComponentSpec, - ComponentSubset, - KubeflowComponentSpec, -) +from fondant.component_spec import ComponentSpec, ComponentSubset, KubeflowComponentSpec from fondant.exceptions import InvalidComponentSpec from fondant.schema import Type @@ -84,7 +80,7 @@ def test_component_spec_no_args(valid_fondant_schema_no_args): assert fondant_component.name == "Example component" assert fondant_component.description == "This is an example component" - assert fondant_component.args == {} + assert fondant_component.args == fondant_component.default_arguments def test_component_spec_to_file(valid_fondant_schema): From cefbc71161f353938796ec2e233258683fbcb0c3 Mon Sep 17 00:00:00 2001 From: Georges Lorre Date: Thu, 21 Sep 2023 14:56:53 +0200 Subject: [PATCH 2/2] Remove artifacts and unify default arguments --- src/fondant/compiler.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index 352513dac..dc71cca1e 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -299,14 +299,18 @@ def kfp_pipeline(): cache_key=component_op.get_component_cache_key(), ) - output_manifest_path = f"{pipeline.base_path}/{pipeline.name}/" - f"{run_id}/{component_name}/manifest.json" + output_manifest_path = ( + f"{pipeline.base_path}/{pipeline.name}/" + f"{run_id}/{component_name}/manifest.json" + ) # Set the execution order of the component task to be after the previous # component task. if component["dependencies"]: for dependency in component["dependencies"]: - input_manifest_path = f"{pipeline.base_path}/{pipeline.name}/" - f"{run_id}/{dependency}/manifest.json" + input_manifest_path = ( + f"{pipeline.base_path}/{pipeline.name}/" + f"{run_id}/{dependency}/manifest.json" + ) component_task = kubeflow_component_op( input_manifest_path=input_manifest_path, output_manifest_path=output_manifest_path, @@ -425,14 +429,18 @@ def kfp_pipeline(): cache_key=component_op.get_component_cache_key(), ) - output_manifest_path = f"{pipeline.base_path}/{pipeline.name}/" - f"{run_id}/{component_name}/manifest.json" + output_manifest_path = ( + f"{pipeline.base_path}/{pipeline.name}/" + f"{run_id}/{component_name}/manifest.json" + ) # Set the execution order of the component task to be after the previous # component task. if component["dependencies"]: for dependency in component["dependencies"]: - input_manifest_path = f"{pipeline.base_path}/{pipeline.name}/" - f"{run_id}/{dependency}/manifest.json" + input_manifest_path = ( + f"{pipeline.base_path}/{pipeline.name}/" + f"{run_id}/{dependency}/manifest.json" + ) component_task = kubeflow_component_op( input_manifest_path=input_manifest_path, output_manifest_path=output_manifest_path,