From e74e765f9dd4f7f0238c37854aef045df160e477 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Wed, 21 Feb 2024 21:24:51 +0100 Subject: [PATCH] Move convert-string into component setup method (#871) Dask config needs to be set before starting the `LocalCluster`. I first put it into the `DaskDataIO` since it's central to the working of Fondant that this is set correctly, and I didn't want this to be overwritten. That's too late in our flow though. The other place we could put it is in the executor, but we currently try to keep that Dask-agnostic. --- src/fondant/component/component.py | 3 +++ src/fondant/component/data_io.py | 5 ----- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/fondant/component/component.py b/src/fondant/component/component.py index 9f23afbd..055beeb9 100644 --- a/src/fondant/component/component.py +++ b/src/fondant/component/component.py @@ -47,6 +47,9 @@ def __init__(self, **kwargs): super().__init__() def setup(self) -> t.Any: + # Don't assume every object is a string + # https://docs.dask.org/en/stable/changelog.html#v2023-7-1 + dask.config.set({"dataframe.convert-string": False}) # worker.daemon is set to false because creating a worker process in daemon # mode is not possible in our docker container setup. dask.config.set({"distributed.worker.daemon": False}) diff --git a/src/fondant/component/data_io.py b/src/fondant/component/data_io.py index 285416af..76d5e7a2 100644 --- a/src/fondant/component/data_io.py +++ b/src/fondant/component/data_io.py @@ -3,7 +3,6 @@ import typing as t from collections import defaultdict -import dask import dask.dataframe as dd from dask.diagnostics import ProgressBar from dask.distributed import Client @@ -31,10 +30,6 @@ def __init__( input_partition_rows: t.Optional[int] = None, ): super().__init__(manifest=manifest, operation_spec=operation_spec) - # Don't assume every object is a string - # https://docs.dask.org/en/stable/changelog.html#v2023-7-1 - dask.config.set({"dataframe.convert-string": False}) - self.input_partition_rows = input_partition_rows def partition_loaded_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame: