From 24231ee68ff3a30ccd0e2ecebd62d2572df94f9c Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Thu, 28 Sep 2023 13:16:36 +0200 Subject: [PATCH 1/4] add load from parquet component --- components/load_from_parquet/Dockerfile | 23 +++ .../load_from_parquet/fondant_component.yaml | 26 +++ components/load_from_parquet/src/main.py | 96 ++++++++++ examples/pipelines/datacomp/pipeline.py | 164 ++++++++++-------- 4 files changed, 233 insertions(+), 76 deletions(-) create mode 100644 components/load_from_parquet/Dockerfile create mode 100644 components/load_from_parquet/fondant_component.yaml create mode 100644 components/load_from_parquet/src/main.py diff --git a/components/load_from_parquet/Dockerfile b/components/load_from_parquet/Dockerfile new file mode 100644 index 000000000..4642b9b8b --- /dev/null +++ b/components/load_from_parquet/Dockerfile @@ -0,0 +1,23 @@ +FROM --platform=linux/amd64 python:3.8-slim + +# System dependencies +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install git -y + +# Install requirements +COPY requirements.txt / +RUN pip3 install --no-cache-dir -r requirements.txt + +# Install Fondant +# This is split from other requirements to leverage caching +ARG FONDANT_VERSION=main +RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} + +# Set the working directory to the component folder +WORKDIR /component/src + +# Copy over src-files +COPY src/ . + +ENTRYPOINT ["fondant", "execute", "main"] \ No newline at end of file diff --git a/components/load_from_parquet/fondant_component.yaml b/components/load_from_parquet/fondant_component.yaml new file mode 100644 index 000000000..2ade941db --- /dev/null +++ b/components/load_from_parquet/fondant_component.yaml @@ -0,0 +1,26 @@ +name: Load from hub +description: Component that loads a dataset from the hub +image: ghcr.io/ml6team/load_from_parquet:132347e040169ac971102f865425df80ca2d6517 + +produces: + dummy_variable: #TODO: fill in here + fields: + data: + type: binary + +args: + dataset_uri: + description: The remote path to the parquet file/folder containing the dataset + type: str + column_name_mapping: + description: Mapping of the consumed dataset + type: dict + default: None + n_rows_to_load: + description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale + type: int + default: None + index_column: + description: Column to set index to in the load component, if not specified a default globally unique index will be set + type: str + default: None \ No newline at end of file diff --git a/components/load_from_parquet/src/main.py b/components/load_from_parquet/src/main.py new file mode 100644 index 000000000..f548db1cc --- /dev/null +++ b/components/load_from_parquet/src/main.py @@ -0,0 +1,96 @@ +"""This component loads a seed dataset from the hub.""" +import logging +import typing as t + +import dask +import dask.dataframe as dd +import pandas as pd +from fondant.component import DaskLoadComponent +from fondant.component_spec import ComponentSpec + +logger = logging.getLogger(__name__) + +dask.config.set({"dataframe.convert-string": False}) + + +class LoadFromHubComponent(DaskLoadComponent): + + def __init__(self, + spec: ComponentSpec, + *_, + dataset_uri: str, + column_name_mapping: t.Optional[dict], + n_rows_to_load: t.Optional[int], + index_column: t.Optional[str], + ) -> None: + """ + Args: + spec: the component spec + dataset_uri: The remote path to the parquet file/folder containing the dataset + column_name_mapping: Mapping of the consumed dataset to fondant column names + n_rows_to_load: optional argument that defines the number of rows to load. Useful for + testing pipeline runs on a small scale. + index_column: Column to set index to in the load component, if not specified a default + globally unique index will be set. + """ + self.dataset_uri = dataset_uri + self.column_name_mapping = column_name_mapping + self.n_rows_to_load = n_rows_to_load + self.index_column = index_column + self.spec = spec + + def load(self) -> dd.DataFrame: + # 1) Load data, read as Dask dataframe + logger.info("Loading dataset from the hub...") + dask_df = dd.read_parquet(self.dataset_uri) + + # 2) Rename columns + logger.info("Renaming columns...") + dask_df = dask_df.rename(columns=self.column_name_mapping) + + # 3) Optional: only return specific amount of rows + if self.n_rows_to_load is not None: + partitions_length = 0 + npartitions = 1 + for npartitions, partition in enumerate(dask_df.partitions, start=1): + if partitions_length >= self.n_rows_to_load: + logger.info(f"""Required number of partitions to load\n + {self.n_rows_to_load} is {npartitions}""") + break + partitions_length += len(partition) + dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions) + dask_df = dd.from_pandas(dask_df, npartitions=npartitions) + + # 4) Set the index + if self.index_column is None: + logger.info( + "Index column not specified, setting a globally unique index", + ) + + def _set_unique_index(dataframe: pd.DataFrame, partition_info=None): + """Function that sets a unique index based on the partition and row number.""" + dataframe["id"] = 1 + dataframe["id"] = ( + str(partition_info["number"]) + + "_" + + (dataframe.id.cumsum()).astype(str) + ) + dataframe.index = dataframe.pop("id") + return dataframe + + def _get_meta_df() -> pd.DataFrame: + meta_dict = {"id": pd.Series(dtype="object")} + for subset_name, subset in self.spec.produces.items(): + for field_name, field in subset.fields.items(): + meta_dict[f"{subset_name}_{field_name}"] = pd.Series( + dtype=pd.ArrowDtype(field.type.value), + ) + return pd.DataFrame(meta_dict).set_index("id") + + meta = _get_meta_df() + dask_df = dask_df.map_partitions(_set_unique_index, meta=meta) + else: + logger.info(f"Setting `{self.index_column}` as index") + dask_df = dask_df.set_index(self.index_column, drop=True) + + return dask_df diff --git a/examples/pipelines/datacomp/pipeline.py b/examples/pipelines/datacomp/pipeline.py index 010d7b65d..6d81bd6dd 100644 --- a/examples/pipelines/datacomp/pipeline.py +++ b/examples/pipelines/datacomp/pipeline.py @@ -22,90 +22,102 @@ # base_path="/Users/nielsrogge/Documents/fondant_artifacts_datacomp", ) -# define ops -load_component_column_mapping = { - "url": "images_url", - "original_width": "images_width", - "original_height": "images_height", - "face_bboxes": "images_face_bboxes", - "sha256": "images_sha256", - "text": "text_data", - "clip_b32_similarity_score": "imagetext_clipb32score", - "clip_l14_similarity_score": "imagetext_clipl14score", - "clip_l14_text_embedding": "textembedding_data", -} +# # define ops +# load_component_column_mapping = { +# "url": "images_url", +# "original_width": "images_width", +# "original_height": "images_height", +# "face_bboxes": "images_face_bboxes", +# "sha256": "images_sha256", +# "text": "text_data", +# "clip_b32_similarity_score": "imagetext_clipb32score", +# "clip_l14_similarity_score": "imagetext_clipl14score", +# "clip_l14_text_embedding": "textembedding_data", +# } +# +# load_from_hub_op = ComponentOp( +# component_dir="components/load_from_hf_hub", +# arguments={ +# "dataset_name": "nielsr/datacomp-small-with-text-embeddings", +# "column_name_mapping": load_component_column_mapping, +# "index_column": "uid", +# "n_rows_to_load": 1000, +# }, +# node_pool_label="node_pool", +# node_pool_name="n2-standard-64-pool", +# cache=False, +# ) -load_from_hub_op = ComponentOp( - component_dir="components/load_from_hf_hub", +load_from_parquet = ComponentOp.from_registry( + name="load_from_parquet", arguments={ - "dataset_name": "nielsr/datacomp-small-with-text-embeddings", - "column_name_mapping": load_component_column_mapping, + "dataset_uri": "nielsr/datacomp-small-with-text-embeddings", "index_column": "uid", "n_rows_to_load": 1000, }, node_pool_label="node_pool", node_pool_name="n2-standard-64-pool", - cache=False, -) -download_images_op = ComponentOp.from_registry( - name="download_images", - arguments={ - "retries": 2, - "min_image_size": 0, - "max_aspect_ratio": float("inf"), - }, - node_pool_label="node_pool", - node_pool_name="n2-standard-64-pool", - input_partition_rows=1000, - cache=False, -) -detect_text_op = ComponentOp( - component_dir="components/detect_text", - arguments={ - "batch_size": 2, - }, - node_pool_label="node_pool", - node_pool_name="model-inference-mega-pool", - number_of_gpus=1, - cache=False, -) -mask_images_op = ComponentOp( - component_dir="components/mask_images", - node_pool_label="node_pool", - node_pool_name="n2-standard-64-pool", - cache=False, -) -embed_images_op = ComponentOp.from_registry( - name="embed_images", - arguments={ - "batch_size": 2, - }, - node_pool_label="node_pool", - node_pool_name="model-inference-mega-pool", - number_of_gpus=1, - cache=False, -) -add_clip_score_op = ComponentOp( - component_dir="components/add_clip_score", - node_pool_label="node_pool", - node_pool_name="n2-standard-64-pool", - cache=False, -) -filter_clip_score_op = ComponentOp( - component_dir="components/filter_clip_score", - arguments={ - "pct_threshold": 0.5, - }, - node_pool_label="node_pool", - node_pool_name="n2-standard-64-pool", ) +# +# download_images_op = ComponentOp.from_registry( +# name="download_images", +# arguments={ +# "retries": 2, +# "min_image_size": 0, +# "max_aspect_ratio": float("inf"), +# }, +# node_pool_label="node_pool", +# node_pool_name="n2-standard-64-pool", +# input_partition_rows=1000, +# cache=False, +# ) +# detect_text_op = ComponentOp( +# component_dir="components/detect_text", +# arguments={ +# "batch_size": 2, +# }, +# node_pool_label="node_pool", +# node_pool_name="model-inference-mega-pool", +# number_of_gpus=1, +# cache=False, +# ) +# mask_images_op = ComponentOp( +# component_dir="components/mask_images", +# node_pool_label="node_pool", +# node_pool_name="n2-standard-64-pool", +# cache=False, +# ) +# embed_images_op = ComponentOp.from_registry( +# name="embed_images", +# arguments={ +# "batch_size": 2, +# }, +# node_pool_label="node_pool", +# node_pool_name="model-inference-mega-pool", +# number_of_gpus=1, +# cache=False, +# ) +# add_clip_score_op = ComponentOp( +# component_dir="components/add_clip_score", +# node_pool_label="node_pool", +# node_pool_name="n2-standard-64-pool", +# cache=False, +# ) +# filter_clip_score_op = ComponentOp( +# component_dir="components/filter_clip_score", +# arguments={ +# "pct_threshold": 0.5, +# }, +# node_pool_label="node_pool", +# node_pool_name="n2-standard-64-pool", +# ) # add ops to pipeline -pipeline.add_op(load_from_hub_op) -pipeline.add_op(download_images_op, dependencies=load_from_hub_op) -pipeline.add_op(detect_text_op, dependencies=download_images_op) -pipeline.add_op(mask_images_op, dependencies=detect_text_op) -pipeline.add_op(embed_images_op, dependencies=mask_images_op) -pipeline.add_op(add_clip_score_op, dependencies=embed_images_op) -pipeline.add_op(filter_clip_score_op, dependencies=add_clip_score_op) +pipeline.add_op(load_from_parquet) +# pipeline.add_op(download_images_op, dependencies=load_from_hub_op) +# pipeline.add_op(detect_text_op, dependencies=download_images_op) +# pipeline.add_op(mask_images_op, dependencies=detect_text_op) +# pipeline.add_op(embed_images_op, dependencies=mask_images_op) +# pipeline.add_op(add_clip_score_op, dependencies=embed_images_op) +# pipeline.add_op(filter_clip_score_op, dependencies=add_clip_score_op) From 823553fa5825cdad2f8c62e1b7caaeee0c870648 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Thu, 28 Sep 2023 13:30:09 +0200 Subject: [PATCH 2/4] modify --- components/load_from_parquet/src/main.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/components/load_from_parquet/src/main.py b/components/load_from_parquet/src/main.py index f548db1cc..92205eb2f 100644 --- a/components/load_from_parquet/src/main.py +++ b/components/load_from_parquet/src/main.py @@ -45,8 +45,9 @@ def load(self) -> dd.DataFrame: dask_df = dd.read_parquet(self.dataset_uri) # 2) Rename columns - logger.info("Renaming columns...") - dask_df = dask_df.rename(columns=self.column_name_mapping) + if self.column_name_mapping is not None: + logger.info("Renaming columns...") + dask_df = dask_df.rename(columns=self.column_name_mapping) # 3) Optional: only return specific amount of rows if self.n_rows_to_load is not None: From c67ca63a81987e0146282c2808ed4b31470e1354 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Thu, 28 Sep 2023 14:31:13 +0200 Subject: [PATCH 3/4] cleanup --- .../load_from_parquet/fondant_component.yaml | 6 +- examples/pipelines/datacomp/pipeline.py | 164 ++++++++---------- 2 files changed, 79 insertions(+), 91 deletions(-) diff --git a/components/load_from_parquet/fondant_component.yaml b/components/load_from_parquet/fondant_component.yaml index 2ade941db..73606b090 100644 --- a/components/load_from_parquet/fondant_component.yaml +++ b/components/load_from_parquet/fondant_component.yaml @@ -1,6 +1,6 @@ -name: Load from hub -description: Component that loads a dataset from the hub -image: ghcr.io/ml6team/load_from_parquet:132347e040169ac971102f865425df80ca2d6517 +name: Load from parquet +description: Component that loads a dataset from a parquet uri +image: ghcr.io/ml6team/load_from_parquet:dev produces: dummy_variable: #TODO: fill in here diff --git a/examples/pipelines/datacomp/pipeline.py b/examples/pipelines/datacomp/pipeline.py index 6d81bd6dd..010d7b65d 100644 --- a/examples/pipelines/datacomp/pipeline.py +++ b/examples/pipelines/datacomp/pipeline.py @@ -22,102 +22,90 @@ # base_path="/Users/nielsrogge/Documents/fondant_artifacts_datacomp", ) -# # define ops -# load_component_column_mapping = { -# "url": "images_url", -# "original_width": "images_width", -# "original_height": "images_height", -# "face_bboxes": "images_face_bboxes", -# "sha256": "images_sha256", -# "text": "text_data", -# "clip_b32_similarity_score": "imagetext_clipb32score", -# "clip_l14_similarity_score": "imagetext_clipl14score", -# "clip_l14_text_embedding": "textembedding_data", -# } -# -# load_from_hub_op = ComponentOp( -# component_dir="components/load_from_hf_hub", -# arguments={ -# "dataset_name": "nielsr/datacomp-small-with-text-embeddings", -# "column_name_mapping": load_component_column_mapping, -# "index_column": "uid", -# "n_rows_to_load": 1000, -# }, -# node_pool_label="node_pool", -# node_pool_name="n2-standard-64-pool", -# cache=False, -# ) +# define ops +load_component_column_mapping = { + "url": "images_url", + "original_width": "images_width", + "original_height": "images_height", + "face_bboxes": "images_face_bboxes", + "sha256": "images_sha256", + "text": "text_data", + "clip_b32_similarity_score": "imagetext_clipb32score", + "clip_l14_similarity_score": "imagetext_clipl14score", + "clip_l14_text_embedding": "textembedding_data", +} -load_from_parquet = ComponentOp.from_registry( - name="load_from_parquet", +load_from_hub_op = ComponentOp( + component_dir="components/load_from_hf_hub", arguments={ - "dataset_uri": "nielsr/datacomp-small-with-text-embeddings", + "dataset_name": "nielsr/datacomp-small-with-text-embeddings", + "column_name_mapping": load_component_column_mapping, "index_column": "uid", "n_rows_to_load": 1000, }, node_pool_label="node_pool", node_pool_name="n2-standard-64-pool", + cache=False, +) +download_images_op = ComponentOp.from_registry( + name="download_images", + arguments={ + "retries": 2, + "min_image_size": 0, + "max_aspect_ratio": float("inf"), + }, + node_pool_label="node_pool", + node_pool_name="n2-standard-64-pool", + input_partition_rows=1000, + cache=False, +) +detect_text_op = ComponentOp( + component_dir="components/detect_text", + arguments={ + "batch_size": 2, + }, + node_pool_label="node_pool", + node_pool_name="model-inference-mega-pool", + number_of_gpus=1, + cache=False, +) +mask_images_op = ComponentOp( + component_dir="components/mask_images", + node_pool_label="node_pool", + node_pool_name="n2-standard-64-pool", + cache=False, +) +embed_images_op = ComponentOp.from_registry( + name="embed_images", + arguments={ + "batch_size": 2, + }, + node_pool_label="node_pool", + node_pool_name="model-inference-mega-pool", + number_of_gpus=1, + cache=False, +) +add_clip_score_op = ComponentOp( + component_dir="components/add_clip_score", + node_pool_label="node_pool", + node_pool_name="n2-standard-64-pool", + cache=False, +) +filter_clip_score_op = ComponentOp( + component_dir="components/filter_clip_score", + arguments={ + "pct_threshold": 0.5, + }, + node_pool_label="node_pool", + node_pool_name="n2-standard-64-pool", ) -# -# download_images_op = ComponentOp.from_registry( -# name="download_images", -# arguments={ -# "retries": 2, -# "min_image_size": 0, -# "max_aspect_ratio": float("inf"), -# }, -# node_pool_label="node_pool", -# node_pool_name="n2-standard-64-pool", -# input_partition_rows=1000, -# cache=False, -# ) -# detect_text_op = ComponentOp( -# component_dir="components/detect_text", -# arguments={ -# "batch_size": 2, -# }, -# node_pool_label="node_pool", -# node_pool_name="model-inference-mega-pool", -# number_of_gpus=1, -# cache=False, -# ) -# mask_images_op = ComponentOp( -# component_dir="components/mask_images", -# node_pool_label="node_pool", -# node_pool_name="n2-standard-64-pool", -# cache=False, -# ) -# embed_images_op = ComponentOp.from_registry( -# name="embed_images", -# arguments={ -# "batch_size": 2, -# }, -# node_pool_label="node_pool", -# node_pool_name="model-inference-mega-pool", -# number_of_gpus=1, -# cache=False, -# ) -# add_clip_score_op = ComponentOp( -# component_dir="components/add_clip_score", -# node_pool_label="node_pool", -# node_pool_name="n2-standard-64-pool", -# cache=False, -# ) -# filter_clip_score_op = ComponentOp( -# component_dir="components/filter_clip_score", -# arguments={ -# "pct_threshold": 0.5, -# }, -# node_pool_label="node_pool", -# node_pool_name="n2-standard-64-pool", -# ) # add ops to pipeline -pipeline.add_op(load_from_parquet) -# pipeline.add_op(download_images_op, dependencies=load_from_hub_op) -# pipeline.add_op(detect_text_op, dependencies=download_images_op) -# pipeline.add_op(mask_images_op, dependencies=detect_text_op) -# pipeline.add_op(embed_images_op, dependencies=mask_images_op) -# pipeline.add_op(add_clip_score_op, dependencies=embed_images_op) -# pipeline.add_op(filter_clip_score_op, dependencies=add_clip_score_op) +pipeline.add_op(load_from_hub_op) +pipeline.add_op(download_images_op, dependencies=load_from_hub_op) +pipeline.add_op(detect_text_op, dependencies=download_images_op) +pipeline.add_op(mask_images_op, dependencies=detect_text_op) +pipeline.add_op(embed_images_op, dependencies=mask_images_op) +pipeline.add_op(add_clip_score_op, dependencies=embed_images_op) +pipeline.add_op(filter_clip_score_op, dependencies=add_clip_score_op) From bfbea102290f03999624ae22e586d554367d84aa Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Thu, 28 Sep 2023 15:25:12 +0200 Subject: [PATCH 4/4] address PR feedback --- components/load_from_parquet/src/main.py | 4 +- examples/pipelines/datacomp/pipeline.py | 119 +++++++++++------------ 2 files changed, 61 insertions(+), 62 deletions(-) diff --git a/components/load_from_parquet/src/main.py b/components/load_from_parquet/src/main.py index 92205eb2f..429924bac 100644 --- a/components/load_from_parquet/src/main.py +++ b/components/load_from_parquet/src/main.py @@ -13,7 +13,7 @@ dask.config.set({"dataframe.convert-string": False}) -class LoadFromHubComponent(DaskLoadComponent): +class LoadFromParquet(DaskLoadComponent): def __init__(self, spec: ComponentSpec, @@ -41,7 +41,7 @@ def __init__(self, def load(self) -> dd.DataFrame: # 1) Load data, read as Dask dataframe - logger.info("Loading dataset from the hub...") + logger.info("Loading dataset from the file...") dask_df = dd.read_parquet(self.dataset_uri) # 2) Rename columns diff --git a/examples/pipelines/datacomp/pipeline.py b/examples/pipelines/datacomp/pipeline.py index 010d7b65d..cc384ffd2 100644 --- a/examples/pipelines/datacomp/pipeline.py +++ b/examples/pipelines/datacomp/pipeline.py @@ -41,71 +41,70 @@ "dataset_name": "nielsr/datacomp-small-with-text-embeddings", "column_name_mapping": load_component_column_mapping, "index_column": "uid", - "n_rows_to_load": 1000, - }, - node_pool_label="node_pool", - node_pool_name="n2-standard-64-pool", - cache=False, -) -download_images_op = ComponentOp.from_registry( - name="download_images", - arguments={ - "retries": 2, - "min_image_size": 0, - "max_aspect_ratio": float("inf"), - }, - node_pool_label="node_pool", - node_pool_name="n2-standard-64-pool", - input_partition_rows=1000, - cache=False, -) -detect_text_op = ComponentOp( - component_dir="components/detect_text", - arguments={ - "batch_size": 2, - }, - node_pool_label="node_pool", - node_pool_name="model-inference-mega-pool", - number_of_gpus=1, - cache=False, -) -mask_images_op = ComponentOp( - component_dir="components/mask_images", - node_pool_label="node_pool", - node_pool_name="n2-standard-64-pool", - cache=False, -) -embed_images_op = ComponentOp.from_registry( - name="embed_images", - arguments={ - "batch_size": 2, - }, - node_pool_label="node_pool", - node_pool_name="model-inference-mega-pool", - number_of_gpus=1, - cache=False, -) -add_clip_score_op = ComponentOp( - component_dir="components/add_clip_score", - node_pool_label="node_pool", - node_pool_name="n2-standard-64-pool", - cache=False, -) -filter_clip_score_op = ComponentOp( - component_dir="components/filter_clip_score", - arguments={ - "pct_threshold": 0.5, + # "n_rows_to_load": 1000, }, node_pool_label="node_pool", node_pool_name="n2-standard-64-pool", ) +# download_images_op = ComponentOp.from_registry( +# name="download_images", +# arguments={ +# "retries": 2, +# "min_image_size": 0, +# "max_aspect_ratio": float("inf"), +# }, +# node_pool_label="node_pool", +# node_pool_name="n2-standard-64-pool", +# input_partition_rows=1000, +# cache=False, +# ) +# detect_text_op = ComponentOp( +# component_dir="components/detect_text", +# arguments={ +# "batch_size": 2, +# }, +# node_pool_label="node_pool", +# node_pool_name="model-inference-mega-pool", +# number_of_gpus=1, +# cache=False, +# ) +# mask_images_op = ComponentOp( +# component_dir="components/mask_images", +# node_pool_label="node_pool", +# node_pool_name="n2-standard-64-pool", +# cache=False, +# ) +# embed_images_op = ComponentOp.from_registry( +# name="embed_images", +# arguments={ +# "batch_size": 2, +# }, +# node_pool_label="node_pool", +# node_pool_name="model-inference-mega-pool", +# number_of_gpus=1, +# cache=False, +# ) +# add_clip_score_op = ComponentOp( +# component_dir="components/add_clip_score", +# node_pool_label="node_pool", +# node_pool_name="n2-standard-64-pool", +# cache=False, +# ) +# filter_clip_score_op = ComponentOp( +# component_dir="components/filter_clip_score", +# arguments={ +# "pct_threshold": 0.5, +# }, +# node_pool_label="node_pool", +# node_pool_name="n2-standard-64-pool", +# ) # add ops to pipeline pipeline.add_op(load_from_hub_op) -pipeline.add_op(download_images_op, dependencies=load_from_hub_op) -pipeline.add_op(detect_text_op, dependencies=download_images_op) -pipeline.add_op(mask_images_op, dependencies=detect_text_op) -pipeline.add_op(embed_images_op, dependencies=mask_images_op) -pipeline.add_op(add_clip_score_op, dependencies=embed_images_op) -pipeline.add_op(filter_clip_score_op, dependencies=add_clip_score_op) +# pipeline.add_op(download_images_op, dependencies=load_from_hub_op) +# pipeline.add_op(detect_text_op, dependencies=download_images_op) +# pipeline.add_op(mask_images_op, dependencies=detect_text_op) +# pipeline.add_op(embed_images_op, dependencies=mask_images_op) +# pipeline.add_op(add_clip_score_op, dependencies=embed_images_op) +# pipeline.add_op(filter_clip_score_op, dependencies=add_clip_score_op)