From c569c4994ad0ab6d6d9be7931ea491ae2d7fbcbe Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Mon, 18 Mar 2024 13:36:33 +0100 Subject: [PATCH 1/3] Skip transformation of partition if partition is empty --- src/fondant/component/executor.py | 5 +++++ tests/component/test_component.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/src/fondant/component/executor.py b/src/fondant/component/executor.py index 7a79c6fd..96b514a7 100644 --- a/src/fondant/component/executor.py +++ b/src/fondant/component/executor.py @@ -479,6 +479,7 @@ def wrap_transform( operation_spec: OperationSpec, ) -> t.Callable: """Factory that creates a function to wrap the component transform function. The wrapper: + - Skips the transformation if the received partition is empty - Removes extra columns from the returned dataframe which are not defined in the component spec `produces` section - Sorts the columns from the returned dataframe according to the order in the component @@ -492,6 +493,10 @@ def wrap_transform( def wrapped_transform(dataframe: pd.DataFrame) -> pd.DataFrame: # Call transform method + if dataframe.empty: + logger.info("Received empty partition, skipping transformation.") + return dataframe + dataframe = transform(dataframe) # Drop columns not in specification diff --git a/tests/component/test_component.py b/tests/component/test_component.py index 4511e1f4..426b92c9 100644 --- a/tests/component/test_component.py +++ b/tests/component/test_component.py @@ -591,3 +591,31 @@ def write(self, dataframe): with mock.patch.object(MyWriteComponent, "write", write): executor.execute(MyWriteComponent) write.mock.assert_called_once() + + +def test_skipping_empty_partition(): + # Create an empty dataframe to simulate empty partitions + input_df = pd.DataFrame.from_dict( + { + "image_height": [], + "image_width": [], + "caption_text": [], + }, + ) + + def transform(dataframe: pd.DataFrame) -> pd.DataFrame: + msg = "This should not be called" + raise ValueError(msg) + + wrapped_transform = PandasTransformExecutor.wrap_transform( + transform, + operation_spec=OperationSpec( + ComponentSpec( + name="dummy-spec", + image="dummy-image", + description="dummy-description", + ), + ), + ) + output_df = wrapped_transform(input_df) + assert input_df.equals(output_df) From 442501cbbddb9d3aa6acdedc7227446de3b089ff Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Mon, 18 Mar 2024 13:51:05 +0100 Subject: [PATCH 2/3] Skip transformation of partition if partition is empty --- src/fondant/component/executor.py | 13 +++++++------ tests/component/test_component.py | 3 ++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/fondant/component/executor.py b/src/fondant/component/executor.py index 96b514a7..770a8bfd 100644 --- a/src/fondant/component/executor.py +++ b/src/fondant/component/executor.py @@ -492,18 +492,19 @@ def wrap_transform( """ def wrapped_transform(dataframe: pd.DataFrame) -> pd.DataFrame: - # Call transform method + # Columns of operation specification + columns = [ + name for name, field in operation_spec.operation_produces.items() + ] + if dataframe.empty: logger.info("Received empty partition, skipping transformation.") - return dataframe + return pd.DataFrame(columns=columns) + # Call transform method dataframe = transform(dataframe) # Drop columns not in specification - columns = [ - name for name, field in operation_spec.operation_produces.items() - ] - return dataframe[columns] return wrapped_transform diff --git a/tests/component/test_component.py b/tests/component/test_component.py index 426b92c9..01b9e16e 100644 --- a/tests/component/test_component.py +++ b/tests/component/test_component.py @@ -617,5 +617,6 @@ def transform(dataframe: pd.DataFrame) -> pd.DataFrame: ), ), ) + output_df = wrapped_transform(input_df) - assert input_df.equals(output_df) + assert output_df.equals(pd.DataFrame()) From 5af2143f53114d99ecb1826c95b7204bda04a058 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Thu, 28 Mar 2024 14:11:17 +0100 Subject: [PATCH 3/3] Update src/fondant/component/executor.py Co-authored-by: Robbe Sneyders --- src/fondant/component/executor.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/fondant/component/executor.py b/src/fondant/component/executor.py index 770a8bfd..176a871a 100644 --- a/src/fondant/component/executor.py +++ b/src/fondant/component/executor.py @@ -497,12 +497,10 @@ def wrapped_transform(dataframe: pd.DataFrame) -> pd.DataFrame: name for name, field in operation_spec.operation_produces.items() ] - if dataframe.empty: + if not dataframe.empty: + dataframe = transform(dataframe) + else: logger.info("Received empty partition, skipping transformation.") - return pd.DataFrame(columns=columns) - - # Call transform method - dataframe = transform(dataframe) # Drop columns not in specification return dataframe[columns]