diff --git a/components/load_from_hf_hub/fondant_component.yaml b/components/load_from_hf_hub/fondant_component.yaml index 38ec90980..998a6659b 100644 --- a/components/load_from_hf_hub/fondant_component.yaml +++ b/components/load_from_hf_hub/fondant_component.yaml @@ -15,6 +15,7 @@ args: column_name_mapping: description: Mapping of the consumed hub dataset to fondant column names type: dict + default: {} image_column_names: description: Optional argument, a list containing the original image column names in case the dataset on the hub contains them. Used to format the image from HF hub format to a byte string. @@ -23,7 +24,7 @@ args: n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int - default: -1 + default: None index_column: description: Column to set index to in the load component, if not specified a default globally unique index will be set type: str diff --git a/components/load_from_hf_hub/src/main.py b/components/load_from_hf_hub/src/main.py index 366e22df1..55e562237 100644 --- a/components/load_from_hf_hub/src/main.py +++ b/components/load_from_hf_hub/src/main.py @@ -1,5 +1,6 @@ """This component loads a seed dataset from the hub.""" import logging +import typing as t import dask import dask.dataframe as dd @@ -18,10 +19,10 @@ def __init__( spec: ComponentSpec, *_, dataset_name: str, - column_name_mapping: dict, - image_column_names: list, - n_rows_to_load: int, - index_column: str, + column_name_mapping: t.Optional[dict], + image_column_names: t.Optional[list], + n_rows_to_load: t.Optional[int], + index_column: t.Optional[str], ) -> None: """ Args: @@ -42,55 +43,45 @@ def __init__( self.index_column = index_column self.spec = spec - def load(self) -> dd.DataFrame: - # 1) Load data, read as Dask dataframe - logger.info("Loading dataset from the hub...") - + def get_columns_to_keep(self) -> t.List[str]: # Only read required columns columns = [] - invert_column_name_mapping = {v: k for k, v in self.column_name_mapping.items()} + if self.column_name_mapping: + invert_column_name_mapping = { + v: k for k, v in self.column_name_mapping.items() + } + else: + invert_column_name_mapping = {} + for subset_name, subset in self.spec.produces.items(): for field_name, field in subset.fields.items(): - subset_field_name = f"{subset_name}_{field_name}" - column_name = invert_column_name_mapping.get( - subset_field_name, - subset_field_name, - ) - columns.append(column_name) - - logger.debug(f"Columns to keep: {columns}") - dask_df = dd.read_parquet(f"hf://datasets/{self.dataset_name}", columns=columns) - - # 2) Make sure images are bytes instead of dicts - if self.image_column_names is not None: + column_name = f"{subset_name}_{field_name}" + if ( + invert_column_name_mapping + and column_name in invert_column_name_mapping + ): + columns.append(invert_column_name_mapping[column_name]) + else: + columns.append(column_name) + + if self.index_column is not None: + columns.append(self.index_column) + + return columns + + def convert_images_to_bytes(self, dask_df) -> dd.DataFrame: + if self.image_column_names: for image_column_name in self.image_column_names: dask_df[image_column_name] = dask_df[image_column_name].map( lambda x: x["bytes"], meta=("bytes", bytes), ) - # 3) Rename columns - logger.info("Renaming columns...") - dask_df = dask_df.rename(columns=self.column_name_mapping) - - # 4) Optional: only return specific amount of rows - if self.n_rows_to_load > 0: - partitions_length = 0 - npartitions = 1 - for npartitions, partition in enumerate(dask_df.partitions, start=1): - if partitions_length >= self.n_rows_to_load: - logger.info( - f"""Required number of partitions to load\n - {self.n_rows_to_load} is {npartitions}""", - ) - break - partitions_length += len(partition) - dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions) - dask_df = dd.from_pandas(dask_df, npartitions=npartitions) + return dask_df - # 5) Set the index - if self.index_column == "None": + def set_df_index(self, dask_df: dd.DataFrame) -> dd.DataFrame: + if self.index_column is None: logger.info( "Index column not specified, setting a globally unique index", ) @@ -122,3 +113,43 @@ def _get_meta_df() -> pd.DataFrame: dask_df = dask_df.set_index(self.index_column, drop=True) return dask_df + + def return_subset_of_df(self, dask_df: dd.DataFrame) -> dd.DataFrame: + if self.n_rows_to_load is not None: + partitions_length = 0 + npartitions = 1 + for npartitions, partition in enumerate(dask_df.partitions, start=1): + if partitions_length >= self.n_rows_to_load: + logger.info( + f"""Required number of partitions to load\n + {self.n_rows_to_load} is {npartitions}""", + ) + break + partitions_length += len(partition) + dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions) + dask_df = dd.from_pandas(dask_df, npartitions=npartitions) + return dask_df + + def load(self) -> dd.DataFrame: + # 1) Load data, read as Dask dataframe + logger.info("Loading dataset from the hub...") + + columns = self.get_columns_to_keep() + + logger.debug(f"Columns to keep: {columns}") + dask_df = dd.read_parquet(f"hf://datasets/{self.dataset_name}", columns=columns) + + # 2) Make sure images are bytes instead of dicts + dask_df = self.convert_images_to_bytes(dask_df) + + # 3) Rename columns + logger.info("Renaming columns...") + dask_df = dask_df.rename(columns=self.column_name_mapping) + + # 4) Optional: only return specific amount of rows + dask_df = self.return_subset_of_df(dask_df) + + # 5) Set the index + dask_df = self.set_df_index(dask_df) + + return dask_df diff --git a/components/load_from_parquet/fondant_component.yaml b/components/load_from_parquet/fondant_component.yaml index 1e97e960e..17d712a95 100644 --- a/components/load_from_parquet/fondant_component.yaml +++ b/components/load_from_parquet/fondant_component.yaml @@ -19,7 +19,6 @@ args: n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int - default: -1 index_column: description: Column to set index to in the load component, if not specified a default globally unique index will be set type: str diff --git a/components/load_from_parquet/src/main.py b/components/load_from_parquet/src/main.py index 8e4bf3913..cfc104740 100644 --- a/components/load_from_parquet/src/main.py +++ b/components/load_from_parquet/src/main.py @@ -19,8 +19,8 @@ def __init__( spec: ComponentSpec, *_, dataset_uri: str, - column_name_mapping: dict, - n_rows_to_load: int, + column_name_mapping: t.Optional[dict], + n_rows_to_load: t.Optional[int], index_column: t.Optional[str], ) -> None: """ @@ -39,49 +39,34 @@ def __init__( self.index_column = index_column self.spec = spec - def load(self) -> dd.DataFrame: - # 1) Load data, read as Dask dataframe - logger.info("Loading dataset from the file...") - + def get_columns_to_keep(self) -> t.List[str]: # Only read required columns columns = [] - if self.column_name_mapping is not None: + + if self.column_name_mapping: invert_column_name_mapping = { v: k for k, v in self.column_name_mapping.items() } - for subset_name, subset in self.spec.produces.items(): - for field_name, field in subset.fields.items(): - subset_field_name = f"{subset_name}_{field_name}" - column_name = invert_column_name_mapping.get( - subset_field_name, - subset_field_name, - ) - columns.append(column_name) + else: + invert_column_name_mapping = {} - logger.debug(f"Columns to keep: {columns}") - dask_df = dd.read_parquet(self.dataset_uri, columns=columns) + for subset_name, subset in self.spec.produces.items(): + for field_name, field in subset.fields.items(): + column_name = f"{subset_name}_{field_name}" + if ( + invert_column_name_mapping + and column_name in invert_column_name_mapping + ): + columns.append(invert_column_name_mapping[column_name]) + else: + columns.append(column_name) - # 2) Rename columns - if self.column_name_mapping: - logger.info("Renaming columns...") - dask_df = dask_df.rename(columns=self.column_name_mapping) + if self.index_column is not None: + columns.append(self.index_column) - # 3) Optional: only return specific amount of rows - if self.n_rows_to_load > 0: - partitions_length = 0 - npartitions = 1 - for npartitions, partition in enumerate(dask_df.partitions, start=1): - if partitions_length >= self.n_rows_to_load: - logger.info( - f"""Required number of partitions to load\n - {self.n_rows_to_load} is {npartitions}""", - ) - break - partitions_length += len(partition) - dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions) - dask_df = dd.from_pandas(dask_df, npartitions=npartitions) + return columns - # 4) Set the index + def set_df_index(self, dask_df: dd.DataFrame) -> dd.DataFrame: if self.index_column is None: logger.info( "Index column not specified, setting a globally unique index", @@ -114,3 +99,40 @@ def _get_meta_df() -> pd.DataFrame: dask_df = dask_df.set_index(self.index_column, drop=True) return dask_df + + def return_subset_of_df(self, dask_df: dd.DataFrame) -> dd.DataFrame: + if self.n_rows_to_load is not None: + partitions_length = 0 + npartitions = 1 + for npartitions, partition in enumerate(dask_df.partitions, start=1): + if partitions_length >= self.n_rows_to_load: + logger.info( + f"""Required number of partitions to load\n + {self.n_rows_to_load} is {npartitions}""", + ) + break + partitions_length += len(partition) + dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions) + dask_df = dd.from_pandas(dask_df, npartitions=npartitions) + return dask_df + + def load(self) -> dd.DataFrame: + # 1) Load data, read as Dask dataframe + logger.info("Loading dataset from the hub...") + + columns = self.get_columns_to_keep() + + logger.debug(f"Columns to keep: {columns}") + dask_df = dd.read_parquet(self.dataset_uri, columns=columns) + + # 2) Rename columns + if self.column_name_mapping: + logger.info("Renaming columns...") + dask_df = dask_df.rename(columns=self.column_name_mapping) + + # 4) Optional: only return specific amount of rows + dask_df = self.return_subset_of_df(dask_df) + + # 5) Set the index + dask_df = self.set_df_index(dask_df) + return dask_df diff --git a/components/write_to_hf_hub/src/main.py b/components/write_to_hf_hub/src/main.py index 32ae97b74..939d81c89 100644 --- a/components/write_to_hf_hub/src/main.py +++ b/components/write_to_hf_hub/src/main.py @@ -41,8 +41,8 @@ def __init__( hf_token: str, username: str, dataset_name: str, - image_column_names: list, - column_name_mapping: dict, + image_column_names: t.Optional[list], + column_name_mapping: t.Optional[dict], ): """ Args: @@ -91,7 +91,7 @@ def write( # Map image column to hf data format feature_encoder = datasets.Image(decode=True) - if self.image_column_names: + if self.image_column_names is not None: for image_column_name in self.image_column_names: dataframe[image_column_name] = dataframe[image_column_name].map( lambda x: convert_bytes_to_image(x, feature_encoder), diff --git a/examples/pipelines/commoncrawl/components/read_warc_paths/fondant_component.yaml b/examples/pipelines/commoncrawl/components/read_warc_paths/fondant_component.yaml index 10c1825d4..def2450a1 100644 --- a/examples/pipelines/commoncrawl/components/read_warc_paths/fondant_component.yaml +++ b/examples/pipelines/commoncrawl/components/read_warc_paths/fondant_component.yaml @@ -15,4 +15,4 @@ args: n_records_to_download: description: Number of records to download type: int - default: -1 \ No newline at end of file + default: None \ No newline at end of file diff --git a/examples/pipelines/commoncrawl/components/read_warc_paths/src/main.py b/examples/pipelines/commoncrawl/components/read_warc_paths/src/main.py index 094c9ce3e..7c642fba3 100644 --- a/examples/pipelines/commoncrawl/components/read_warc_paths/src/main.py +++ b/examples/pipelines/commoncrawl/components/read_warc_paths/src/main.py @@ -18,7 +18,7 @@ def __init__( self, *_, common_crawl_indices: t.List[str], - n_records_to_download: int, + n_records_to_download: t.Optional[int] = None, ): self.index_urls = [ self.build_index_url(index_name) for index_name in common_crawl_indices @@ -38,7 +38,7 @@ def load(self) -> dd.DataFrame: warc_urls.extend([line.decode() for line in extracted.split(b"\n")]) df = pd.DataFrame(warc_urls, columns=["warc_url"]) - if self.n_records_to_download > 0: + if self.n_records_to_download is not None: df = df.head(self.n_records_to_download) return dd.from_pandas(df, npartitions=len(df) // 100) diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/fondant_component.yaml b/examples/pipelines/controlnet-interior-design/components/generate_prompts/fondant_component.yaml index a1c4ede40..a5f4b6640 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/fondant_component.yaml +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/fondant_component.yaml @@ -12,4 +12,4 @@ args: n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int - default: -1 \ No newline at end of file + default: None \ No newline at end of file diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/src/main.py b/examples/pipelines/controlnet-interior-design/components/generate_prompts/src/main.py index fff2ef46c..9d58a287a 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/src/main.py +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/src/main.py @@ -114,7 +114,7 @@ def load(self) -> dd.DataFrame: pandas_df = pd.DataFrame(prompts, columns=["prompts_text"]) - if self.n_rows_to_load > 0: + if self.n_rows_to_load: pandas_df = pandas_df.head(self.n_rows_to_load) df = dd.from_pandas(pandas_df, npartitions=1) diff --git a/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml b/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml index 9698ff2be..47b752b0b 100644 --- a/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml +++ b/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml @@ -46,6 +46,7 @@ args: column_name_mapping: description: Mapping of the consumed hub dataset to fondant column names type: dict + default: {} image_column_names: description: Optional argument, a list containing the original image column names in case the dataset on the hub contains them. Used to format the image from HF hub format to a byte string. @@ -54,7 +55,7 @@ args: n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int - default: -1 + default: None index_column: description: Column to set index to in the load component, if not specified a default globally unique index will be set type: str diff --git a/examples/pipelines/filter-cc-25m/components/load_from_hf_hub/fondant_component.yaml b/examples/pipelines/filter-cc-25m/components/load_from_hf_hub/fondant_component.yaml index 6d865a6a0..fa2da3e62 100644 --- a/examples/pipelines/filter-cc-25m/components/load_from_hf_hub/fondant_component.yaml +++ b/examples/pipelines/filter-cc-25m/components/load_from_hf_hub/fondant_component.yaml @@ -27,6 +27,7 @@ args: column_name_mapping: description: Mapping of the consumed hub dataset to fondant column names type: dict + default: {} image_column_names: description: Optional argument, a list containing the original image column names in case the dataset on the hub contains them. Used to format the image from HF hub format to a byte string. @@ -35,7 +36,6 @@ args: n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int - default: -1 index_column: description: Column to set index to in the load component, if not specified a default globally unique index will be set type: str diff --git a/examples/pipelines/finetune_stable_diffusion/components/load_from_hf_hub/fondant_component.yaml b/examples/pipelines/finetune_stable_diffusion/components/load_from_hf_hub/fondant_component.yaml index 2f180b601..a9df2700d 100644 --- a/examples/pipelines/finetune_stable_diffusion/components/load_from_hf_hub/fondant_component.yaml +++ b/examples/pipelines/finetune_stable_diffusion/components/load_from_hf_hub/fondant_component.yaml @@ -20,6 +20,7 @@ args: column_name_mapping: description: Mapping of the consumed hub dataset to fondant column names type: dict + default: {} image_column_names: description: Optional argument, a list containing the original image column names in case the dataset on the hub contains them. Used to format the image from HF hub format to a byte string. @@ -28,7 +29,7 @@ args: n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int - default: -1 + default: None index_column: description: Column to set index to in the load component, if not specified a default globally unique index will be set type: str diff --git a/examples/pipelines/finetune_stable_diffusion/components/write_to_hf_hub/fondant_component.yaml b/examples/pipelines/finetune_stable_diffusion/components/write_to_hf_hub/fondant_component.yaml index 1c251ab61..d1e6a6660 100644 --- a/examples/pipelines/finetune_stable_diffusion/components/write_to_hf_hub/fondant_component.yaml +++ b/examples/pipelines/finetune_stable_diffusion/components/write_to_hf_hub/fondant_component.yaml @@ -26,8 +26,8 @@ args: image_column_names: description: A list containing the image column names. Used to format to image to HF hub format type: list - default: [] + default: None column_name_mapping: description: Mapping of the consumed fondant column names to the written hub column names type: dict - default: {} \ No newline at end of file + default: None \ No newline at end of file diff --git a/examples/pipelines/finetune_stable_diffusion/pipeline.py b/examples/pipelines/finetune_stable_diffusion/pipeline.py index d3654f8e8..c6e13a867 100644 --- a/examples/pipelines/finetune_stable_diffusion/pipeline.py +++ b/examples/pipelines/finetune_stable_diffusion/pipeline.py @@ -27,7 +27,7 @@ "dataset_name": "logo-wizard/modern-logo-dataset", "column_name_mapping": load_component_column_mapping, "image_column_names": ["image"], - "n_rows_to_load": None, + "n_rows_to_load": 100, }, ) diff --git a/examples/pipelines/starcoder/components/load_from_hub/fondant_component.yaml b/examples/pipelines/starcoder/components/load_from_hub/fondant_component.yaml index 2919227d7..031eea524 100644 --- a/examples/pipelines/starcoder/components/load_from_hub/fondant_component.yaml +++ b/examples/pipelines/starcoder/components/load_from_hub/fondant_component.yaml @@ -33,11 +33,11 @@ args: description: A list containing the original hub image column names. Used to format the image from HF hub format to a byte string type: list - default: [] + default: None n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int - default: -1 + default: None index_column: description: Column to set index to in the load component, if not specified a default globally unique index will be set type: str diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index 79c3ddf17..301a60a29 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -14,6 +14,7 @@ valid_accelerator_types, valid_vertex_accelerator_types, ) +from fondant.schema import KubeflowCommandArguments # noqa: TCH001 logger = logging.getLogger(__name__) @@ -293,6 +294,30 @@ def compile( pipeline.validate(run_id=run_id) logger.info(f"Compiling {pipeline.name} to {output_path}") + def set_component_exec_args( + *, + component_op, + component_args: t.List[str], + input_manifest_path: bool, + ): + """Dump Fondant specification arguments to kfp command executor arguments.""" + dumped_args: KubeflowCommandArguments = [] + + component_args.extend(["output_manifest_path", "metadata"]) + if input_manifest_path: + component_args.append("input_manifest_path") + + for arg in component_args: + arg_name = arg.strip().replace(" ", "_") + arg_name_cmd = f"--{arg_name}" + + dumped_args.append(arg_name_cmd) + dumped_args.append("{{$.inputs.parameters['" + f"{arg_name}" + "']}}") + + component_op.component_spec.implementation.container.args = dumped_args + + return component_op + @self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description) def kfp_pipeline(): previous_component_task = None @@ -335,6 +360,11 @@ def kfp_pipeline(): f"{pipeline.base_path}/{pipeline.name}/" f"{run_id}/{dependency}/manifest.json" ) + kubeflow_component_op = set_component_exec_args( + component_op=kubeflow_component_op, + component_args=list(component_args.keys()), + input_manifest_path=True, + ) component_task = kubeflow_component_op( input_manifest_path=input_manifest_path, output_manifest_path=output_manifest_path, @@ -344,13 +374,18 @@ def kfp_pipeline(): component_task.after(previous_component_task) else: + kubeflow_component_op = set_component_exec_args( + component_op=kubeflow_component_op, + component_args=list(component_args.keys()), + input_manifest_path=False, + ) component_task = kubeflow_component_op( metadata=metadata.to_json(), output_manifest_path=output_manifest_path, **component_args, ) - # Set optional arguments + # Set optional configuration component_task = self._set_configuration( component_task, component_op, @@ -421,7 +456,8 @@ def resolve_imports(self): msg, ) - def _set_configuration(self, task, fondant_component_operation): + @staticmethod + def _set_configuration(task, fondant_component_operation): # Unpack optional specifications number_of_accelerators = fondant_component_operation.number_of_accelerators accelerator_name = fondant_component_operation.accelerator_name diff --git a/src/fondant/component_spec.py b/src/fondant/component_spec.py index 3698ae883..ed54ac03f 100644 --- a/src/fondant/component_spec.py +++ b/src/fondant/component_spec.py @@ -15,7 +15,7 @@ from referencing.jsonschema import DRAFT4 from fondant.exceptions import InvalidComponentSpec -from fondant.schema import Field, KubeflowCommandArguments, Type +from fondant.schema import Field, Type @dataclass @@ -37,6 +37,9 @@ class Argument: default: t.Any = None optional: t.Optional[bool] = False + def __post_init__(self): + self.default = None if self.default == "None" else self.default + @property def python_type(self) -> t.Any: lookup = { @@ -47,7 +50,8 @@ def python_type(self) -> t.Any: "dict": json.loads, "list": json.loads, } - return lookup[self.type] + map_fn = lookup[self.type] + return lambda value: map_fn(value) if value != "None" else None # type: ignore @property def kubeflow_type(self) -> str: @@ -230,7 +234,7 @@ def default_arguments(self) -> t.Dict[str, Argument]: description="The number of rows to load per partition. \ Set to override the automatic partitioning", type="int", - default=-1, + optional=True, ), "cache": Argument( name="cache", @@ -286,7 +290,8 @@ def convert_arguments(fondant_component: ComponentSpec): for arg in fondant_component.args.values(): arg_type_dict = {} - if arg.optional or arg.default is not None: + # Enable isOptional attribute in spec if arg is Optional and defaults to None + if arg.optional and arg.default is None: arg_type_dict["isOptional"] = True if arg.default is not None: arg_type_dict["defaultValue"] = arg.default @@ -336,7 +341,6 @@ def from_fondant_component_spec(cls, fondant_component: ComponentSpec): "exec-" + cleaned_component_name: { "container": { - "args": cls._dump_args(fondant_component.args.values()), "command": ["fondant", "execute", "main"], "image": fondant_component.image, }, @@ -367,19 +371,6 @@ def from_fondant_component_spec(cls, fondant_component: ComponentSpec): } return cls(specification) - @staticmethod - def _dump_args(args: t.Iterable[Argument]) -> KubeflowCommandArguments: - """Dump Fondant specification arguments to kfp command arguments.""" - dumped_args: KubeflowCommandArguments = [] - for arg in args: - arg_name = arg.name.strip().replace(" ", "_") - arg_name_cmd = f"--{arg_name}" - - dumped_args.append(arg_name_cmd) - dumped_args.append("{{$.inputs.parameters['" + f"{arg_name}" + "']}}") - - return dumped_args - def to_file(self, path: t.Union[str, Path]) -> None: """Dump the component specification to the file specified by the provided path.""" with open(path, "w", encoding="utf-8") as file_: diff --git a/src/fondant/data_io.py b/src/fondant/data_io.py index f5ce0e5eb..bfb4abc7c 100644 --- a/src/fondant/data_io.py +++ b/src/fondant/data_io.py @@ -24,7 +24,7 @@ def __init__( *, manifest: Manifest, component_spec: ComponentSpec, - input_partition_rows: int = -1, + input_partition_rows: t.Optional[int] = None, ): super().__init__(manifest=manifest, component_spec=component_spec) self.input_partition_rows = input_partition_rows @@ -38,7 +38,20 @@ def partition_loaded_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame: """ n_workers: int = os.cpu_count() # type: ignore - if self.input_partition_rows > 1: + if self.input_partition_rows is None: + n_partitions = dataframe.npartitions + if n_partitions < n_workers: # type: ignore + logger.info( + f"The number of partitions of the input dataframe is {n_partitions}. The " + f"available number of workers is {n_workers}.", + ) + dataframe = dataframe.repartition(npartitions=n_workers) + logger.info( + f"Repartitioning the data to {n_workers} partitions before processing" + f" to maximize worker usage", + ) + + elif self.input_partition_rows >= 1: # Only load the index column to trigger a faster compute of the rows total_rows = len(dataframe.index) # +1 to handle any remainder rows @@ -56,23 +69,11 @@ def partition_loaded_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame: f" all available workers {n_partitions} out of {n_workers} are used.", ) - elif self.input_partition_rows == -1: - n_partitions = dataframe.npartitions - if n_partitions < n_workers: # type: ignore - logger.info( - f"The number of partitions of the input dataframe is {n_partitions}. The " - f"available number of workers is {n_workers}.", - ) - dataframe = dataframe.repartition(npartitions=n_workers) - logger.info( - f"Repartitioning the data to {n_workers} partitions before processing" - f" to maximize worker usage", - ) else: msg = ( f"{self.input_partition_rows} is not a valid value for the 'input_partition_rows' " f"parameter. It should be a number larger than 0 to indicate the number of " - f"expected rows per partition, or '-1' to let Fondant optimize the number of " + f"expected rows per partition, or None to let Fondant optimize the number of " f"partitions based on the number of available workers." ) raise ValueError( diff --git a/src/fondant/executor.py b/src/fondant/executor.py index 16941663b..41141c1d1 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -190,7 +190,7 @@ def _add_and_parse_args(cls, spec: ComponentSpec): elif arg.default is not None and arg.optional is False: input_required = False default = arg.default - elif arg.default is not None and arg.optional is True: + elif arg.default is None and arg.optional is True: input_required = False default = None else: diff --git a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml index 1e148f5c3..b39df7c9e 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml @@ -21,7 +21,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1 isOptional: true parameterType: NUMBER_INTEGER metadata: @@ -48,7 +47,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1 isOptional: true parameterType: NUMBER_INTEGER metadata: @@ -75,7 +73,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1 isOptional: true parameterType: NUMBER_INTEGER metadata: @@ -89,22 +86,20 @@ deploymentSpec: exec-first-component: container: args: - - "--input_manifest_path" - - "{{$.inputs.parameters['input_manifest_path']}}" - - "--component_spec" - - "{{$.inputs.parameters['component_spec']}}" - - "--input_partition_rows" - - "{{$.inputs.parameters['input_partition_rows']}}" - - "--cache" - - "{{$.inputs.parameters['cache']}}" - - "--cluster_type" - - "{{$.inputs.parameters['cluster_type']}}" - - "--metadata" - - "{{$.inputs.parameters['metadata']}}" - - "--output_manifest_path" - - "{{$.inputs.parameters['output_manifest_path']}}" - - "--storage_args" - - "{{$.inputs.parameters['storage_args']}}" + - --storage_args + - '{{$.inputs.parameters[''storage_args'']}}' + - --input_partition_rows + - '{{$.inputs.parameters[''input_partition_rows'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --cluster_type + - '{{$.inputs.parameters[''cluster_type'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' command: - fondant - execute @@ -116,22 +111,22 @@ deploymentSpec: exec-second-component: container: args: - - "--input_manifest_path" - - "{{$.inputs.parameters['input_manifest_path']}}" - - "--component_spec" - - "{{$.inputs.parameters['component_spec']}}" - - "--input_partition_rows" - - "{{$.inputs.parameters['input_partition_rows']}}" - - "--cache" - - "{{$.inputs.parameters['cache']}}" - - "--cluster_type" - - "{{$.inputs.parameters['cluster_type']}}" - - "--metadata" - - "{{$.inputs.parameters['metadata']}}" - - "--output_manifest_path" - - "{{$.inputs.parameters['output_manifest_path']}}" - - "--storage_args" - - "{{$.inputs.parameters['storage_args']}}" + - --storage_args + - '{{$.inputs.parameters[''storage_args'']}}' + - --input_partition_rows + - '{{$.inputs.parameters[''input_partition_rows'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --cluster_type + - '{{$.inputs.parameters[''cluster_type'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' command: - fondant - execute @@ -140,22 +135,20 @@ deploymentSpec: exec-third-component: container: args: - - "--input_manifest_path" - - "{{$.inputs.parameters['input_manifest_path']}}" - - "--component_spec" - - "{{$.inputs.parameters['component_spec']}}" - - "--input_partition_rows" - - "{{$.inputs.parameters['input_partition_rows']}}" - - "--cache" - - "{{$.inputs.parameters['cache']}}" - - "--cluster_type" - - "{{$.inputs.parameters['cluster_type']}}" - - "--metadata" - - "{{$.inputs.parameters['metadata']}}" - - "--output_manifest_path" - - "{{$.inputs.parameters['output_manifest_path']}}" - - "--storage_args" - - "{{$.inputs.parameters['storage_args']}}" + - --storage_args + - '{{$.inputs.parameters[''storage_args'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --cluster_type + - '{{$.inputs.parameters[''cluster_type'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' command: - fondant - execute @@ -200,7 +193,7 @@ root: type: binary input_partition_rows: runtimeValue: - constant: 10 + constant: 10.0 metadata: runtimeValue: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", @@ -208,7 +201,7 @@ root: "cache_key": "1"}' output_manifest_path: runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json" + constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json storage_args: runtimeValue: constant: a dummy string arg @@ -252,10 +245,10 @@ root: type: array input_manifest_path: runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json" + constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json input_partition_rows: runtimeValue: - constant: 10 + constant: 10.0 metadata: runtimeValue: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", @@ -263,7 +256,7 @@ root: "cache_key": "2"}' output_manifest_path: runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json" + constant: /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json storage_args: runtimeValue: constant: a dummy string arg @@ -316,7 +309,7 @@ root: type: binary input_manifest_path: runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json" + constant: /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json metadata: runtimeValue: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", @@ -324,7 +317,7 @@ root: "cache_key": "3"}' output_manifest_path: runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/third_component/manifest.json" + constant: /foo/bar/testpipeline/testpipeline-20230101000000/third_component/manifest.json storage_args: runtimeValue: constant: a dummy string arg diff --git a/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml index bf34acc3f..a524d8e35 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml @@ -20,7 +20,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1 isOptional: true parameterType: NUMBER_INTEGER metadata: @@ -47,7 +46,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1 isOptional: true parameterType: NUMBER_INTEGER metadata: @@ -74,7 +72,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1 isOptional: true parameterType: NUMBER_INTEGER metadata: @@ -88,22 +85,20 @@ deploymentSpec: exec-first-component: container: args: - - "--input_manifest_path" - - "{{$.inputs.parameters['input_manifest_path']}}" - - "--component_spec" - - "{{$.inputs.parameters['component_spec']}}" - - "--input_partition_rows" - - "{{$.inputs.parameters['input_partition_rows']}}" - - "--cache" - - "{{$.inputs.parameters['cache']}}" - - "--cluster_type" - - "{{$.inputs.parameters['cluster_type']}}" - - "--metadata" - - "{{$.inputs.parameters['metadata']}}" - - "--output_manifest_path" - - "{{$.inputs.parameters['output_manifest_path']}}" - - "--storage_args" - - "{{$.inputs.parameters['storage_args']}}" + - --storage_args + - '{{$.inputs.parameters[''storage_args'']}}' + - --input_partition_rows + - '{{$.inputs.parameters[''input_partition_rows'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --cluster_type + - '{{$.inputs.parameters[''cluster_type'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' command: - fondant - execute @@ -112,22 +107,22 @@ deploymentSpec: exec-second-component: container: args: - - "--input_manifest_path" - - "{{$.inputs.parameters['input_manifest_path']}}" - - "--component_spec" - - "{{$.inputs.parameters['component_spec']}}" - - "--input_partition_rows" - - "{{$.inputs.parameters['input_partition_rows']}}" - - "--cache" - - "{{$.inputs.parameters['cache']}}" - - "--cluster_type" - - "{{$.inputs.parameters['cluster_type']}}" - - "--metadata" - - "{{$.inputs.parameters['metadata']}}" - - "--output_manifest_path" - - "{{$.inputs.parameters['output_manifest_path']}}" - - "--storage_args" - - "{{$.inputs.parameters['storage_args']}}" + - --storage_args + - '{{$.inputs.parameters[''storage_args'']}}' + - --input_partition_rows + - '{{$.inputs.parameters[''input_partition_rows'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --cluster_type + - '{{$.inputs.parameters[''cluster_type'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' command: - fondant - execute @@ -136,22 +131,20 @@ deploymentSpec: exec-third-component: container: args: - - "--input_manifest_path" - - "{{$.inputs.parameters['input_manifest_path']}}" - - "--component_spec" - - "{{$.inputs.parameters['component_spec']}}" - - "--input_partition_rows" - - "{{$.inputs.parameters['input_partition_rows']}}" - - "--cache" - - "{{$.inputs.parameters['cache']}}" - - "--cluster_type" - - "{{$.inputs.parameters['cluster_type']}}" - - "--metadata" - - "{{$.inputs.parameters['metadata']}}" - - "--output_manifest_path" - - "{{$.inputs.parameters['output_manifest_path']}}" - - "--storage_args" - - "{{$.inputs.parameters['storage_args']}}" + - --storage_args + - '{{$.inputs.parameters[''storage_args'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --cluster_type + - '{{$.inputs.parameters[''cluster_type'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' command: - fondant - execute @@ -204,7 +197,7 @@ root: "cache_key": "1"}' output_manifest_path: runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json" + constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json storage_args: runtimeValue: constant: a dummy string arg @@ -248,7 +241,7 @@ root: type: array input_manifest_path: runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json" + constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json input_partition_rows: runtimeValue: constant: 10.0 @@ -259,7 +252,7 @@ root: "cache_key": "2"}' output_manifest_path: runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json" + constant: /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json storage_args: runtimeValue: constant: a dummy string arg @@ -312,7 +305,7 @@ root: type: binary input_manifest_path: runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json" + constant: /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json metadata: runtimeValue: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", @@ -320,7 +313,7 @@ root: "cache_key": "3"}' output_manifest_path: runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/third_component/manifest.json" + constant: /foo/bar/testpipeline/testpipeline-20230101000000/third_component/manifest.json storage_args: runtimeValue: constant: a dummy string arg diff --git a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml index c6c9200e6..99f57051a 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml @@ -21,7 +21,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1.0 isOptional: true parameterType: NUMBER_INTEGER metadata: @@ -52,7 +51,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1.0 isOptional: true parameterType: NUMBER_INTEGER metadata: @@ -68,22 +66,18 @@ deploymentSpec: exec-first-component: container: args: - - "--input_manifest_path" - - "{{$.inputs.parameters['input_manifest_path']}}" - - "--component_spec" - - "{{$.inputs.parameters['component_spec']}}" - - "--input_partition_rows" - - "{{$.inputs.parameters['input_partition_rows']}}" - - "--cache" - - "{{$.inputs.parameters['cache']}}" - - "--cluster_type" - - "{{$.inputs.parameters['cluster_type']}}" - - "--metadata" - - "{{$.inputs.parameters['metadata']}}" - - "--output_manifest_path" - - "{{$.inputs.parameters['output_manifest_path']}}" - - "--storage_args" - - "{{$.inputs.parameters['storage_args']}}" + - --storage_args + - '{{$.inputs.parameters[''storage_args'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --cluster_type + - '{{$.inputs.parameters[''cluster_type'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' command: - fondant - execute @@ -92,24 +86,22 @@ deploymentSpec: exec-image-cropping: container: args: - - "--input_manifest_path" - - "{{$.inputs.parameters['input_manifest_path']}}" - - "--component_spec" - - "{{$.inputs.parameters['component_spec']}}" - - "--input_partition_rows" - - "{{$.inputs.parameters['input_partition_rows']}}" - - "--cache" - - "{{$.inputs.parameters['cache']}}" - - "--cluster_type" - - "{{$.inputs.parameters['cluster_type']}}" - - "--metadata" - - "{{$.inputs.parameters['metadata']}}" - - "--output_manifest_path" - - "{{$.inputs.parameters['output_manifest_path']}}" - - "--cropping_threshold" - - "{{$.inputs.parameters['cropping_threshold']}}" - - "--padding" - - "{{$.inputs.parameters['padding']}}" + - --cropping_threshold + - '{{$.inputs.parameters[''cropping_threshold'']}}' + - --padding + - '{{$.inputs.parameters[''padding'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --cluster_type + - '{{$.inputs.parameters[''cluster_type'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' command: - fondant - execute @@ -159,7 +151,7 @@ root: "cache_key": "1"}' output_manifest_path: runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json" + constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json storage_args: runtimeValue: constant: a dummy string arg @@ -199,21 +191,22 @@ root: fields: data: type: binary - description: "This component crops out image borders. This is typically - useful when working with graphical \nimages that have single-color - borders (e.g. logos, icons, etc.).\n\nThe component takes an image - and calculates which color is most present in the border. It then - \ncrops the image in order to minimize this single-color border. - The `padding` argument will add \nextra border to the image before - cropping it, in order to avoid cutting off parts of the image.\nThe - resulting crop will always be square. If a crop is not possible, - the component will return \nthe original image.\n\n#### Examples\nExamples - of image cropping by removing the single-color border. Left side - is original image, \nright side is border-cropped image.\n\n![Example - of image cropping by removing the single-color border. Left side - is original, right side is cropped image](../../docs/art/components/image_cropping/component_border_crop_1.png)\n![Example - of image cropping by removing the single-color border. Left side - is original, right side is cropped image](../../docs/art/components/image_cropping/component_border_crop_0.png)\n" + description: "This component crops out image borders. This is typically\ + \ useful when working with graphical \nimages that have single-color\ + \ borders (e.g. logos, icons, etc.).\n\nThe component takes an\ + \ image and calculates which color is most present in the border.\ + \ It then \ncrops the image in order to minimize this single-color\ + \ border. The `padding` argument will add \nextra border to the\ + \ image before cropping it, in order to avoid cutting off parts\ + \ of the image.\nThe resulting crop will always be square. If\ + \ a crop is not possible, the component will return \nthe original\ + \ image.\n\n#### Examples\nExamples of image cropping by removing\ + \ the single-color border. Left side is original image, \nright\ + \ side is border-cropped image.\n\n![Example of image cropping\ + \ by removing the single-color border. Left side is original,\ + \ right side is cropped image](../../docs/art/components/image_cropping/component_border_crop_1.png)\n\ + ![Example of image cropping by removing the single-color border.\ + \ Left side is original, right side is cropped image](../../docs/art/components/image_cropping/component_border_crop_0.png)\n" image: fndnt/image_cropping:dev name: Image cropping produces: @@ -230,7 +223,7 @@ root: constant: 0.0 input_manifest_path: runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json" + constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json metadata: runtimeValue: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", @@ -238,7 +231,7 @@ root: "cache_key": "2"}' output_manifest_path: runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/image_cropping/manifest.json" + constant: /foo/bar/testpipeline/testpipeline-20230101000000/image_cropping/manifest.json padding: runtimeValue: constant: 0.0 diff --git a/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml index d31e119fc..99f57051a 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml @@ -21,7 +21,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1 isOptional: true parameterType: NUMBER_INTEGER metadata: @@ -52,7 +51,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1.0 isOptional: true parameterType: NUMBER_INTEGER metadata: @@ -68,22 +66,18 @@ deploymentSpec: exec-first-component: container: args: - - "--input_manifest_path" - - "{{$.inputs.parameters['input_manifest_path']}}" - - "--component_spec" - - "{{$.inputs.parameters['component_spec']}}" - - "--input_partition_rows" - - "{{$.inputs.parameters['input_partition_rows']}}" - - "--cache" - - "{{$.inputs.parameters['cache']}}" - - "--cluster_type" - - "{{$.inputs.parameters['cluster_type']}}" - - "--metadata" - - "{{$.inputs.parameters['metadata']}}" - - "--output_manifest_path" - - "{{$.inputs.parameters['output_manifest_path']}}" - - "--storage_args" - - "{{$.inputs.parameters['storage_args']}}" + - --storage_args + - '{{$.inputs.parameters[''storage_args'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --cluster_type + - '{{$.inputs.parameters[''cluster_type'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' command: - fondant - execute @@ -92,24 +86,22 @@ deploymentSpec: exec-image-cropping: container: args: - - "--input_manifest_path" - - "{{$.inputs.parameters['input_manifest_path']}}" - - "--component_spec" - - "{{$.inputs.parameters['component_spec']}}" - - "--input_partition_rows" - - "{{$.inputs.parameters['input_partition_rows']}}" - - "--cache" - - "{{$.inputs.parameters['cache']}}" - - "--cluster_type" - - "{{$.inputs.parameters['cluster_type']}}" - - "--metadata" - - "{{$.inputs.parameters['metadata']}}" - - "--output_manifest_path" - - "{{$.inputs.parameters['output_manifest_path']}}" - - "--cropping_threshold" - - "{{$.inputs.parameters['cropping_threshold']}}" - - "--padding" - - "{{$.inputs.parameters['padding']}}" + - --cropping_threshold + - '{{$.inputs.parameters[''cropping_threshold'']}}' + - --padding + - '{{$.inputs.parameters[''padding'']}}' + - --cache + - '{{$.inputs.parameters[''cache'']}}' + - --cluster_type + - '{{$.inputs.parameters[''cluster_type'']}}' + - --component_spec + - '{{$.inputs.parameters[''component_spec'']}}' + - --output_manifest_path + - '{{$.inputs.parameters[''output_manifest_path'']}}' + - --metadata + - '{{$.inputs.parameters[''metadata'']}}' + - --input_manifest_path + - '{{$.inputs.parameters[''input_manifest_path'']}}' command: - fondant - execute @@ -159,7 +151,7 @@ root: "cache_key": "1"}' output_manifest_path: runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json" + constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json storage_args: runtimeValue: constant: a dummy string arg @@ -199,21 +191,22 @@ root: fields: data: type: binary - description: "This component crops out image borders. This is typically - useful when working with graphical \nimages that have single-color - borders (e.g. logos, icons, etc.).\n\nThe component takes an image - and calculates which color is most present in the border. It then - \ncrops the image in order to minimize this single-color border. - The `padding` argument will add \nextra border to the image before - cropping it, in order to avoid cutting off parts of the image.\nThe - resulting crop will always be square. If a crop is not possible, - the component will return \nthe original image.\n\n#### Examples\nExamples - of image cropping by removing the single-color border. Left side - is original image, \nright side is border-cropped image.\n\n![Example - of image cropping by removing the single-color border. Left side - is original, right side is cropped image](../../docs/art/components/image_cropping/component_border_crop_1.png)\n![Example - of image cropping by removing the single-color border. Left side - is original, right side is cropped image](../../docs/art/components/image_cropping/component_border_crop_0.png)\n" + description: "This component crops out image borders. This is typically\ + \ useful when working with graphical \nimages that have single-color\ + \ borders (e.g. logos, icons, etc.).\n\nThe component takes an\ + \ image and calculates which color is most present in the border.\ + \ It then \ncrops the image in order to minimize this single-color\ + \ border. The `padding` argument will add \nextra border to the\ + \ image before cropping it, in order to avoid cutting off parts\ + \ of the image.\nThe resulting crop will always be square. If\ + \ a crop is not possible, the component will return \nthe original\ + \ image.\n\n#### Examples\nExamples of image cropping by removing\ + \ the single-color border. Left side is original image, \nright\ + \ side is border-cropped image.\n\n![Example of image cropping\ + \ by removing the single-color border. Left side is original,\ + \ right side is cropped image](../../docs/art/components/image_cropping/component_border_crop_1.png)\n\ + ![Example of image cropping by removing the single-color border.\ + \ Left side is original, right side is cropped image](../../docs/art/components/image_cropping/component_border_crop_0.png)\n" image: fndnt/image_cropping:dev name: Image cropping produces: @@ -230,7 +223,7 @@ root: constant: 0.0 input_manifest_path: runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json" + constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json metadata: runtimeValue: constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", @@ -238,7 +231,7 @@ root: "cache_key": "2"}' output_manifest_path: runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/image_cropping/manifest.json" + constant: /foo/bar/testpipeline/testpipeline-20230101000000/image_cropping/manifest.json padding: runtimeValue: constant: 0.0 diff --git a/tests/example_specs/component_specs/kubeflow_component.yaml b/tests/example_specs/component_specs/kubeflow_component.yaml index 4ecedde7d..d8864f0f4 100644 --- a/tests/example_specs/component_specs/kubeflow_component.yaml +++ b/tests/example_specs/component_specs/kubeflow_component.yaml @@ -6,12 +6,10 @@ components: cache: defaultValue: true description: Set to False to disable caching, True by default. - isOptional: true parameterType: BOOLEAN cluster_type: defaultValue: default description: The cluster type to use for the execution - isOptional: true parameterType: STRING component_spec: description: The component specification as a dictionary @@ -21,7 +19,6 @@ components: isOptional: true parameterType: STRING input_partition_rows: - defaultValue: -1 description: The number of rows to load per partition. Set to override the automatic partitioning isOptional: true @@ -39,23 +36,6 @@ deploymentSpec: executors: exec-example-component: container: - args: - - --input_manifest_path - - '{{$.inputs.parameters[''input_manifest_path'']}}' - - --component_spec - - '{{$.inputs.parameters[''component_spec'']}}' - - --input_partition_rows - - '{{$.inputs.parameters[''input_partition_rows'']}}' - - --cache - - '{{$.inputs.parameters[''cache'']}}' - - --cluster_type - - '{{$.inputs.parameters[''cluster_type'']}}' - - --metadata - - '{{$.inputs.parameters[''metadata'']}}' - - --output_manifest_path - - '{{$.inputs.parameters[''output_manifest_path'']}}' - - --storage_args - - '{{$.inputs.parameters[''storage_args'']}}' command: - fondant - execute