From 74d41d388c08fb4226b82a4e0dc34930fcbc784e Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 14 Jan 2020 13:09:38 -0500 Subject: [PATCH] ARROW-7547: [C++][Dataset][Python] Add ParquetFileFormat options --- cpp/src/arrow/dataset/file_parquet.cc | 226 ++++++++++++--------- cpp/src/arrow/dataset/file_parquet.h | 25 ++- cpp/src/arrow/dataset/file_parquet_test.cc | 55 ++++- cpp/src/arrow/type.h | 5 +- cpp/src/parquet/arrow/reader.h | 5 +- cpp/src/parquet/file_reader.h | 5 +- cpp/src/parquet/metadata.h | 5 +- cpp/src/parquet/properties.h | 5 +- 8 files changed, 203 insertions(+), 128 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index ad9ad213ea4aa..2cf63323a9a26 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -76,14 +76,117 @@ class ParquetScanTask : public ScanTask { std::shared_ptr reader_; }; +static Result> OpenReader( + const FileSource& source, parquet::ReaderProperties properties) { + ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); + try { + return parquet::ParquetFileReader::Open(std::move(input), std::move(properties)); + } catch (const ::parquet::ParquetException& e) { + return Status::IOError("Could not open parquet input source '", source.path(), + "': ", e.what()); + } + + return Status::UnknownError("unknown exception caught"); +} + +static parquet::ReaderProperties MakeReaderProperties( + const ParquetFileFormat& format, MemoryPool* pool = default_memory_pool()) { + parquet::ReaderProperties properties(pool); + if (format.use_buffered_stream) { + properties.enable_buffered_stream(); + } else { + properties.disable_buffered_stream(); + } + properties.set_buffer_size(format.buffer_size); + properties.file_decryption_properties(format.file_decryption_properties); + return properties; +} + +static parquet::ArrowReaderProperties MakeArrowReaderProperties( + const ParquetFileFormat& format) { + parquet::ArrowReaderProperties properties(/* use_threads= */ false); + for (int column_index : format.read_dict_indices) { + properties.set_read_dictionary(column_index, true); + } + properties.set_batch_size(format.batch_size); + return properties; +} + +template +static Result GetSchemaManifest( + const M& metadata, const parquet::ArrowReaderProperties& properties) { + SchemaManifest manifest; + const std::shared_ptr& key_value_metadata = nullptr; + RETURN_NOT_OK( + SchemaManifest::Make(metadata.schema(), key_value_metadata, properties, &manifest)); + return manifest; +} + +static std::shared_ptr ColumnChunkStatisticsAsExpression( + const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) { + // For the remaining of this function, failure to extract/parse statistics + // are ignored by returning the `true` scalar. The goal is two fold. First + // avoid that an optimization break the computation. Second, allow the + // following columns to maybe succeed in extracting column statistics. + + // For now, only leaf (primitive) types are supported. + if (!schema_field.is_leaf()) { + return scalar(true); + } + + auto column_metadata = metadata.ColumnChunk(schema_field.column_index); + auto field = schema_field.field; + auto field_expr = field_ref(field->name()); + + // In case of missing statistics, return nothing. + if (!column_metadata->is_stats_set()) { + return scalar(true); + } + + auto statistics = column_metadata->statistics(); + if (statistics == nullptr) { + return scalar(true); + } + + // Optimize for corner case where all values are nulls + if (statistics->num_values() == statistics->null_count()) { + return equal(field_expr, scalar(MakeNullScalar(field->type()))); + } + + std::shared_ptr min, max; + if (!StatisticsAsScalars(*statistics, &min, &max).ok()) { + return scalar(true); + } + + return and_(greater_equal(field_expr, scalar(min)), + less_equal(field_expr, scalar(max))); +} + +static Result> RowGroupStatisticsAsExpression( + const parquet::RowGroupMetaData& metadata, + const parquet::ArrowReaderProperties& properties) { + ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, properties)); + + ExpressionVector expressions; + for (const auto& schema_field : manifest.schema_fields) { + expressions.emplace_back(ColumnChunkStatisticsAsExpression(schema_field, metadata)); + } + + return expressions.empty() ? scalar(true) : and_(expressions); +} + // Skip RowGroups with a filter and metadata class RowGroupSkipper { public: static constexpr int kIterationDone = -1; RowGroupSkipper(std::shared_ptr metadata, + parquet::ArrowReaderProperties arrow_properties, std::shared_ptr filter) - : metadata_(std::move(metadata)), filter_(std::move(filter)), row_group_idx_(0) { + : metadata_(std::move(metadata)), + arrow_properties_(std::move(arrow_properties)), + filter_(std::move(filter)), + row_group_idx_(0) { num_row_groups_ = metadata_->num_row_groups(); } @@ -106,7 +209,7 @@ class RowGroupSkipper { private: bool CanSkip(const parquet::RowGroupMetaData& metadata) const { - auto maybe_stats_expr = RowGroupStatisticsAsExpression(metadata); + auto maybe_stats_expr = RowGroupStatisticsAsExpression(metadata, arrow_properties_); // Errors with statistics are ignored and post-filtering will apply. if (!maybe_stats_expr.ok()) { return false; @@ -118,36 +221,33 @@ class RowGroupSkipper { } std::shared_ptr metadata_; + parquet::ArrowReaderProperties arrow_properties_; std::shared_ptr filter_; int row_group_idx_; int num_row_groups_; int64_t rows_skipped_; }; -template -static Result GetSchemaManifest(const M& metadata) { - SchemaManifest manifest; - RETURN_NOT_OK(SchemaManifest::Make( - metadata.schema(), nullptr, parquet::default_arrow_reader_properties(), &manifest)); - return manifest; -} - class ParquetScanTaskIterator { public: - static Result Make( - std::shared_ptr options, std::shared_ptr context, - std::unique_ptr reader) { + static Result Make(std::shared_ptr options, + std::shared_ptr context, + std::unique_ptr reader, + parquet::ArrowReaderProperties arrow_properties) { auto metadata = reader->metadata(); - auto column_projection = InferColumnProjection(*metadata, options); + auto column_projection = InferColumnProjection(*metadata, arrow_properties, options); std::unique_ptr arrow_reader; RETURN_NOT_OK(parquet::arrow::FileReader::Make(context->pool, std::move(reader), - &arrow_reader)); + arrow_properties, &arrow_reader)); + + RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties), + options->filter); return ScanTaskIterator(ParquetScanTaskIterator( std::move(options), std::move(context), std::move(column_projection), - std::move(metadata), std::move(arrow_reader))); + std::move(skipper), std::move(arrow_reader))); } Result> Next() { @@ -166,8 +266,9 @@ class ParquetScanTaskIterator { // Compute the column projection out of an optional arrow::Schema static std::vector InferColumnProjection( const parquet::FileMetaData& metadata, + const parquet::ArrowReaderProperties& arrow_properties, const std::shared_ptr& options) { - auto maybe_manifest = GetSchemaManifest(metadata); + auto maybe_manifest = GetSchemaManifest(metadata, arrow_properties); if (!maybe_manifest.ok()) { return internal::Iota(metadata.num_columns()); } @@ -210,13 +311,12 @@ class ParquetScanTaskIterator { ParquetScanTaskIterator(std::shared_ptr options, std::shared_ptr context, - std::vector column_projection, - std::shared_ptr metadata, + std::vector column_projection, RowGroupSkipper skipper, std::unique_ptr reader) : options_(std::move(options)), context_(std::move(context)), column_projection_(std::move(column_projection)), - skipper_(std::move(metadata), options_->filter), + skipper_(std::move(skipper)), reader_(std::move(reader)) {} std::shared_ptr options_; @@ -229,7 +329,9 @@ class ParquetScanTaskIterator { Result ParquetFileFormat::IsSupported(const FileSource& source) const { try { ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); - auto reader = parquet::ParquetFileReader::Open(input); + auto properties = MakeReaderProperties(*this); + auto reader = + parquet::ParquetFileReader::Open(std::move(input), std::move(properties)); auto metadata = reader->metadata(); return metadata != nullptr && metadata->can_decompress(); } catch (const ::parquet::ParquetInvalidOrCorruptedFileException& e) { @@ -245,11 +347,14 @@ Result ParquetFileFormat::IsSupported(const FileSource& source) const { Result> ParquetFileFormat::Inspect( const FileSource& source) const { - auto pool = default_memory_pool(); - ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, pool)); + auto properties = MakeReaderProperties(*this); + ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties))); + auto arrow_properties = MakeArrowReaderProperties(*this); std::unique_ptr arrow_reader; - RETURN_NOT_OK(parquet::arrow::FileReader::Make(pool, std::move(reader), &arrow_reader)); + RETURN_NOT_OK(parquet::arrow::FileReader::Make(default_memory_pool(), std::move(reader), + std::move(arrow_properties), + &arrow_reader)); std::shared_ptr schema; RETURN_NOT_OK(arrow_reader->GetSchema(&schema)); @@ -259,8 +364,12 @@ Result> ParquetFileFormat::Inspect( Result ParquetFileFormat::ScanFile( const FileSource& source, std::shared_ptr options, std::shared_ptr context) const { - ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, context->pool)); - return ParquetScanTaskIterator::Make(options, context, std::move(reader)); + auto properties = MakeReaderProperties(*this, context->pool); + ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties))); + + auto arrow_properties = MakeArrowReaderProperties(*this); + return ParquetScanTaskIterator::Make(options, context, std::move(reader), + std::move(arrow_properties)); } Result> ParquetFileFormat::MakeFragment( @@ -268,70 +377,5 @@ Result> ParquetFileFormat::MakeFragment( return std::make_shared(source, options); } -Result> ParquetFileFormat::OpenReader( - const FileSource& source, MemoryPool* pool) const { - ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); - try { - return parquet::ParquetFileReader::Open(input); - } catch (const ::parquet::ParquetException& e) { - return Status::IOError("Could not open parquet input source '", source.path(), - "': ", e.what()); - } - - return Status::UnknownError("unknown exception caught"); -} - -static std::shared_ptr ColumnChunkStatisticsAsExpression( - const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) { - // For the remaining of this function, failure to extract/parse statistics - // are ignored by returning the `true` scalar. The goal is two fold. First - // avoid that an optimization break the computation. Second, allow the - // following columns to maybe succeed in extracting column statistics. - - // For now, only leaf (primitive) types are supported. - if (!schema_field.is_leaf()) { - return scalar(true); - } - - auto column_metadata = metadata.ColumnChunk(schema_field.column_index); - auto field = schema_field.field; - auto field_expr = field_ref(field->name()); - - // In case of missing statistics, return nothing. - if (!column_metadata->is_stats_set()) { - return scalar(true); - } - - auto statistics = column_metadata->statistics(); - if (statistics == nullptr) { - return scalar(true); - } - - // Optimize for corner case where all values are nulls - if (statistics->num_values() == statistics->null_count()) { - return equal(field_expr, scalar(MakeNullScalar(field->type()))); - } - - std::shared_ptr min, max; - if (!StatisticsAsScalars(*statistics, &min, &max).ok()) { - return scalar(true); - } - - return and_(greater_equal(field_expr, scalar(min)), - less_equal(field_expr, scalar(max))); -} - -Result> RowGroupStatisticsAsExpression( - const parquet::RowGroupMetaData& metadata) { - ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata)); - - ExpressionVector expressions; - for (const auto& schema_field : manifest.schema_fields) { - expressions.emplace_back(ColumnChunkStatisticsAsExpression(schema_field, metadata)); - } - - return expressions.empty() ? scalar(true) : and_(expressions); -} - } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 58a51a3368675..151c7be541da7 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -23,6 +23,7 @@ #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" +#include "parquet/properties.h" namespace parquet { class ParquetFileReader; @@ -36,6 +37,23 @@ namespace dataset { /// \brief A FileFormat implementation that reads from Parquet files class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { public: + /// \defgroup parquet-file-format-reader-properties properties which correspond to + /// members of parquet::ReaderProperties. + /// + /// @{ + bool use_buffered_stream = parquet::DEFAULT_USE_BUFFERED_STREAM; + int64_t buffer_size = parquet::DEFAULT_BUFFER_SIZE; + std::shared_ptr file_decryption_properties; + /// @} + + /// \defgroup parquet-file-format-arrow-reader-properties properties which correspond to + /// members of parquet::ArrowReaderProperties. + /// + /// @{ + std::unordered_set read_dict_indices; + int64_t batch_size = parquet::kArrowDefaultBatchSize; + /// @} + std::string type_name() const override { return "parquet"; } Result IsSupported(const FileSource& source) const override; @@ -50,10 +68,6 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { Result> MakeFragment( const FileSource& source, std::shared_ptr options) override; - - private: - Result> OpenReader( - const FileSource& source, MemoryPool* pool) const; }; class ARROW_DS_EXPORT ParquetFragment : public FileFragment { @@ -64,8 +78,5 @@ class ARROW_DS_EXPORT ParquetFragment : public FileFragment { bool splittable() const override { return true; } }; -Result> RowGroupStatisticsAsExpression( - const parquet::RowGroupMetaData& metadata); - } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index c3ebd86ed1182..daa35713422b9 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -159,6 +159,7 @@ class ParquetBufferFixtureMixin : public ArrowParquetWriterMixin { class TestParquetFileFormat : public ParquetBufferFixtureMixin { protected: + ParquetFileFormat format_; std::shared_ptr opts_; std::shared_ptr ctx_ = std::make_shared(); }; @@ -185,18 +186,43 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { ASSERT_EQ(row_count, kNumRows); } -TEST_F(TestParquetFileFormat, OpenFailureWithRelevantError) { - auto format = ParquetFileFormat(); +TEST_F(TestParquetFileFormat, ScanRecordBatchReaderDictEncoded) { + auto reader = GetRecordBatchReader(); + auto source = GetFileSource(reader.get()); + + opts_ = ScanOptions::Make(reader->schema()); + + format_.read_dict_indices.insert(0); + ASSERT_OK_AND_ASSIGN(auto fragment, format_.MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_)); + int64_t row_count = 0; + + Schema expected_schema({field("f64", dictionary(int32(), float64()))}); + + for (auto maybe_task : scan_task_it) { + ASSERT_OK_AND_ASSIGN(auto task, std::move(maybe_task)); + ASSERT_OK_AND_ASSIGN(auto rb_it, task->Execute()); + for (auto maybe_batch : rb_it) { + ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch)); + row_count += batch->num_rows(); + ASSERT_EQ(*batch->schema(), expected_schema); + } + } + + ASSERT_EQ(row_count, kNumRows); +} + +TEST_F(TestParquetFileFormat, OpenFailureWithRelevantError) { std::shared_ptr buf = std::make_shared(util::string_view("")); - auto result = format.Inspect(FileSource(buf)); + auto result = format_.Inspect(FileSource(buf)); EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, testing::HasSubstr(""), result.status()); constexpr auto file_name = "herp/derp"; ASSERT_OK_AND_ASSIGN( auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)})); - result = format.Inspect({file_name, fs.get()}); + result = format_.Inspect({file_name, fs.get()}); EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, testing::HasSubstr(file_name), result.status()); } @@ -277,28 +303,37 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjectedMissingCols) { TEST_F(TestParquetFileFormat, Inspect) { auto reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); - auto format = ParquetFileFormat(); - ASSERT_OK_AND_ASSIGN(auto actual, format.Inspect(*source.get())); + ASSERT_OK_AND_ASSIGN(auto actual, format_.Inspect(*source.get())); AssertSchemaEqual(*actual, *schema_, /*check_metadata=*/false); } +TEST_F(TestParquetFileFormat, InspectDictEncoded) { + auto reader = GetRecordBatchReader(); + auto source = GetFileSource(reader.get()); + + format_.read_dict_indices.insert(0); + ASSERT_OK_AND_ASSIGN(auto actual, format_.Inspect(*source.get())); + + Schema expected_schema({field("f64", dictionary(int32(), float64()))}); + EXPECT_EQ(*actual, expected_schema); +} + TEST_F(TestParquetFileFormat, IsSupported) { auto reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); - auto format = ParquetFileFormat(); bool supported = false; std::shared_ptr buf = std::make_shared(util::string_view("")); - ASSERT_OK_AND_ASSIGN(supported, format.IsSupported(FileSource(buf))); + ASSERT_OK_AND_ASSIGN(supported, format_.IsSupported(FileSource(buf))); ASSERT_EQ(supported, false); buf = std::make_shared(util::string_view("corrupted")); - ASSERT_OK_AND_ASSIGN(supported, format.IsSupported(FileSource(buf))); + ASSERT_OK_AND_ASSIGN(supported, format_.IsSupported(FileSource(buf))); ASSERT_EQ(supported, false); - ASSERT_OK_AND_ASSIGN(supported, format.IsSupported(*source)); + ASSERT_OK_AND_ASSIGN(supported, format_.IsSupported(*source)); EXPECT_EQ(supported, true); } diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 40a954bb85fc4..cee62a6a90432 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef ARROW_TYPE_H -#define ARROW_TYPE_H +#pragma once #include #include @@ -1790,5 +1789,3 @@ Result> UnifySchemas( Field::MergeOptions field_merge_options = Field::MergeOptions::Defaults()); } // namespace arrow - -#endif // ARROW_TYPE_H diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index 5d97048a36804..90d485e65aeaa 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef PARQUET_ARROW_READER_H -#define PARQUET_ARROW_READER_H +#pragma once #include #include @@ -324,5 +323,3 @@ ::arrow::Status FuzzReader(const uint8_t* data, int64_t size); } // namespace internal } // namespace arrow } // namespace parquet - -#endif // PARQUET_ARROW_READER_H diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index 65504bf568be4..fec6219e79e71 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef PARQUET_FILE_READER_H -#define PARQUET_FILE_READER_H +#pragma once #include #include @@ -137,5 +136,3 @@ int64_t ScanFileContents(std::vector columns, const int32_t column_batch_si ParquetFileReader* reader); } // namespace parquet - -#endif // PARQUET_FILE_READER_H diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index c4ef07c60e6e5..1abfd895d1ab8 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef PARQUET_FILE_METADATA_H -#define PARQUET_FILE_METADATA_H +#pragma once #include #include @@ -396,5 +395,3 @@ class PARQUET_EXPORT FileMetaDataBuilder { PARQUET_EXPORT std::string ParquetVersionToString(ParquetVersion::type ver); } // namespace parquet - -#endif // PARQUET_FILE_METADATA_H diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 2c4c86fc3bd11..a9302bc7abf8f 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef PARQUET_COLUMN_PROPERTIES_H -#define PARQUET_COLUMN_PROPERTIES_H +#pragma once #include #include @@ -721,5 +720,3 @@ PARQUET_EXPORT std::shared_ptr default_arrow_writer_properties(); } // namespace parquet - -#endif // PARQUET_COLUMN_PROPERTIES_H