diff --git a/components/load_from_parquet/Dockerfile b/components/load_from_parquet/Dockerfile new file mode 100644 index 000000000..4642b9b8b --- /dev/null +++ b/components/load_from_parquet/Dockerfile @@ -0,0 +1,23 @@ +FROM --platform=linux/amd64 python:3.8-slim + +# System dependencies +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install git -y + +# Install requirements +COPY requirements.txt / +RUN pip3 install --no-cache-dir -r requirements.txt + +# Install Fondant +# This is split from other requirements to leverage caching +ARG FONDANT_VERSION=main +RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} + +# Set the working directory to the component folder +WORKDIR /component/src + +# Copy over src-files +COPY src/ . + +ENTRYPOINT ["fondant", "execute", "main"] \ No newline at end of file diff --git a/components/load_from_parquet/fondant_component.yaml b/components/load_from_parquet/fondant_component.yaml new file mode 100644 index 000000000..73606b090 --- /dev/null +++ b/components/load_from_parquet/fondant_component.yaml @@ -0,0 +1,26 @@ +name: Load from parquet +description: Component that loads a dataset from a parquet uri +image: ghcr.io/ml6team/load_from_parquet:dev + +produces: + dummy_variable: #TODO: fill in here + fields: + data: + type: binary + +args: + dataset_uri: + description: The remote path to the parquet file/folder containing the dataset + type: str + column_name_mapping: + description: Mapping of the consumed dataset + type: dict + default: None + n_rows_to_load: + description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale + type: int + default: None + index_column: + description: Column to set index to in the load component, if not specified a default globally unique index will be set + type: str + default: None \ No newline at end of file diff --git a/components/load_from_parquet/src/main.py b/components/load_from_parquet/src/main.py new file mode 100644 index 000000000..429924bac --- /dev/null +++ b/components/load_from_parquet/src/main.py @@ -0,0 +1,97 @@ +"""This component loads a seed dataset from the hub.""" +import logging +import typing as t + +import dask +import dask.dataframe as dd +import pandas as pd +from fondant.component import DaskLoadComponent +from fondant.component_spec import ComponentSpec + +logger = logging.getLogger(__name__) + +dask.config.set({"dataframe.convert-string": False}) + + +class LoadFromParquet(DaskLoadComponent): + + def __init__(self, + spec: ComponentSpec, + *_, + dataset_uri: str, + column_name_mapping: t.Optional[dict], + n_rows_to_load: t.Optional[int], + index_column: t.Optional[str], + ) -> None: + """ + Args: + spec: the component spec + dataset_uri: The remote path to the parquet file/folder containing the dataset + column_name_mapping: Mapping of the consumed dataset to fondant column names + n_rows_to_load: optional argument that defines the number of rows to load. Useful for + testing pipeline runs on a small scale. + index_column: Column to set index to in the load component, if not specified a default + globally unique index will be set. + """ + self.dataset_uri = dataset_uri + self.column_name_mapping = column_name_mapping + self.n_rows_to_load = n_rows_to_load + self.index_column = index_column + self.spec = spec + + def load(self) -> dd.DataFrame: + # 1) Load data, read as Dask dataframe + logger.info("Loading dataset from the file...") + dask_df = dd.read_parquet(self.dataset_uri) + + # 2) Rename columns + if self.column_name_mapping is not None: + logger.info("Renaming columns...") + dask_df = dask_df.rename(columns=self.column_name_mapping) + + # 3) Optional: only return specific amount of rows + if self.n_rows_to_load is not None: + partitions_length = 0 + npartitions = 1 + for npartitions, partition in enumerate(dask_df.partitions, start=1): + if partitions_length >= self.n_rows_to_load: + logger.info(f"""Required number of partitions to load\n + {self.n_rows_to_load} is {npartitions}""") + break + partitions_length += len(partition) + dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions) + dask_df = dd.from_pandas(dask_df, npartitions=npartitions) + + # 4) Set the index + if self.index_column is None: + logger.info( + "Index column not specified, setting a globally unique index", + ) + + def _set_unique_index(dataframe: pd.DataFrame, partition_info=None): + """Function that sets a unique index based on the partition and row number.""" + dataframe["id"] = 1 + dataframe["id"] = ( + str(partition_info["number"]) + + "_" + + (dataframe.id.cumsum()).astype(str) + ) + dataframe.index = dataframe.pop("id") + return dataframe + + def _get_meta_df() -> pd.DataFrame: + meta_dict = {"id": pd.Series(dtype="object")} + for subset_name, subset in self.spec.produces.items(): + for field_name, field in subset.fields.items(): + meta_dict[f"{subset_name}_{field_name}"] = pd.Series( + dtype=pd.ArrowDtype(field.type.value), + ) + return pd.DataFrame(meta_dict).set_index("id") + + meta = _get_meta_df() + dask_df = dask_df.map_partitions(_set_unique_index, meta=meta) + else: + logger.info(f"Setting `{self.index_column}` as index") + dask_df = dask_df.set_index(self.index_column, drop=True) + + return dask_df diff --git a/examples/pipelines/datacomp/pipeline.py b/examples/pipelines/datacomp/pipeline.py index 010d7b65d..cc384ffd2 100644 --- a/examples/pipelines/datacomp/pipeline.py +++ b/examples/pipelines/datacomp/pipeline.py @@ -41,71 +41,70 @@ "dataset_name": "nielsr/datacomp-small-with-text-embeddings", "column_name_mapping": load_component_column_mapping, "index_column": "uid", - "n_rows_to_load": 1000, - }, - node_pool_label="node_pool", - node_pool_name="n2-standard-64-pool", - cache=False, -) -download_images_op = ComponentOp.from_registry( - name="download_images", - arguments={ - "retries": 2, - "min_image_size": 0, - "max_aspect_ratio": float("inf"), - }, - node_pool_label="node_pool", - node_pool_name="n2-standard-64-pool", - input_partition_rows=1000, - cache=False, -) -detect_text_op = ComponentOp( - component_dir="components/detect_text", - arguments={ - "batch_size": 2, - }, - node_pool_label="node_pool", - node_pool_name="model-inference-mega-pool", - number_of_gpus=1, - cache=False, -) -mask_images_op = ComponentOp( - component_dir="components/mask_images", - node_pool_label="node_pool", - node_pool_name="n2-standard-64-pool", - cache=False, -) -embed_images_op = ComponentOp.from_registry( - name="embed_images", - arguments={ - "batch_size": 2, - }, - node_pool_label="node_pool", - node_pool_name="model-inference-mega-pool", - number_of_gpus=1, - cache=False, -) -add_clip_score_op = ComponentOp( - component_dir="components/add_clip_score", - node_pool_label="node_pool", - node_pool_name="n2-standard-64-pool", - cache=False, -) -filter_clip_score_op = ComponentOp( - component_dir="components/filter_clip_score", - arguments={ - "pct_threshold": 0.5, + # "n_rows_to_load": 1000, }, node_pool_label="node_pool", node_pool_name="n2-standard-64-pool", ) +# download_images_op = ComponentOp.from_registry( +# name="download_images", +# arguments={ +# "retries": 2, +# "min_image_size": 0, +# "max_aspect_ratio": float("inf"), +# }, +# node_pool_label="node_pool", +# node_pool_name="n2-standard-64-pool", +# input_partition_rows=1000, +# cache=False, +# ) +# detect_text_op = ComponentOp( +# component_dir="components/detect_text", +# arguments={ +# "batch_size": 2, +# }, +# node_pool_label="node_pool", +# node_pool_name="model-inference-mega-pool", +# number_of_gpus=1, +# cache=False, +# ) +# mask_images_op = ComponentOp( +# component_dir="components/mask_images", +# node_pool_label="node_pool", +# node_pool_name="n2-standard-64-pool", +# cache=False, +# ) +# embed_images_op = ComponentOp.from_registry( +# name="embed_images", +# arguments={ +# "batch_size": 2, +# }, +# node_pool_label="node_pool", +# node_pool_name="model-inference-mega-pool", +# number_of_gpus=1, +# cache=False, +# ) +# add_clip_score_op = ComponentOp( +# component_dir="components/add_clip_score", +# node_pool_label="node_pool", +# node_pool_name="n2-standard-64-pool", +# cache=False, +# ) +# filter_clip_score_op = ComponentOp( +# component_dir="components/filter_clip_score", +# arguments={ +# "pct_threshold": 0.5, +# }, +# node_pool_label="node_pool", +# node_pool_name="n2-standard-64-pool", +# ) # add ops to pipeline pipeline.add_op(load_from_hub_op) -pipeline.add_op(download_images_op, dependencies=load_from_hub_op) -pipeline.add_op(detect_text_op, dependencies=download_images_op) -pipeline.add_op(mask_images_op, dependencies=detect_text_op) -pipeline.add_op(embed_images_op, dependencies=mask_images_op) -pipeline.add_op(add_clip_score_op, dependencies=embed_images_op) -pipeline.add_op(filter_clip_score_op, dependencies=add_clip_score_op) +# pipeline.add_op(download_images_op, dependencies=load_from_hub_op) +# pipeline.add_op(detect_text_op, dependencies=download_images_op) +# pipeline.add_op(mask_images_op, dependencies=detect_text_op) +# pipeline.add_op(embed_images_op, dependencies=mask_images_op) +# pipeline.add_op(add_clip_score_op, dependencies=embed_images_op) +# pipeline.add_op(filter_clip_score_op, dependencies=add_clip_score_op)