Skip to content

Commit

Permalink
[DataComp] Add download images component (#348)
Browse files Browse the repository at this point in the history
This PR:

splits up the datacomp folder into 2 pipelines:

- a simple pipeline, just consisting of 3 components, serving as a
simple baseline which could serve as a first submission
- a more advanced pipeline, which involves downloading images (using the
reusable `download_images` component), and later on also text detection
and text recognition

and improves the `download_images` component to leverage Dask's
`map_partitions`.

---------

Co-authored-by: Robbe Sneyders <[email protected]>
  • Loading branch information
NielsRogge and RobbeSneyders authored Aug 14, 2023
1 parent c65ddd4 commit 6fb22a3
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 58 deletions.
74 changes: 50 additions & 24 deletions components/download_images/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,24 @@ def download_image_with_retry(
return None, None, None


def download_image_with_retry_partition(dataframe, timeout, retries, resizer):
# process a single partition
# TODO make column name more flexible
data = dataframe.images_url.apply(lambda x:
download_image_with_retry(
url=x, timeout=timeout, retries=retries, resizer=resizer,
),
)

# use assign to add values as extra columns
dataframe = dataframe.assign(data=[example[0] for example in data],
width=[example[1] for example in data],
height=[example[2] for example in data],
)

return dataframe


class DownloadImagesComponent(DaskTransformComponent):
"""Component that downloads images based on URLs."""

Expand Down Expand Up @@ -123,35 +141,43 @@ def __init__(self,
max_aspect_ratio=max_aspect_ratio,
)

def transform(
self,
dataframe: dd.DataFrame,
) -> dd.DataFrame:
logger.info("Instantiating resizer...")

# Remove duplicates from laion retrieval
dataframe = dataframe.drop_duplicates()

dataframe = dataframe.apply(
lambda example: download_image_with_retry(
url=example.images_url,
timeout=self.timeout,
retries=self.retries,
resizer=self.resizer,
),
axis=1,
result_type="expand",
meta={0: bytes, 1: int, 2: int},
def transform(self, dataframe: dd.DataFrame) -> dd.DataFrame:

logger.info(f"Length of the dataframe: {len(dataframe)}")
logger.info("Downloading images...")

# drop width and height columns, as those are going to be
# added later on
dataframe = dataframe.drop(columns=['images_width', 'images_height'])

# create meta
# needs to be a dictionary with keys = column names, values = dtypes of columns
# for each column in the output
meta = dict(zip(dataframe.columns, dataframe.dtypes))
meta["data"] = bytes
meta["width"] = int
meta["height"] = int

dataframe = dataframe.map_partitions(
download_image_with_retry_partition,
timeout=self.timeout,
retries=self.retries,
resizer=self.resizer,
meta=meta,
)
dataframe.columns = [
"images_data",
"images_width",
"images_height",
]

# rename new columns to be conform the spec
logger.info("Renaming columns...")
dataframe = dataframe.rename(columns={"data": "images_data",
"width": "images_width",
"height":"images_height"})

# Remove images that could not be fetched
logger.info("Dropping invalid rows...")
dataframe = dataframe.dropna()

print("Columns of final dataframe:", dataframe.columns)

return dataframe


Expand Down
2 changes: 1 addition & 1 deletion components/filter_image_resolution/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ description: Component that filters images based on minimum size and max aspect
image: ghcr.io/ml6team/filter_image_resolution:dev

consumes:
image:
images:
fields:
width:
type: int32
Expand Down
4 changes: 2 additions & 2 deletions components/filter_image_resolution/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ def __init__(self, *_, min_image_dim: int, max_aspect_ratio: float) -> None:
self.max_aspect_ratio = max_aspect_ratio

def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
width = dataframe["image"]["width"]
height = dataframe["image"]["height"]
width = dataframe["images"]["width"]
height = dataframe["images"]["height"]
min_image_dim = np.minimum(width, height)
max_image_dim = np.maximum(width, height)
aspect_ratio = max_image_dim / min_image_dim
Expand Down
8 changes: 8 additions & 0 deletions examples/pipelines/datacomp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# DataComp pipeline

[DataComp](https://www.datacomp.ai/) is a competition organized by the University of Washington and others to come up with the best possible image-text dataset to train a fixed CLIP model. Hence, it's an ideal use case for Fondant, as we can leverage reusable components to filter large, noisy image-text datasets.

Currently, 2 pipelines are implemented:

- a simple pipeline (`simple_pipeline.py`), which loads the DataComp dataset from the hub and applies 2 basic filtering steps (filtering on image resolution and caption complexity). This pipeline serves as a baseline and could serve as a first submission.
- a more complex pipeline (`pipeline.py`), which loads the DataComp dataset from the hub, loads the actual images based on the URLs, and applies text detection and text recognition models to filter the dataset.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
name: Download images
description: Component that downloads images based on URLs
image: ghcr.io/ml6team/download_images:dev

consumes:
images:
fields:
url:
type: string
width:
type: int32
height:
type: int32
face_bboxes:
type: array
items:
type: array
items:
type: float32
sha256:
type: utf8

produces:
images:
fields:
data:
type: binary
width:
type: int32
height:
type: int32

args:
timeout:
description: Maximum time (in seconds) to wait when trying to download an image
type: int
default: 10
retries:
description: Number of times to retry downloading an image if it fails.
type: int
default: 0
image_size:
description: Size of the images after resizing.
type: int
default: 256
resize_mode:
description: Resize mode to use. One of "no", "keep_ratio", "center_crop", "border".
type: str
default: 'border'
resize_only_if_bigger:
description: If True, resize only if image is bigger than image_size.
type: bool
default: 'False'
min_image_size:
description: Minimum size of the images.
type: int
default: 0
max_aspect_ratio:
description: Maximum aspect ratio of the images.
type: float
default: 'inf'
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,4 @@ args:
type: int
min_complexity:
description: Minimum complexity to filter text on.
type: int
min_num_actions:
description: Minimum number of actions a text should contain.
type: int
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,6 @@ def get_text_complexity(doc: spacy.tokens.doc.Doc):
return complexity


def get_num_actions(doc: spacy.tokens.doc.Doc):
verbs = set()
for possible_subject in doc:
if possible_subject.dep == nsubj and possible_subject.head.pos == VERB:
verbs.add(possible_subject.head)

return len(verbs)


class FilterTextComplexity(PandasTransformComponent):
"""Component that filters text based on:
Expand All @@ -48,25 +39,24 @@ def __init__(
spacy_pipeline,
batch_size: int,
min_complexity: int,
min_num_actions: int
) -> None:
self.nlp = spacy.load(spacy_pipeline, exclude=["ner"])
self.nlp = spacy.load(
spacy_pipeline, exclude=["tagger", "ner", "lemmatizer", "textcat"]
)
self.batch_size = batch_size
self.min_complexity = min_complexity
self.min_num_actions = min_num_actions

def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
texts = dataframe["text"]["data"]

logger.info("Creating SpaCy docs...")
docs = list(self.nlp.pipe(texts, batch_size=self.batch_size))
docs = pd.Series(docs)

logger.info("Calculating text complexity...")
caption_complexity = docs.apply(lambda doc: get_text_complexity(doc))
num_actions = docs.apply(lambda doc: get_num_actions(doc))

mask = (caption_complexity >= self.min_complexity) & (
num_actions >= self.min_num_actions
)
mask = caption_complexity >= self.min_complexity
mask = mask.to_numpy()

return dataframe[mask]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ description: Component that loads a dataset from the hub
image: ghcr.io/ml6team/load_from_hf_hub:dev

produces:
image:
images:
fields:
url:
type: string
Expand All @@ -27,6 +27,8 @@ produces:

image_text:
fields:
uid:
type: string
clip_b32_similarity_score:
type: float32
clip_l14_similarity_score:
Expand Down
35 changes: 24 additions & 11 deletions examples/pipelines/datacomp/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@

# define ops
load_component_column_mapping = {
"url": "image_url",
"original_width": "image_width",
"original_height": "image_height",
"face_bboxes": "image_face_bboxes",
"sha256": "image_sha256",
"url": "images_url",
"original_width": "images_width",
"original_height": "images_height",
"face_bboxes": "images_face_bboxes",
"sha256": "images_sha256",
"text": "text_data",
"uid": "image_text_uid",
"clip_b32_similarity_score": "image_text_clip_b32_similarity_score",
"clip_l14_similarity_score": "image_text_clip_l14_similarity_score",
}
Expand All @@ -36,26 +37,38 @@
arguments={
"dataset_name": "mlfoundations/datacomp_small",
"column_name_mapping": load_component_column_mapping,
"n_rows_to_load": 1000,
},
node_pool_label="node_pool",
node_pool_name="n2-standard-128-pool",
)
filter_image_resolution_op = ComponentOp.from_registry(
name="filter_image_resolution",
arguments={"min_image_dim": 200, "max_aspect_ratio": 3},
download_images_op = ComponentOp(
component_dir="components/download_images",
arguments={
"retries": 2,
"min_image_size": 0,
"max_aspect_ratio": float("inf"),
},
node_pool_label="node_pool",
node_pool_name="n2-standard-128-pool",
output_partition_size="disable",
)
filter_complexity_op = ComponentOp(
component_dir="components/filter_text_complexity",
arguments={
"spacy_pipeline": "en_core_web_sm",
"batch_size": 1000,
"min_complexity": 1,
"min_num_actions": 0,
},
node_pool_label="node_pool",
node_pool_name="n2-standard-128-pool",
)


# add ops to pipeline
pipeline.add_op(load_from_hub_op)
pipeline.add_op(filter_image_resolution_op, dependencies=load_from_hub_op)
pipeline.add_op(filter_complexity_op, dependencies=filter_image_resolution_op)
pipeline.add_op(filter_complexity_op, dependencies=download_images_op)
pipeline.add_op(download_images_op, dependencies=load_from_hub_op)
# TODO add more ops


Expand Down
70 changes: 70 additions & 0 deletions examples/pipelines/datacomp/simple_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Simplified pipeline used to filter the dataset of the Datacomp competition."""

import logging
import sys

sys.path.append("../")

from pipeline_configs import PipelineConfigs

from fondant.pipeline import ComponentOp, Pipeline, Client

logger = logging.getLogger(__name__)

# Initialize pipeline and client
pipeline = Pipeline(
pipeline_name="datacomp-filtering",
pipeline_description="A pipeline for filtering the Datacomp dataset",
base_path=PipelineConfigs.BASE_PATH,
)
client = Client(host=PipelineConfigs.HOST)

# define ops
load_component_column_mapping = {
"url": "images_url",
"original_width": "images_width",
"original_height": "images_height",
"face_bboxes": "images_face_bboxes",
"sha256": "images_sha256",
"text": "text_data",
"uid": "image_text_uid",
"clip_b32_similarity_score": "image_text_clip_b32_similarity_score",
"clip_l14_similarity_score": "image_text_clip_l14_similarity_score",
}

load_from_hub_op = ComponentOp(
component_dir="components/load_from_hf_hub",
arguments={
"dataset_name": "nielsr/datacomp-small-with-embeddings",
"column_name_mapping": load_component_column_mapping,
},
node_pool_label="node_pool",
node_pool_name="n2-standard-128-pool",
)
filter_image_resolution_op = ComponentOp.from_registry(
name="filter_image_resolution",
arguments={"min_image_dim": 200, "max_aspect_ratio": 3},
node_pool_label="node_pool",
node_pool_name="n2-standard-128-pool",
)
filter_complexity_op = ComponentOp(
component_dir="components/filter_text_complexity",
arguments={
"spacy_pipeline": "en_core_web_sm",
"batch_size": 1000,
"min_complexity": 1,
},
node_pool_label="node_pool",
node_pool_name="n2-standard-128-pool",
output_partition_size="disable",
)

# add ops to pipeline
pipeline.add_op(load_from_hub_op)
pipeline.add_op(filter_image_resolution_op, dependencies=load_from_hub_op)
pipeline.add_op(filter_complexity_op, dependencies=filter_image_resolution_op)
# TODO add more ops


if __name__ == "__main__":
client.compile_and_run(pipeline=pipeline)

0 comments on commit 6fb22a3

Please sign in to comment.