Skip to content

Commit

Permalink
ARROW-15784: [C++][Python] Removing flag enable_parallel_column_conve…
Browse files Browse the repository at this point in the history
…rsion which is no longer used

Closes apache#12514 from westonpace/bugfix/ARROW-15784--single-file-parquet-read-regression

Authored-by: Weston Pace <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
westonpace authored and lidavidm committed Mar 1, 2022
1 parent d46417d commit 676b49f
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 89 deletions.
49 changes: 9 additions & 40 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,49 +319,16 @@ Result<bool> ParquetFileFormat::IsSupported(const FileSource& source) const {

Result<std::shared_ptr<Schema>> ParquetFileFormat::Inspect(
const FileSource& source) const {
ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source));
auto scan_options = std::make_shared<ScanOptions>();
ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, scan_options));
std::shared_ptr<Schema> schema;
RETURN_NOT_OK(reader->GetSchema(&schema));
return schema;
}

Result<std::unique_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader(
const FileSource& source, ScanOptions* options) const {
ARROW_ASSIGN_OR_RAISE(auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(
kParquetTypeName, options, default_fragment_scan_options));
MemoryPool* pool = options ? options->pool : default_memory_pool();
auto properties = MakeReaderProperties(*this, parquet_scan_options.get(), pool);

ARROW_ASSIGN_OR_RAISE(auto input, source.Open());

auto make_reader = [&]() -> Result<std::unique_ptr<parquet::ParquetFileReader>> {
BEGIN_PARQUET_CATCH_EXCEPTIONS
return parquet::ParquetFileReader::Open(std::move(input), std::move(properties));
END_PARQUET_CATCH_EXCEPTIONS
};

auto maybe_reader = std::move(make_reader)();
if (!maybe_reader.ok()) {
return WrapSourceError(maybe_reader.status(), source.path());
}
std::unique_ptr<parquet::ParquetFileReader> reader = *std::move(maybe_reader);
std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
auto arrow_properties = MakeArrowReaderProperties(*this, *metadata);

if (options) {
arrow_properties.set_batch_size(options->batch_size);
}

if (options && !options->use_threads) {
arrow_properties.set_use_threads(
parquet_scan_options->enable_parallel_column_conversion);
}

std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(
pool, std::move(reader), std::move(arrow_properties), &arrow_reader));
return std::move(arrow_reader);
Result<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader(
const FileSource& source, const std::shared_ptr<ScanOptions>& options) const {
return GetReaderAsync(source, options).result();
}

Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReaderAsync(
Expand Down Expand Up @@ -557,7 +524,8 @@ Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* r

if (reader == nullptr) {
lock.Unlock();
ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
auto scan_options = std::make_shared<ScanOptions>();
ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_, scan_options));
return EnsureCompleteMetadata(reader.get());
}

Expand Down Expand Up @@ -817,7 +785,8 @@ Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(
options.partition_base_dir = base_path;
}

ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
auto scan_options = std::make_shared<ScanOptions>();
ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source, scan_options));
std::shared_ptr<parquet::FileMetaData> metadata = reader->parquet_reader()->metadata();

if (metadata->num_columns() == 0) {
Expand Down
13 changes: 3 additions & 10 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
std::shared_ptr<Schema> physical_schema, std::vector<int> row_groups);

/// \brief Return a FileReader on the given source.
Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
const FileSource& source, ScanOptions* = NULLPTR) const;
Result<std::shared_ptr<parquet::arrow::FileReader>> GetReader(
const FileSource& source, const std::shared_ptr<ScanOptions>& options) const;

Future<std::shared_ptr<parquet::arrow::FileReader>> GetReaderAsync(
const FileSource& source, const std::shared_ptr<ScanOptions>& options) const;
Expand Down Expand Up @@ -212,16 +212,9 @@ class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions {
/// ScanOptions.
std::shared_ptr<parquet::ReaderProperties> reader_properties;
/// Arrow reader properties. Not all properties are respected: batch_size comes from
/// ScanOptions, and use_threads will be overridden based on
/// enable_parallel_column_conversion. Additionally, dictionary columns come from
/// ScanOptions. Additionally, dictionary columns come from
/// ParquetFileFormat::ReaderOptions::dict_columns.
std::shared_ptr<parquet::ArrowReaderProperties> arrow_reader_properties;
/// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a
/// scan is already parallelized across input files to avoid thread contention. This
/// option will be removed after support is added for simultaneous parallelization
/// across files and columns. Only affects the threaded reader; the async reader
/// will parallelize across columns if use_threads is enabled.
bool enable_parallel_column_conversion = false;
};

class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions {
Expand Down
27 changes: 3 additions & 24 deletions python/pyarrow/_dataset_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -592,11 +592,6 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
If enabled, pre-buffer the raw Parquet data instead of issuing one
read per column chunk. This can improve performance on high-latency
filesystems.
enable_parallel_column_conversion : bool, default False
EXPERIMENTAL: Parallelize conversion across columns. This option is
ignored if a scan is already parallelized across input files to avoid
thread contention. This option will be removed after support is added
for simultaneous parallelization across files and columns.
"""

cdef:
Expand All @@ -607,15 +602,12 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):

def __init__(self, bint use_buffered_stream=False,
buffer_size=8192,
bint pre_buffer=False,
bint enable_parallel_column_conversion=False):
bint pre_buffer=False):
self.init(shared_ptr[CFragmentScanOptions](
new CParquetFragmentScanOptions()))
self.use_buffered_stream = use_buffered_stream
self.buffer_size = buffer_size
self.pre_buffer = pre_buffer
self.enable_parallel_column_conversion = \
enable_parallel_column_conversion

cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
FragmentScanOptions.init(self, sp)
Expand Down Expand Up @@ -656,29 +648,16 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
def pre_buffer(self, bint pre_buffer):
self.arrow_reader_properties().set_pre_buffer(pre_buffer)

@property
def enable_parallel_column_conversion(self):
return self.parquet_options.enable_parallel_column_conversion

@enable_parallel_column_conversion.setter
def enable_parallel_column_conversion(
self, bint enable_parallel_column_conversion):
self.parquet_options.enable_parallel_column_conversion = \
enable_parallel_column_conversion

def equals(self, ParquetFragmentScanOptions other):
return (
self.use_buffered_stream == other.use_buffered_stream and
self.buffer_size == other.buffer_size and
self.pre_buffer == other.pre_buffer and
self.enable_parallel_column_conversion ==
other.enable_parallel_column_conversion
self.pre_buffer == other.pre_buffer
)

def __reduce__(self):
return ParquetFragmentScanOptions, (
self.use_buffered_stream, self.buffer_size, self.pre_buffer,
self.enable_parallel_column_conversion
self.use_buffered_stream, self.buffer_size, self.pre_buffer
)


Expand Down
1 change: 0 additions & 1 deletion python/pyarrow/includes/libarrow_dataset_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
"arrow::dataset::ParquetFragmentScanOptions"(CFragmentScanOptions):
shared_ptr[CReaderProperties] reader_properties
shared_ptr[ArrowReaderProperties] arrow_reader_properties
c_bool enable_parallel_column_conversion

cdef cppclass CParquetFactoryOptions \
"arrow::dataset::ParquetFactoryOptions":
Expand Down
16 changes: 2 additions & 14 deletions python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1755,11 +1755,9 @@ def __init__(self, path_or_paths, filesystem=None, filters=None,
else:
single_file = path_or_paths

if single_file is not None:
self._enable_parallel_column_conversion = True
read_options.update(enable_parallel_column_conversion=True)
parquet_format = ds.ParquetFileFormat(**read_options)

parquet_format = ds.ParquetFileFormat(**read_options)
if single_file is not None:
fragment = parquet_format.make_fragment(single_file, filesystem)

self._dataset = ds.FileSystemDataset(
Expand All @@ -1768,10 +1766,6 @@ def __init__(self, path_or_paths, filesystem=None, filters=None,
filesystem=fragment.filesystem
)
return
else:
self._enable_parallel_column_conversion = False

parquet_format = ds.ParquetFileFormat(**read_options)

# check partitioning to enable dictionary encoding
if partitioning == "hive":
Expand Down Expand Up @@ -1822,12 +1816,6 @@ def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
list(columns) + list(set(index_columns) - set(columns))
)

if self._enable_parallel_column_conversion:
if use_threads:
# Allow per-column parallelism; would otherwise cause
# contention in the presence of per-file parallelism.
use_threads = False

table = self._dataset.to_table(
columns=columns, filter=self._filter_expression,
use_threads=use_threads
Expand Down

0 comments on commit 676b49f

Please sign in to comment.