diff --git a/src/fondant/component/data_io.py b/src/fondant/component/data_io.py index 5d7f442e..bbec6529 100644 --- a/src/fondant/component/data_io.py +++ b/src/fondant/component/data_io.py @@ -5,6 +5,7 @@ import dask.dataframe as dd import dask.distributed +import fsspec import pyarrow as pa from dask.diagnostics import ProgressBar from dask.distributed import as_completed @@ -205,6 +206,12 @@ def _write_dataframe(self, dataframe: dd.DataFrame) -> dd.core.Scalar: f"{self.manifest.run_id}/{self.operation_spec.component_name}" ) + # Create directory the dataframe will be written to, since this is not handled by Pandas + # `to_parquet` method. + protocol = fsspec.utils.get_protocol(location) + fs = fsspec.get_filesystem_class(protocol) + fs().makedirs(location) + schema = { field.name: field.type.value for field in self.operation_spec.produces_to_dataset.values()