diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 0ac0af2842b..ac1b0b9f3e3 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -4,7 +4,7 @@ from io import BufferedWriter, BytesIO, IOBase import numpy as np -from pyarrow import parquet as pq +from pyarrow import dataset as pa_ds, parquet as pq from dask import dataframe as dd from dask.dataframe.io.parquet.arrow import ArrowDatasetEngine @@ -54,9 +54,88 @@ def multi_support(cls): # and that multi-part reading is supported return cls == CudfEngine - @staticmethod + @classmethod + def _read_paths( + cls, + paths, + fs, + columns=None, + row_groups=None, + strings_to_categorical=None, + partitions=None, + partitioning=None, + partition_keys=None, + **kwargs, + ): + + # Use cudf to read in data + df = cudf.read_parquet( + paths, + engine="cudf", + columns=columns, + row_groups=row_groups if row_groups else None, + strings_to_categorical=strings_to_categorical, + **kwargs, + ) + + if partitions and partition_keys is None: + + # Use `HivePartitioning` by default + partitioning = partitioning or {"obj": pa_ds.HivePartitioning} + ds = pa_ds.dataset( + paths, + filesystem=fs, + format="parquet", + partitioning=partitioning["obj"].discover( + *partitioning.get("args", []), + **partitioning.get("kwargs", {}), + ), + ) + frag = next(ds.get_fragments()) + if frag: + # Extract hive-partition keys, and make sure they + # are orderd the same as they are in `partitions` + raw_keys = pa_ds._get_partition_keys(frag.partition_expression) + partition_keys = [ + (hive_part.name, raw_keys[hive_part.name]) + for hive_part in partitions + ] + + if partition_keys: + if partitions is None: + raise ValueError("Must pass partition sets") + + for i, (name, index2) in enumerate(partition_keys): + + # Build the column from `codes` directly + # (since the category is often a larger dtype) + codes = ( + as_column(partitions[i].keys.index(index2)) + .as_frame() + .repeat(len(df)) + ._data[None] + ) + df[name] = build_categorical_column( + categories=partitions[i].keys, + codes=codes, + size=codes.size, + offset=codes.offset, + ordered=False, + ) + + return df + + @classmethod def read_partition( - fs, pieces, columns, index, categories=(), partitions=(), **kwargs + cls, + fs, + pieces, + columns, + index, + categories=(), + partitions=(), + partitioning=None, + **kwargs, ): if columns is not None: columns = [c for c in columns] @@ -67,64 +146,88 @@ def read_partition( pieces = [pieces] strings_to_cats = kwargs.get("strings_to_categorical", False) + if len(pieces) > 1: + # Multi-peice read paths = [] rgs = [] - partition_keys = [] - - for piece in pieces: - if isinstance(piece, str): - paths.append(piece) - rgs.append(None) - else: - (path, row_group, partition_keys) = piece - - row_group = None if row_group == [None] else row_group - - paths.append(path) - rgs.append( - [row_group] - if not isinstance(row_group, list) - else row_group + last_partition_keys = None + dfs = [] + + for i, piece in enumerate(pieces): + + (path, row_group, partition_keys) = piece + row_group = None if row_group == [None] else row_group + + if i > 0 and partition_keys != last_partition_keys: + dfs.append( + cls._read_paths( + paths, + fs, + columns=columns, + row_groups=rgs if rgs else None, + strings_to_categorical=strings_to_cats, + partitions=partitions, + partitioning=partitioning, + partition_keys=last_partition_keys, + **kwargs.get("read", {}), + ) ) + paths = rgs = [] + last_partition_keys = None + paths.append(path) + rgs.append( + [row_group] + if not isinstance(row_group, list) + else row_group + ) + last_partition_keys = partition_keys - df = cudf.read_parquet( - paths, - engine="cudf", - columns=columns, - row_groups=rgs if rgs else None, - strings_to_categorical=strings_to_cats, - **kwargs.get("read", {}), + dfs.append( + cls._read_paths( + paths, + fs, + columns=columns, + row_groups=rgs if rgs else None, + strings_to_categorical=strings_to_cats, + partitions=partitions, + partitioning=partitioning, + partition_keys=last_partition_keys, + **kwargs.get("read", {}), + ) ) + df = cudf.concat(dfs) else: + # Single-piece read - if isinstance(pieces[0], str): - path = pieces[0] - row_group = None - partition_keys = [] - else: - (path, row_group, partition_keys) = pieces[0] - row_group = None if row_group == [None] else row_group + (path, row_group, partition_keys) = pieces[0] + row_group = None if row_group == [None] else row_group if cudf.utils.ioutils._is_local_filesystem(fs): - df = cudf.read_parquet( + df = cls._read_paths( path, - engine="cudf", + fs, columns=columns, row_groups=row_group, strings_to_categorical=strings_to_cats, + partitions=partitions, + partitioning=partitioning, + partition_keys=partition_keys, **kwargs.get("read", {}), ) else: with fs.open(path, mode="rb") as f: - df = cudf.read_parquet( + df = cls._read_paths( f, - engine="cudf", + fs, columns=columns, row_groups=row_group, strings_to_categorical=strings_to_cats, + partitions=partitions, + partitioning=partitioning, + partition_keys=partition_keys, **kwargs.get("read", {}), ) @@ -138,28 +241,6 @@ def read_partition( # names in `columns` are actually in `df.columns` df.reset_index(inplace=True) - if partition_keys: - if partitions is None: - raise ValueError("Must pass partition sets") - - for i, (name, index2) in enumerate(partition_keys): - - # Build the column from `codes` directly - # (since the category is often a larger dtype) - codes = ( - as_column(partitions[i].keys.index(index2)) - .as_frame() - .repeat(len(df)) - ._data[None] - ) - df[name] = build_categorical_column( - categories=partitions[i].keys, - codes=codes, - size=codes.size, - offset=codes.offset, - ordered=False, - ) - return df @staticmethod diff --git a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py index a5492bc5fc0..06c888cfec9 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py @@ -288,6 +288,15 @@ def test_roundtrip_from_dask_partitioned(tmpdir, parts, daskcudf, metadata): if not fn.startswith("_"): assert "part" in fn + if parse_version(dask.__version__) > parse_version("2021.07.0"): + # This version of Dask supports `aggregate_files=True`. + # Check that we can aggregate by a partition name. + df_read = dd.read_parquet( + tmpdir, engine="pyarrow", aggregate_files="year" + ) + gdf_read = dask_cudf.read_parquet(tmpdir, aggregate_files="year") + dd.assert_eq(df_read, gdf_read) + @pytest.mark.parametrize("metadata", [True, False]) @pytest.mark.parametrize("chunksize", [None, 1024, 4096, "1MiB"]) @@ -327,6 +336,7 @@ def test_chunksize(tmpdir, chunksize, metadata): split_row_groups=True, gather_statistics=True, ) + ddf2.compute(scheduler="synchronous") dd.assert_eq(ddf1, ddf2, check_divisions=False)