diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index 2f70c286503fb..285134a40dbd9 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -204,8 +204,10 @@ class BitReader { inline bool BitWriter::PutValue(uint64_t v, int num_bits) { // TODO: revisit this limit if necessary (can be raised to 64 by fixing some edge cases) - DCHECK_LE(num_bits, 32); - DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits; + DCHECK_LE(num_bits, 64); + if (num_bits < 64) { + DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits; + } if (ARROW_PREDICT_FALSE(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) return false; @@ -220,7 +222,8 @@ inline bool BitWriter::PutValue(uint64_t v, int num_bits) { buffered_values_ = 0; byte_offset_ += 8; bit_offset_ -= 64; - buffered_values_ = v >> (num_bits - bit_offset_); + buffered_values_ = + (num_bits - bit_offset_ == 64) ? 0 : (v >> (num_bits - bit_offset_)); } DCHECK_LT(bit_offset_, 64); return true; diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 44f762d7113e4..510f61d7f8819 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2060,6 +2060,275 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, } }; +// ---------------------------------------------------------------------- +// DeltaBitPackEncoder + +/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format +/// as per the parquet spec. See: +/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5 +/// +/// Consists of a header followed by blocks of delta encoded values binary packed. +/// +/// Format +/// [header] [block 1] [block 2] ... [block N] +/// +/// Header +/// [block size] [number of mini blocks per block] [total value count] [first value] +/// +/// Block +/// [min delta] [list of bitwidths of the mini blocks] [miniblocks] +/// +/// Sets aside bytes at the start of the internal buffer where the header will be written, +/// and only writes the header when FlushValues is called before returning it. +/// +/// To encode a block, we will: +/// +/// 1. Compute the differences between consecutive elements. For the first element in the +/// block, use the last element in the previous block or, in the case of the first block, +/// use the first value of the whole sequence, stored in the header. +/// +/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract +/// this min delta from all deltas in the block. This guarantees that all values are +/// non-negative. +/// +/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the +/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed +/// per mini block. +/// +/// Supports only INT32 and INT64. + +template +class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder { + static constexpr uint32_t kMaxPageHeaderWriterSize = 32; + static constexpr uint32_t kValuesPerBlock = 128; + static constexpr uint32_t kMiniBlocksPerBlock = 4; + + public: + using T = typename DType::c_type; + using UT = std::make_unsigned_t; + using TypedEncoder::Put; + + explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool, + const uint32_t values_per_block = kValuesPerBlock, + const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock) + : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool), + values_per_block_(values_per_block), + mini_blocks_per_block_(mini_blocks_per_block), + values_per_mini_block_(values_per_block / mini_blocks_per_block), + deltas_(values_per_block, ::arrow::stl::allocator(pool)), + bits_buffer_( + AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * sizeof(T))), + sink_(pool), + bit_writer_(bits_buffer_->mutable_data(), static_cast(bits_buffer_->size())), + bit_widths_(mini_blocks_per_block, 0) { + if (values_per_block_ % 128 != 0) { + throw ParquetException( + "the number of values in a block must be multiple of 128, but it's " + + std::to_string(values_per_block_)); + } + if (values_per_mini_block_ % 32 != 0) { + throw ParquetException( + "the number of values in a miniblock must be multiple of 32, but it's " + + std::to_string(values_per_mini_block_)); + } + if (values_per_block % mini_blocks_per_block != 0) { + throw ParquetException( + "the number of values per block % number of miniblocks per block must be 0, " + "but it's " + + std::to_string(values_per_block % mini_blocks_per_block)); + } + // Reserve enough space at the beginning of the buffer for largest possible header. + PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize)); + } + + std::shared_ptr FlushValues() override; + + int64_t EstimatedDataEncodedSize() override { return sink_.length(); } + + void Put(const ::arrow::Array& values) override; + + void Put(const T* buffer, int num_values) override; + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override; + + void FlushBlock(); + + private: + const uint32_t values_per_block_; + const uint32_t mini_blocks_per_block_; + const uint32_t values_per_mini_block_; + uint32_t values_current_block_{0}; + uint32_t total_value_count_{0}; + UT first_value_{0}; + UT current_value_{0}; + ArrowPoolVector deltas_; + std::shared_ptr bits_buffer_; + ::arrow::BufferBuilder sink_; + ::arrow::bit_util::BitWriter bit_writer_; + std::vector bit_widths_; +}; + +template +void DeltaBitPackEncoder::Put(const T* src, int num_values) { + if (num_values == 0) { + return; + } + + int idx = 0; + if (total_value_count_ == 0) { + current_value_ = src[0]; + first_value_ = current_value_; + idx = 1; + } + total_value_count_ += num_values; + + while (idx < num_values) { + UT value = static_cast(src[idx]); + // Calculate deltas. The possible overflow is handled by use of unsigned integers + // making subtraction operations well defined and correct even in case of overflow. + // Encoded integes will wrap back around on decoding. + // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n + deltas_[values_current_block_] = value - current_value_; + current_value_ = value; + idx++; + values_current_block_++; + if (values_current_block_ == values_per_block_) { + FlushBlock(); + } + } +} + +template +void DeltaBitPackEncoder::FlushBlock() { + if (values_current_block_ == 0) { + return; + } + + auto min_delta = static_cast( + *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_)); + bit_writer_.PutZigZagVlqInt(static_cast(min_delta)); + + // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to write + // bit widths of miniblocks as they become known during the encoding. + uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_); + DCHECK(bit_width_data != nullptr); + + uint32_t num_miniblocks = + static_cast(std::ceil(static_cast(values_current_block_) / + static_cast(values_per_mini_block_))); + for (uint32_t i = 0; i < num_miniblocks; i++) { + const uint32_t values_current_mini_block = + std::min(values_per_mini_block_, values_current_block_); + + const uint32_t start = i * values_per_mini_block_; + auto max_delta = static_cast(*std::max_element( + deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block)); + + // The minimum number of bits required to write any of values in deltas_ vector. + // See overflow comment above. + bit_widths_[i] = bit_util::NumRequiredBits(max_delta - min_delta); + + for (uint32_t j = start; j < start + values_current_mini_block; j++) { + // See overflow comment above. + const UT value = deltas_[j] - min_delta; + bit_writer_.PutValue(value, bit_widths_[i]); + } + // If there are not enough values to fill the last mini block, we pad the mini block + // with zeroes so that its length is the number of values in a full mini block + // multiplied by the bit width. + for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) { + bit_writer_.PutValue(0, bit_widths_[i]); + } + values_current_block_ -= values_current_mini_block; + } + DCHECK_EQ(values_current_block_, 0); + + // If, in the last block, less than miniblocks are + // needed to store the values, the bytes storing the bit widths of the unneeded + // miniblocks are still present, their value should be zero, but readers must accept + // arbitrary values as well. + for (uint32_t i = 0; i < mini_blocks_per_block_; i++) { + bit_width_data[i] = bit_widths_[i]; + } + + bit_writer_.Flush(); + PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); + bit_writer_.Clear(); + bit_width_data = NULL; +} + +template +std::shared_ptr DeltaBitPackEncoder::FlushValues() { + if (values_current_block_ > 0) { + FlushBlock(); + } + PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true)); + + uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {}; + bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_)); + if (!header_writer.PutVlqInt(values_per_block_) || + !header_writer.PutVlqInt(mini_blocks_per_block_) || + !header_writer.PutVlqInt(total_value_count_) || + !header_writer.PutZigZagVlqInt(static_cast(first_value_))) { + throw ParquetException("header writing error"); + } + header_writer.Flush(); + + // We reserved enough space at the beginning of the buffer for largest possible header + // and data was written immediately after. We now write the header data immediately + // before the end of reserved space. + const size_t offset_bytes = kMaxPageHeaderWriterSize - header_writer.bytes_written(); + std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_, + header_writer.bytes_written()); + + // Excess bytes at the beginning will are sliced off and ignored. + return SliceBuffer(buffer, offset_bytes); +} + +template <> +void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { + const ::arrow::ArrayData& data = *values.data(); + if (values.null_count() == 0) { + Put(data.GetValues(1), static_cast(data.length)); + } else { + PutSpaced(data.GetValues(1), static_cast(data.length), + data.GetValues(0, 0), data.offset); + } +} + +template <> +void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { + const ::arrow::ArrayData& data = *values.data(); + if (values.null_count() == 0) { + Put(data.GetValues(1), static_cast(data.length)); + } else { + PutSpaced(data.GetValues(1), static_cast(data.length), + data.GetValues(0, 0), data.offset); + } +} + +template +void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { + ParquetException::NYI("direct put of " + values.type()->ToString()); +} + +template +void DeltaBitPackEncoder::PutSpaced(const T* src, int num_values, + const uint8_t* valid_bits, + int64_t valid_bits_offset) { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), + this->memory_pool())); + T* data = reinterpret_cast(buffer->mutable_data()); + int num_valid_values = ::arrow::util::internal::SpacedCompress( + src, num_values, valid_bits, valid_bits_offset, data); + Put(data, num_valid_values); + } else { + Put(src, num_values); + } +} + // ---------------------------------------------------------------------- // DeltaBitPackDecoder @@ -2067,6 +2336,7 @@ template class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder { public: typedef typename DType::c_type T; + using UT = std::make_unsigned_t; explicit DeltaBitPackDecoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) @@ -2141,6 +2411,11 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(min_delta_) + static_cast(buffer[i + j]); - buffer[i + j] = static_cast(delta + static_cast(last_value_)); + // unsigned addition. Overflow is as expected. + buffer[i + j] = static_cast(min_delta_) + static_cast(buffer[i + j]) + + static_cast(last_value_); last_value_ = buffer[i + j]; } values_current_mini_block_ -= values_decode; @@ -2253,6 +2527,102 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder +class DeltaLengthByteArrayEncoder : public EncoderImpl, + virtual public TypedEncoder { + public: + using T = typename DType::c_type; + + explicit DeltaLengthByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool) + : EncoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY, + pool = ::arrow::default_memory_pool()), + sink_(pool), + length_encoder_(nullptr, pool), + encoded_size_{0} {} + + std::shared_ptr FlushValues() override; + + int64_t EstimatedDataEncodedSize() override { return encoded_size_; } + + using TypedEncoder::Put; + + void Put(const ::arrow::Array& values) override; + + void Put(const T* buffer, int num_values) override; + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override; + + protected: + ::arrow::BufferBuilder sink_; + DeltaBitPackEncoder length_encoder_; + uint32_t encoded_size_; +}; + +template <> +void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) { + auto src = values.data()->GetValues(1); + Put(src, static_cast(values.length())); +} + +template +void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) { + if (num_values == 0) { + return; + } + + std::vector lengths(num_values); + for (int idx = 0; idx < num_values; idx++) { + auto len = src[idx].len; + lengths[idx] = len; + encoded_size_ += len; + } + length_encoder_.Put(lengths.data(), num_values); + PARQUET_THROW_NOT_OK(sink_.Reserve(encoded_size_)); + + for (int idx = 0; idx < num_values; idx++) { + sink_.UnsafeAppend(src[idx].ptr, lengths[idx]); + } +} + +template +void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values, + const uint8_t* valid_bits, + int64_t valid_bits_offset) { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), + this->memory_pool())); + T* data = reinterpret_cast(buffer->mutable_data()); + int num_valid_values = ::arrow::util::internal::SpacedCompress( + src, num_values, valid_bits, valid_bits_offset, data); + Put(data, num_valid_values); + } else { + Put(src, num_values); + } +} + +template +std::shared_ptr DeltaLengthByteArrayEncoder::FlushValues() { + std::shared_ptr encoded_lengths = length_encoder_.FlushValues(); + + std::shared_ptr data; + PARQUET_THROW_NOT_OK(sink_.Finish(&data)); + sink_.Reset(); + + PARQUET_THROW_NOT_OK(sink_.Append(encoded_lengths->data(), encoded_lengths->size())); + PARQUET_THROW_NOT_OK(sink_.Append(data->mutable_data(), data->size())); + + std::shared_ptr buffer; + PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true)); + return buffer; +} + +// ---------------------------------------------------------------------- +// DeltaByteArrayDecoder + class DeltaLengthByteArrayDecoder : public DecoderImpl, virtual public TypedDecoder { public: @@ -2416,6 +2786,111 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY +// ---------------------------------------------------------------------- +// DeltaByteArrayEncoder + +template +class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder { + public: + using T = typename DType::c_type; + + explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool) + : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, + pool = ::arrow::default_memory_pool()), + sink_(pool), + prefix_length_encoder_(nullptr, pool), + suffix_encoder_(nullptr, pool), + last_value_(""), + last_value_in_previous_page_("") {} + + std::shared_ptr FlushValues() override; + + int64_t EstimatedDataEncodedSize() override { + return prefix_length_encoder_.EstimatedDataEncodedSize() + + suffix_encoder_.EstimatedDataEncodedSize(); + } + + using TypedEncoder::Put; + + void Put(const ::arrow::Array& values) override { + auto src = values.data()->GetValues(1); + Put(src, static_cast(values.length())); + } + + void Put(const T* buffer, int num_values) override; + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), + this->memory_pool())); + T* data = reinterpret_cast(buffer->mutable_data()); + int num_valid_values = ::arrow::util::internal::SpacedCompress( + src, num_values, valid_bits, valid_bits_offset, data); + Put(data, num_valid_values); + } else { + Put(src, num_values); + } + } + + uint32_t total_value_count_; + ::arrow::BufferBuilder sink_; + DeltaBitPackEncoder prefix_length_encoder_; + DeltaLengthByteArrayEncoder suffix_encoder_; + string_view last_value_; + std::string last_value_in_previous_page_; +}; + +template +void DeltaByteArrayEncoder::Put(const T* src, int num_values) { + if (num_values == 0) { + return; + } + int32_t offset = 0; + T* suffix; + + if (ARROW_PREDICT_TRUE(total_value_count_ == 0)) { + last_value_ = string_view{reinterpret_cast(src[0].ptr), src[0].len}; + suffix_encoder_.Put(&src[0], 1); + prefix_length_encoder_.Put(std::vector(src[0].len)); + offset = src[0].len; + } + + for (uint32_t i = 1; i < src->len; i++) { + auto val = string_view{reinterpret_cast(src[i].ptr), src[i].len}; + + for (uint32_t j = 0; j < std::min(last_value_.length(), val.length()); j++) { + const bool substring_equal = memcmp(&val + j, src + (offset + j), j); + if (!substring_equal) { + memcpy(&suffix, &val + j, val.length() - j); + last_value_ = val; + prefix_length_encoder_.Put(std::vector(j)); + suffix_encoder_.Put(suffix, static_cast(val.length() - j)); + offset += val.length(); + break; + } + } + } + + total_value_count_ += num_values; +} + +template +std::shared_ptr DeltaByteArrayEncoder::FlushValues() { + PARQUET_THROW_NOT_OK(sink_.Resize(EstimatedDataEncodedSize(), false)); + + std::shared_ptr prefix_lengths = prefix_length_encoder_.FlushValues(); + PARQUET_THROW_NOT_OK( + sink_.Append(prefix_lengths->mutable_data(), prefix_lengths->size())); + + std::shared_ptr suffixes = suffix_encoder_.FlushValues(); + PARQUET_THROW_NOT_OK(sink_.Append(suffixes->mutable_data(), suffixes->size())); + + std::shared_ptr buffer; + PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true)); + return buffer; +} + class DeltaByteArrayDecoder : public DecoderImpl, virtual public TypedDecoder { public: @@ -2760,6 +3235,39 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE"); break; } + } else if (encoding == Encoding::DELTA_BINARY_PACKED) { + switch (type_num) { + case Type::INT32: + return std::unique_ptr(new DeltaBitPackEncoder(descr, pool)); + case Type::INT64: + return std::unique_ptr(new DeltaBitPackEncoder(descr, pool)); + default: + throw ParquetException("DELTA_BINARY_PACKED only supports INT32 and INT64"); + break; + } + } else if (encoding == Encoding::DELTA_BYTE_ARRAY) { + switch (type_num) { + case Type::BYTE_ARRAY: + return std::unique_ptr( + new DeltaByteArrayEncoder(descr, pool)); + // TODO + // case Type::FIXED_LEN_BYTE_ARRAY: + // return std::unique_ptr(new + // DeltaByteArrayEncoder(descr, pool)); + default: + throw ParquetException( + "DELTA_BYTE_ARRAY only supports BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY"); + break; + } + } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { + switch (type_num) { + case Type::BYTE_ARRAY: + return std::unique_ptr( + new DeltaLengthByteArrayEncoder(descr, pool)); + default: + throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); + break; + } } else { ParquetException::NYI("Selected encoding is not supported"); } diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 7d42e3e8ce315..e459d925abc6e 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -124,6 +124,12 @@ void GenerateData(int num_values, T* out, std::vector* heap) { std::numeric_limits::max(), out); } +template +void GenerateBoundData(int num_values, T* out, T min, T max, std::vector* heap) { + // seed the prng so failure is deterministic + random_numbers(num_values, 0, min, max, out); +} + template <> void GenerateData(int num_values, bool* out, std::vector* heap) { // seed the prng so failure is deterministic @@ -1276,5 +1282,228 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) { ASSERT_THROW(MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT), ParquetException); } +// ---------------------------------------------------------------------- +// DELTA_BINARY_PACKED encode/decode tests. + +template +class TestDeltaBitPackEncoding : public TestEncodingBase { + public: + using c_type = typename Type::c_type; + static constexpr int TYPE = Type::type_num; + + void InitBoundData(int nvalues, int repeats) { + num_values_ = nvalues * repeats; + input_bytes_.resize(num_values_ * sizeof(c_type)); + output_bytes_.resize(num_values_ * sizeof(c_type)); + draws_ = reinterpret_cast(input_bytes_.data()); + decode_buf_ = reinterpret_cast(output_bytes_.data()); + GenerateBoundData(nvalues, draws_, -10, 10, &data_buffer_); + + // add some repeated values + for (int j = 1; j < repeats; ++j) { + for (int i = 0; i < nvalues; ++i) { + draws_[nvalues * j + i] = draws_[i]; + } + } + } + + void ExecuteBound(int nvalues, int repeats) { + InitBoundData(nvalues, repeats); + CheckRoundtrip(); + } + + void ExecuteSpacedBound(int nvalues, int repeats, int64_t valid_bits_offset, + double null_probability) { + InitBoundData(nvalues, repeats); + + int64_t size = num_values_ + valid_bits_offset; + auto rand = ::arrow::random::RandomArrayGenerator(1923); + const auto array = rand.UInt8(size, 0, 100, null_probability); + const auto valid_bits = array->null_bitmap_data(); + if (valid_bits) { + CheckRoundtripSpaced(valid_bits, valid_bits_offset); + } + } + + void CheckRoundtrip() override { + auto encoder = + MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); + auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); + + encoder->Put(draws_, num_values_); + encode_buffer_ = encoder->FlushValues(); + + decoder->SetData(num_values_, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + int values_decoded = decoder->Decode(decode_buf_, num_values_); + ASSERT_EQ(num_values_, values_decoded); + ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); + } + + void CheckRoundtripSpaced(const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + auto encoder = + MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); + auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); + int null_count = 0; + for (auto i = 0; i < num_values_; i++) { + if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) { + null_count++; + } + } + + encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset); + encode_buffer_ = encoder->FlushValues(); + decoder->SetData(num_values_ - null_count, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count, + valid_bits, valid_bits_offset); + ASSERT_EQ(num_values_, values_decoded); + ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced(decode_buf_, draws_, num_values_, + valid_bits, valid_bits_offset)); + } + + protected: + USING_BASE_MEMBERS(); + std::vector input_bytes_; + std::vector output_bytes_; +}; + +using TestDeltaBitPackEncodingTypes = ::testing::Types; +TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes); + +TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { + ASSERT_NO_FATAL_FAILURE(this->Execute(25000, 200)); + ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0)); + ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 2000)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced( + /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 0.1)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(25000, 200)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(0, 0)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(2000, 2000)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSpacedBound( + /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 0.1)); +} + +// ---------------------------------------------------------------------- +// DELTA_LENGTH_BYTE_ARRAY encode/decode tests. + +template +class TestDeltaLengthByteArrayEncoding : public TestEncodingBase { + public: + using c_type = typename Type::c_type; + static constexpr int TYPE = Type::type_num; + + virtual void CheckRoundtrip() { + auto encoder = + MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, false, descr_.get()); + auto decoder = + MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, descr_.get()); + + encoder->Put(draws_, num_values_); + encode_buffer_ = encoder->FlushValues(); + + decoder->SetData(num_values_, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + int values_decoded = decoder->Decode(decode_buf_, num_values_); + ASSERT_EQ(num_values_, values_decoded); + ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); + } + + void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) { + auto encoder = + MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, false, descr_.get()); + auto decoder = + MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY, descr_.get()); + int null_count = 0; + for (auto i = 0; i < num_values_; i++) { + if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) { + null_count++; + } + } + + encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset); + encode_buffer_ = encoder->FlushValues(); + decoder->SetData(num_values_ - null_count, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count, + valid_bits, valid_bits_offset); + ASSERT_EQ(num_values_, values_decoded); + ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced(decode_buf_, draws_, num_values_, + valid_bits, valid_bits_offset)); + } + + protected: + USING_BASE_MEMBERS(); +}; + +typedef ::testing::Types TestDeltaLengthByteArrayEncodingTypes; +TYPED_TEST_SUITE(TestDeltaLengthByteArrayEncoding, TestDeltaLengthByteArrayEncodingTypes); + +TYPED_TEST(TestDeltaLengthByteArrayEncoding, BasicRoundTrip) { + ASSERT_NO_FATAL_FAILURE(this->Execute(250, 2)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced( + /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 0.1)); +} + +// ---------------------------------------------------------------------- +// DELTA_BYTE_ARRAY encode/decode tests. + +template +class TestDeltaByteArrayEncoding : public TestEncodingBase { + public: + using c_type = typename Type::c_type; + static constexpr int TYPE = Type::type_num; + + virtual void CheckRoundtrip() { + auto encoder = + MakeTypedEncoder(Encoding::DELTA_BYTE_ARRAY, false, descr_.get()); + auto decoder = MakeTypedDecoder(Encoding::DELTA_BYTE_ARRAY, descr_.get()); + + encoder->Put(draws_, num_values_); + encode_buffer_ = encoder->FlushValues(); + + decoder->SetData(num_values_, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + int values_decoded = decoder->Decode(decode_buf_, num_values_); + ASSERT_EQ(num_values_, values_decoded); + ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); + } + + void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) { + auto encoder = + MakeTypedEncoder(Encoding::DELTA_BYTE_ARRAY, false, descr_.get()); + auto decoder = MakeTypedDecoder(Encoding::DELTA_BYTE_ARRAY, descr_.get()); + int null_count = 0; + for (auto i = 0; i < num_values_; i++) { + if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) { + null_count++; + } + } + + encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset); + encode_buffer_ = encoder->FlushValues(); + decoder->SetData(num_values_ - null_count, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count, + valid_bits, valid_bits_offset); + ASSERT_EQ(num_values_, values_decoded); + ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced(decode_buf_, draws_, num_values_, + valid_bits, valid_bits_offset)); + } + + protected: + USING_BASE_MEMBERS(); +}; + +typedef ::testing::Types TestDeltaByteArrayEncodingTypes; +TYPED_TEST_SUITE(TestDeltaByteArrayEncoding, TestDeltaByteArrayEncodingTypes); + +TYPED_TEST(TestDeltaByteArrayEncoding, BasicRoundTrip) { + ASSERT_NO_FATAL_FAILURE(this->Execute(250, 2)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced( + /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 0.1)); +} + } // namespace test } // namespace parquet diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst index 7649e45357a17..b25f1dba956b3 100644 --- a/docs/source/cpp/parquet.rst +++ b/docs/source/cpp/parquet.rst @@ -95,11 +95,11 @@ Encodings +--------------------------+----------+----------+---------+ | BYTE_STREAM_SPLIT | ✓ | ✓ | | +--------------------------+----------+----------+---------+ -| DELTA_BINARY_PACKED | ✓ | | | +| DELTA_BINARY_PACKED | ✓ | ✓ | | +--------------------------+----------+----------+---------+ | DELTA_BYTE_ARRAY | ✓ | | | +--------------------------+----------+----------+---------+ -| DELTA_LENGTH_BYTE_ARRAY | ✓ | | | +| DELTA_LENGTH_BYTE_ARRAY | ✓ | ✓ | | +--------------------------+----------+----------+---------+ * \(1) Only supported for encoding definition and repetition levels, diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py index 62ea19d422dfe..58995143c83e3 100644 --- a/python/pyarrow/tests/parquet/test_basic.py +++ b/python/pyarrow/tests/parquet/test_basic.py @@ -384,13 +384,16 @@ def test_byte_stream_split(use_legacy_dataset): def test_column_encoding(use_legacy_dataset): arr_float = pa.array(list(map(float, range(100)))) arr_int = pa.array(list(map(int, range(100)))) - mixed_table = pa.Table.from_arrays([arr_float, arr_int], - names=['a', 'b']) + arr_bin = pa.array(list(map(b"01234567", range(100))), pa.binary(8)) + mixed_table = pa.Table.from_arrays([arr_float, arr_int, arr_bin], + names=['a', 'b', 'c']) # Check "BYTE_STREAM_SPLIT" for column 'a' and "PLAIN" column_encoding for - # column 'b'. + # column 'b' and 'c'. _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, - column_encoding={'a': "BYTE_STREAM_SPLIT", 'b': "PLAIN"}, + column_encoding={'a': "BYTE_STREAM_SPLIT", + 'b': "PLAIN", + 'c': "PLAIN"}, use_legacy_dataset=use_legacy_dataset) # Check "PLAIN" for all columns. @@ -399,6 +402,22 @@ def test_column_encoding(use_legacy_dataset): column_encoding="PLAIN", use_legacy_dataset=use_legacy_dataset) + # Check "DELTA_BINARY_PACKED" for integer columns. + _check_roundtrip(mixed_table, expected=mixed_table, + use_dictionary=False, + column_encoding={'a': "PLAIN", + 'b': "DELTA_BINARY_PACKED", + 'c': "PLAIN"}, + use_legacy_dataset=use_legacy_dataset) + + # Check "DELTA_LENGTH_BYTE_ARRAY" for integer columns. + _check_roundtrip(mixed_table, expected=mixed_table, + use_dictionary=False, + column_encoding={'a': "PLAIN", + 'b': "DELTA_BINARY_PACKED", + 'c': "DELTA_LENGTH_BYTE_ARRAY"}, + use_legacy_dataset=use_legacy_dataset) + # Try to pass "BYTE_STREAM_SPLIT" column encoding for integer column 'b'. # This should throw an error as it is only supports FLOAT and DOUBLE. with pytest.raises(IOError, @@ -406,17 +425,17 @@ def test_column_encoding(use_legacy_dataset): " DOUBLE"): _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, - column_encoding={'b': "BYTE_STREAM_SPLIT"}, + column_encoding={'b': "BYTE_STREAM_SPLIT", + 'c': "PLAIN"}, use_legacy_dataset=use_legacy_dataset) - # Try to pass "DELTA_BINARY_PACKED". - # This should throw an error as it is only supported for reading. - with pytest.raises(IOError, - match="Not yet implemented: Selected encoding is" - " not supported."): + # Try to pass use "DELTA_BINARY_PACKED" encoding on float column. + # This should throw an error as only integers are supported. + with pytest.raises(OSError): _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, - column_encoding={'b': "DELTA_BINARY_PACKED"}, + column_encoding={'a': "DELTA_BINARY_PACKED", + 'c': "PLAIN"}, use_legacy_dataset=use_legacy_dataset) # Try to pass "RLE_DICTIONARY". @@ -457,7 +476,8 @@ def test_column_encoding(use_legacy_dataset): use_dictionary=False, use_byte_stream_split=['a'], column_encoding={'a': "RLE", - 'b': "BYTE_STREAM_SPLIT"}, + 'b': "BYTE_STREAM_SPLIT", + 'c': "PLAIN"}, use_legacy_dataset=use_legacy_dataset) # Try to pass column_encoding and use_byte_stream_split=True. @@ -467,7 +487,8 @@ def test_column_encoding(use_legacy_dataset): use_dictionary=False, use_byte_stream_split=True, column_encoding={'a': "RLE", - 'b': "BYTE_STREAM_SPLIT"}, + 'b': "BYTE_STREAM_SPLIT", + 'c': "PLAIN"}, use_legacy_dataset=use_legacy_dataset) # Try to pass column_encoding=True.