Skip to content

Commit

Permalink
Introduce repartitioning (#309)
Browse files Browse the repository at this point in the history
PR that introduces the partitioning strategy discussed in #288 

1) The automatic behavior is as follows for all component types (dask,
pandas)
* The written dataframe is re-partitioned to 250 Mb
* The loaded dataframe is re-partitioned depending on the current number
of partitions and workers

2) The behavior above can be overwritten by the end user in case they
want to implement their own custom logic, this is done on the
ComponentOp level as an additional flag parameters that can be passed.
See added docs with this PR for more details

I will handle adding the diagnostic tools and optimizing the downloader
component in a separate PR.
  • Loading branch information
PhilippeMoussalli authored Jul 25, 2023
1 parent b8bfaef commit 8c023ca
Show file tree
Hide file tree
Showing 15 changed files with 455 additions and 36 deletions.
67 changes: 66 additions & 1 deletion docs/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ To build a pipeline, you need to define a set of component operations called `Co

The component specifications include the location of the Docker image in a registry.

The runtime configuration consists of the component's arguments and the definition of node pools and resources. For example, if a component requires GPU for model inference, you can specify the necessary GPU resources in the runtime configuration.
The runtime configuration consists of the component's arguments and the definition of node pools, resources and custom partitioning specification. For example, if a component requires GPU for model inference, you can specify the necessary GPU resources in the runtime configuration.

Here is an example of how to build a pipeline using Fondant:
```python
Expand Down Expand Up @@ -46,7 +46,72 @@ Next, we define two operations: `load_from_hub_op`, which is a based from a reus
!!! note "IMPORTANT"
Currently Fondant supports linear DAGs with single dependencies. Support for non-linear DAGs will be available in future releases.

## Setting Custom partitioning parameters

When working with Fondant, each component deals with datasets, and Dask is used internally
to handle datasets larger than the available memory. To achieve this, the data is divided
into smaller chunks called "partitions" that can be processed in parallel. Ensuring a sufficient number of partitions
enables parallel processing, where multiple workers process different partitions simultaneously,
and smaller partitions ensure they fit into memory.

### How Fondant handles partitions

**1) Repartitioning the Loaded DataFrame:** This step is optional and comes into play if the number
of partitions is fewer than the available workers on the data processing instance.
By repartitioning, the maximum number of workers can be efficiently utilized, leading to faster
and parallel processing.

**2) Repartitioning the Written DataFrame:** The written dataframe is also repartitioned into
smaller sizes (default 250MB) to enable the next component to load these partitions into memory.


### Customizing Partitioning

By default, Fondant automatically handles the partitioning, but you can disable this and create your
own custom partitioning logic if you have specific requirements.

Here's an example of disabling the automatic partitioning:

```python

caption_images_op = ComponentOp(
component_dir="components/captioning_component",
arguments={
"model_id": "Salesforce/blip-image-captioning-base",
"batch_size": 2,
"max_new_tokens": 50,
},
input_partition_rows='disable',
output_partition_size='disable',
)
```

The code snippet above disables automatic partitions for both the loaded and written dataframes,
allowing you to define your own partitioning logic inside the components.

Moreover, you have the flexibility to set your own custom partitioning parameters to override the default settings:

```python

caption_images_op = ComponentOp(
component_dir="components/captioning_component",
arguments={
"model_id": "Salesforce/blip-image-captioning-base",
"batch_size": 2,
"max_new_tokens": 50,
},
input_partition_rows=100,
output_partition_size="10MB",
)
```

In the example above, each partition of the loaded dataframe will contain approximately one hundred rows,
and the size of the output partitions will be around 10MB. This capability is useful in scenarios
where processing one row significantly increases the number of rows in the dataset
(resulting in dataset explosion) or causes a substantial increase in row size (e.g., fetching images from URLs).

By setting a lower value for input partition rows, you can mitigate issues where the processed data
grows larger than the available memory before being written to disk.

## Compiling a pipeline

Expand Down
1 change: 1 addition & 0 deletions examples/pipelines/controlnet-interior-design/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
generate_prompts_op = ComponentOp(
component_dir="components/generate_prompts",
arguments={"n_rows_to_load": None},
output_partition_size="disable",
)
laion_retrieval_op = ComponentOp.from_registry(
name="prompt_based_laion_retrieval",
Expand Down
14 changes: 7 additions & 7 deletions src/fondant/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def transform(self, dataframe: dd.DataFrame) -> dd.DataFrame:
raise NotImplementedError


class DaskWriteComponent(BaseComponent):
"""Component that accepts a Dask DataFrame and writes its contents."""

def write(self, dataframe: dd.DataFrame) -> None:
raise NotImplementedError


class PandasTransformComponent(BaseComponent):
"""Component that transforms the incoming dataset partition per partition as a pandas
DataFrame.
Expand All @@ -57,12 +64,5 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
raise NotImplementedError


class DaskWriteComponent(BaseComponent):
"""Component that accepts a Dask DataFrame and writes its contents."""

def write(self, dataframe: dd.DataFrame) -> None:
raise NotImplementedError


Component = t.TypeVar("Component", bound=BaseComponent)
"""Component type which can represents any of the subclasses of BaseComponent"""
18 changes: 18 additions & 0 deletions src/fondant/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,20 @@ def from_fondant_component_spec(
"type": "JsonObject",
"default": "None",
},
{
"name": "input_partition_rows",
"description": "The number of rows to load per partition. Set to override the"
" automatic partitioning",
"type": "String",
"default": "None",
},
{
"name": "output_partition_size",
"description": "The size of the output partition size, defaults"
" to 250MB. Set to `disable` to disable the automatic partitioning",
"type": "String",
"default": "None",
},
*(
{
"name": arg.name,
Expand Down Expand Up @@ -285,6 +299,10 @@ def from_fondant_component_spec(
{"inputValue": "metadata"},
"--component_spec",
{"inputValue": "component_spec"},
"--input_partition_rows",
{"inputValue": "input_partition_rows"},
"--output_partition_size",
{"inputValue": "output_partition_size"},
*cls._dump_args(fondant_component.args.values()),
"--output_manifest_path",
{"outputPath": "output_manifest_path"},
Expand Down
105 changes: 105 additions & 0 deletions src/fondant/data_io.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
import typing as t

import dask.dataframe as dd
Expand All @@ -17,6 +18,62 @@ def __init__(self, *, manifest: Manifest, component_spec: ComponentSpec) -> None


class DaskDataLoader(DataIO):
def __init__(
self,
*,
manifest: Manifest,
component_spec: ComponentSpec,
input_partition_rows: t.Optional[t.Union[int, str]] = None,
):
super().__init__(manifest=manifest, component_spec=component_spec)
self.input_partition_rows = input_partition_rows

def partition_loaded_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame:
"""
Function that partitions the loaded dataframe depending on its partitions and the available
workers
Returns:
The partitioned dataframe.
"""
if self.input_partition_rows != "disable":
if isinstance(self.input_partition_rows, int):
# Only load the index column to trigger a faster compute of the rows
total_rows = len(dataframe.index)
# +1 to handle any remainder rows
n_partitions = (total_rows // self.input_partition_rows) + 1
dataframe = dataframe.repartition(npartitions=n_partitions)
logger.info(
f"Total number of rows is {total_rows}.\n"
f"Repartitioning the data from {dataframe.partitions} partitions to have"
f" {n_partitions} such that the number of partitions per row is approximately"
f"{self.input_partition_rows}",
)

elif self.input_partition_rows is None:
n_partitions = dataframe.npartitions
n_workers = os.cpu_count()
if n_partitions < n_workers: # type: ignore
logger.info(
f"The number of partitions of the input dataframe is {n_partitions}. The "
f"available number of workers is {n_workers}.",
)
dataframe = dataframe.repartition(npartitions=n_workers)
logger.info(
f"Repartitioning the data to {n_workers} partitions before processing"
f" to maximize worker usage",
)
else:
msg = (
f"{self.input_partition_rows} is not a valid argument. Choose either "
f"the number of partitions or set to 'disable' to disable automated "
f"partitioning"
)
raise ValueError(
msg,
)

return dataframe

def _load_subset(self, subset_name: str, fields: t.List[str]) -> dd.DataFrame:
"""
Function that loads a subset from the manifest as a Dask dataframe.
Expand Down Expand Up @@ -80,15 +137,63 @@ def load_dataframe(self) -> dd.DataFrame:
how="left",
)

dataframe = self.partition_loaded_dataframe(dataframe)

logging.info(f"Columns of dataframe: {list(dataframe.columns)}")

return dataframe


class DaskDataWriter(DataIO):
def __init__(
self,
*,
manifest: Manifest,
component_spec: ComponentSpec,
output_partition_size: t.Optional[t.Union[str]] = None,
):
super().__init__(manifest=manifest, component_spec=component_spec)

self.output_partition_size = output_partition_size

def partition_written_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame:
"""
Function that partitions the written dataframe to smaller partitions based on a given
partition size.
"""
if self.output_partition_size != "disable":
if isinstance(self.output_partition_size, str):
dataframe = dataframe.repartition(
partition_size=self.output_partition_size,
)
logger.info(
f"Repartitioning the written data such that the size per partition is approx."
f" {self.output_partition_size}",
)

elif self.output_partition_size is None:
dataframe = dataframe.repartition(partition_size="250MB")
logger.info(
f"Repartitioning the written data such that the size per partition is approx."
f" {self.output_partition_size}. (Automatic repartitioning)",
)
else:
msg = (
f"{self.output_partition_size} is not a valid argument. Choose either the"
f" number of size of the partition (e.g. '250Mb' or set to 'disable' to"
f" disable automated partitioning"
)
raise ValueError(
msg,
)

return dataframe

def write_dataframe(self, dataframe: dd.DataFrame) -> None:
write_tasks = []

dataframe = self.partition_written_dataframe(dataframe)

dataframe.index = dataframe.index.rename("id").astype("string")

# Turn index into an empty dataframe so we can write it
Expand Down
Loading

0 comments on commit 8c023ca

Please sign in to comment.