From 81cd69fd5c54aa55c0803ce810653856ec621dc4 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Thu, 24 Aug 2023 10:05:03 +0200 Subject: [PATCH] Add disable caching argument (#320) Related to #313 Update [22/08] This PR has been updated to add an argument at the componentOp level to disable caching. Previous implementation relied on passing an `execute_component` since we were estimating whether a component is cached or not at compile time. A component will only execute if `disable_caching` is not enabled and if previous matching executions are found (this will be implemented in a later PR) --- data_explorer/requirements.txt | 2 +- src/fondant/compiler.py | 1 + src/fondant/component_spec.py | 8 + src/fondant/executor.py | 47 +++++- src/fondant/pipeline.py | 17 +- .../example_1/docker-compose.yml | 6 + .../example_1/kubeflow_pipeline.yml | 153 ++++++++++-------- .../example_2/docker-compose.yml | 4 + .../example_2/kubeflow_pipeline.yml | 94 ++++++----- .../compiled_pipeline/kubeflow_pipeline.yml | 48 +++--- .../component_specs/kubeflow_component.yaml | 6 + tests/test_component.py | 11 ++ 12 files changed, 253 insertions(+), 144 deletions(-) diff --git a/data_explorer/requirements.txt b/data_explorer/requirements.txt index f688d5e9f..11ffd89de 100644 --- a/data_explorer/requirements.txt +++ b/data_explorer/requirements.txt @@ -1,4 +1,4 @@ -git+https://github.com/ml6team/fondant@main +git+https://github.com/ml6team/fondant@71214bb9de30cd4302d09af97acf117d4b0e9231 streamlit==1.23.1 streamlit-aggrid==0.3.4 matplotlib==3.7.1 diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index 9e04d1b99..49c1f9b6e 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -259,6 +259,7 @@ def kfp_pipeline(): logger.info(f"Compiling service for {component_name}") component_op = component["fondant_component_op"] + # convert ComponentOp to Kubeflow component kubeflow_component_op = self.kfp.components.load_component( text=component_op.component_spec.kubeflow_specification.to_string(), diff --git a/src/fondant/component_spec.py b/src/fondant/component_spec.py index 861a1ef15..b94323201 100644 --- a/src/fondant/component_spec.py +++ b/src/fondant/component_spec.py @@ -263,6 +263,12 @@ def from_fondant_component_spec( "type": "String", "default": "None", }, + { + "name": "cache", + "description": "Set to False to disable caching, True by default.", + "type": "Boolean", + "default": "True", + }, *( { "name": arg.name, @@ -295,6 +301,8 @@ def from_fondant_component_spec( {"inputValue": "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, + "--cache", + {"inputValue": "cache"}, *cls._dump_args(fondant_component.args.values()), "--output_manifest_path", {"outputPath": "output_manifest_path"}, diff --git a/src/fondant/executor.py b/src/fondant/executor.py index 1725cb9ba..adb9c6774 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -6,6 +6,7 @@ """ import argparse +import ast import json import logging import typing as t @@ -37,6 +38,7 @@ def __init__( self, spec: ComponentSpec, *, + cache: bool, input_manifest_path: t.Union[str, Path], output_manifest_path: t.Union[str, Path], metadata: t.Dict[str, t.Any], @@ -44,6 +46,7 @@ def __init__( input_partition_rows: t.Optional[t.Union[str, int]] = None, ) -> None: self.spec = spec + self.cache = cache self.input_manifest_path = input_manifest_path self.output_manifest_path = output_manifest_path self.metadata = Metadata.from_dict(metadata) @@ -55,6 +58,7 @@ def from_args(cls) -> "Executor": """Create an executor from a passed argument containing the specification as a dict.""" parser = argparse.ArgumentParser() parser.add_argument("--component_spec", type=json.loads) + parser.add_argument("--cache", type=ast.literal_eval) parser.add_argument("--input_partition_rows", type=validate_partition_number) args, _ = parser.parse_known_args() @@ -64,16 +68,20 @@ def from_args(cls) -> "Executor": component_spec = ComponentSpec(args.component_spec) input_partition_rows = args.input_partition_rows + cache = args.cache return cls.from_spec( component_spec, - input_partition_rows, + cache=cache, + input_partition_rows=input_partition_rows, ) @classmethod def from_spec( cls, component_spec: ComponentSpec, + *, + cache: bool, input_partition_rows: t.Optional[t.Union[str, int]], ) -> "Executor": """Create an executor from a component spec.""" @@ -85,6 +93,9 @@ def from_spec( if "input_partition_rows" in args_dict: args_dict.pop("input_partition_rows") + if "cache" in args_dict: + args_dict.pop("cache") + input_manifest_path = args_dict.pop("input_manifest_path") output_manifest_path = args_dict.pop("output_manifest_path") metadata = args_dict.pop("metadata") @@ -94,6 +105,7 @@ def from_spec( component_spec, input_manifest_path=input_manifest_path, output_manifest_path=output_manifest_path, + cache=cache, metadata=metadata, user_arguments=args_dict, input_partition_rows=input_partition_rows, @@ -177,20 +189,43 @@ def _write_data(self, dataframe: dd.DataFrame, *, manifest: Manifest): data_writer.write_dataframe(dataframe) + def _load_cached_output_manifest(self) -> "Manifest": + """Function that returns the cached output manifest.""" + raise NotImplementedError + + def _has_matching_execution(self) -> bool: + """Function that checks if there is an existing previous matching execution.""" + # TODO: implement + return False + def execute(self, component_cls: t.Type[Component]) -> None: """Execute a component. Args: component_cls: The class of the component to execute """ - input_manifest = self._load_or_create_manifest() + matching_execution_exists = self._has_matching_execution() - component = component_cls(self.spec, **self.user_arguments) - output_df = self._execute_component(component, manifest=input_manifest) + if matching_execution_exists: + logger.info("Previous matching execution found") + else: + logger.info("No previous matching execution found") - output_manifest = input_manifest.evolve(component_spec=self.spec) + if self.cache: + logger.info("Caching for the component is disabled") + else: + logger.info("Caching for the component is enabled") - self._write_data(dataframe=output_df, manifest=output_manifest) + if self.cache is False and matching_execution_exists: + logging.info("Cached component run. Skipping component execution") + output_manifest = self._load_cached_output_manifest() + else: + logging.info("Executing component") + input_manifest = self._load_or_create_manifest() + component = component_cls(self.spec, **self.user_arguments) + output_df = self._execute_component(component, manifest=input_manifest) + output_manifest = input_manifest.evolve(component_spec=self.spec) + self._write_data(dataframe=output_df, manifest=output_manifest) self.upload_manifest(output_manifest, save_path=self.output_manifest_path) diff --git a/src/fondant/pipeline.py b/src/fondant/pipeline.py index a82bb09ea..b304905b7 100644 --- a/src/fondant/pipeline.py +++ b/src/fondant/pipeline.py @@ -34,6 +34,7 @@ class ComponentOp: number_of_gpus: The number of gpus to assign to the operation node_pool_label: The label of the node pool to which the operation will be assigned. node_pool_name: The name of the node pool to which the operation will be assigned. + cache: Set to False to disable caching, True by default. Note: - A Fondant Component operation is created by defining a Fondant Component and its input @@ -56,14 +57,17 @@ def __init__( number_of_gpus: t.Optional[int] = None, node_pool_label: t.Optional[str] = None, node_pool_name: t.Optional[str] = None, + cache: t.Optional[bool] = True, ) -> None: self.component_dir = Path(component_dir) + self.input_partition_rows = input_partition_rows + self.cache = cache + self.arguments = self._set_arguments(arguments) + self.component_spec = ComponentSpec.from_file( self.component_dir / self.COMPONENT_SPEC_NAME, ) self.name = self.component_spec.name.replace(" ", "_").lower() - self.input_partition_rows = input_partition_rows - self.arguments = self._set_arguments(arguments) self.arguments.setdefault("component_spec", self.component_spec.specification) @@ -85,6 +89,7 @@ def _set_arguments( input_partition_rows = validate_partition_number(self.input_partition_rows) arguments["input_partition_rows"] = str(input_partition_rows) + arguments["cache"] = str(self.cache) return arguments @@ -116,6 +121,7 @@ def from_registry( number_of_gpus: t.Optional[int] = None, node_pool_label: t.Optional[str] = None, node_pool_name: t.Optional[str] = None, + cache: t.Optional[bool] = True, ) -> "ComponentOp": """Load a reusable component by its name. @@ -127,6 +133,7 @@ def from_registry( number_of_gpus: The number of gpus to assign to the operation node_pool_label: The label of the node pool to which the operation will be assigned. node_pool_name: The name of the node pool to which the operation will be assigned. + cache: Set to False to disable caching, True by default. """ components_dir: Path = t.cast(Path, files("fondant") / f"components/{name}") @@ -141,6 +148,7 @@ def from_registry( number_of_gpus=number_of_gpus, node_pool_label=node_pool_label, node_pool_name=node_pool_name, + cache=cache, ) def get_component_cache_key(self) -> str: @@ -324,6 +332,7 @@ def _validate_pipeline_definition(self, run_id: str): for operation_specs in self._graph.values(): fondant_component_op = operation_specs["fondant_component_op"] component_spec = fondant_component_op.component_spec + if not load_component: # Check subset exists for ( @@ -375,3 +384,7 @@ def _validate_pipeline_definition(self, run_id: str): load_component = False logger.info("All pipeline component specifications match.") + + def __repr__(self) -> str: + """Return a string representation of the FondantPipeline object.""" + return f"{self.__class__.__name__}({self._graph!r}" diff --git a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml index a6fa1d8c1..ddfdc7505 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml @@ -14,6 +14,8 @@ services: - a dummy string arg - --input_partition_rows - disable + - --cache + - 'True' - --component_spec - '{"name": "First component", "description": "This is an example component", "image": "example_component:latest", "produces": {"images": {"fields": {"data": @@ -43,6 +45,8 @@ services: - a dummy string arg - --input_partition_rows - '10' + - --cache + - 'True' - --component_spec - '{"name": "Second component", "description": "This is an example component", "image": "example_component:latest", "consumes": {"images": {"fields": {"data": @@ -69,6 +73,8 @@ services: - a dummy string arg - --input_partition_rows - None + - --cache + - 'True' - --component_spec - '{"name": "Third component", "description": "This is an example component", "image": "example_component:latest", "consumes": {"images": {"fields": {"data": diff --git a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml index 77ef99ef9..872be2d0f 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml @@ -33,6 +33,8 @@ spec: "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}' - --input_partition_rows - disable + - --cache + - 'True' - --storage_args - a dummy string arg - --output_manifest_path @@ -49,33 +51,35 @@ spec: data: '' metadata: annotations: - pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": - {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, - \"description\": \"This is an example component\", \"image\": \"example_component:latest\", - \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": - {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": - {\"type\": \"binary\"}}}}}", "input_partition_rows": "disable", "metadata": - "{\"base_path\": \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", \"run_id\": - \"test_pipeline-20230101000000\", \"component_id\": \"first_component\"}", - "storage_args": "a dummy string arg"}' - pipelines.kubeflow.org/component_ref: '{"digest": "c53791e5eba77643348ea14b01bc20f273c32d827f5f1b1b896ef6965fd12d82"}' + pipelines.kubeflow.org/arguments.parameters: '{"cache": "True", "component_spec": + "{\"args\": {\"storage_args\": {\"description\": \"Storage arguments\", + \"type\": \"str\"}}, \"description\": \"This is an example component\", + \"image\": \"example_component:latest\", \"name\": \"First component\", + \"produces\": {\"captions\": {\"fields\": {\"data\": {\"type\": \"string\"}}}, + \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": + "disable", "metadata": "{\"base_path\": \"/foo/bar\", \"pipeline_name\": + \"test_pipeline\", \"run_id\": \"test_pipeline-20230101000000\", \"component_id\": + \"first_component\"}", "storage_args": "a dummy string arg"}' + pipelines.kubeflow.org/component_ref: '{"digest": "99e50abb5261d2381b8d7ab61eadb9feff6c3d90f9a7b3ed89e69cda31c39d9b"}' pipelines.kubeflow.org/component_spec: '{"description": "This is an example component", "implementation": {"container": {"command": ["fondant", "execute", "main", "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, - "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", - {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": - "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": - [{"description": "Path to the input manifest", "name": "input_manifest_path", - "type": "String"}, {"description": "Metadata arguments containing the run - id and base path", "name": "metadata", "type": "String"}, {"default": "None", - "description": "The component specification as a dictionary", "name": "component_spec", - "type": "JsonObject"}, {"default": "None", "description": "The number of - rows to load per partition. Set to override the automatic partitioning", - "name": "input_partition_rows", "type": "String"}, {"description": "Storage - arguments", "name": "storage_args", "type": "String"}], "name": "First component", - "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", - "type": "String"}]}' + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--cache", + {"inputValue": "cache"}, "--storage_args", {"inputValue": "storage_args"}, + "--output_manifest_path", {"outputPath": "output_manifest_path"}], "image": + "example_component:latest"}}, "inputs": [{"description": "Path to the input + manifest", "name": "input_manifest_path", "type": "String"}, {"description": + "Metadata arguments containing the run id and base path", "name": "metadata", + "type": "String"}, {"default": "None", "description": "The component specification + as a dictionary", "name": "component_spec", "type": "JsonObject"}, {"default": + "None", "description": "The number of rows to load per partition. Set to + override the automatic partitioning", "name": "input_partition_rows", "type": + "String"}, {"default": "True", "description": "Set to False to disable caching, + True by default.", "name": "cache", "type": "Boolean"}, {"description": + "Storage arguments", "name": "storage_args", "type": "String"}], "name": + "First component", "outputs": [{"description": "Path to the output manifest", + "name": "output_manifest_path", "type": "String"}]}' labels: pipelines.kubeflow.org/enable_caching: 'true' pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 @@ -104,6 +108,8 @@ spec: {"type": "float32"}, "type": "array"}}}}}' - --input_partition_rows - '10' + - --cache + - 'True' - --storage_args - a dummy string arg - --output_manifest_path @@ -115,34 +121,36 @@ spec: path: /tmp/inputs/input_manifest_path/data metadata: annotations: - pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": - {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, - \"consumes\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}, - \"description\": \"This is an example component\", \"image\": \"example_component:latest\", - \"name\": \"Second component\", \"produces\": {\"embeddings\": {\"fields\": - {\"data\": {\"items\": {\"type\": \"float32\"}, \"type\": \"array\"}}}}}", - "input_partition_rows": "10", "metadata": "{\"base_path\": \"/foo/bar\", - \"pipeline_name\": \"test_pipeline\", \"run_id\": \"test_pipeline-20230101000000\", - \"component_id\": \"second_component\"}", "storage_args": "a dummy string - arg"}' - pipelines.kubeflow.org/component_ref: '{"digest": "455aeccd323115d9caae33621d3ecf5ad4de86da321f97c3761f77bc962f7fc2"}' + pipelines.kubeflow.org/arguments.parameters: '{"cache": "True", "component_spec": + "{\"args\": {\"storage_args\": {\"description\": \"Storage arguments\", + \"type\": \"str\"}}, \"consumes\": {\"images\": {\"fields\": {\"data\": + {\"type\": \"binary\"}}}}, \"description\": \"This is an example component\", + \"image\": \"example_component:latest\", \"name\": \"Second component\", + \"produces\": {\"embeddings\": {\"fields\": {\"data\": {\"items\": {\"type\": + \"float32\"}, \"type\": \"array\"}}}}}", "input_partition_rows": "10", "metadata": + "{\"base_path\": \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", \"run_id\": + \"test_pipeline-20230101000000\", \"component_id\": \"second_component\"}", + "storage_args": "a dummy string arg"}' + pipelines.kubeflow.org/component_ref: '{"digest": "e157b93359593b46563237b985194771d9a8f106a3577a7c5f4746b170fe5b23"}' pipelines.kubeflow.org/component_spec: '{"description": "This is an example component", "implementation": {"container": {"command": ["fondant", "execute", "main", "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, - "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", - {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": - "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": - [{"description": "Path to the input manifest", "name": "input_manifest_path", - "type": "String"}, {"description": "Metadata arguments containing the run - id and base path", "name": "metadata", "type": "String"}, {"default": "None", - "description": "The component specification as a dictionary", "name": "component_spec", - "type": "JsonObject"}, {"default": "None", "description": "The number of - rows to load per partition. Set to override the automatic partitioning", - "name": "input_partition_rows", "type": "String"}, {"description": "Storage - arguments", "name": "storage_args", "type": "String"}], "name": "Second - component", "outputs": [{"description": "Path to the output manifest", "name": - "output_manifest_path", "type": "String"}]}' + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--cache", + {"inputValue": "cache"}, "--storage_args", {"inputValue": "storage_args"}, + "--output_manifest_path", {"outputPath": "output_manifest_path"}], "image": + "example_component:latest"}}, "inputs": [{"description": "Path to the input + manifest", "name": "input_manifest_path", "type": "String"}, {"description": + "Metadata arguments containing the run id and base path", "name": "metadata", + "type": "String"}, {"default": "None", "description": "The component specification + as a dictionary", "name": "component_spec", "type": "JsonObject"}, {"default": + "None", "description": "The number of rows to load per partition. Set to + override the automatic partitioning", "name": "input_partition_rows", "type": + "String"}, {"default": "True", "description": "Set to False to disable caching, + True by default.", "name": "cache", "type": "Boolean"}, {"description": + "Storage arguments", "name": "storage_args", "type": "String"}], "name": + "Second component", "outputs": [{"description": "Path to the output manifest", + "name": "output_manifest_path", "type": "String"}]}' labels: pipelines.kubeflow.org/enable_caching: 'true' pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 @@ -194,6 +202,8 @@ spec: "binary"}}}}}' - --input_partition_rows - None + - --cache + - 'True' - --storage_args - a dummy string arg - --output_manifest_path @@ -205,35 +215,38 @@ spec: path: /tmp/inputs/input_manifest_path/data metadata: annotations: - pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": - {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, - \"consumes\": {\"captions\": {\"fields\": {\"data\": {\"type\": \"string\"}}}, - \"embeddings\": {\"fields\": {\"data\": {\"items\": {\"type\": \"float32\"}, - \"type\": \"array\"}}}, \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}, - \"description\": \"This is an example component\", \"image\": \"example_component:latest\", - \"name\": \"Third component\", \"produces\": {\"additionalSubsets\": false, - \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": - "None", "metadata": "{\"base_path\": \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", + pipelines.kubeflow.org/arguments.parameters: '{"cache": "True", "component_spec": + "{\"args\": {\"storage_args\": {\"description\": \"Storage arguments\", + \"type\": \"str\"}}, \"consumes\": {\"captions\": {\"fields\": {\"data\": + {\"type\": \"string\"}}}, \"embeddings\": {\"fields\": {\"data\": {\"items\": + {\"type\": \"float32\"}, \"type\": \"array\"}}}, \"images\": {\"fields\": + {\"data\": {\"type\": \"binary\"}}}}, \"description\": \"This is an example + component\", \"image\": \"example_component:latest\", \"name\": \"Third + component\", \"produces\": {\"additionalSubsets\": false, \"images\": {\"fields\": + {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": "None", + "metadata": "{\"base_path\": \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", \"run_id\": \"test_pipeline-20230101000000\", \"component_id\": \"third_component\"}", "storage_args": "a dummy string arg"}' - pipelines.kubeflow.org/component_ref: '{"digest": "4e728e3a6242c68816de163eb5ec0398940c5fb6746adf57223ca595103e6c2a"}' + pipelines.kubeflow.org/component_ref: '{"digest": "a8c0d8c46f876326331c3fb551bbf90530abab1e1d070a2cd7725635e7664f06"}' pipelines.kubeflow.org/component_spec: '{"description": "This is an example component", "implementation": {"container": {"command": ["fondant", "execute", "main", "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, - "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", - {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": - "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": - [{"description": "Path to the input manifest", "name": "input_manifest_path", - "type": "String"}, {"description": "Metadata arguments containing the run - id and base path", "name": "metadata", "type": "String"}, {"default": "None", - "description": "The component specification as a dictionary", "name": "component_spec", - "type": "JsonObject"}, {"default": "None", "description": "The number of - rows to load per partition. Set to override the automatic partitioning", - "name": "input_partition_rows", "type": "String"}, {"description": "Storage - arguments", "name": "storage_args", "type": "String"}], "name": "Third component", - "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", - "type": "String"}]}' + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--cache", + {"inputValue": "cache"}, "--storage_args", {"inputValue": "storage_args"}, + "--output_manifest_path", {"outputPath": "output_manifest_path"}], "image": + "example_component:latest"}}, "inputs": [{"description": "Path to the input + manifest", "name": "input_manifest_path", "type": "String"}, {"description": + "Metadata arguments containing the run id and base path", "name": "metadata", + "type": "String"}, {"default": "None", "description": "The component specification + as a dictionary", "name": "component_spec", "type": "JsonObject"}, {"default": + "None", "description": "The number of rows to load per partition. Set to + override the automatic partitioning", "name": "input_partition_rows", "type": + "String"}, {"default": "True", "description": "Set to False to disable caching, + True by default.", "name": "cache", "type": "Boolean"}, {"description": + "Storage arguments", "name": "storage_args", "type": "String"}], "name": + "Third component", "outputs": [{"description": "Path to the output manifest", + "name": "output_manifest_path", "type": "String"}]}' labels: pipelines.kubeflow.org/enable_caching: 'true' pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 diff --git a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml index f3178ebe2..4dc1256db 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml @@ -14,6 +14,8 @@ services: - a dummy string arg - --input_partition_rows - None + - --cache + - 'True' - --component_spec - '{"name": "First component", "description": "This is an example component", "image": "example_component:latest", "produces": {"images": {"fields": {"data": @@ -34,6 +36,8 @@ services: - '0' - --input_partition_rows - None + - --cache + - 'True' - --component_spec - '{"name": "Image cropping", "description": "Component that removes single-colored borders around images and crops them appropriately", "image": "ghcr.io/ml6team/image_cropping:dev", diff --git a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml index 9ecf9d6c0..a122bbd34 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml @@ -33,6 +33,8 @@ spec: "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}' - --input_partition_rows - None + - --cache + - 'True' - --storage_args - a dummy string arg - --output_manifest_path @@ -46,33 +48,35 @@ spec: data: '' metadata: annotations: - pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": - {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, - \"description\": \"This is an example component\", \"image\": \"example_component:latest\", - \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": - {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": - {\"type\": \"binary\"}}}}}", "input_partition_rows": "None", "metadata": - "{\"base_path\": \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", \"run_id\": - \"test_pipeline-20230101000000\", \"component_id\": \"first_component\"}", + pipelines.kubeflow.org/arguments.parameters: '{"cache": "True", "component_spec": + "{\"args\": {\"storage_args\": {\"description\": \"Storage arguments\", + \"type\": \"str\"}}, \"description\": \"This is an example component\", + \"image\": \"example_component:latest\", \"name\": \"First component\", + \"produces\": {\"captions\": {\"fields\": {\"data\": {\"type\": \"string\"}}}, + \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": + "None", "metadata": "{\"base_path\": \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", + \"run_id\": \"test_pipeline-20230101000000\", \"component_id\": \"first_component\"}", "storage_args": "a dummy string arg"}' - pipelines.kubeflow.org/component_ref: '{"digest": "c53791e5eba77643348ea14b01bc20f273c32d827f5f1b1b896ef6965fd12d82"}' + pipelines.kubeflow.org/component_ref: '{"digest": "99e50abb5261d2381b8d7ab61eadb9feff6c3d90f9a7b3ed89e69cda31c39d9b"}' pipelines.kubeflow.org/component_spec: '{"description": "This is an example component", "implementation": {"container": {"command": ["fondant", "execute", "main", "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, - "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", - {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": - "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": - [{"description": "Path to the input manifest", "name": "input_manifest_path", - "type": "String"}, {"description": "Metadata arguments containing the run - id and base path", "name": "metadata", "type": "String"}, {"default": "None", - "description": "The component specification as a dictionary", "name": "component_spec", - "type": "JsonObject"}, {"default": "None", "description": "The number of - rows to load per partition. Set to override the automatic partitioning", - "name": "input_partition_rows", "type": "String"}, {"description": "Storage - arguments", "name": "storage_args", "type": "String"}], "name": "First component", - "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", - "type": "String"}]}' + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--cache", + {"inputValue": "cache"}, "--storage_args", {"inputValue": "storage_args"}, + "--output_manifest_path", {"outputPath": "output_manifest_path"}], "image": + "example_component:latest"}}, "inputs": [{"description": "Path to the input + manifest", "name": "input_manifest_path", "type": "String"}, {"description": + "Metadata arguments containing the run id and base path", "name": "metadata", + "type": "String"}, {"default": "None", "description": "The component specification + as a dictionary", "name": "component_spec", "type": "JsonObject"}, {"default": + "None", "description": "The number of rows to load per partition. Set to + override the automatic partitioning", "name": "input_partition_rows", "type": + "String"}, {"default": "True", "description": "Set to False to disable caching, + True by default.", "name": "cache", "type": "Boolean"}, {"description": + "Storage arguments", "name": "storage_args", "type": "String"}], "name": + "First component", "outputs": [{"description": "Path to the output manifest", + "name": "output_manifest_path", "type": "String"}]}' labels: pipelines.kubeflow.org/enable_caching: 'true' pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 @@ -106,6 +110,8 @@ spec: "binary"}, "height": {"type": "int32"}, "width": {"type": "int32"}}}}}' - --input_partition_rows - None + - --cache + - 'True' - --cropping_threshold - '0' - --padding @@ -119,11 +125,11 @@ spec: path: /tmp/inputs/input_manifest_path/data metadata: annotations: - pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": - {\"cropping_threshold\": {\"default\": -30, \"description\": \"Threshold - parameter used for detecting borders. A lower (negative) parameter results - in a more performant border detection, but can cause overcropping. Default - is -30\", \"type\": \"int\"}, \"padding\": {\"default\": 10, \"description\": + pipelines.kubeflow.org/arguments.parameters: '{"cache": "True", "component_spec": + "{\"args\": {\"cropping_threshold\": {\"default\": -30, \"description\": + \"Threshold parameter used for detecting borders. A lower (negative) parameter + results in a more performant border detection, but can cause overcropping. + Default is -30\", \"type\": \"int\"}, \"padding\": {\"default\": 10, \"description\": \"Padding for the image cropping. The padding is added to all borders of the image.\", \"type\": \"int\"}}, \"consumes\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}, \"description\": \"Component that @@ -134,29 +140,31 @@ spec: "cropping_threshold": "0", "input_partition_rows": "None", "metadata": "{\"base_path\": \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", \"run_id\": \"test_pipeline-20230101000000\", \"component_id\": \"image_cropping\"}", "padding": "0"}' - pipelines.kubeflow.org/component_ref: '{"digest": "8a3c2b5736cf8297ad5848ec043987aed42c6fb12e6e26db25b922467b4d2d7f"}' + pipelines.kubeflow.org/component_ref: '{"digest": "8c3ca8c42706df81bfe28a8f6a8447b5245fe817a4fb5ae4d0872041f1ca7f65"}' pipelines.kubeflow.org/component_spec: '{"description": "Component that removes single-colored borders around images and crops them appropriately", "implementation": {"container": {"command": ["fondant", "execute", "main", "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, "--input_partition_rows", - {"inputValue": "input_partition_rows"}, "--cropping_threshold", {"inputValue": - "cropping_threshold"}, "--padding", {"inputValue": "padding"}, "--output_manifest_path", - {"outputPath": "output_manifest_path"}], "image": "ghcr.io/ml6team/image_cropping:dev"}}, - "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", - "type": "String"}, {"description": "Metadata arguments containing the run - id and base path", "name": "metadata", "type": "String"}, {"default": "None", - "description": "The component specification as a dictionary", "name": "component_spec", + {"inputValue": "input_partition_rows"}, "--cache", {"inputValue": "cache"}, + "--cropping_threshold", {"inputValue": "cropping_threshold"}, "--padding", + {"inputValue": "padding"}, "--output_manifest_path", {"outputPath": "output_manifest_path"}], + "image": "ghcr.io/ml6team/image_cropping:dev"}}, "inputs": [{"description": + "Path to the input manifest", "name": "input_manifest_path", "type": "String"}, + {"description": "Metadata arguments containing the run id and base path", + "name": "metadata", "type": "String"}, {"default": "None", "description": + "The component specification as a dictionary", "name": "component_spec", "type": "JsonObject"}, {"default": "None", "description": "The number of rows to load per partition. Set to override the automatic partitioning", - "name": "input_partition_rows", "type": "String"}, {"default": -30, "description": - "Threshold parameter used for detecting borders. A lower (negative) parameter - results in a more performant border detection, but can cause overcropping. - Default is -30", "name": "cropping_threshold", "type": "Integer"}, {"default": - 10, "description": "Padding for the image cropping. The padding is added - to all borders of the image.", "name": "padding", "type": "Integer"}], "name": - "Image cropping", "outputs": [{"description": "Path to the output manifest", - "name": "output_manifest_path", "type": "String"}]}' + "name": "input_partition_rows", "type": "String"}, {"default": "True", "description": + "Set to False to disable caching, True by default.", "name": "cache", "type": + "Boolean"}, {"default": -30, "description": "Threshold parameter used for + detecting borders. A lower (negative) parameter results in a more performant + border detection, but can cause overcropping. Default is -30", "name": "cropping_threshold", + "type": "Integer"}, {"default": 10, "description": "Padding for the image + cropping. The padding is added to all borders of the image.", "name": "padding", + "type": "Integer"}], "name": "Image cropping", "outputs": [{"description": + "Path to the output manifest", "name": "output_manifest_path", "type": "String"}]}' labels: pipelines.kubeflow.org/enable_caching: 'true' pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 diff --git a/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml index 994a34f02..aecb91f23 100644 --- a/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml @@ -33,6 +33,8 @@ spec: "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}' - --input_partition_rows - None + - --cache + - 'True' - --storage_args - a dummy string arg - --output_manifest_path @@ -49,33 +51,35 @@ spec: data: '' metadata: annotations: - pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": - {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, - \"description\": \"This is an example component\", \"image\": \"example_component:latest\", - \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": - {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": - {\"type\": \"binary\"}}}}}", "input_partition_rows": "None", "metadata": - "{\"base_path\": \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", \"run_id\": - \"test_pipeline-20230101000000\", \"component_id\": \"first_component\"}", + pipelines.kubeflow.org/arguments.parameters: '{"cache": "True", "component_spec": + "{\"args\": {\"storage_args\": {\"description\": \"Storage arguments\", + \"type\": \"str\"}}, \"description\": \"This is an example component\", + \"image\": \"example_component:latest\", \"name\": \"First component\", + \"produces\": {\"captions\": {\"fields\": {\"data\": {\"type\": \"string\"}}}, + \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": + "None", "metadata": "{\"base_path\": \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", + \"run_id\": \"test_pipeline-20230101000000\", \"component_id\": \"first_component\"}", "storage_args": "a dummy string arg"}' - pipelines.kubeflow.org/component_ref: '{"digest": "c53791e5eba77643348ea14b01bc20f273c32d827f5f1b1b896ef6965fd12d82"}' + pipelines.kubeflow.org/component_ref: '{"digest": "99e50abb5261d2381b8d7ab61eadb9feff6c3d90f9a7b3ed89e69cda31c39d9b"}' pipelines.kubeflow.org/component_spec: '{"description": "This is an example component", "implementation": {"container": {"command": ["fondant", "execute", "main", "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, - "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", - {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": - "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": - [{"description": "Path to the input manifest", "name": "input_manifest_path", - "type": "String"}, {"description": "Metadata arguments containing the run - id and base path", "name": "metadata", "type": "String"}, {"default": "None", - "description": "The component specification as a dictionary", "name": "component_spec", - "type": "JsonObject"}, {"default": "None", "description": "The number of - rows to load per partition. Set to override the automatic partitioning", - "name": "input_partition_rows", "type": "String"}, {"description": "Storage - arguments", "name": "storage_args", "type": "String"}], "name": "First component", - "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", - "type": "String"}]}' + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--cache", + {"inputValue": "cache"}, "--storage_args", {"inputValue": "storage_args"}, + "--output_manifest_path", {"outputPath": "output_manifest_path"}], "image": + "example_component:latest"}}, "inputs": [{"description": "Path to the input + manifest", "name": "input_manifest_path", "type": "String"}, {"description": + "Metadata arguments containing the run id and base path", "name": "metadata", + "type": "String"}, {"default": "None", "description": "The component specification + as a dictionary", "name": "component_spec", "type": "JsonObject"}, {"default": + "None", "description": "The number of rows to load per partition. Set to + override the automatic partitioning", "name": "input_partition_rows", "type": + "String"}, {"default": "True", "description": "Set to False to disable caching, + True by default.", "name": "cache", "type": "Boolean"}, {"description": + "Storage arguments", "name": "storage_args", "type": "String"}], "name": + "First component", "outputs": [{"description": "Path to the output manifest", + "name": "output_manifest_path", "type": "String"}]}' labels: pipelines.kubeflow.org/enable_caching: 'true' pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 diff --git a/tests/example_specs/component_specs/kubeflow_component.yaml b/tests/example_specs/component_specs/kubeflow_component.yaml index fcf1fa866..f7286a82e 100644 --- a/tests/example_specs/component_specs/kubeflow_component.yaml +++ b/tests/example_specs/component_specs/kubeflow_component.yaml @@ -16,6 +16,10 @@ inputs: partitioning type: String default: None +- name: cache + description: Set to False to disable caching, True by default. + type: Boolean + default: 'True' - name: storage_args description: Storage arguments type: String @@ -38,6 +42,8 @@ implementation: - inputValue: component_spec - --input_partition_rows - inputValue: input_partition_rows + - --cache + - inputValue: cache - --storage_args - inputValue: storage_args - --output_manifest_path diff --git a/tests/test_component.py b/tests/test_component.py index 851983c71..3f5528344 100644 --- a/tests/test_component.py +++ b/tests/test_component.py @@ -94,6 +94,8 @@ def test_component_arguments(metadata): str(components_path / "arguments/output_manifest.json"), "--component_spec", yaml_file_to_json_string(components_path / "arguments/component.yaml"), + "--cache", + "True", "--input_partition_rows", "100", "--override_default_arg", @@ -116,6 +118,7 @@ def _process_dataset(self, manifest: Manifest) -> t.Union[None, dd.DataFrame]: executor = MyExecutor.from_args() expected_partition_row_arg = 100 assert executor.input_partition_rows == expected_partition_row_arg + assert executor.cache is True assert executor.user_arguments == { "string_default_arg": "foo", "integer_default_arg": 0, @@ -152,6 +155,8 @@ def test_load_component(metadata): str(components_path / "output_manifest.json"), "--component_spec", yaml_file_to_json_string(components_path / "component.yaml"), + "--cache", + "False", ] class MyLoadComponent(DaskLoadComponent): @@ -197,6 +202,8 @@ def test_dask_transform_component(metadata): str(components_path / "output_manifest.json"), "--component_spec", yaml_file_to_json_string(components_path / "component.yaml"), + "--cache", + "False", ] class MyDaskComponent(DaskTransformComponent): @@ -240,6 +247,8 @@ def test_pandas_transform_component(metadata): str(components_path / "output_manifest.json"), "--component_spec", yaml_file_to_json_string(components_path / "component.yaml"), + "--cache", + "False", ] class MyPandasComponent(PandasTransformComponent): @@ -354,6 +363,8 @@ def test_write_component(metadata): "1", "--component_spec", yaml_file_to_json_string(components_path / "component.yaml"), + "--cache", + "False", ] class MyWriteComponent(DaskWriteComponent):