diff --git a/components/filter_comments/Dockerfile b/components/filter_comments/Dockerfile index 208e16c15..74cbc9ac0 100644 --- a/components/filter_comments/Dockerfile +++ b/components/filter_comments/Dockerfile @@ -9,7 +9,7 @@ RUN apt-get update && \ COPY requirements.txt ./ RUN pip3 install --no-cache-dir -r requirements.txt -# Set the working directory to the compoent folder +# Set the working directory to the component folder WORKDIR /component/src # Copy over src-files diff --git a/components/filter_line_length/Dockerfile b/components/filter_line_length/Dockerfile index 208e16c15..74cbc9ac0 100644 --- a/components/filter_line_length/Dockerfile +++ b/components/filter_line_length/Dockerfile @@ -9,7 +9,7 @@ RUN apt-get update && \ COPY requirements.txt ./ RUN pip3 install --no-cache-dir -r requirements.txt -# Set the working directory to the compoent folder +# Set the working directory to the component folder WORKDIR /component/src # Copy over src-files diff --git a/components/image_cropping/Dockerfile b/components/image_cropping/Dockerfile index 90d7ec0ea..605adc7e9 100644 --- a/components/image_cropping/Dockerfile +++ b/components/image_cropping/Dockerfile @@ -9,7 +9,7 @@ RUN apt-get update && \ COPY requirements.txt / RUN pip3 install --no-cache-dir -r requirements.txt -# Set the working directory to the compoent folder +# Set the working directory to the component folder WORKDIR /component/src # Copy over src-files diff --git a/examples/pipelines/starcoder/components/load_from_hub_stack/Dockerfile b/components/image_resolution_extraction/Dockerfile similarity index 100% rename from examples/pipelines/starcoder/components/load_from_hub_stack/Dockerfile rename to components/image_resolution_extraction/Dockerfile diff --git a/components/image_resolution_extraction/fondant_component.yaml b/components/image_resolution_extraction/fondant_component.yaml new file mode 100644 index 000000000..a0d9609e5 --- /dev/null +++ b/components/image_resolution_extraction/fondant_component.yaml @@ -0,0 +1,19 @@ +name: Image resolution extraction +description: Component that extracts image resolution data from the images +image: ghcr.io/ml6team/image_resolution_extraction:latest + +consumes: + images: + fields: + data: + type: binary + +produces: + images: + fields: + width: + type: int16 + height: + type: int16 + data: + type: binary \ No newline at end of file diff --git a/components/image_resolution_extraction/requirements.txt b/components/image_resolution_extraction/requirements.txt new file mode 100644 index 000000000..df0524e2c --- /dev/null +++ b/components/image_resolution_extraction/requirements.txt @@ -0,0 +1,4 @@ +fondant +pyarrow>=7.0 +gcsfs==2023.4.0 +imagesize==1.4.1 \ No newline at end of file diff --git a/components/image_resolution_extraction/src/main.py b/components/image_resolution_extraction/src/main.py new file mode 100644 index 000000000..97d29306a --- /dev/null +++ b/components/image_resolution_extraction/src/main.py @@ -0,0 +1,52 @@ +"""This component filters images of the dataset based on image size (minimum height and width).""" +import io +import logging +import typing as t + +import imagesize +import numpy as np +import pandas as pd + +from fondant.component import PandasTransformComponent +from fondant.logger import configure_logging + +configure_logging() +logger = logging.getLogger(__name__) + + +def extract_dimensions(images: bytes) -> t.Tuple[np.int16, np.int16]: + """Extract the width and height of an image. + + Args: + images: input dataframe with images_data column + + Returns: + np.int16: width of the image + np.int16: height of the image + """ + width, height = imagesize.get(io.BytesIO(images)) + + return np.int16(width), np.int16(height) + + +class ImageResolutionExtractionComponent(PandasTransformComponent): + """Component that extracts image dimensions.""" + + def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: + """ + Args: + dataframe: Dask dataframe + Returns: + dataset. + """ + logger.info("Filtering dataset...") + + dataframe[[("images", "width"), ("images", "height")]] = \ + dataframe[[("images", "data")]].map(extract_dimensions) + + return dataframe + + +if __name__ == "__main__": + component = ImageResolutionExtractionComponent.from_args() + component.run() diff --git a/components/image_resolution_filtering/Dockerfile b/components/image_resolution_filtering/Dockerfile index 90d7ec0ea..605adc7e9 100644 --- a/components/image_resolution_filtering/Dockerfile +++ b/components/image_resolution_filtering/Dockerfile @@ -9,7 +9,7 @@ RUN apt-get update && \ COPY requirements.txt / RUN pip3 install --no-cache-dir -r requirements.txt -# Set the working directory to the compoent folder +# Set the working directory to the component folder WORKDIR /component/src # Copy over src-files diff --git a/components/load_from_hf_hub/fondant_component.yaml b/components/load_from_hf_hub/fondant_component.yaml index 7fdef5812..b8d4195d9 100644 --- a/components/load_from_hf_hub/fondant_component.yaml +++ b/components/load_from_hf_hub/fondant_component.yaml @@ -2,21 +2,25 @@ name: Load from hub description: Component that loads a dataset from the hub image: ghcr.io/ml6team/load_from_hf_hub:latest -produces: - images: +consumes: + dummy_variable: #TODO: fill in here fields: data: type: binary - width: - type: int16 - height: - type: int16 - captions: - fields: - data: - type: string args: dataset_name: description: Name of dataset on the hub type: str + column_name_mapping: + description: Mapping of the consumed hub dataset to fondant column names + type: dict + image_column_names: + description: Optional argument, a list containing the original image column names in case the + dataset on the hub contains them. Used to format the image from HF hub format to a byte string. + type: list + default: None + n_rows_to_load: + description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale + type: int + default: None \ No newline at end of file diff --git a/components/load_from_hf_hub/src/main.py b/components/load_from_hf_hub/src/main.py index 12f5e36c8..fd21aae22 100644 --- a/components/load_from_hf_hub/src/main.py +++ b/components/load_from_hf_hub/src/main.py @@ -1,10 +1,8 @@ """This component loads a seed dataset from the hub.""" -import io import logging +import typing as t import dask.dataframe as dd -import numpy as np -from PIL import Image from fondant.component import LoadComponent from fondant.logger import configure_logging @@ -13,52 +11,43 @@ logger = logging.getLogger(__name__) -def extract_width(image_bytes): - # Decode image bytes to PIL Image object - pil_image = Image.open(io.BytesIO(image_bytes)) - width = pil_image.size[0] - - return np.int16(width) - - -def extract_height(image_bytes): - # Decode image bytes to PIL Image object - pil_image = Image.open(io.BytesIO(image_bytes)) - height = pil_image.size[1] - - return np.int16(height) - - class LoadFromHubComponent(LoadComponent): - def load(self, *, dataset_name: str) -> dd.DataFrame: + def load(self, + *, + dataset_name: str, + column_name_mapping: dict, + image_column_names: t.Optional[list], + n_rows_to_load: t.Optional[int]) -> dd.DataFrame: """ Args: dataset_name: name of the dataset to load. - + column_name_mapping: Mapping of the consumed hub dataset to fondant column names + image_column_names: A list containing the original hub image column names. Used to + format the image from HF hub format to a byte string + n_rows_to_load: optional argument that defines the number of rows to load. Useful for + testing pipeline runs on a small scale Returns: - Dataset: HF dataset + Dataset: HF dataset. """ # 1) Load data, read as Dask dataframe logger.info("Loading dataset from the hub...") dask_df = dd.read_parquet(f"hf://datasets/{dataset_name}") - # 2) Rename columns - dask_df = dask_df.rename( - columns={"image": "images_data", "text": "captions_data"} - ) + # 2) Make sure images are bytes instead of dicts + if image_column_names is not None: + for image_column_name in image_column_names: + dask_df[image_column_name] = dask_df[image_column_name].map( + lambda x: x["bytes"], meta=("bytes", bytes) + ) + + # 3) Rename columns + dask_df = dask_df.rename(columns=column_name_mapping) - # 3) Make sure images are bytes instead of dicts - dask_df["images_data"] = dask_df["images_data"].map( - lambda x: x["bytes"], meta=("bytes", bytes) - ) + # 4) Optional: only return specific amount of rows - # 4) Add width and height columns - dask_df["images_width"] = dask_df["images_data"].map( - extract_width, meta=("images_width", int) - ) - dask_df["images_height"] = dask_df["images_data"].map( - extract_height, meta=("images_height", int) - ) + if n_rows_to_load: + dask_df = dask_df.head(n_rows_to_load) + dask_df = dd.from_pandas(dask_df, npartitions=1) return dask_df diff --git a/components/write_to_hf_hub/Dockerfile b/components/write_to_hf_hub/Dockerfile new file mode 100644 index 000000000..e69de29bb diff --git a/components/write_to_hf_hub/fondant_component.yaml b/components/write_to_hf_hub/fondant_component.yaml new file mode 100644 index 000000000..b686b16f1 --- /dev/null +++ b/components/write_to_hf_hub/fondant_component.yaml @@ -0,0 +1,28 @@ +name: Write to hub +description: Component that writes a dataset to the hub +image: ghcr.io/ml6team/write_to_hf_hub:latest + +consumes: + dummy_variable: #TODO: fill in here + fields: + data: + type: binary + +args: + hf_token: + description: The hugging face token used to write to the hub + type: str + username: + description: The username under which to upload the dataset + type: str + dataset_name: + description: The name of the dataset to upload + type: str + image_column_names: + description: A list containing the image column names. Used to format to image to HF hub format + type: list + default: None + column_name_mapping: + description: Mapping of the consumed fondant column names to the written hub column names + type: dict + default: None \ No newline at end of file diff --git a/examples/pipelines/controlnet-interior-design/components/write_to_hub_controlnet/requirements.txt b/components/write_to_hf_hub/requirements.txt similarity index 81% rename from examples/pipelines/controlnet-interior-design/components/write_to_hub_controlnet/requirements.txt rename to components/write_to_hf_hub/requirements.txt index a13c8ca2d..657e60f65 100644 --- a/examples/pipelines/controlnet-interior-design/components/write_to_hub_controlnet/requirements.txt +++ b/components/write_to_hf_hub/requirements.txt @@ -1,4 +1,5 @@ huggingface_hub==0.14.1 +datasets==2.10.1 fondant pyarrow>=7.0 Pillow==9.4.0 diff --git a/components/write_to_hf_hub/src/main.py b/components/write_to_hf_hub/src/main.py new file mode 100644 index 000000000..cd983a7af --- /dev/null +++ b/components/write_to_hf_hub/src/main.py @@ -0,0 +1,101 @@ +"""This component writes an image dataset to the hub.""" +import logging +import typing as t +from io import BytesIO + +import dask.dataframe as dd +import datasets + +# Define the schema for the struct using PyArrow +import huggingface_hub +from PIL import Image + +from fondant.component import WriteComponent +from fondant.logger import configure_logging + +configure_logging() +logger = logging.getLogger(__name__) + + +def convert_bytes_to_image(image_bytes: bytes, feature_encoder: datasets.Image) -> \ + t.Dict[str, t.Any]: + """ + Function that converts image bytes to hf image format + Args: + image_bytes: the images as a bytestring + feature_encoder: hf image feature encoder + Returns: + HF image representation. + """ + image = Image.open(BytesIO(image_bytes)) + image = feature_encoder.encode_example(image) + return image + + +class WriteToHubComponent(WriteComponent): + def write( + self, + dataframe: dd.DataFrame, + *, + hf_token: str, + username: str, + dataset_name: str, + image_column_names: t.Optional[list], + column_name_mapping: t.Optional[dict] + ): + """ + Args: + dataframe: Dask dataframe + hf_token: The hugging face token used to write to the hub + username: The username under which to upload the dataset + dataset_name: The name of the dataset to upload + image_column_names: A list containing the subset image column names. Used to format the + image fields to HF hub format + column_name_mapping: Mapping of the consumed fondant column names to the written hub + column names. + """ + # login + huggingface_hub.login(token=hf_token) + + # Create HF dataset repository + repo_id = f"{username}/{dataset_name}" + repo_path = f"hf://datasets/{repo_id}" + logger.info(f"Creating HF dataset repository under ID: '{repo_id}'") + huggingface_hub.create_repo(repo_id=repo_id, repo_type="dataset", exist_ok=True) + + # Get columns to write and schema + write_columns = [] + schema_dict = {} + for subset_name, subset in self.spec.consumes.items(): + for field in subset.fields.values(): + column_name = f"{subset_name}_{field.name}" + write_columns.append(column_name) + if image_column_names and column_name in image_column_names: + schema_dict[column_name] = datasets.Image() + else: + schema_dict[column_name] = datasets.Value(str(field.type.value)) + + schema = datasets.Features(schema_dict).arrow_schema + dataframe = dataframe[write_columns] + + # Map image column to hf data format + feature_encoder = datasets.Image(decode=True) + + if image_column_names is not None: + for image_column_name in image_column_names: + dataframe[image_column_name] = dataframe[image_column_name].map( + lambda x: convert_bytes_to_image(x, feature_encoder), + meta=(image_column_name, "object") + ) + + # Map column names to hf data format + if column_name_mapping: + dataframe = dataframe.rename(columns=column_name_mapping) + + # Write dataset to the hub + dd.to_parquet(dataframe, path=f"{repo_path}/data", schema=schema) + + +if __name__ == "__main__": + component = WriteToHubComponent.from_args() + component.run() diff --git a/docs/component_spec.md b/docs/component_spec.md index 00022103c..d59e8c3ad 100644 --- a/docs/component_spec.md +++ b/docs/component_spec.md @@ -138,7 +138,7 @@ args: ``` These arguments are passed in when the component is instantiated. -If an argument is not explicitly provided, the default value will be used instead if available.``` +If an argument is not explicitly provided, the default value will be used instead if available. ```python from fondant.pipeline import ComponentOp diff --git a/docs/custom_component.md b/docs/custom_component.md index 0c641b2fa..a415036be 100644 --- a/docs/custom_component.md +++ b/docs/custom_component.md @@ -98,7 +98,7 @@ RUN apt-get update && \ COPY requirements.txt ./ RUN pip3 install --no-cache-dir -r requirements.txt -# Set the working directory to the compoent folder +# Set the working directory to the component folder WORKDIR /component/src # Copy over src-files and spec of the component diff --git a/docs/generic_component.md b/docs/generic_component.md new file mode 100644 index 000000000..c8a15f724 --- /dev/null +++ b/docs/generic_component.md @@ -0,0 +1,140 @@ +# Generic components + +Fondant provides a set of reusable generic components that facilitate loading and writing +datasets to/from different platforms. + +We currently have components that interface with the following platforms: +* Hugging Face Hub ([Read](https://github.com/ml6team/fondant/tree/main/components/load_from_hf_hub)/[Write](https://github.com/ml6team/fondant/tree/main/components/write_to_hf_hub)). + + +To integrate a generic Read/Write component into your pipeline, you only need to modify the +component specification and define the custom required/optional arguments. + +## Using Generic components + +Each Fondant component is defined by a specification which describes its interface. This +specification is represented by a single `fondant_component.yaml` file. See the [component +specification page](component_spec) for info on how to write the specification for your component. + + +### Load component +To use a Load component, you need to modify the subset of data **produced** by the component. +These subsets define the fields that will be read from the source dataset. + +For example, let's consider the [`load_from_hf_hub`]((https://github.com/ml6team/fondant/tree/main/components/load_from_hf_hub/fondant_component.yaml)) +Suppose we are interested in reading two columns, width and height, from a given input dataset: + +| width
(int32) | height
(int32) | +|-------------------|--------------------| +| Value | Value | + +The component specification can be modified as follows + +```yaml +name: Load from hub +description: Component that loads a dataset from the hub +image: ghcr.io/ml6team/load_from_hf_hub:latest + +consumes: + images: + fields: + width: + type: int32 + height: + type: int32 + +args: + dataset_name: + description: Name of dataset on the hub + type: str + column_name_mapping: + description: Mapping of the consumed hub dataset to fondant column names + type: dict + image_column_names: + description: Optional argument, a list containing the original image column names in case the + dataset on the hub contains them. Used to format the image from HF hub format to a byte string. + type: list + default: None + n_rows_to_load: + description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale + type: int + default: None + +``` + +Here are a few things to note: +* The original fields are mapped to a valid +[subset](https://github.com/ml6team/fondant/blob/main/docs/component_spec.md#:~:text=additionalSubsets%3A%20true-,Subsets,-A%20component%20consumes) using the `column_name_mapping` dictionary: + +```python +column_name_mapping = { + "width":"images_width", + "height":"images_height" +} +``` +This mapping changes the names of the original dataset fields to match the component specification, +enabling their use in subsequent pipeline steps. + +* The specification includes pre-defined arguments, some of which are required (e.g., `dataset_name`), +while others are optional but necessary in certain scenarios (e.g., `image_column_names`). + + +### Write component +To use a Write component, you need to modify the subset of data **consumed** by the component. +These subsets define the fields that will be written in the final dataset. + +For example, let's consider the dataset that was loaded by the previous component, which currently has the following schema: + +| images_width
(int32) | images_height
(int32) | +|--------------------------|---------------------------| +| Value | Value | + +If we want to write this dataset to a Hugging Face Hub location, we can use the [`write_to_hf_hub`]((https://github.com/ml6team/fondant/tree/main/components/write_to_hf_hub/fondant_component.yaml)) + +```yaml +name: Write to hub +description: Component that writes a dataset to the hub +image: ghcr.io/ml6team/write_to_hf_hub:latest + +consumes: + images: + fields: + width: + type: int32 + height: + type: int32 +args: + hf_token: + description: The hugging face token used to write to the hub + type: str + username: + description: The username under which to upload the dataset + type: str + dataset_name: + description: The name of the dataset to upload + type: str + image_column_names: + description: A list containing the image column names. Used to format to image to HF hub format + type: list + default: None + column_name_mapping: + description: Mapping of the consumed fondant column names to the written hub column names + type: dict + default: None +``` + +Note that the `column_name_mapping` is optional and can be used to change the name of the columns +before writing them to their final destination. For example, if you want to have the same column names as +the original dataset, you could set the `column_name_mapping` argument as follows + +```python +column_name_mapping = { + "images_width":"width", + "images_height":"height" +} +``` + +For a practical example of using and adapting load/write components, refer to the +[stable_diffusion_finetuning](https://github.com/ml6team/fondant/blob/main/examples/pipelines/finetune_stable_diffusion/pipeline.py) example. + +Feel free to explore the Fondant documentation for more information on these components and their usage. \ No newline at end of file diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/fondant_component.yaml b/examples/pipelines/controlnet-interior-design/components/generate_prompts/fondant_component.yaml index daab81b93..b278fc50d 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/fondant_component.yaml +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/fondant_component.yaml @@ -6,4 +6,10 @@ produces: prompts: fields: text: - type: string \ No newline at end of file + type: string + +args: + n_rows_to_load: + description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale + type: int + default: None \ No newline at end of file diff --git a/examples/pipelines/controlnet-interior-design/components/generate_prompts/src/main.py b/examples/pipelines/controlnet-interior-design/components/generate_prompts/src/main.py index f2fd50ced..e10c749e6 100644 --- a/examples/pipelines/controlnet-interior-design/components/generate_prompts/src/main.py +++ b/examples/pipelines/controlnet-interior-design/components/generate_prompts/src/main.py @@ -3,6 +3,7 @@ """ import itertools import logging +import typing as t import dask.dataframe as dd import pandas as pd @@ -97,10 +98,13 @@ def make_interior_prompt(room: str, prefix: str, style: str) -> str: class GeneratePromptsComponent(LoadComponent): - def load(self) -> dd.DataFrame: + def load(self, n_rows_to_load: t.Optional[int]) -> dd.DataFrame: """ - Generate a set of initial prompts that will be used to retrieve images from the LAION-5B dataset. - + Generate a set of initial prompts that will be used to retrieve images from the LAION-5B + dataset. + Args: + n_rows_to_load: Optional argument that defines the number of rows to load. Useful for + testing pipeline runs on a small scale Returns: Dask dataframe """ @@ -109,6 +113,9 @@ def load(self) -> dd.DataFrame: pandas_df = pd.DataFrame(prompts, columns=["prompts_text"]) + if n_rows_to_load: + pandas_df = pandas_df.head(n_rows_to_load) + df = dd.from_pandas(pandas_df, npartitions=1) return df diff --git a/examples/pipelines/controlnet-interior-design/components/write_to_hub_controlnet/Dockerfile b/examples/pipelines/controlnet-interior-design/components/write_to_hub_controlnet/Dockerfile deleted file mode 100644 index 532ce5bb7..000000000 --- a/examples/pipelines/controlnet-interior-design/components/write_to_hub_controlnet/Dockerfile +++ /dev/null @@ -1,20 +0,0 @@ -FROM --platform=linux/amd64 python:3.8-slim - -## System dependencies -RUN apt-get update && \ - apt-get upgrade -y && \ - apt-get install git -y - -# install requirements -COPY requirements.txt / -RUN pip3 install --no-cache-dir -r requirements.txt - -# Set the working directory to the component folder -WORKDIR /component/src - -# Copy over src-files and spec of the component -COPY src/ . -COPY fondant_component.yaml ../ - - -ENTRYPOINT ["python", "main.py"] \ No newline at end of file diff --git a/examples/pipelines/controlnet-interior-design/components/write_to_hub_controlnet/fondant_component.yaml b/examples/pipelines/controlnet-interior-design/components/write_to_hub_controlnet/fondant_component.yaml index 4024a99f7..efb253159 100644 --- a/examples/pipelines/controlnet-interior-design/components/write_to_hub_controlnet/fondant_component.yaml +++ b/examples/pipelines/controlnet-interior-design/components/write_to_hub_controlnet/fondant_component.yaml @@ -1,6 +1,6 @@ name: Write to hub description: Component that writes a dataset to the hub -image: ghcr.io/ml6team/write_to_hub_controlnet:latest +image: ghcr.io/ml6team/write_to_hf_hub:latest consumes: images: @@ -29,4 +29,12 @@ args: type: str dataset_name: description: The name of the dataset to upload - type: str \ No newline at end of file + type: str + image_column_names: + description: A list containing the image column names. Used to format to image to HF hub format + type: list + default: None + column_name_mapping: + description: Mapping of the consumed fondant column names to the written hub column names + type: dict + default: None \ No newline at end of file diff --git a/examples/pipelines/controlnet-interior-design/components/write_to_hub_controlnet/src/main.py b/examples/pipelines/controlnet-interior-design/components/write_to_hub_controlnet/src/main.py deleted file mode 100644 index 49fabeee2..000000000 --- a/examples/pipelines/controlnet-interior-design/components/write_to_hub_controlnet/src/main.py +++ /dev/null @@ -1,66 +0,0 @@ -""" -This component writes an image dataset to the hub. -""" -import logging - -import huggingface_hub -import dask.dataframe as dd - -from fondant.component import TransformComponent -from fondant.logger import configure_logging - -configure_logging() -logger = logging.getLogger(__name__) - - -class WriteToHubComponent(TransformComponent): - def transform( - self, - dataframe: dd.DataFrame, - *, - hf_token: str, - username: str, - dataset_name: str, - ) -> dd.DataFrame: - """ - Args: - dataframe: Dask dataframe - hf_token: The hugging face token used to write to the hub - username: The username under which to upload the dataset - dataset_name: The name of the dataset to upload - - Returns: - dataset - """ - # login - huggingface_hub.login(token=hf_token) - - # Create HF dataset repository - repo_id = f"{username}/{dataset_name}" - repo_path = f"hf://datasets/{repo_id}" - logger.info(f"Creating HF dataset repository under ID: '{repo_id}'") - huggingface_hub.create_repo(repo_id=repo_id, repo_type="dataset", exist_ok=True) - - # Get columns to write and schema - write_columns = [] - schema = {} - for subset_name, subset in self.spec.consumes.items(): - write_columns.extend([f"{subset_name}_{field}" for field in subset.fields]) - # Get schema - subset_schema = { - f"{subset_name}_{field.name}": field.type.name - for field in subset.fields.values() - } - - schema.update(subset_schema) - - dataframe_hub = dataframe[write_columns] - - dd.to_parquet(dataframe_hub, path=f"{repo_path}/data", schema=schema) - - return dataframe - - -if __name__ == "__main__": - component = WriteToHubComponent.from_args() - component.run() diff --git a/examples/pipelines/controlnet-interior-design/pipeline.py b/examples/pipelines/controlnet-interior-design/pipeline.py index 36244eec5..355a1ffc6 100644 --- a/examples/pipelines/controlnet-interior-design/pipeline.py +++ b/examples/pipelines/controlnet-interior-design/pipeline.py @@ -1,5 +1,6 @@ """Pipeline used to create a stable diffusion dataset from a set of initial prompts.""" # pylint: disable=import-error +import argparse import logging import sys @@ -24,7 +25,8 @@ # Define component ops generate_prompts_op = ComponentOp( - component_spec_path="components/generate_prompts/fondant_component.yaml" + component_spec_path="components/generate_prompts/fondant_component.yaml", + arguments={"n_rows_to_load": None}, ) laion_retrieval_op = ComponentOp.from_registry( name="prompt_based_laion_retrieval", @@ -62,12 +64,14 @@ node_pool_name="model-inference-pool", ) -write_to_hub_controlnet = ComponentOp( +write_to_hub_controlnet = ComponentOp.from_registry( + name="write_to_hf_hub", component_spec_path="components/write_to_hub_controlnet/fondant_component.yaml", arguments={ "username": "test-user", "dataset_name": "segmentation_kfp", "hf_token": "hf_token", + "image_column_names": ["images_data"], }, number_of_gpus=1, node_pool_name="model-inference-pool", diff --git a/examples/pipelines/finetune_stable_diffusion/components/load_from_hf_hub/fondant_component.yaml b/examples/pipelines/finetune_stable_diffusion/components/load_from_hf_hub/fondant_component.yaml new file mode 100644 index 000000000..9c9e02adf --- /dev/null +++ b/examples/pipelines/finetune_stable_diffusion/components/load_from_hf_hub/fondant_component.yaml @@ -0,0 +1,31 @@ +name: Load from hub +description: Component that loads a dataset from the hub +image: ghcr.io/ml6team/load_from_hf_hub:latest + +produces: + images: + fields: + data: + type: binary + + captions: + fields: + data: + type: string + +args: + dataset_name: + description: Name of dataset on the hub + type: str + column_name_mapping: + description: Mapping of the consumed hub dataset to fondant column names + type: dict + image_column_names: + description: Optional argument, a list containing the original image column names in case the + dataset on the hub contains them. Used to format the image from HF hub format to a byte string. + type: list + default: None + n_rows_to_load: + description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale + type: int + default: None diff --git a/examples/pipelines/finetune_stable_diffusion/components/write_to_hf_hub/fondant_component.yaml b/examples/pipelines/finetune_stable_diffusion/components/write_to_hf_hub/fondant_component.yaml new file mode 100644 index 000000000..9d94e6aa4 --- /dev/null +++ b/examples/pipelines/finetune_stable_diffusion/components/write_to_hf_hub/fondant_component.yaml @@ -0,0 +1,33 @@ +name: Write to hub +description: Component that writes a dataset to the hub +image: ghcr.io/ml6team/write_to_hf_hub:latest + +consumes: + images: + fields: + data: + type: binary + + captions: + fields: + text: + type: string + +args: + hf_token: + description: The hugging face token used to write to the hub + type: str + username: + description: The username under which to upload the dataset + type: str + dataset_name: + description: The name of the dataset to upload + type: str + image_column_names: + description: A list containing the image column names. Used to format to image to HF hub format + type: list + default: None + column_name_mapping: + description: Mapping of the consumed fondant column names to the written hub column names + type: dict + default: None \ No newline at end of file diff --git a/examples/pipelines/finetune_stable_diffusion/pipeline.py b/examples/pipelines/finetune_stable_diffusion/pipeline.py index bc16ba212..5751583de 100644 --- a/examples/pipelines/finetune_stable_diffusion/pipeline.py +++ b/examples/pipelines/finetune_stable_diffusion/pipeline.py @@ -1,4 +1,5 @@ """Pipeline used to create a stable diffusion dataset from a set of given images.""" +import argparse import logging import sys @@ -17,10 +18,26 @@ client = Client(host=PipelineConfigs.HOST) +load_component_column_mapping = {"image": "images_data", "text": "captions_data"} + +write_component_column_mapping = { + value: key for key, value in load_component_column_mapping.items() +} # Define component ops load_from_hub_op = ComponentOp.from_registry( name="load_from_hf_hub", - arguments={"dataset_name": "logo-wizard/modern-logo-dataset"}, + component_spec_path="components/load_from_hf_hub/fondant_component.yaml", + arguments={ + "dataset_name": "logo-wizard/modern-logo-dataset", + "column_name_mapping": load_component_column_mapping, + "image_column_names": ["image"], + "nb_rows_to_load": None, + }, +) + +# Define component ops +image_resolution_extraction_op = ComponentOp.from_registry( + name="image_resolution_extraction" ) image_embedding_op = ComponentOp.from_registry( @@ -60,13 +77,27 @@ node_pool_name="model-inference-pool", ) +write_to_hub = ComponentOp.from_registry( + name="write_to_hf_hub", + component_spec_path="components/write_to_hf_hub/fondant_component.yaml", + arguments={ + "username": "test-user", + "dataset_name": "stable_diffusion_processed", + "hf_token": "hf_token", + "image_column_names": ["images_data"], + }, + number_of_gpus=1, + node_pool_name="model-inference-pool", +) pipeline = Pipeline(pipeline_name=pipeline_name, base_path=PipelineConfigs.BASE_PATH) pipeline.add_op(load_from_hub_op) -pipeline.add_op(image_embedding_op, dependencies=load_from_hub_op) +pipeline.add_op(image_resolution_extraction_op, dependencies=load_from_hub_op) +pipeline.add_op(image_embedding_op, dependencies=image_resolution_extraction_op) pipeline.add_op(laion_retrieval_op, dependencies=image_embedding_op) pipeline.add_op(download_images_op, dependencies=laion_retrieval_op) pipeline.add_op(caption_images_op, dependencies=download_images_op) +pipeline.add_op(write_to_hub, dependencies=caption_images_op) client.compile_and_run(pipeline=pipeline) diff --git a/examples/pipelines/starcoder/components/load_from_hub/fondant_component.yaml b/examples/pipelines/starcoder/components/load_from_hub/fondant_component.yaml new file mode 100644 index 000000000..56d2d3d41 --- /dev/null +++ b/examples/pipelines/starcoder/components/load_from_hub/fondant_component.yaml @@ -0,0 +1,40 @@ +name: Load code dataset from hub +description: Component that loads the stack dataset from the hub +image: ghcr.io/ml6team/load_from_hf_hub:latest + +produces: + code: + fields: + content: + type: string + lang: + type: string + size: + type: int32 + path: + type: string + repository_name: + type: string + avg_line_length: + type: float64 + max_line_length: + type: int32 + alphanum_fraction: + type: float64 + +args: + dataset_name: + description: Name of dataset on the hub + type: str + column_name_mapping: + description: Mapping of the consumed hub dataset to fondant column names + type: dict + image_column_names: + description: A list containing the original hub image column names. Used to format the image + from HF hub format to a byte string + type: list + default: None + n_rows_to_load: + description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale + type: int + default: None diff --git a/examples/pipelines/starcoder/components/load_from_hub_stack/README.md b/examples/pipelines/starcoder/components/load_from_hub_stack/README.md deleted file mode 100644 index 755d725e3..000000000 --- a/examples/pipelines/starcoder/components/load_from_hub_stack/README.md +++ /dev/null @@ -1,3 +0,0 @@ -## Load code dataset from hub - -This component loads a code dataset from a remote location on the [Hugging Face hub](https://huggingface.co/). \ No newline at end of file diff --git a/examples/pipelines/starcoder/components/load_from_hub_stack/fondant_component.yaml b/examples/pipelines/starcoder/components/load_from_hub_stack/fondant_component.yaml deleted file mode 100644 index 70136f293..000000000 --- a/examples/pipelines/starcoder/components/load_from_hub_stack/fondant_component.yaml +++ /dev/null @@ -1,28 +0,0 @@ -name: Load code dataset from hub -description: Component that loads the stack dataset from the hub -image: ghcr.io/ml6team/load_from_hub_stack:latest - -produces: - code: - fields: - content: - type: string - lang: - type: string - size: - type: int32 - path: - type: string - repository_name: - type: string - avg_line_length: - type: float64 - max_line_length: - type: int32 - alphanum_fraction: - type: float64 - -args: - dataset_name: - description: Name of dataset on the hub - type: str \ No newline at end of file diff --git a/examples/pipelines/starcoder/components/load_from_hub_stack/requirements.txt b/examples/pipelines/starcoder/components/load_from_hub_stack/requirements.txt deleted file mode 100644 index 6d4973e32..000000000 --- a/examples/pipelines/starcoder/components/load_from_hub_stack/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -huggingface_hub==0.14.1 -fondant -pyarrow>=7.0 -gcsfs==2023.4.0 \ No newline at end of file diff --git a/examples/pipelines/starcoder/components/load_from_hub_stack/src/main.py b/examples/pipelines/starcoder/components/load_from_hub_stack/src/main.py deleted file mode 100644 index c1447a1c3..000000000 --- a/examples/pipelines/starcoder/components/load_from_hub_stack/src/main.py +++ /dev/null @@ -1,39 +0,0 @@ -""" -This component loads a code dataset from a remote location on the Hugging Face hub. -""" - -import logging - -import dask.dataframe as dd - -from fondant.component import LoadComponent -from fondant.logger import configure_logging - -configure_logging() -logger = logging.getLogger(__name__) - - -class LoadFromHubComponent(LoadComponent): - def load(self, *, dataset_name: str) -> dd.DataFrame: - """ - Args: - dataset_name: name of the dataset to load - - Returns: - Dataset: HF dataset - """ - - # 1) Load data, read as Dask dataframe - logger.info("Loading dataset from the hub...") - dask_df = dd.read_parquet(f"hf://datasets/{dataset_name}") - - # 2) Add prefix to column - column_dict = {column: f"code_{column}" for column in dask_df.columns} - dask_df = dask_df.rename(columns=column_dict) - - return dask_df - - -if __name__ == "__main__": - component = LoadFromHubComponent.from_args() - component.run() diff --git a/examples/pipelines/starcoder/pipeline.py b/examples/pipelines/starcoder/pipeline.py index 8afa9b3fa..5409b4b71 100644 --- a/examples/pipelines/starcoder/pipeline.py +++ b/examples/pipelines/starcoder/pipeline.py @@ -1,6 +1,5 @@ """Pipeline used to create the dataset to train the StarCoder model.""" -import argparse import logging import sys @@ -15,17 +14,41 @@ configure_logging() logger = logging.getLogger(__name__) + +client = Client(host=PipelineConfigs.HOST) + +dataset_column_name = [ + "content", + "lang", + "size", + "path", + "repository_name", + "avg_line_length", + "max_line_length", + "alphanum_fraction", +] + +load_component_column_mapping = { + column: f"code_{column}" for column in dataset_column_name +} + # Initialize pipeline and client pipeline = Pipeline( pipeline_name="Stack filtering pipeline", pipeline_description="A pipeline for filtering the stack dataset", base_path=PipelineConfigs.BASE_PATH, ) +client = Client(host=PipelineConfigs.HOST) # define ops -load_from_hub_op = ComponentOp( - component_spec_path="components/load_from_hub_stack/fondant_component.yaml", - arguments={"dataset_name": "ml6team/the-stack-smol-python"}, +load_from_hub_op = ComponentOp.from_registry( + name="load_from_hub", + component_spec_path="components/load_from_hub/fondant_component.yaml", + arguments={ + "dataset_name": "ml6team/the-stack-smol-python", + "column_name_mapping": load_component_column_mapping, + "n_rows_to_load": None, + }, ) filter_line_length_op = ComponentOp.from_registry( @@ -36,6 +59,7 @@ "alphanum_fraction_threshold": 0.25, }, ) + filter_comments_op = ComponentOp.from_registry( name="filter_comments", arguments={"min_comments_ratio": 0.1, "max_comments_ratio": 0.9}, diff --git a/fondant/component_spec.py b/fondant/component_spec.py index f22bd17e6..4de75f0fd 100644 --- a/fondant/component_spec.py +++ b/fondant/component_spec.py @@ -205,6 +205,11 @@ def kubeflow_specification(self) -> "KubeflowComponentSpec": def __repr__(self) -> str: return f"{self.__class__.__name__}({self._specification!r})" + def __eq__(self, other): + if not isinstance(other, ComponentSpec): + return False + return self._specification == other._specification + class KubeflowComponentSpec: """ diff --git a/fondant/pipeline.py b/fondant/pipeline.py index 5fda7b030..c1b5a8899 100644 --- a/fondant/pipeline.py +++ b/fondant/pipeline.py @@ -85,6 +85,7 @@ def from_registry( cls, name: str, *, + component_spec_path: t.Optional[t.Union[str, Path]] = None, arguments: t.Optional[t.Dict[str, t.Any]] = None, number_of_gpus: t.Optional[int] = None, node_pool_name: t.Optional[str] = None, @@ -95,6 +96,9 @@ def from_registry( Args: name: Name of the component to load + component_spec_path: The path to the specification file defining the component, defaults + to path defined within the component but can be specified to define custom + specification file arguments: A dictionary containing the argument name and value for the operation. number_of_gpus: The number of gpus to assign to the operation node_pool_name: The name of the node pool to which the operation will be assigned. @@ -104,13 +108,14 @@ def from_registry( Defined by string which can be a number or a number followed by one of “E”, “P”, “T”, “G”, “M”, “K”. (e.g. 2T for 2 Terabytes) """ - component_spec_path = ( - files("fondant") / f"components/{name}/fondant_component.yaml" - ) - component_spec_path = t.cast(Path, component_spec_path) + if not component_spec_path: + component_spec_path = ( + files("fondant") / f"components/{name}/fondant_component.yaml" # type: ignore + ) + component_spec_path = t.cast(Path, component_spec_path) - if not (component_spec_path.exists() and component_spec_path.is_file()): - raise ValueError(f"No reusable component with name {name} found.") + if not (component_spec_path.exists() and component_spec_path.is_file()): + raise ValueError(f"No reusable component with name {name} found.") return ComponentOp( component_spec_path, diff --git a/tests/example_pipelines/load_from_hub_custom_spec.yaml b/tests/example_pipelines/load_from_hub_custom_spec.yaml new file mode 100644 index 000000000..6ac779cc4 --- /dev/null +++ b/tests/example_pipelines/load_from_hub_custom_spec.yaml @@ -0,0 +1,27 @@ +name: First component +description: This is an example component +image: example_component:latest + +produces: + images: + fields: + data: + type: binary + + captions: + fields: + data: + type: string + +args: + dataset_name: + description: Name of dataset on the hub + type: str + column_name_mapping: + description: Mapping of the consumed hub dataset to fondant column names + type: dict + image_column_names: + description: A list containing the original hub image column names. Used to format the image + from HF hub format to a byte string + type: list + default: None \ No newline at end of file diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index c37d7ae90..47df7645e 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -2,12 +2,22 @@ from pathlib import Path import pytest +import yaml from fondant.exceptions import InvalidPipelineDefinition -from fondant.pipeline import ComponentOp, Pipeline +from fondant.pipeline import ComponentOp, ComponentSpec, Pipeline valid_pipeline_path = Path(__file__).parent / "example_pipelines/valid_pipeline" invalid_pipeline_path = Path(__file__).parent / "example_pipelines/invalid_pipeline" +custom_spec_path = ( + Path(__file__).parent / "example_pipelines/load_from_hub_custom_spec.yaml" +) + + +def yaml_file_to_dict(file_path): + with open(file_path, "r") as file: + data = yaml.safe_load(file) + return data @pytest.fixture @@ -177,3 +187,35 @@ def test_reusable_component_op(): ComponentOp.from_registry( name="this_component_does_not_exist", ) + + +def test_defining_reusable_component_op_with_custom_spec(): + load_from_hub_op_default_op = ComponentOp.from_registry( + name="load_from_hf_hub", + arguments={ + "dataset_name": "test_dataset", + "column_name_mapping": {"foo": "bar"}, + "image_column_names": None, + }, + ) + + load_from_hub_op_default_spec = ComponentSpec( + yaml_file_to_dict(load_from_hub_op_default_op.component_spec_path) + ) + + load_from_hub_op_custom_op = ComponentOp.from_registry( + name="load_from_hf_hub", + component_spec_path=custom_spec_path, + arguments={ + "dataset_name": "test_dataset", + "column_name_mapping": {"foo": "bar"}, + "image_column_names": None, + }, + ) + + load_from_hub_op_custom_spec = ComponentSpec(yaml_file_to_dict(custom_spec_path)) + + assert load_from_hub_op_custom_op.component_spec == load_from_hub_op_custom_spec + assert load_from_hub_op_default_op.component_spec == load_from_hub_op_default_spec + assert load_from_hub_op_default_op.component_spec != load_from_hub_op_custom_spec + assert load_from_hub_op_custom_op.component_spec != load_from_hub_op_default_spec