From 676b49f4f263a9bdcb7a48e434fa0d52f1da5dcd Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 1 Mar 2022 09:12:07 -0500 Subject: [PATCH] ARROW-15784: [C++][Python] Removing flag enable_parallel_column_conversion which is no longer used Closes #12514 from westonpace/bugfix/ARROW-15784--single-file-parquet-read-regression Authored-by: Weston Pace Signed-off-by: David Li --- cpp/src/arrow/dataset/file_parquet.cc | 49 ++++--------------- cpp/src/arrow/dataset/file_parquet.h | 13 ++--- python/pyarrow/_dataset_parquet.pyx | 27 ++-------- .../includes/libarrow_dataset_parquet.pxd | 1 - python/pyarrow/parquet.py | 16 +----- 5 files changed, 17 insertions(+), 89 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index cdee1f0068470..5fbd457eccdd9 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -319,49 +319,16 @@ Result ParquetFileFormat::IsSupported(const FileSource& source) const { Result> ParquetFileFormat::Inspect( const FileSource& source) const { - ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source)); + auto scan_options = std::make_shared(); + ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, scan_options)); std::shared_ptr schema; RETURN_NOT_OK(reader->GetSchema(&schema)); return schema; } -Result> ParquetFileFormat::GetReader( - const FileSource& source, ScanOptions* options) const { - ARROW_ASSIGN_OR_RAISE(auto parquet_scan_options, - GetFragmentScanOptions( - kParquetTypeName, options, default_fragment_scan_options)); - MemoryPool* pool = options ? options->pool : default_memory_pool(); - auto properties = MakeReaderProperties(*this, parquet_scan_options.get(), pool); - - ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); - - auto make_reader = [&]() -> Result> { - BEGIN_PARQUET_CATCH_EXCEPTIONS - return parquet::ParquetFileReader::Open(std::move(input), std::move(properties)); - END_PARQUET_CATCH_EXCEPTIONS - }; - - auto maybe_reader = std::move(make_reader)(); - if (!maybe_reader.ok()) { - return WrapSourceError(maybe_reader.status(), source.path()); - } - std::unique_ptr reader = *std::move(maybe_reader); - std::shared_ptr metadata = reader->metadata(); - auto arrow_properties = MakeArrowReaderProperties(*this, *metadata); - - if (options) { - arrow_properties.set_batch_size(options->batch_size); - } - - if (options && !options->use_threads) { - arrow_properties.set_use_threads( - parquet_scan_options->enable_parallel_column_conversion); - } - - std::unique_ptr arrow_reader; - RETURN_NOT_OK(parquet::arrow::FileReader::Make( - pool, std::move(reader), std::move(arrow_properties), &arrow_reader)); - return std::move(arrow_reader); +Result> ParquetFileFormat::GetReader( + const FileSource& source, const std::shared_ptr& options) const { + return GetReaderAsync(source, options).result(); } Future> ParquetFileFormat::GetReaderAsync( @@ -557,7 +524,8 @@ Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* r if (reader == nullptr) { lock.Unlock(); - ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_)); + auto scan_options = std::make_shared(); + ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_, scan_options)); return EnsureCompleteMetadata(reader.get()); } @@ -817,7 +785,8 @@ Result> ParquetDatasetFactory::Make( options.partition_base_dir = base_path; } - ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source)); + auto scan_options = std::make_shared(); + ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source, scan_options)); std::shared_ptr metadata = reader->parquet_reader()->metadata(); if (metadata->num_columns() == 0) { diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 571ec8166678e..6f2f54206812e 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -116,8 +116,8 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { std::shared_ptr physical_schema, std::vector row_groups); /// \brief Return a FileReader on the given source. - Result> GetReader( - const FileSource& source, ScanOptions* = NULLPTR) const; + Result> GetReader( + const FileSource& source, const std::shared_ptr& options) const; Future> GetReaderAsync( const FileSource& source, const std::shared_ptr& options) const; @@ -212,16 +212,9 @@ class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions { /// ScanOptions. std::shared_ptr reader_properties; /// Arrow reader properties. Not all properties are respected: batch_size comes from - /// ScanOptions, and use_threads will be overridden based on - /// enable_parallel_column_conversion. Additionally, dictionary columns come from + /// ScanOptions. Additionally, dictionary columns come from /// ParquetFileFormat::ReaderOptions::dict_columns. std::shared_ptr arrow_reader_properties; - /// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a - /// scan is already parallelized across input files to avoid thread contention. This - /// option will be removed after support is added for simultaneous parallelization - /// across files and columns. Only affects the threaded reader; the async reader - /// will parallelize across columns if use_threads is enabled. - bool enable_parallel_column_conversion = false; }; class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions { diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index 730370cc38b4e..f3bccf57f6acf 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -592,11 +592,6 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): If enabled, pre-buffer the raw Parquet data instead of issuing one read per column chunk. This can improve performance on high-latency filesystems. - enable_parallel_column_conversion : bool, default False - EXPERIMENTAL: Parallelize conversion across columns. This option is - ignored if a scan is already parallelized across input files to avoid - thread contention. This option will be removed after support is added - for simultaneous parallelization across files and columns. """ cdef: @@ -607,15 +602,12 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): def __init__(self, bint use_buffered_stream=False, buffer_size=8192, - bint pre_buffer=False, - bint enable_parallel_column_conversion=False): + bint pre_buffer=False): self.init(shared_ptr[CFragmentScanOptions]( new CParquetFragmentScanOptions())) self.use_buffered_stream = use_buffered_stream self.buffer_size = buffer_size self.pre_buffer = pre_buffer - self.enable_parallel_column_conversion = \ - enable_parallel_column_conversion cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): FragmentScanOptions.init(self, sp) @@ -656,29 +648,16 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): def pre_buffer(self, bint pre_buffer): self.arrow_reader_properties().set_pre_buffer(pre_buffer) - @property - def enable_parallel_column_conversion(self): - return self.parquet_options.enable_parallel_column_conversion - - @enable_parallel_column_conversion.setter - def enable_parallel_column_conversion( - self, bint enable_parallel_column_conversion): - self.parquet_options.enable_parallel_column_conversion = \ - enable_parallel_column_conversion - def equals(self, ParquetFragmentScanOptions other): return ( self.use_buffered_stream == other.use_buffered_stream and self.buffer_size == other.buffer_size and - self.pre_buffer == other.pre_buffer and - self.enable_parallel_column_conversion == - other.enable_parallel_column_conversion + self.pre_buffer == other.pre_buffer ) def __reduce__(self): return ParquetFragmentScanOptions, ( - self.use_buffered_stream, self.buffer_size, self.pre_buffer, - self.enable_parallel_column_conversion + self.use_buffered_stream, self.buffer_size, self.pre_buffer ) diff --git a/python/pyarrow/includes/libarrow_dataset_parquet.pxd b/python/pyarrow/includes/libarrow_dataset_parquet.pxd index fc40e545a6f5a..cc753d66a94f7 100644 --- a/python/pyarrow/includes/libarrow_dataset_parquet.pxd +++ b/python/pyarrow/includes/libarrow_dataset_parquet.pxd @@ -62,7 +62,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: "arrow::dataset::ParquetFragmentScanOptions"(CFragmentScanOptions): shared_ptr[CReaderProperties] reader_properties shared_ptr[ArrowReaderProperties] arrow_reader_properties - c_bool enable_parallel_column_conversion cdef cppclass CParquetFactoryOptions \ "arrow::dataset::ParquetFactoryOptions": diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index c31fa270b2f1b..2107f27471f94 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -1755,11 +1755,9 @@ def __init__(self, path_or_paths, filesystem=None, filters=None, else: single_file = path_or_paths - if single_file is not None: - self._enable_parallel_column_conversion = True - read_options.update(enable_parallel_column_conversion=True) + parquet_format = ds.ParquetFileFormat(**read_options) - parquet_format = ds.ParquetFileFormat(**read_options) + if single_file is not None: fragment = parquet_format.make_fragment(single_file, filesystem) self._dataset = ds.FileSystemDataset( @@ -1768,10 +1766,6 @@ def __init__(self, path_or_paths, filesystem=None, filters=None, filesystem=fragment.filesystem ) return - else: - self._enable_parallel_column_conversion = False - - parquet_format = ds.ParquetFileFormat(**read_options) # check partitioning to enable dictionary encoding if partitioning == "hive": @@ -1822,12 +1816,6 @@ def read(self, columns=None, use_threads=True, use_pandas_metadata=False): list(columns) + list(set(index_columns) - set(columns)) ) - if self._enable_parallel_column_conversion: - if use_threads: - # Allow per-column parallelism; would otherwise cause - # contention in the presence of per-file parallelism. - use_threads = False - table = self._dataset.to_table( columns=columns, filter=self._filter_expression, use_threads=use_threads