diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 2fa12d8bb6220..a3cef4b4ce856 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1180,55 +1180,126 @@ int PlainBooleanDecoder::Decode(bool* buffer, int max_values) { return max_values; } +// A helper class to abstract away differences between EncodingTraits::Accumulator +// for ByteArrayType and FLBAType. template -struct ArrowBinaryHelper { - explicit ArrowBinaryHelper(typename EncodingTraits::Accumulator* acc) { - if constexpr (std::is_same_v) { - builder = acc->builder.get(); - chunks = &acc->chunks; - } else { - builder = acc; +struct ArrowBinaryHelper; + +template <> +struct ArrowBinaryHelper { + using Accumulator = typename EncodingTraits::Accumulator; + + ArrowBinaryHelper(Accumulator* acc, int64_t length) + : acc_(acc), + entries_remaining_(length), + chunk_space_remaining_(::arrow::kBinaryMemoryLimit - + acc_->builder->value_data_length()) {} + + Status Prepare(std::optional estimated_data_length = {}) { + RETURN_NOT_OK(acc_->builder->Reserve(entries_remaining_)); + if (estimated_data_length.has_value()) { + RETURN_NOT_OK(acc_->builder->ReserveData( + std::min(*estimated_data_length, ::arrow::kBinaryMemoryLimit))); } - if (ARROW_PREDICT_FALSE(SubtractWithOverflow(::arrow::kBinaryMemoryLimit, - builder->value_data_length(), - &chunk_space_remaining))) { - throw ParquetException("excess expansion in ArrowBinaryHelper"); + return Status::OK(); + } + + Status PrepareNextInput(int64_t next_value_length, + std::optional estimated_remaining_data_length = {}) { + if (ARROW_PREDICT_FALSE(!CanFit(next_value_length))) { + // This element would exceed the capacity of a chunk + RETURN_NOT_OK(PushChunk()); + RETURN_NOT_OK(acc_->builder->Reserve(entries_remaining_)); + if (estimated_remaining_data_length.has_value()) { + RETURN_NOT_OK(acc_->builder->ReserveData( + std::min(*estimated_remaining_data_length, chunk_space_remaining_))); + } } + return Status::OK(); } + void UnsafeAppend(const uint8_t* data, int32_t length) { + DCHECK(CanFit(length)); + DCHECK_GT(entries_remaining_, 0); + chunk_space_remaining_ -= length; + --entries_remaining_; + acc_->builder->UnsafeAppend(data, length); + } + + Status Append(const uint8_t* data, int32_t length) { + DCHECK(CanFit(length)); + DCHECK_GT(entries_remaining_, 0); + chunk_space_remaining_ -= length; + --entries_remaining_; + return acc_->builder->Append(data, length); + } + + void UnsafeAppendNull() { + --entries_remaining_; + acc_->builder->UnsafeAppendNull(); + } + + Status AppendNull() { + --entries_remaining_; + return acc_->builder->AppendNull(); + } + + private: Status PushChunk() { - std::shared_ptr<::arrow::Array> result; - RETURN_NOT_OK(builder->Finish(&result)); - chunks->push_back(result); - chunk_space_remaining = ::arrow::kBinaryMemoryLimit; + ARROW_ASSIGN_OR_RAISE(auto chunk, acc_->builder->Finish()); + acc_->chunks.push_back(std::move(chunk)); + chunk_space_remaining_ = ::arrow::kBinaryMemoryLimit; return Status::OK(); } - bool CanFit(int64_t length) const { return length <= chunk_space_remaining; } + bool CanFit(int64_t length) const { return length <= chunk_space_remaining_; } - void UnsafeAppend(const uint8_t* data, int32_t length) { - DCHECK(CanFit(length)); - chunk_space_remaining -= length; - builder->UnsafeAppend(data, length); + Accumulator* acc_; + int64_t entries_remaining_; + int64_t chunk_space_remaining_; +}; + +template <> +struct ArrowBinaryHelper { + using Accumulator = typename EncodingTraits::Accumulator; + + ArrowBinaryHelper(Accumulator* acc, int64_t length) + : acc_(acc), entries_remaining_(length) {} + + Status Prepare(std::optional estimated_data_length = {}) { + return acc_->Reserve(entries_remaining_); } - void UnsafeAppendNull() { builder->UnsafeAppendNull(); } + Status PrepareNextInput(int64_t next_value_length, + std::optional estimated_remaining_data_length = {}) { + return Status::OK(); + } + + void UnsafeAppend(const uint8_t* data, int32_t length) { + DCHECK_GT(entries_remaining_, 0); + --entries_remaining_; + acc_->UnsafeAppend(data); + } Status Append(const uint8_t* data, int32_t length) { - DCHECK(CanFit(length)); - chunk_space_remaining -= length; - if constexpr (std::is_same_v) { - return builder->Append(data); - } else { - return builder->Append(data, length); - } + DCHECK_GT(entries_remaining_, 0); + --entries_remaining_; + return acc_->Append(data); } - Status AppendNull() { return builder->AppendNull(); } + void UnsafeAppendNull() { + --entries_remaining_; + acc_->UnsafeAppendNull(); + } + + Status AppendNull() { + --entries_remaining_; + return acc_->AppendNull(); + } - typename EncodingTraits::BuilderType* builder; - std::vector>* chunks; - int64_t chunk_space_remaining; + private: + Accumulator* acc_; + int64_t entries_remaining_; }; template <> @@ -1329,12 +1400,10 @@ class PlainByteArrayDecoder : public PlainDecoder, int64_t valid_bits_offset, typename EncodingTraits::Accumulator* out, int* out_values_decoded) { - ArrowBinaryHelper helper(out); + ArrowBinaryHelper helper(out, num_values); int values_decoded = 0; - RETURN_NOT_OK(helper.builder->Reserve(num_values)); - RETURN_NOT_OK(helper.builder->ReserveData( - std::min(len_, helper.chunk_space_remaining))); + RETURN_NOT_OK(helper.Prepare(len_)); int i = 0; RETURN_NOT_OK(VisitNullBitmapInline( @@ -1351,13 +1420,7 @@ class PlainByteArrayDecoder : public PlainDecoder, if (ARROW_PREDICT_FALSE(len_ < increment)) { ParquetException::EofException(); } - if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) { - // This element would exceed the capacity of a chunk - RETURN_NOT_OK(helper.PushChunk()); - RETURN_NOT_OK(helper.builder->Reserve(num_values - i)); - RETURN_NOT_OK(helper.builder->ReserveData( - std::min(len_, helper.chunk_space_remaining))); - } + RETURN_NOT_OK(helper.PrepareNextInput(value_len, len_)); helper.UnsafeAppend(data_ + 4, value_len); data_ += increment; len_ -= increment; @@ -1850,7 +1913,8 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, constexpr int32_t kBufferSize = 1024; int32_t indices[kBufferSize]; - ArrowBinaryHelper helper(out); + ArrowBinaryHelper helper(out, num_values); + RETURN_NOT_OK(helper.Prepare()); auto dict_values = reinterpret_cast(dictionary_->data()); int values_decoded = 0; @@ -1871,9 +1935,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, const auto index = indices[pos_indices++]; RETURN_NOT_OK(IndexInBounds(index)); const auto& val = dict_values[index]; - if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { - RETURN_NOT_OK(helper.PushChunk()); - } + RETURN_NOT_OK(helper.PrepareNextInput(val.len)); RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); ++values_decoded; return Status::OK(); @@ -1919,20 +1981,21 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, int32_t indices[kBufferSize]; int values_decoded = 0; - ArrowBinaryHelper helper(out); + ArrowBinaryHelper helper(out, num_values); + RETURN_NOT_OK(helper.Prepare(len_)); + auto dict_values = reinterpret_cast(dictionary_->data()); while (values_decoded < num_values) { - int32_t batch_size = std::min(kBufferSize, num_values - values_decoded); - int num_indices = idx_decoder_.GetBatch(indices, batch_size); + const int32_t batch_size = + std::min(kBufferSize, num_values - values_decoded); + const int num_indices = idx_decoder_.GetBatch(indices, batch_size); if (num_indices == 0) ParquetException::EofException(); for (int i = 0; i < num_indices; ++i) { auto idx = indices[i]; RETURN_NOT_OK(IndexInBounds(idx)); const auto& val = dict_values[idx]; - if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { - RETURN_NOT_OK(helper.PushChunk()); - } + RETURN_NOT_OK(helper.PrepareNextInput(val.len)); RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); } values_decoded += num_indices; @@ -2762,7 +2825,8 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, int64_t valid_bits_offset, typename EncodingTraits::Accumulator* out, int* out_num_values) { - ArrowBinaryHelper helper(out); + ArrowBinaryHelper helper(out, num_values); + RETURN_NOT_OK(helper.Prepare()); std::vector values(num_values - null_count); const int num_valid_values = Decode(values.data(), num_values - null_count); @@ -2778,9 +2842,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, valid_bits, valid_bits_offset, num_values, null_count, [&]() { const auto& val = values_ptr[value_idx]; - if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { - RETURN_NOT_OK(helper.PushChunk()); - } + RETURN_NOT_OK(helper.PrepareNextInput(val.len)); RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); ++value_idx; return Status::OK(); @@ -3334,7 +3396,8 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public TypedDecode int64_t valid_bits_offset, typename EncodingTraits::Accumulator* out, int* out_num_values) { - ArrowBinaryHelper helper(out); + ArrowBinaryHelper helper(out, num_values); + RETURN_NOT_OK(helper.Prepare()); std::vector values(num_values); const int num_valid_values = GetInternal(values.data(), num_values - null_count); @@ -3347,9 +3410,7 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public TypedDecode valid_bits, valid_bits_offset, num_values, null_count, [&]() { const auto& val = values_ptr[value_idx]; - if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { - RETURN_NOT_OK(helper.PushChunk()); - } + RETURN_NOT_OK(helper.PrepareNextInput(val.len)); RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); ++value_idx; return Status::OK(); diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 300352bfe85e1..6cdfe37920200 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -140,15 +140,14 @@ template <> struct EncodingTraits { using Encoder = ByteArrayEncoder; using Decoder = ByteArrayDecoder; - using BuilderType = ::arrow::BinaryBuilder; + using ArrowType = ::arrow::BinaryType; /// \brief Internal helper class for decoding BYTE_ARRAY data where we can /// overflow the capacity of a single arrow::BinaryArray struct Accumulator { std::unique_ptr<::arrow::BinaryBuilder> builder; std::vector> chunks; }; - using ArrowType = ::arrow::BinaryType; using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::BinaryType>; }; @@ -158,8 +157,7 @@ struct EncodingTraits { using Decoder = FLBADecoder; using ArrowType = ::arrow::FixedSizeBinaryType; - using BuilderType = ::arrow::FixedSizeBinaryBuilder; - using Accumulator = BuilderType; + using Accumulator = ::arrow::FixedSizeBinaryBuilder; using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::FixedSizeBinaryType>; }; diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index f1b1933193f8e..71dc40d33ac47 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -2042,9 +2042,10 @@ TYPED_TEST(TestDeltaByteArrayEncoding, BasicRoundTrip) { } template -class DeltaByteArrayEncodingDirectPut : public TestEncodingBase { +class TestDeltaByteArrayEncodingDirectPut : public TestEncodingBase { using ArrowType = typename EncodingTraits::ArrowType; - using IsFixedSizeBinary = ::arrow::is_fixed_size_binary_type; + using Accumulator = typename EncodingTraits::Accumulator; + using BuilderType = typename ::arrow::TypeTraits::BuilderType; public: std::unique_ptr> encoder = @@ -2052,109 +2053,101 @@ class DeltaByteArrayEncodingDirectPut : public TestEncodingBase { std::unique_ptr> decoder = MakeTypedDecoder(Encoding::DELTA_BYTE_ARRAY); - void CheckDirectPutByteArray(std::shared_ptr<::arrow::Array> array) { - ASSERT_NO_THROW(encoder->Put(*array)); - auto buf = encoder->FlushValues(); + void CheckDirectPut(std::shared_ptr<::arrow::Array> array); - int num_values = static_cast(array->length() - array->null_count()); - decoder->SetData(num_values, buf->data(), static_cast(buf->size())); + void CheckRoundtrip() override; - typename EncodingTraits::Accumulator acc; - using BuilderType = typename EncodingTraits::BuilderType; - acc.builder = std::make_unique(array->type(), default_memory_pool()); + protected: + USING_BASE_MEMBERS(); +}; - ASSERT_EQ(num_values, - decoder->DecodeArrow(static_cast(array->length()), - static_cast(array->null_count()), - array->null_bitmap_data(), array->offset(), &acc)); +template <> +void TestDeltaByteArrayEncodingDirectPut::CheckDirectPut( + std::shared_ptr<::arrow::Array> array) { + ASSERT_NO_THROW(encoder->Put(*array)); + auto buf = encoder->FlushValues(); - std::shared_ptr<::arrow::Array> result; - ASSERT_OK(acc.builder->Finish(&result)); - ASSERT_EQ(array->length(), result->length()); - ASSERT_OK(result->ValidateFull()); + int num_values = static_cast(array->length() - array->null_count()); + decoder->SetData(num_values, buf->data(), static_cast(buf->size())); - ::arrow::AssertArraysEqual(*array, *result); - } + Accumulator acc; + acc.builder = std::make_unique(array->type(), default_memory_pool()); - void CheckDirectPutFLBA(std::shared_ptr<::arrow::Array> array) { - ASSERT_NO_THROW(encoder->Put(*array)); - auto buf = encoder->FlushValues(); + ASSERT_EQ(num_values, + decoder->DecodeArrow(static_cast(array->length()), + static_cast(array->null_count()), + array->null_bitmap_data(), array->offset(), &acc)); - int num_values = static_cast(array->length() - array->null_count()); - decoder->SetData(num_values, buf->data(), static_cast(buf->size())); + ASSERT_EQ(acc.chunks.size(), 0) << "Accumulator shouldn't have overflowed chunks"; + ASSERT_OK_AND_ASSIGN(auto result, acc.builder->Finish()); + ASSERT_EQ(array->length(), result->length()); + ASSERT_OK(result->ValidateFull()); - auto acc = - typename EncodingTraits::Accumulator(array->type(), default_memory_pool()); - ASSERT_EQ(num_values, - decoder->DecodeArrow(static_cast(array->length()), - static_cast(array->null_count()), - array->null_bitmap_data(), array->offset(), &acc)); + ::arrow::AssertArraysEqual(*array, *result); +} - std::shared_ptr<::arrow::Array> result; - ASSERT_OK(acc.Finish(&result)); - ASSERT_EQ(array->length(), result->length()); - ASSERT_OK(result->ValidateFull()); +template <> +void TestDeltaByteArrayEncodingDirectPut::CheckDirectPut( + std::shared_ptr<::arrow::Array> array) { + ASSERT_NO_THROW(encoder->Put(*array)); + auto buf = encoder->FlushValues(); - ::arrow::AssertArraysEqual(*array, *result); - } + int num_values = static_cast(array->length() - array->null_count()); + decoder->SetData(num_values, buf->data(), static_cast(buf->size())); - void CheckDirectPut(std::shared_ptr<::arrow::Array> array) { - if constexpr (IsFixedSizeBinary::value) { - CheckDirectPutFLBA(array); - } else { - CheckDirectPutByteArray(array); - } - } + Accumulator acc(array->type(), default_memory_pool()); - void CheckRoundtripFLBA() { - constexpr int64_t kSize = 50; - constexpr int kSeed = 42; - constexpr int kByteWidth = 4; - ::arrow::random::RandomArrayGenerator rag{kSeed}; - std::shared_ptr<::arrow::Array> values = - rag.FixedSizeBinary(/*size=*/0, /*byte_width=*/kByteWidth); - CheckDirectPut(values); + ASSERT_EQ(num_values, + decoder->DecodeArrow(static_cast(array->length()), + static_cast(array->null_count()), + array->null_bitmap_data(), array->offset(), &acc)); - for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { - values = rag.FixedSizeBinary(kSize + seed, kByteWidth); - CheckDirectPut(values); - } - } + ASSERT_OK_AND_ASSIGN(auto result, acc.Finish()); + ASSERT_EQ(array->length(), result->length()); + ASSERT_OK(result->ValidateFull()); - void CheckRoundtripByteArray() { - constexpr int64_t kSize = 500; - constexpr int32_t kMinLength = 0; - constexpr int32_t kMaxLength = 10; - constexpr int32_t kNumUnique = 10; - constexpr double kNullProbability = 0.25; - constexpr int kSeed = 42; - ::arrow::random::RandomArrayGenerator rag{kSeed}; - std::shared_ptr<::arrow::Array> values = rag.BinaryWithRepeats( - /*size=*/1, /*unique=*/1, kMinLength, kMaxLength, kNullProbability); - CheckDirectPut(values); + ::arrow::AssertArraysEqual(*array, *result); +} - for (int i = 0; i < 10; ++i) { - values = rag.BinaryWithRepeats(kSize, kNumUnique, kMinLength, kMaxLength, - kNullProbability); - CheckDirectPut(values); - } +template <> +void TestDeltaByteArrayEncodingDirectPut::CheckRoundtrip() { + constexpr int64_t kSize = 500; + constexpr int32_t kMinLength = 0; + constexpr int32_t kMaxLength = 10; + constexpr int32_t kNumUnique = 10; + constexpr double kNullProbability = 0.25; + constexpr int kSeed = 42; + ::arrow::random::RandomArrayGenerator rag{kSeed}; + std::shared_ptr<::arrow::Array> values = rag.BinaryWithRepeats( + /*size=*/1, /*unique=*/1, kMinLength, kMaxLength, kNullProbability); + CheckDirectPut(values); + + for (int i = 0; i < 10; ++i) { + values = rag.BinaryWithRepeats(kSize, kNumUnique, kMinLength, kMaxLength, + kNullProbability); + CheckDirectPut(values); } +} - void CheckRoundtrip() override { - if constexpr (IsFixedSizeBinary::value) { - CheckRoundtripFLBA(); - } else { - CheckRoundtripByteArray(); - } - } +template <> +void TestDeltaByteArrayEncodingDirectPut::CheckRoundtrip() { + constexpr int64_t kSize = 50; + constexpr int kSeed = 42; + constexpr int kByteWidth = 4; + ::arrow::random::RandomArrayGenerator rag{kSeed}; + std::shared_ptr<::arrow::Array> values = + rag.FixedSizeBinary(/*size=*/0, /*byte_width=*/kByteWidth); + CheckDirectPut(values); - protected: - USING_BASE_MEMBERS(); -}; + for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { + values = rag.FixedSizeBinary(kSize + seed, kByteWidth); + CheckDirectPut(values); + } +} -TYPED_TEST_SUITE(DeltaByteArrayEncodingDirectPut, TestDeltaByteArrayEncodingTypes); +TYPED_TEST_SUITE(TestDeltaByteArrayEncodingDirectPut, TestDeltaByteArrayEncodingTypes); -TYPED_TEST(DeltaByteArrayEncodingDirectPut, DirectPut) { +TYPED_TEST(TestDeltaByteArrayEncodingDirectPut, DirectPut) { ASSERT_NO_FATAL_FAILURE(this->CheckRoundtrip()); }