diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index f71adb380ba2f..f5935661cda5e 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -5763,8 +5763,8 @@ class ParquetBloomFilterRoundTripTest : public ::testing::Test, } template - void VerifyBloomFilter(const BloomFilter* bloom_filter, - const ::arrow::ChunkedArray& chunked_array) { + void VerifyBloomFilterContains(const BloomFilter* bloom_filter, + const ::arrow::ChunkedArray& chunked_array) { for (auto value : ::arrow::stl::Iterate(chunked_array)) { if (value == std::nullopt) { continue; @@ -5773,6 +5773,17 @@ class ParquetBloomFilterRoundTripTest : public ::testing::Test, } } + template + void VerifyBloomFilterNotContains(const BloomFilter* bloom_filter, + const ::arrow::ChunkedArray& chunked_array) { + for (auto value : ::arrow::stl::Iterate(chunked_array)) { + if (value == std::nullopt) { + continue; + } + EXPECT_FALSE(bloom_filter->FindHash(bloom_filter->Hash(value.value()))); + } + } + protected: std::vector> bloom_filters_; }; @@ -5781,7 +5792,7 @@ TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTrip) { auto schema = ::arrow::schema( {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())}); BloomFilterOptions options; - options.ndv = 100; + options.ndv = 10; auto writer_properties = WriterProperties::Builder() .enable_bloom_filter_options(options, "c0") ->enable_bloom_filter_options(options, "c1") @@ -5804,16 +5815,26 @@ TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTrip) { int64_t bloom_filter_idx = 0; // current index in `bloom_filters_` for (int64_t row_group_id = 0; row_group_id < 2; ++row_group_id) { { + // The bloom filter for same column in another row-group. + int64_t bloom_filter_idx_another_rg = + row_group_id == 0 ? bloom_filter_idx + 2 : bloom_filter_idx - 2; ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]); auto col = table->column(0)->Slice(current_row, row_group_row_count[row_group_id]); - VerifyBloomFilter<::arrow::Int64Type>(bloom_filters_[bloom_filter_idx].get(), *col); + VerifyBloomFilterContains<::arrow::Int64Type>( + bloom_filters_[bloom_filter_idx].get(), *col); + VerifyBloomFilterNotContains<::arrow::Int64Type>( + bloom_filters_[bloom_filter_idx_another_rg].get(), *col); ++bloom_filter_idx; } { + int64_t bloom_filter_idx_another_rg = + row_group_id == 0 ? bloom_filter_idx + 2 : bloom_filter_idx - 2; ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]); auto col = table->column(1)->Slice(current_row, row_group_row_count[row_group_id]); - VerifyBloomFilter<::arrow::StringType>(bloom_filters_[bloom_filter_idx].get(), - *col); + VerifyBloomFilterContains<::arrow::StringType>( + bloom_filters_[bloom_filter_idx].get(), *col); + VerifyBloomFilterNotContains<::arrow::StringType>( + bloom_filters_[bloom_filter_idx_another_rg].get(), *col); ++bloom_filter_idx; } current_row += row_group_row_count[row_group_id]; @@ -5828,7 +5849,7 @@ TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripDictionary) { ::arrow::field("c1", ::arrow::dictionary(::arrow::int64(), ::arrow::utf8()))}); bloom_filters_.clear(); BloomFilterOptions options; - options.ndv = 100; + options.ndv = 10; auto writer_properties = WriterProperties::Builder() .enable_bloom_filter_options(options, "c0") ->enable_bloom_filter_options(options, "c1") @@ -5843,6 +5864,7 @@ TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripDictionary) { [6, "f"] ])"}; auto table = ::arrow::TableFromJSON(schema, contents); + // using non_dict_table to adapt some interface which doesn't support dictionary. auto non_dict_table = ::arrow::TableFromJSON(origin_schema, contents); WriteFile(writer_properties, table); @@ -5853,18 +5875,28 @@ TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripDictionary) { int64_t bloom_filter_idx = 0; // current index in `bloom_filters_` for (int64_t row_group_id = 0; row_group_id < 2; ++row_group_id) { { + // The bloom filter for same column in another row-group. + int64_t bloom_filter_idx_another_rg = + row_group_id == 0 ? bloom_filter_idx + 2 : bloom_filter_idx - 2; ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]); auto col = non_dict_table->column(0)->Slice(current_row, row_group_row_count[row_group_id]); - VerifyBloomFilter<::arrow::Int64Type>(bloom_filters_[bloom_filter_idx].get(), *col); + VerifyBloomFilterContains<::arrow::Int64Type>( + bloom_filters_[bloom_filter_idx].get(), *col); + VerifyBloomFilterNotContains<::arrow::Int64Type>( + bloom_filters_[bloom_filter_idx_another_rg].get(), *col); ++bloom_filter_idx; } { + int64_t bloom_filter_idx_another_rg = + row_group_id == 0 ? bloom_filter_idx + 2 : bloom_filter_idx - 2; ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]); auto col = non_dict_table->column(1)->Slice(current_row, row_group_row_count[row_group_id]); - VerifyBloomFilter<::arrow::StringType>(bloom_filters_[bloom_filter_idx].get(), - *col); + VerifyBloomFilterContains<::arrow::StringType>( + bloom_filters_[bloom_filter_idx].get(), *col); + VerifyBloomFilterNotContains<::arrow::StringType>( + bloom_filters_[bloom_filter_idx_another_rg].get(), *col); ++bloom_filter_idx; } current_row += row_group_row_count[row_group_id]; @@ -5875,7 +5907,7 @@ TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripWithOneFilter) { auto schema = ::arrow::schema( {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())}); BloomFilterOptions options; - options.ndv = 100; + options.ndv = 10; auto writer_properties = WriterProperties::Builder() .enable_bloom_filter_options(options, "c0") ->disable_bloom_filter("c1") @@ -5900,7 +5932,8 @@ TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripWithOneFilter) { { ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]); auto col = table->column(0)->Slice(current_row, row_group_row_count[row_group_id]); - VerifyBloomFilter<::arrow::Int64Type>(bloom_filters_[bloom_filter_idx].get(), *col); + VerifyBloomFilterContains<::arrow::Int64Type>( + bloom_filters_[bloom_filter_idx].get(), *col); ++bloom_filter_idx; } current_row += row_group_row_count[row_group_id]; @@ -5910,7 +5943,7 @@ TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripWithOneFilter) { TEST_F(ParquetBloomFilterRoundTripTest, ThrowForBoolean) { auto schema = ::arrow::schema({::arrow::field("boolean_col", ::arrow::boolean())}); BloomFilterOptions options; - options.ndv = 100; + options.ndv = 10; auto writer_properties = WriterProperties::Builder() .enable_bloom_filter_options(options, "boolean_col") ->max_row_group_length(4) diff --git a/cpp/src/parquet/bloom_filter_builder.cc b/cpp/src/parquet/bloom_filter_builder.cc index 997fd5b4c6caf..651ca89ba266b 100644 --- a/cpp/src/parquet/bloom_filter_builder.cc +++ b/cpp/src/parquet/bloom_filter_builder.cc @@ -41,6 +41,9 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder { explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema, const WriterProperties* properties) : schema_(schema), properties_(properties) {} + BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete; + BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default; + /// Append a new row group to host all incoming bloom filters. void AppendRowGroup() override; @@ -51,14 +54,11 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder { /// been flushed. void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) override; - BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete; - BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default; - private: /// Make sure column ordinal is not out of bound and the builder is in good state. void CheckState(int32_t column_ordinal) const { if (finished_) { - throw ParquetException("BloomFilterBuilder is already finished."); + throw ParquetException("Cannot call WriteTo() twice on BloomFilterBuilder."); } if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) { throw ParquetException("Invalid column ordinal: ", column_ordinal); @@ -85,7 +85,7 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder { void BloomFilterBuilderImpl::AppendRowGroup() { if (finished_) { throw ParquetException( - "Cannot call AppendRowGroup() to finished BloomFilterBuilder."); + "Cannot call AppendRowGroup() to BloomFilterBuilder::WriteTo is called"); } file_bloom_filters_.emplace_back(std::make_unique()); } @@ -93,12 +93,16 @@ void BloomFilterBuilderImpl::AppendRowGroup() { BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t column_ordinal) { CheckState(column_ordinal); const ColumnDescriptor* column_descr = schema_->Column(column_ordinal); + // Bloom filter does not support boolean type, and this should be checked in + // CheckState() already. DCHECK_NE(column_descr->physical_type(), Type::BOOLEAN); auto bloom_filter_options_opt = properties_->bloom_filter_options(column_descr->path()); if (bloom_filter_options_opt == std::nullopt) { return nullptr; } BloomFilterOptions bloom_filter_options = *bloom_filter_options_opt; + // CheckState() should have checked that file_bloom_filters_ is not empty. + DCHECK(!file_bloom_filters_.empty()); RowGroupBloomFilters& row_group_bloom_filter = *file_bloom_filters_.back(); auto iter = row_group_bloom_filter.find(column_ordinal); if (iter == row_group_bloom_filter.end()) { @@ -111,7 +115,10 @@ BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t column_ordin DCHECK(insert_result.second); iter = insert_result.first; } - ARROW_CHECK(iter->second != nullptr); + if (iter->second == nullptr) { + throw ParquetException("Bloom filter state is invalid for column ", + column_descr->path()); + } return iter->second.get(); } @@ -130,22 +137,20 @@ void BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink, if (row_group_bloom_filters.empty()) { continue; } - bool has_valid_bloom_filter = false; int num_columns = schema_->num_columns(); std::vector> locations(num_columns, std::nullopt); // serialize bloom filter in ascending order of column id for (auto& [column_id, filter] : row_group_bloom_filters) { - ARROW_CHECK(filter != nullptr); + if (ARROW_PREDICT_FALSE(filter == nullptr)) { + throw ParquetException("Bloom filter state is invalid for column ", column_id); + } PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell()); filter->WriteTo(sink); PARQUET_ASSIGN_OR_THROW(int64_t pos, sink->Tell()); - has_valid_bloom_filter = true; locations[column_id] = IndexLocation{offset, static_cast(pos - offset)}; } - if (has_valid_bloom_filter) { - location->bloom_filter_location.emplace(row_group_ordinal, std::move(locations)); - } + location->bloom_filter_location.emplace(row_group_ordinal, std::move(locations)); } } } // namespace diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 571c954dcd916..325073be44453 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -2446,6 +2446,8 @@ template <> void TypedColumnWriterImpl::UpdateBloomFilterSpaced(const bool*, int64_t, const uint8_t*, int64_t) { + // BooleanType does not have a bloom filter currently, + // so bloom_filter_ should always be nullptr. DCHECK(bloom_filter_ == nullptr); } @@ -2504,7 +2506,7 @@ void TypedColumnWriterImpl::UpdateBloomFilterArray( const ::arrow::Array& values) { if (bloom_filter_) { // TODO(mwish): GH-37832 currently we don't support write StringView/BinaryView to - // parquet file. We can support + // parquet file. if (!::arrow::is_base_binary_like(values.type_id())) { throw ParquetException("Only BaseBinaryArray and subclasses supported"); } @@ -2513,7 +2515,11 @@ void TypedColumnWriterImpl::UpdateBloomFilterArray( UpdateBinaryBloomFilter(bloom_filter_, checked_cast(values)); } else { - DCHECK(::arrow::is_large_binary_like(values.type_id())); + // TODO(mwish): GH-37832 currently we don't support write StringView/BinaryView to + // parquet file. + if (!::arrow::is_large_binary_like(values.type_id())) { + throw ParquetException("Only LargeBinaryArray and subclasses supported"); + } UpdateBinaryBloomFilter(bloom_filter_, checked_cast(values)); }