diff --git a/components/load_from_hf_hub/fondant_component.yaml b/components/load_from_hf_hub/fondant_component.yaml index 38ec90980..998a6659b 100644 --- a/components/load_from_hf_hub/fondant_component.yaml +++ b/components/load_from_hf_hub/fondant_component.yaml @@ -15,6 +15,7 @@ args: column_name_mapping: description: Mapping of the consumed hub dataset to fondant column names type: dict + default: {} image_column_names: description: Optional argument, a list containing the original image column names in case the dataset on the hub contains them. Used to format the image from HF hub format to a byte string. @@ -23,7 +24,7 @@ args: n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int - default: -1 + default: None index_column: description: Column to set index to in the load component, if not specified a default globally unique index will be set type: str diff --git a/components/load_from_hf_hub/src/main.py b/components/load_from_hf_hub/src/main.py index 366e22df1..55e562237 100644 --- a/components/load_from_hf_hub/src/main.py +++ b/components/load_from_hf_hub/src/main.py @@ -1,5 +1,6 @@ """This component loads a seed dataset from the hub.""" import logging +import typing as t import dask import dask.dataframe as dd @@ -18,10 +19,10 @@ def __init__( spec: ComponentSpec, *_, dataset_name: str, - column_name_mapping: dict, - image_column_names: list, - n_rows_to_load: int, - index_column: str, + column_name_mapping: t.Optional[dict], + image_column_names: t.Optional[list], + n_rows_to_load: t.Optional[int], + index_column: t.Optional[str], ) -> None: """ Args: @@ -42,55 +43,45 @@ def __init__( self.index_column = index_column self.spec = spec - def load(self) -> dd.DataFrame: - # 1) Load data, read as Dask dataframe - logger.info("Loading dataset from the hub...") - + def get_columns_to_keep(self) -> t.List[str]: # Only read required columns columns = [] - invert_column_name_mapping = {v: k for k, v in self.column_name_mapping.items()} + if self.column_name_mapping: + invert_column_name_mapping = { + v: k for k, v in self.column_name_mapping.items() + } + else: + invert_column_name_mapping = {} + for subset_name, subset in self.spec.produces.items(): for field_name, field in subset.fields.items(): - subset_field_name = f"{subset_name}_{field_name}" - column_name = invert_column_name_mapping.get( - subset_field_name, - subset_field_name, - ) - columns.append(column_name) - - logger.debug(f"Columns to keep: {columns}") - dask_df = dd.read_parquet(f"hf://datasets/{self.dataset_name}", columns=columns) - - # 2) Make sure images are bytes instead of dicts - if self.image_column_names is not None: + column_name = f"{subset_name}_{field_name}" + if ( + invert_column_name_mapping + and column_name in invert_column_name_mapping + ): + columns.append(invert_column_name_mapping[column_name]) + else: + columns.append(column_name) + + if self.index_column is not None: + columns.append(self.index_column) + + return columns + + def convert_images_to_bytes(self, dask_df) -> dd.DataFrame: + if self.image_column_names: for image_column_name in self.image_column_names: dask_df[image_column_name] = dask_df[image_column_name].map( lambda x: x["bytes"], meta=("bytes", bytes), ) - # 3) Rename columns - logger.info("Renaming columns...") - dask_df = dask_df.rename(columns=self.column_name_mapping) - - # 4) Optional: only return specific amount of rows - if self.n_rows_to_load > 0: - partitions_length = 0 - npartitions = 1 - for npartitions, partition in enumerate(dask_df.partitions, start=1): - if partitions_length >= self.n_rows_to_load: - logger.info( - f"""Required number of partitions to load\n - {self.n_rows_to_load} is {npartitions}""", - ) - break - partitions_length += len(partition) - dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions) - dask_df = dd.from_pandas(dask_df, npartitions=npartitions) + return dask_df - # 5) Set the index - if self.index_column == "None": + def set_df_index(self, dask_df: dd.DataFrame) -> dd.DataFrame: + if self.index_column is None: logger.info( "Index column not specified, setting a globally unique index", ) @@ -122,3 +113,43 @@ def _get_meta_df() -> pd.DataFrame: dask_df = dask_df.set_index(self.index_column, drop=True) return dask_df + + def return_subset_of_df(self, dask_df: dd.DataFrame) -> dd.DataFrame: + if self.n_rows_to_load is not None: + partitions_length = 0 + npartitions = 1 + for npartitions, partition in enumerate(dask_df.partitions, start=1): + if partitions_length >= self.n_rows_to_load: + logger.info( + f"""Required number of partitions to load\n + {self.n_rows_to_load} is {npartitions}""", + ) + break + partitions_length += len(partition) + dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions) + dask_df = dd.from_pandas(dask_df, npartitions=npartitions) + return dask_df + + def load(self) -> dd.DataFrame: + # 1) Load data, read as Dask dataframe + logger.info("Loading dataset from the hub...") + + columns = self.get_columns_to_keep() + + logger.debug(f"Columns to keep: {columns}") + dask_df = dd.read_parquet(f"hf://datasets/{self.dataset_name}", columns=columns) + + # 2) Make sure images are bytes instead of dicts + dask_df = self.convert_images_to_bytes(dask_df) + + # 3) Rename columns + logger.info("Renaming columns...") + dask_df = dask_df.rename(columns=self.column_name_mapping) + + # 4) Optional: only return specific amount of rows + dask_df = self.return_subset_of_df(dask_df) + + # 5) Set the index + dask_df = self.set_df_index(dask_df) + + return dask_df diff --git a/components/load_from_parquet/fondant_component.yaml b/components/load_from_parquet/fondant_component.yaml index 1e97e960e..17d712a95 100644 --- a/components/load_from_parquet/fondant_component.yaml +++ b/components/load_from_parquet/fondant_component.yaml @@ -19,7 +19,6 @@ args: n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int - default: -1 index_column: description: Column to set index to in the load component, if not specified a default globally unique index will be set type: str diff --git a/components/load_from_parquet/src/main.py b/components/load_from_parquet/src/main.py index 8e4bf3913..cfc104740 100644 --- a/components/load_from_parquet/src/main.py +++ b/components/load_from_parquet/src/main.py @@ -19,8 +19,8 @@ def __init__( spec: ComponentSpec, *_, dataset_uri: str, - column_name_mapping: dict, - n_rows_to_load: int, + column_name_mapping: t.Optional[dict], + n_rows_to_load: t.Optional[int], index_column: t.Optional[str], ) -> None: """ @@ -39,49 +39,34 @@ def __init__( self.index_column = index_column self.spec = spec - def load(self) -> dd.DataFrame: - # 1) Load data, read as Dask dataframe - logger.info("Loading dataset from the file...") - + def get_columns_to_keep(self) -> t.List[str]: # Only read required columns columns = [] - if self.column_name_mapping is not None: + + if self.column_name_mapping: invert_column_name_mapping = { v: k for k, v in self.column_name_mapping.items() } - for subset_name, subset in self.spec.produces.items(): - for field_name, field in subset.fields.items(): - subset_field_name = f"{subset_name}_{field_name}" - column_name = invert_column_name_mapping.get( - subset_field_name, - subset_field_name, - ) - columns.append(column_name) + else: + invert_column_name_mapping = {} - logger.debug(f"Columns to keep: {columns}") - dask_df = dd.read_parquet(self.dataset_uri, columns=columns) + for subset_name, subset in self.spec.produces.items(): + for field_name, field in subset.fields.items(): + column_name = f"{subset_name}_{field_name}" + if ( + invert_column_name_mapping + and column_name in invert_column_name_mapping + ): + columns.append(invert_column_name_mapping[column_name]) + else: + columns.append(column_name) - # 2) Rename columns - if self.column_name_mapping: - logger.info("Renaming columns...") - dask_df = dask_df.rename(columns=self.column_name_mapping) + if self.index_column is not None: + columns.append(self.index_column) - # 3) Optional: only return specific amount of rows - if self.n_rows_to_load > 0: - partitions_length = 0 - npartitions = 1 - for npartitions, partition in enumerate(dask_df.partitions, start=1): - if partitions_length >= self.n_rows_to_load: - logger.info( - f"""Required number of partitions to load\n - {self.n_rows_to_load} is {npartitions}""", - ) - break - partitions_length += len(partition) - dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions) - dask_df = dd.from_pandas(dask_df, npartitions=npartitions) + return columns - # 4) Set the index + def set_df_index(self, dask_df: dd.DataFrame) -> dd.DataFrame: if self.index_column is None: logger.info( "Index column not specified, setting a globally unique index", @@ -114,3 +99,40 @@ def _get_meta_df() -> pd.DataFrame: dask_df = dask_df.set_index(self.index_column, drop=True) return dask_df + + def return_subset_of_df(self, dask_df: dd.DataFrame) -> dd.DataFrame: + if self.n_rows_to_load is not None: + partitions_length = 0 + npartitions = 1 + for npartitions, partition in enumerate(dask_df.partitions, start=1): + if partitions_length >= self.n_rows_to_load: + logger.info( + f"""Required number of partitions to load\n + {self.n_rows_to_load} is {npartitions}""", + ) + break + partitions_length += len(partition) + dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions) + dask_df = dd.from_pandas(dask_df, npartitions=npartitions) + return dask_df + + def load(self) -> dd.DataFrame: + # 1) Load data, read as Dask dataframe + logger.info("Loading dataset from the hub...") + + columns = self.get_columns_to_keep() + + logger.debug(f"Columns to keep: {columns}") + dask_df = dd.read_parquet(self.dataset_uri, columns=columns) + + # 2) Rename columns + if self.column_name_mapping: + logger.info("Renaming columns...") + dask_df = dask_df.rename(columns=self.column_name_mapping) + + # 4) Optional: only return specific amount of rows + dask_df = self.return_subset_of_df(dask_df) + + # 5) Set the index + dask_df = self.set_df_index(dask_df) + return dask_df diff --git a/components/write_to_hf_hub/src/main.py b/components/write_to_hf_hub/src/main.py index 32ae97b74..939d81c89 100644 --- a/components/write_to_hf_hub/src/main.py +++ b/components/write_to_hf_hub/src/main.py @@ -41,8 +41,8 @@ def __init__( hf_token: str, username: str, dataset_name: str, - image_column_names: list, - column_name_mapping: dict, + image_column_names: t.Optional[list], + column_name_mapping: t.Optional[dict], ): """ Args: @@ -91,7 +91,7 @@ def write( # Map image column to hf data format feature_encoder = datasets.Image(decode=True) - if self.image_column_names: + if self.image_column_names is not None: for image_column_name in self.image_column_names: dataframe[image_column_name] = dataframe[image_column_name].map( lambda x: convert_bytes_to_image(x, feature_encoder), diff --git a/examples/pipelines/commoncrawl/components/read_warc_paths/fondant_component.yaml b/examples/pipelines/commoncrawl/components/read_warc_paths/fondant_component.yaml index 10c1825d4..def2450a1 100644 --- a/examples/pipelines/commoncrawl/components/read_warc_paths/fondant_component.yaml +++ b/examples/pipelines/commoncrawl/components/read_warc_paths/fondant_component.yaml @@ -15,4 +15,4 @@ args: n_records_to_download: description: Number of records to download type: int - default: -1 \ No newline at end of file + default: None \ No newline at end of file diff --git a/examples/pipelines/commoncrawl/components/read_warc_paths/src/main.py b/examples/pipelines/commoncrawl/components/read_warc_paths/src/main.py index 094c9ce3e..7c642fba3 100644 --- a/examples/pipelines/commoncrawl/components/read_warc_paths/src/main.py +++ b/examples/pipelines/commoncrawl/components/read_warc_paths/src/main.py @@ -18,7 +18,7 @@ def __init__( self, *_, common_crawl_indices: t.List[str], - n_records_to_download: int, + n_records_to_download: t.Optional[int] = None, ): self.index_urls = [ self.build_index_url(index_name) for index_name in common_crawl_indices @@ -38,7 +38,7 @@ def load(self) -> dd.DataFrame: warc_urls.extend([line.decode() for line in extracted.split(b"\n")]) df = pd.DataFrame(warc_urls, columns=["warc_url"]) - if self.n_records_to_download > 0: + if self.n_records_to_download is not None: df = df.head(self.n_records_to_download) return dd.from_pandas(df, npartitions=len(df) // 100) diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/fondant_component.yaml b/examples/pipelines/controlnet-interior-design/components/generate_prompts/fondant_component.yaml index a1c4ede40..a5f4b6640 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/fondant_component.yaml +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/fondant_component.yaml @@ -12,4 +12,4 @@ args: n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int - default: -1 \ No newline at end of file + default: None \ No newline at end of file diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/src/main.py b/examples/pipelines/controlnet-interior-design/components/generate_prompts/src/main.py index fff2ef46c..9d58a287a 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/src/main.py +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/src/main.py @@ -114,7 +114,7 @@ def load(self) -> dd.DataFrame: pandas_df = pd.DataFrame(prompts, columns=["prompts_text"]) - if self.n_rows_to_load > 0: + if self.n_rows_to_load: pandas_df = pandas_df.head(self.n_rows_to_load) df = dd.from_pandas(pandas_df, npartitions=1) diff --git a/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml b/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml index 9698ff2be..47b752b0b 100644 --- a/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml +++ b/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml @@ -46,6 +46,7 @@ args: column_name_mapping: description: Mapping of the consumed hub dataset to fondant column names type: dict + default: {} image_column_names: description: Optional argument, a list containing the original image column names in case the dataset on the hub contains them. Used to format the image from HF hub format to a byte string. @@ -54,7 +55,7 @@ args: n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int - default: -1 + default: None index_column: description: Column to set index to in the load component, if not specified a default globally unique index will be set type: str diff --git a/examples/pipelines/filter-cc-25m/components/load_from_hf_hub/fondant_component.yaml b/examples/pipelines/filter-cc-25m/components/load_from_hf_hub/fondant_component.yaml index 6d865a6a0..fa2da3e62 100644 --- a/examples/pipelines/filter-cc-25m/components/load_from_hf_hub/fondant_component.yaml +++ b/examples/pipelines/filter-cc-25m/components/load_from_hf_hub/fondant_component.yaml @@ -27,6 +27,7 @@ args: column_name_mapping: description: Mapping of the consumed hub dataset to fondant column names type: dict + default: {} image_column_names: description: Optional argument, a list containing the original image column names in case the dataset on the hub contains them. Used to format the image from HF hub format to a byte string. @@ -35,7 +36,6 @@ args: n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int - default: -1 index_column: description: Column to set index to in the load component, if not specified a default globally unique index will be set type: str diff --git a/examples/pipelines/finetune_stable_diffusion/components/load_from_hf_hub/fondant_component.yaml b/examples/pipelines/finetune_stable_diffusion/components/load_from_hf_hub/fondant_component.yaml index 2f180b601..a9df2700d 100644 --- a/examples/pipelines/finetune_stable_diffusion/components/load_from_hf_hub/fondant_component.yaml +++ b/examples/pipelines/finetune_stable_diffusion/components/load_from_hf_hub/fondant_component.yaml @@ -20,6 +20,7 @@ args: column_name_mapping: description: Mapping of the consumed hub dataset to fondant column names type: dict + default: {} image_column_names: description: Optional argument, a list containing the original image column names in case the dataset on the hub contains them. Used to format the image from HF hub format to a byte string. @@ -28,7 +29,7 @@ args: n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int - default: -1 + default: None index_column: description: Column to set index to in the load component, if not specified a default globally unique index will be set type: str diff --git a/examples/pipelines/finetune_stable_diffusion/components/write_to_hf_hub/fondant_component.yaml b/examples/pipelines/finetune_stable_diffusion/components/write_to_hf_hub/fondant_component.yaml index 1c251ab61..d1e6a6660 100644 --- a/examples/pipelines/finetune_stable_diffusion/components/write_to_hf_hub/fondant_component.yaml +++ b/examples/pipelines/finetune_stable_diffusion/components/write_to_hf_hub/fondant_component.yaml @@ -26,8 +26,8 @@ args: image_column_names: description: A list containing the image column names. Used to format to image to HF hub format type: list - default: [] + default: None column_name_mapping: description: Mapping of the consumed fondant column names to the written hub column names type: dict - default: {} \ No newline at end of file + default: None \ No newline at end of file diff --git a/examples/pipelines/finetune_stable_diffusion/pipeline.py b/examples/pipelines/finetune_stable_diffusion/pipeline.py index d3654f8e8..c6e13a867 100644 --- a/examples/pipelines/finetune_stable_diffusion/pipeline.py +++ b/examples/pipelines/finetune_stable_diffusion/pipeline.py @@ -27,7 +27,7 @@ "dataset_name": "logo-wizard/modern-logo-dataset", "column_name_mapping": load_component_column_mapping, "image_column_names": ["image"], - "n_rows_to_load": None, + "n_rows_to_load": 100, }, ) diff --git a/examples/pipelines/starcoder/components/load_from_hub/fondant_component.yaml b/examples/pipelines/starcoder/components/load_from_hub/fondant_component.yaml index 2919227d7..031eea524 100644 --- a/examples/pipelines/starcoder/components/load_from_hub/fondant_component.yaml +++ b/examples/pipelines/starcoder/components/load_from_hub/fondant_component.yaml @@ -33,11 +33,11 @@ args: description: A list containing the original hub image column names. Used to format the image from HF hub format to a byte string type: list - default: [] + default: None n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int - default: -1 + default: None index_column: description: Column to set index to in the load component, if not specified a default globally unique index will be set type: str diff --git a/src/fondant/component_spec.py b/src/fondant/component_spec.py index 3698ae883..4ba646122 100644 --- a/src/fondant/component_spec.py +++ b/src/fondant/component_spec.py @@ -34,7 +34,7 @@ class Argument: name: str description: str type: str - default: t.Any = None + default: t.Optional[t.Any] = None optional: t.Optional[bool] = False @property @@ -47,7 +47,8 @@ def python_type(self) -> t.Any: "dict": json.loads, "list": json.loads, } - return lookup[self.type] + map_fn = lookup[self.type] + return lambda value: map_fn(value) if value != "None" else None # type: ignore @property def kubeflow_type(self) -> str: @@ -230,7 +231,7 @@ def default_arguments(self) -> t.Dict[str, Argument]: description="The number of rows to load per partition. \ Set to override the automatic partitioning", type="int", - default=-1, + optional=True, ), "cache": Argument( name="cache", @@ -286,7 +287,8 @@ def convert_arguments(fondant_component: ComponentSpec): for arg in fondant_component.args.values(): arg_type_dict = {} - if arg.optional or arg.default is not None: + # Enable isOptional attribute in spec if arg is Optional and defaults to None + if arg.optional and arg.default is None: arg_type_dict["isOptional"] = True if arg.default is not None: arg_type_dict["defaultValue"] = arg.default diff --git a/src/fondant/data_io.py b/src/fondant/data_io.py index f5ce0e5eb..bfb4abc7c 100644 --- a/src/fondant/data_io.py +++ b/src/fondant/data_io.py @@ -24,7 +24,7 @@ def __init__( *, manifest: Manifest, component_spec: ComponentSpec, - input_partition_rows: int = -1, + input_partition_rows: t.Optional[int] = None, ): super().__init__(manifest=manifest, component_spec=component_spec) self.input_partition_rows = input_partition_rows @@ -38,7 +38,20 @@ def partition_loaded_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame: """ n_workers: int = os.cpu_count() # type: ignore - if self.input_partition_rows > 1: + if self.input_partition_rows is None: + n_partitions = dataframe.npartitions + if n_partitions < n_workers: # type: ignore + logger.info( + f"The number of partitions of the input dataframe is {n_partitions}. The " + f"available number of workers is {n_workers}.", + ) + dataframe = dataframe.repartition(npartitions=n_workers) + logger.info( + f"Repartitioning the data to {n_workers} partitions before processing" + f" to maximize worker usage", + ) + + elif self.input_partition_rows >= 1: # Only load the index column to trigger a faster compute of the rows total_rows = len(dataframe.index) # +1 to handle any remainder rows @@ -56,23 +69,11 @@ def partition_loaded_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame: f" all available workers {n_partitions} out of {n_workers} are used.", ) - elif self.input_partition_rows == -1: - n_partitions = dataframe.npartitions - if n_partitions < n_workers: # type: ignore - logger.info( - f"The number of partitions of the input dataframe is {n_partitions}. The " - f"available number of workers is {n_workers}.", - ) - dataframe = dataframe.repartition(npartitions=n_workers) - logger.info( - f"Repartitioning the data to {n_workers} partitions before processing" - f" to maximize worker usage", - ) else: msg = ( f"{self.input_partition_rows} is not a valid value for the 'input_partition_rows' " f"parameter. It should be a number larger than 0 to indicate the number of " - f"expected rows per partition, or '-1' to let Fondant optimize the number of " + f"expected rows per partition, or None to let Fondant optimize the number of " f"partitions based on the number of available workers." ) raise ValueError( diff --git a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml index 1e148f5c3..1806460b1 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml @@ -21,7 +21,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1 isOptional: true parameterType: NUMBER_INTEGER metadata: @@ -48,7 +47,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1 isOptional: true parameterType: NUMBER_INTEGER metadata: @@ -75,7 +73,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1 isOptional: true parameterType: NUMBER_INTEGER metadata: diff --git a/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml index bf34acc3f..dd789c373 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml @@ -20,7 +20,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1 isOptional: true parameterType: NUMBER_INTEGER metadata: @@ -47,7 +46,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1 isOptional: true parameterType: NUMBER_INTEGER metadata: @@ -74,7 +72,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1 isOptional: true parameterType: NUMBER_INTEGER metadata: diff --git a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml index c6c9200e6..84f86af77 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml @@ -21,7 +21,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1.0 isOptional: true parameterType: NUMBER_INTEGER metadata: @@ -52,7 +51,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1.0 isOptional: true parameterType: NUMBER_INTEGER metadata: diff --git a/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml index d31e119fc..84f86af77 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml @@ -21,7 +21,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1 isOptional: true parameterType: NUMBER_INTEGER metadata: @@ -52,7 +51,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1.0 isOptional: true parameterType: NUMBER_INTEGER metadata: diff --git a/tests/example_specs/component_specs/kubeflow_component.yaml b/tests/example_specs/component_specs/kubeflow_component.yaml index 4ecedde7d..41662ac03 100644 --- a/tests/example_specs/component_specs/kubeflow_component.yaml +++ b/tests/example_specs/component_specs/kubeflow_component.yaml @@ -6,12 +6,10 @@ components: cache: defaultValue: true description: Set to False to disable caching, True by default. - isOptional: true parameterType: BOOLEAN cluster_type: defaultValue: default description: The cluster type to use for the execution - isOptional: true parameterType: STRING component_spec: description: The component specification as a dictionary @@ -21,7 +19,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1 description: The number of rows to load per partition. Set to override the automatic partitioning isOptional: true