From d412d2faf175a1891bc69059444c5174f48b1ff8 Mon Sep 17 00:00:00 2001 From: Austin Dickey Date: Tue, 24 Oct 2023 16:06:29 -0500 Subject: [PATCH] Fix dataset-serialize benchmark by setting pre_buffer=False (#152) * Fix dataset-serialize benchmark by setting pre_buffer=False * comments * fix build --- benchmarks/dataset_serialize_benchmark.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/benchmarks/dataset_serialize_benchmark.py b/benchmarks/dataset_serialize_benchmark.py index 1ab96c7..a118ca6 100644 --- a/benchmarks/dataset_serialize_benchmark.py +++ b/benchmarks/dataset_serialize_benchmark.py @@ -199,7 +199,7 @@ def run(self, source, case=None, **kwargs): yield self.benchmark( f=self._get_benchmark_function( - case, source.name, source_ds, dirpath + case, source.name, source_ds, source.format_str, dirpath ), extra_tags=tags, options=kwargs, @@ -220,7 +220,12 @@ def run(self, source, case=None, **kwargs): self._report_dirsize_and_wipe(OUTPUT_DIR_PREFIX) def _get_benchmark_function( - self, case, source_name: str, source_ds: ds.Dataset, dirpath: str + self, + case, + source_name: str, + source_ds: ds.Dataset, + source_fmt: str, + dirpath: str, ): (selectivity, serialization_format) = case @@ -234,6 +239,13 @@ def _get_benchmark_function( except KeyError: pass + # Need this or arrow#38438 will cause a segfault. TODO: remove once fixed. + data_read_kwargs = {} + if source_fmt == "parquet": + data_read_kwargs["fragment_scan_options"] = ds.ParquetFragmentScanOptions( + pre_buffer=False + ) + if n_rows_only: # Pragmatic method for reading only a subset of the data set. A # different method for sub selection of rows could use a @@ -242,10 +254,10 @@ def _get_benchmark_function( # Note that `head()` returns a `Table` object, i.e. loads data # into memory. log.info("read %s rows of dataset %s into memory", n_rows_only, source_name) - data = source_ds.head(n_rows_only) + data = source_ds.head(n_rows_only, **data_read_kwargs) else: log.info("read complete dataset %s into memory", source_name) - data = source_ds.to_table() + data = source_ds.to_table(**data_read_kwargs) log.info("read source dataset into memory in %.4f s", time.monotonic() - t0)