forked from apache/arrow
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
PARQUET-169: Implement support for bulk reading and writing rep/def l…
…evels Added a LevelDecoder and LevelEncoder class to read and write batches of def/rep levels. Added tests to verify the functionality. Author: Deepak Majeti <[email protected]> Closes apache#30 from majetideepak/master and squashes the following commits: 18d7e51 [Deepak Majeti] fixed argument order of asserts inside test cases 5e0000b [Deepak Majeti] PARQUET-169: Implement support for reading repetition and definition levels
- Loading branch information
Showing
10 changed files
with
393 additions
and
71 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#include <cstdlib> | ||
#include <iostream> | ||
#include <sstream> | ||
#include <string> | ||
|
||
#include <gtest/gtest.h> | ||
|
||
#include "parquet/thrift/parquet_types.h" | ||
#include "parquet/column/levels.h" | ||
|
||
using std::string; | ||
|
||
namespace parquet_cpp { | ||
|
||
class TestLevels : public ::testing::Test { | ||
public: | ||
int GenerateLevels(int min_repeat_factor, int max_repeat_factor, | ||
int max_level, std::vector<int16_t>& input_levels) { | ||
int total_count = 0; | ||
// for each repetition count upto max_repeat_factor | ||
for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) { | ||
// repeat count increase by a factor of 2 for every iteration | ||
int repeat_count = (1 << repeat); | ||
// generate levels for repetition count upto the maximum level | ||
int value = 0; | ||
int bwidth = 0; | ||
while (value <= max_level) { | ||
for (int i = 0; i < repeat_count; i++) { | ||
input_levels[total_count++] = value; | ||
} | ||
value = (2 << bwidth) - 1; | ||
bwidth++; | ||
} | ||
} | ||
return total_count; | ||
} | ||
|
||
void VerifyLevelsEncoding(parquet::Encoding::type encoding, int max_level, | ||
std::vector<int16_t>& input_levels) { | ||
LevelEncoder encoder; | ||
LevelDecoder decoder; | ||
int levels_count = 0; | ||
std::vector<int16_t> output_levels; | ||
std::vector<uint8_t> bytes; | ||
int num_levels = input_levels.size(); | ||
output_levels.resize(num_levels); | ||
bytes.resize(2 * num_levels); | ||
ASSERT_EQ(num_levels, output_levels.size()); | ||
ASSERT_EQ(2 * num_levels, bytes.size()); | ||
// start encoding and decoding | ||
if (encoding == parquet::Encoding::RLE) { | ||
// leave space to write the rle length value | ||
encoder.Init(encoding, max_level, num_levels, | ||
bytes.data() + sizeof(uint32_t), bytes.size()); | ||
|
||
levels_count = encoder.Encode(num_levels, input_levels.data()); | ||
(reinterpret_cast<uint32_t*>(bytes.data()))[0] = encoder.len(); | ||
|
||
} else { | ||
encoder.Init(encoding, max_level, num_levels, | ||
bytes.data(), bytes.size()); | ||
levels_count = encoder.Encode(num_levels, input_levels.data()); | ||
} | ||
|
||
ASSERT_EQ(num_levels, levels_count); | ||
|
||
decoder.Init(encoding, max_level, num_levels, bytes.data()); | ||
levels_count = decoder.Decode(num_levels, output_levels.data()); | ||
|
||
ASSERT_EQ(num_levels, levels_count); | ||
|
||
for (int i = 0; i < num_levels; i++) { | ||
EXPECT_EQ(input_levels[i], output_levels[i]); | ||
} | ||
} | ||
}; | ||
|
||
// test levels with maximum bit-width from 1 to 8 | ||
// increase the repetition count for each iteration by a factor of 2 | ||
TEST_F(TestLevels, TestEncodeDecodeLevels) { | ||
int min_repeat_factor = 0; | ||
int max_repeat_factor = 7; // 128 | ||
int max_bit_width = 8; | ||
std::vector<int16_t> input_levels; | ||
parquet::Encoding::type encodings[2] = {parquet::Encoding::RLE, | ||
parquet::Encoding::BIT_PACKED}; | ||
|
||
// for each encoding | ||
for (int encode = 0; encode < 2; encode++) { | ||
parquet::Encoding::type encoding = encodings[encode]; | ||
// BIT_PACKED requires a sequence of atleast 8 | ||
if (encoding == parquet::Encoding::BIT_PACKED) min_repeat_factor = 3; | ||
|
||
// for each maximum bit-width | ||
for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) { | ||
int num_levels_per_width = ((2 << max_repeat_factor) - (1 << min_repeat_factor)); | ||
int num_levels = (bit_width + 1) * num_levels_per_width; | ||
input_levels.resize(num_levels); | ||
ASSERT_EQ(num_levels, input_levels.size()); | ||
|
||
// find the maximum level for the current bit_width | ||
int max_level = (1 << bit_width) - 1; | ||
// Generate levels | ||
int total_count = GenerateLevels(min_repeat_factor, max_repeat_factor, | ||
max_level, input_levels); | ||
ASSERT_EQ(num_levels, total_count); | ||
VerifyLevelsEncoding(encoding, max_level, input_levels); | ||
} | ||
} | ||
} | ||
|
||
} // namespace parquet_cpp |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#ifndef PARQUET_COLUMN_LEVELS_H | ||
#define PARQUET_COLUMN_LEVELS_H | ||
|
||
#include "parquet/exception.h" | ||
#include "parquet/thrift/parquet_types.h" | ||
#include "parquet/encodings/encodings.h" | ||
#include "parquet/util/rle-encoding.h" | ||
|
||
namespace parquet_cpp { | ||
|
||
class LevelEncoder { | ||
public: | ||
LevelEncoder() {} | ||
|
||
// Initialize the LevelEncoder. | ||
void Init(parquet::Encoding::type encoding, int16_t max_level, | ||
int num_buffered_values, uint8_t* data, int data_size) { | ||
bit_width_ = BitUtil::Log2(max_level + 1); | ||
encoding_ = encoding; | ||
switch (encoding) { | ||
case parquet::Encoding::RLE: { | ||
rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_)); | ||
break; | ||
} | ||
case parquet::Encoding::BIT_PACKED: { | ||
int num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8); | ||
bit_packed_encoder_.reset(new BitWriter(data, num_bytes)); | ||
break; | ||
} | ||
default: | ||
throw ParquetException("Unknown encoding type for levels."); | ||
} | ||
} | ||
|
||
// Encodes a batch of levels from an array and returns the number of levels encoded | ||
size_t Encode(size_t batch_size, const int16_t* levels) { | ||
size_t num_encoded = 0; | ||
if (!rle_encoder_ && !bit_packed_encoder_) { | ||
throw ParquetException("Level encoders are not initialized."); | ||
} | ||
|
||
if (encoding_ == parquet::Encoding::RLE) { | ||
for (size_t i = 0; i < batch_size; ++i) { | ||
if (!rle_encoder_->Put(*(levels + i))) { | ||
break; | ||
} | ||
++num_encoded; | ||
} | ||
rle_encoder_->Flush(); | ||
rle_length_ = rle_encoder_->len(); | ||
} else { | ||
for (size_t i = 0; i < batch_size; ++i) { | ||
if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) { | ||
break; | ||
} | ||
++num_encoded; | ||
} | ||
bit_packed_encoder_->Flush(); | ||
} | ||
return num_encoded; | ||
} | ||
|
||
int32_t len() { | ||
assert(encoding_ == parquet::Encoding::RLE); | ||
return rle_length_; | ||
} | ||
|
||
private: | ||
int bit_width_; | ||
int rle_length_; | ||
parquet::Encoding::type encoding_; | ||
std::unique_ptr<RleEncoder> rle_encoder_; | ||
std::unique_ptr<BitWriter> bit_packed_encoder_; | ||
}; | ||
|
||
|
||
class LevelDecoder { | ||
public: | ||
LevelDecoder() {} | ||
|
||
// Initialize the LevelDecoder and return the number of bytes consumed | ||
size_t Init(parquet::Encoding::type encoding, int16_t max_level, | ||
int num_buffered_values, const uint8_t* data) { | ||
uint32_t num_bytes = 0; | ||
uint32_t total_bytes = 0; | ||
bit_width_ = BitUtil::Log2(max_level + 1); | ||
encoding_ = encoding; | ||
switch (encoding) { | ||
case parquet::Encoding::RLE: { | ||
num_bytes = *reinterpret_cast<const uint32_t*>(data); | ||
const uint8_t* decoder_data = data + sizeof(uint32_t); | ||
rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_)); | ||
return sizeof(uint32_t) + num_bytes; | ||
} | ||
case parquet::Encoding::BIT_PACKED: { | ||
num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8); | ||
bit_packed_decoder_.reset(new BitReader(data, num_bytes)); | ||
return num_bytes; | ||
} | ||
default: | ||
throw ParquetException("Unknown encoding type for levels."); | ||
} | ||
return -1; | ||
} | ||
|
||
// Decodes a batch of levels into an array and returns the number of levels decoded | ||
size_t Decode(size_t batch_size, int16_t* levels) { | ||
size_t num_decoded = 0; | ||
if (!rle_decoder_ && !bit_packed_decoder_) { | ||
throw ParquetException("Level decoders are not initialized."); | ||
} | ||
|
||
if (encoding_ == parquet::Encoding::RLE) { | ||
for (size_t i = 0; i < batch_size; ++i) { | ||
if (!rle_decoder_->Get(levels + i)) { | ||
break; | ||
} | ||
++num_decoded; | ||
} | ||
} else { | ||
for (size_t i = 0; i < batch_size; ++i) { | ||
if (!bit_packed_decoder_->GetValue(bit_width_, levels + i)) { | ||
break; | ||
} | ||
++num_decoded; | ||
} | ||
} | ||
return num_decoded; | ||
} | ||
|
||
private: | ||
int bit_width_; | ||
parquet::Encoding::type encoding_; | ||
std::unique_ptr<RleDecoder> rle_decoder_; | ||
std::unique_ptr<BitReader> bit_packed_decoder_; | ||
}; | ||
|
||
} // namespace parquet_cpp | ||
#endif // PARQUET_COLUMN_LEVELS_H |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.