From 984839402e45e1ba1617c7da5c094d9e6025e243 Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 20 Sep 2023 00:29:10 +0800 Subject: [PATCH] GH-37487: [C++][Parquet] Dataset: Implement sync `ParquetFileFormat::GetReader` (#37514) ### Rationale for this change As https://github.com/apache/arrow/issues/37487 says. When thread cnt == 1, the thread might blocking in `ParquetFileFormat::GetReaderAsync`, that's because: 1. `ParquetFileFormat::CountRows` would call `EnsureCompleteMetadata` in `io_executor` 2. `EnsureCompleteMetadata` call `ParquetFileFormat::GetReader`, which dispatch real request to async mode 3. `async` is executed in `io_executor`. 1/3 in same fix-sized executor, causing deadlock. ### What changes are included in this PR? Implement sync `ParquetFileFormat::GetReader`. ### Are these changes tested? Currently not ### Are there any user-facing changes? Bugfix * Closes: #37487 Authored-by: mwish Signed-off-by: Benjamin Kietzman --- cpp/src/arrow/dataset/file_parquet.cc | 61 +++++++++++++++++----- cpp/src/arrow/dataset/file_parquet_test.cc | 25 +++++++++ 2 files changed, 74 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index c30441d911e4e..9d0e8a6515878 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -88,6 +88,22 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties( return properties; } +parquet::ArrowReaderProperties MakeArrowReaderProperties( + const ParquetFileFormat& format, const parquet::FileMetaData& metadata, + const ScanOptions& options, const ParquetFragmentScanOptions& parquet_scan_options) { + auto arrow_properties = MakeArrowReaderProperties(format, metadata); + arrow_properties.set_batch_size(options.batch_size); + // Must be set here since the sync ScanTask handles pre-buffering itself + arrow_properties.set_pre_buffer( + parquet_scan_options.arrow_reader_properties->pre_buffer()); + arrow_properties.set_cache_options( + parquet_scan_options.arrow_reader_properties->cache_options()); + arrow_properties.set_io_context( + parquet_scan_options.arrow_reader_properties->io_context()); + arrow_properties.set_use_threads(options.use_threads); + return arrow_properties; +} + template Result> GetSchemaManifest( const M& metadata, const parquet::ArrowReaderProperties& properties) { @@ -410,13 +426,42 @@ Result> ParquetFileFormat::Inspect( Result> ParquetFileFormat::GetReader( const FileSource& source, const std::shared_ptr& options) const { - return GetReaderAsync(source, options, nullptr).result(); + return GetReader(source, options, /*metadata=*/nullptr); } Result> ParquetFileFormat::GetReader( const FileSource& source, const std::shared_ptr& options, const std::shared_ptr& metadata) const { - return GetReaderAsync(source, options, metadata).result(); + ARROW_ASSIGN_OR_RAISE( + auto parquet_scan_options, + GetFragmentScanOptions(kParquetTypeName, options.get(), + default_fragment_scan_options)); + auto properties = + MakeReaderProperties(*this, parquet_scan_options.get(), options->pool); + ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); + // `parquet::ParquetFileReader::Open` will not wrap the exception as status, + // so using `open_parquet_file` to wrap it. + auto open_parquet_file = [&]() -> Result> { + BEGIN_PARQUET_CATCH_EXCEPTIONS + auto reader = parquet::ParquetFileReader::Open(std::move(input), + std::move(properties), metadata); + return reader; + END_PARQUET_CATCH_EXCEPTIONS + }; + + auto reader_opt = open_parquet_file(); + if (!reader_opt.ok()) { + return WrapSourceError(reader_opt.status(), source.path()); + } + auto reader = std::move(reader_opt).ValueOrDie(); + + std::shared_ptr reader_metadata = reader->metadata(); + auto arrow_properties = + MakeArrowReaderProperties(*this, *reader_metadata, *options, *parquet_scan_options); + std::unique_ptr arrow_reader; + RETURN_NOT_OK(parquet::arrow::FileReader::Make( + options->pool, std::move(reader), std::move(arrow_properties), &arrow_reader)); + return arrow_reader; } Future> ParquetFileFormat::GetReaderAsync( @@ -445,16 +490,8 @@ Future> ParquetFileFormat::GetReader ARROW_ASSIGN_OR_RAISE(std::unique_ptr reader, reader_fut.MoveResult()); std::shared_ptr metadata = reader->metadata(); - auto arrow_properties = MakeArrowReaderProperties(*self, *metadata); - arrow_properties.set_batch_size(options->batch_size); - // Must be set here since the sync ScanTask handles pre-buffering itself - arrow_properties.set_pre_buffer( - parquet_scan_options->arrow_reader_properties->pre_buffer()); - arrow_properties.set_cache_options( - parquet_scan_options->arrow_reader_properties->cache_options()); - arrow_properties.set_io_context( - parquet_scan_options->arrow_reader_properties->io_context()); - arrow_properties.set_use_threads(options->use_threads); + auto arrow_properties = + MakeArrowReaderProperties(*this, *metadata, *options, *parquet_scan_options); std::unique_ptr arrow_reader; RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader), std::move(arrow_properties), diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 42f923f0e6a27..8527c3af64c83 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -18,12 +18,14 @@ #include "arrow/dataset/file_parquet.h" #include +#include #include #include #include "arrow/compute/api_scalar.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/test_util_internal.h" +#include "arrow/io/interfaces.h" #include "arrow/io/memory.h" #include "arrow/io/test_common.h" #include "arrow/io/util_internal.h" @@ -367,6 +369,29 @@ TEST_F(TestParquetFileFormat, MultithreadedScan) { ASSERT_EQ(batches.size(), kNumRowGroups); } +TEST_F(TestParquetFileFormat, SingleThreadExecutor) { + // Reset capacity for io executor + struct PoolResetGuard { + int original_capacity = io::GetIOThreadPoolCapacity(); + ~PoolResetGuard() { DCHECK_OK(io::SetIOThreadPoolCapacity(original_capacity)); } + } guard; + ASSERT_OK(io::SetIOThreadPoolCapacity(1)); + + auto reader = GetRecordBatchReader(schema({field("utf8", utf8())})); + + ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get())); + auto buffer_reader = std::make_shared<::arrow::io::BufferReader>(buffer); + auto source = std::make_shared(std::move(buffer_reader), buffer->size()); + auto options = std::make_shared(); + + { + auto fragment = MakeFragment(*source); + auto count_rows = fragment->CountRows(literal(true), options); + ASSERT_OK_AND_ASSIGN(auto result, count_rows.MoveResult()); + ASSERT_EQ(expected_rows(), result); + } +} + class TestParquetFileSystemDataset : public WriteFileSystemDatasetMixin, public testing::Test { public: