Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
PARQUET-169: Implement support for bulk reading and writing rep/def l…
Browse files Browse the repository at this point in the history
…evels

Added a LevelDecoder and LevelEncoder class to read and write batches of def/rep levels.
Added tests to verify the functionality.

Author: Deepak Majeti <deepak.majeti@hp.com>

Closes apache#30 from majetideepak/master and squashes the following commits:

18d7e51 [Deepak Majeti] fixed argument order of asserts inside test cases
5e0000b [Deepak Majeti] PARQUET-169: Implement support for reading repetition and definition levels

Change-Id: Ie3ed4ac5c5ceabe60b095d3b5eab45941bd71698
Deepak Majeti authored and nongli committed Feb 11, 2016

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 92cecf8 commit e9efa7a
Showing 10 changed files with 393 additions and 71 deletions.
2 changes: 2 additions & 0 deletions cpp/src/parquet/column/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -18,9 +18,11 @@
# Headers: top level
install(FILES
page.h
levels.h
reader.h
serialized-page.h
scanner.h
DESTINATION include/parquet/column)

ADD_PARQUET_TEST(column-reader-test)
ADD_PARQUET_TEST(levels-test)
59 changes: 56 additions & 3 deletions cpp/src/parquet/column/column-reader-test.cc
Original file line number Diff line number Diff line change
@@ -124,9 +124,6 @@ TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {

Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());

std::vector<int32_t> vexpected;
std::vector<int16_t> dexpected;

size_t values_read = 0;
size_t batch_actual = 0;

@@ -157,6 +154,62 @@ TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
ASSERT_EQ(0, values_read);
}

TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
vector<int32_t> values = {1, 2, 3, 4, 5};
vector<int16_t> def_levels = {2, 1, 1, 2, 2, 1, 1, 2, 2, 1};
vector<int16_t> rep_levels = {0, 1, 1, 0, 0, 1, 1, 0, 0, 1};

size_t num_values = values.size();
parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN;

vector<uint8_t> page1;
test::DataPageBuilder<Type::INT32> page_builder(&page1);

// Definition levels precede the values
page_builder.AppendRepLevels(rep_levels, 1, parquet::Encoding::RLE);
page_builder.AppendDefLevels(def_levels, 2, parquet::Encoding::RLE);
page_builder.AppendValues(values, parquet::Encoding::PLAIN);

pages_.push_back(page_builder.Finish());

NodePtr type = schema::Int32("a", Repetition::REPEATED);
ColumnDescriptor descr(type, 2, 1);
InitReader(&descr);

Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());

size_t values_read = 0;
size_t batch_actual = 0;

vector<int32_t> vresult(3, -1);
vector<int16_t> dresult(5, -1);
vector<int16_t> rresult(5, -1);

batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
&vresult[0], &values_read);
ASSERT_EQ(5, batch_actual);
ASSERT_EQ(3, values_read);

ASSERT_TRUE(vector_equal(vresult, slice(values, 0, 3)));
ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 0, 5)));
ASSERT_TRUE(vector_equal(rresult, slice(rep_levels, 0, 5)));

batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
&vresult[0], &values_read);
ASSERT_EQ(5, batch_actual);
ASSERT_EQ(2, values_read);

ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values, 3, 5)));
ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 5, 10)));
ASSERT_TRUE(vector_equal(rresult, slice(rep_levels, 5, 10)));

// EOS, pass all nullptrs to check for improper writes. Do not segfault /
// core dump
batch_actual = reader->ReadBatch(5, nullptr, nullptr,
nullptr, &values_read);
ASSERT_EQ(0, batch_actual);
ASSERT_EQ(0, values_read);
}
} // namespace test

} // namespace parquet_cpp
129 changes: 129 additions & 0 deletions cpp/src/parquet/column/levels-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <cstdlib>
#include <iostream>
#include <sstream>
#include <string>

#include <gtest/gtest.h>

#include "parquet/thrift/parquet_types.h"
#include "parquet/column/levels.h"

using std::string;

namespace parquet_cpp {

class TestLevels : public ::testing::Test {
public:
int GenerateLevels(int min_repeat_factor, int max_repeat_factor,
int max_level, std::vector<int16_t>& input_levels) {
int total_count = 0;
// for each repetition count upto max_repeat_factor
for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) {
// repeat count increase by a factor of 2 for every iteration
int repeat_count = (1 << repeat);
// generate levels for repetition count upto the maximum level
int value = 0;
int bwidth = 0;
while (value <= max_level) {
for (int i = 0; i < repeat_count; i++) {
input_levels[total_count++] = value;
}
value = (2 << bwidth) - 1;
bwidth++;
}
}
return total_count;
}

void VerifyLevelsEncoding(parquet::Encoding::type encoding, int max_level,
std::vector<int16_t>& input_levels) {
LevelEncoder encoder;
LevelDecoder decoder;
int levels_count = 0;
std::vector<int16_t> output_levels;
std::vector<uint8_t> bytes;
int num_levels = input_levels.size();
output_levels.resize(num_levels);
bytes.resize(2 * num_levels);
ASSERT_EQ(num_levels, output_levels.size());
ASSERT_EQ(2 * num_levels, bytes.size());
// start encoding and decoding
if (encoding == parquet::Encoding::RLE) {
// leave space to write the rle length value
encoder.Init(encoding, max_level, num_levels,
bytes.data() + sizeof(uint32_t), bytes.size());

levels_count = encoder.Encode(num_levels, input_levels.data());
(reinterpret_cast<uint32_t*>(bytes.data()))[0] = encoder.len();

} else {
encoder.Init(encoding, max_level, num_levels,
bytes.data(), bytes.size());
levels_count = encoder.Encode(num_levels, input_levels.data());
}

ASSERT_EQ(num_levels, levels_count);

decoder.Init(encoding, max_level, num_levels, bytes.data());
levels_count = decoder.Decode(num_levels, output_levels.data());

ASSERT_EQ(num_levels, levels_count);

for (int i = 0; i < num_levels; i++) {
EXPECT_EQ(input_levels[i], output_levels[i]);
}
}
};

// test levels with maximum bit-width from 1 to 8
// increase the repetition count for each iteration by a factor of 2
TEST_F(TestLevels, TestEncodeDecodeLevels) {
int min_repeat_factor = 0;
int max_repeat_factor = 7; // 128
int max_bit_width = 8;
std::vector<int16_t> input_levels;
parquet::Encoding::type encodings[2] = {parquet::Encoding::RLE,
parquet::Encoding::BIT_PACKED};

// for each encoding
for (int encode = 0; encode < 2; encode++) {
parquet::Encoding::type encoding = encodings[encode];
// BIT_PACKED requires a sequence of atleast 8
if (encoding == parquet::Encoding::BIT_PACKED) min_repeat_factor = 3;

// for each maximum bit-width
for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) {
int num_levels_per_width = ((2 << max_repeat_factor) - (1 << min_repeat_factor));
int num_levels = (bit_width + 1) * num_levels_per_width;
input_levels.resize(num_levels);
ASSERT_EQ(num_levels, input_levels.size());

// find the maximum level for the current bit_width
int max_level = (1 << bit_width) - 1;
// Generate levels
int total_count = GenerateLevels(min_repeat_factor, max_repeat_factor,
max_level, input_levels);
ASSERT_EQ(num_levels, total_count);
VerifyLevelsEncoding(encoding, max_level, input_levels);
}
}
}

} // namespace parquet_cpp
156 changes: 156 additions & 0 deletions cpp/src/parquet/column/levels.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef PARQUET_COLUMN_LEVELS_H
#define PARQUET_COLUMN_LEVELS_H

#include "parquet/exception.h"
#include "parquet/thrift/parquet_types.h"
#include "parquet/encodings/encodings.h"
#include "parquet/util/rle-encoding.h"

namespace parquet_cpp {

class LevelEncoder {
public:
LevelEncoder() {}

// Initialize the LevelEncoder.
void Init(parquet::Encoding::type encoding, int16_t max_level,
int num_buffered_values, uint8_t* data, int data_size) {
bit_width_ = BitUtil::Log2(max_level + 1);
encoding_ = encoding;
switch (encoding) {
case parquet::Encoding::RLE: {
rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_));
break;
}
case parquet::Encoding::BIT_PACKED: {
int num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8);
bit_packed_encoder_.reset(new BitWriter(data, num_bytes));
break;
}
default:
throw ParquetException("Unknown encoding type for levels.");
}
}

// Encodes a batch of levels from an array and returns the number of levels encoded
size_t Encode(size_t batch_size, const int16_t* levels) {
size_t num_encoded = 0;
if (!rle_encoder_ && !bit_packed_encoder_) {
throw ParquetException("Level encoders are not initialized.");
}

if (encoding_ == parquet::Encoding::RLE) {
for (size_t i = 0; i < batch_size; ++i) {
if (!rle_encoder_->Put(*(levels + i))) {
break;
}
++num_encoded;
}
rle_encoder_->Flush();
rle_length_ = rle_encoder_->len();
} else {
for (size_t i = 0; i < batch_size; ++i) {
if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) {
break;
}
++num_encoded;
}
bit_packed_encoder_->Flush();
}
return num_encoded;
}

int32_t len() {
assert(encoding_ == parquet::Encoding::RLE);
return rle_length_;
}

private:
int bit_width_;
int rle_length_;
parquet::Encoding::type encoding_;
std::unique_ptr<RleEncoder> rle_encoder_;
std::unique_ptr<BitWriter> bit_packed_encoder_;
};


class LevelDecoder {
public:
LevelDecoder() {}

// Initialize the LevelDecoder and return the number of bytes consumed
size_t Init(parquet::Encoding::type encoding, int16_t max_level,
int num_buffered_values, const uint8_t* data) {
uint32_t num_bytes = 0;
uint32_t total_bytes = 0;
bit_width_ = BitUtil::Log2(max_level + 1);
encoding_ = encoding;
switch (encoding) {
case parquet::Encoding::RLE: {
num_bytes = *reinterpret_cast<const uint32_t*>(data);
const uint8_t* decoder_data = data + sizeof(uint32_t);
rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_));
return sizeof(uint32_t) + num_bytes;
}
case parquet::Encoding::BIT_PACKED: {
num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8);
bit_packed_decoder_.reset(new BitReader(data, num_bytes));
return num_bytes;
}
default:
throw ParquetException("Unknown encoding type for levels.");
}
return -1;
}

// Decodes a batch of levels into an array and returns the number of levels decoded
size_t Decode(size_t batch_size, int16_t* levels) {
size_t num_decoded = 0;
if (!rle_decoder_ && !bit_packed_decoder_) {
throw ParquetException("Level decoders are not initialized.");
}

if (encoding_ == parquet::Encoding::RLE) {
for (size_t i = 0; i < batch_size; ++i) {
if (!rle_decoder_->Get(levels + i)) {
break;
}
++num_decoded;
}
} else {
for (size_t i = 0; i < batch_size; ++i) {
if (!bit_packed_decoder_->GetValue(bit_width_, levels + i)) {
break;
}
++num_decoded;
}
}
return num_decoded;
}

private:
int bit_width_;
parquet::Encoding::type encoding_;
std::unique_ptr<RleDecoder> rle_decoder_;
std::unique_ptr<BitReader> bit_packed_decoder_;
};

} // namespace parquet_cpp
#endif // PARQUET_COLUMN_LEVELS_H
8 changes: 8 additions & 0 deletions cpp/src/parquet/column/page.h
Original file line number Diff line number Diff line change
@@ -84,6 +84,14 @@ class DataPage : public Page {
return header_.encoding;
}

parquet::Encoding::type repetition_level_encoding() const {
return header_.repetition_level_encoding;
}

parquet::Encoding::type definition_level_encoding() const {
return header_.definition_level_encoding;
}

private:
parquet::DataPageHeader header_;
};
67 changes: 24 additions & 43 deletions cpp/src/parquet/column/reader.cc
Original file line number Diff line number Diff line change
@@ -59,18 +59,6 @@ void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) {
current_decoder_ = decoders_[encoding].get();
}


static size_t InitializeLevelDecoder(const uint8_t* buffer,
int16_t max_level, std::unique_ptr<RleDecoder>& decoder) {
int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer);

decoder.reset(new RleDecoder(buffer + sizeof(uint32_t),
num_definition_bytes,
BitUtil::NumRequiredBits(max_level)));

return sizeof(uint32_t) + num_definition_bytes;
}

// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
// encoding.
static bool IsDictionaryIndexEncoding(const parquet::Encoding::type& e) {
@@ -109,24 +97,30 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
// the page size to determine the number of bytes in the encoded data.
size_t data_size = page->size();

max_definition_level_ = descr_->max_definition_level();

// Read definition levels.
if (max_definition_level_ > 0) {
// Temporary hack until schema resolution implemented

size_t def_levels_bytes = InitializeLevelDecoder(buffer,
max_definition_level_, definition_level_decoder_);

int16_t max_definition_level = descr_->max_definition_level();
int16_t max_repetition_level = descr_->max_repetition_level();
//Data page Layout: Repetition Levels - Definition Levels - encoded values.
//Levels are encoded as rle or bit-packed.
//Init repetition levels
if (max_repetition_level > 0) {
size_t rep_levels_bytes = repetition_level_decoder_.Init(
page->repetition_level_encoding(),
max_repetition_level, num_buffered_values_, buffer);
buffer += rep_levels_bytes;
data_size -= rep_levels_bytes;
}
//TODO figure a way to set max_definition_level_ to 0
//if the initial value is invalid

//Init definition levels
if (max_definition_level > 0) {
size_t def_levels_bytes = definition_level_decoder_.Init(
page->definition_level_encoding(),
max_definition_level, num_buffered_values_, buffer);
buffer += def_levels_bytes;
data_size -= def_levels_bytes;
} else {
// REQUIRED field
max_definition_level_ = 0;
}

// TODO: repetition levels

// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
parquet::Encoding::type encoding = page->encoding();
@@ -172,31 +166,18 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
// ----------------------------------------------------------------------
// Batch read APIs

static size_t DecodeMany(RleDecoder* decoder, int16_t* levels, size_t batch_size) {
size_t num_decoded = 0;

// TODO(wesm): Push this decoding down into RleDecoder itself
for (size_t i = 0; i < batch_size; ++i) {
if (!decoder->Get(levels + i)) {
break;
}
++num_decoded;
}
return num_decoded;
}

size_t ColumnReader::ReadDefinitionLevels(size_t batch_size, int16_t* levels) {
if (!definition_level_decoder_) {
if (descr_->max_definition_level() == 0) {
return 0;
}
return DecodeMany(definition_level_decoder_.get(), levels, batch_size);
return definition_level_decoder_.Decode(batch_size, levels);
}

size_t ColumnReader::ReadRepetitionLevels(size_t batch_size, int16_t* levels) {
if (!repetition_level_decoder_) {
if (descr_->max_repetition_level() == 0) {
return 0;
}
return DecodeMany(repetition_level_decoder_.get(), levels, batch_size);
return repetition_level_decoder_.Decode(batch_size, levels);
}

// ----------------------------------------------------------------------
17 changes: 7 additions & 10 deletions cpp/src/parquet/column/reader.h
Original file line number Diff line number Diff line change
@@ -33,9 +33,11 @@
#include "parquet/encodings/encodings.h"
#include "parquet/schema/descriptor.h"
#include "parquet/util/rle-encoding.h"
#include "parquet/column/levels.h"

namespace parquet_cpp {


class Codec;
class Scanner;

@@ -85,13 +87,10 @@ class ColumnReader {
std::shared_ptr<Page> current_page_;

// Not set if full schema for this field has no optional or repeated elements
std::unique_ptr<RleDecoder> definition_level_decoder_;
LevelDecoder definition_level_decoder_;

// Not set for flat schemas.
std::unique_ptr<RleDecoder> repetition_level_decoder_;

// Temporarily storing this to assist with batch reading
int16_t max_definition_level_;
LevelDecoder repetition_level_decoder_;

// The total number of values stored in the data page. This is the maximum of
// the number of encoded definition levels or encoded values. For
@@ -182,13 +181,12 @@ inline size_t TypedColumnReader<TYPE>::ReadBatch(int batch_size, int16_t* def_le
size_t values_to_read = 0;

// If the field is required and non-repeated, there are no definition levels
if (definition_level_decoder_) {
if (descr_->max_definition_level() > 0) {
num_def_levels = ReadDefinitionLevels(batch_size, def_levels);

// TODO(wesm): this tallying of values-to-decode can be performed with better
// cache-efficiency if fused with the level decoding.
for (size_t i = 0; i < num_def_levels; ++i) {
if (def_levels[i] == max_definition_level_) {
if (def_levels[i] == descr_->max_definition_level()) {
++values_to_read;
}
}
@@ -198,9 +196,8 @@ inline size_t TypedColumnReader<TYPE>::ReadBatch(int batch_size, int16_t* def_le
}

// Not present for non-repeated fields
if (repetition_level_decoder_) {
if (descr_->max_repetition_level() > 0) {
num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels);

if (num_def_levels != num_rep_levels) {
throw ParquetException("Number of decoded rep / def levels did not match");
}
19 changes: 8 additions & 11 deletions cpp/src/parquet/column/test-util.h
Original file line number Diff line number Diff line change
@@ -155,22 +155,19 @@ class DataPageBuilder {
// TODO: compute a more precise maximum size for the encoded levels
std::vector<uint8_t> encode_buffer(DEFAULT_DATA_PAGE_SIZE);

RleEncoder encoder(&encode_buffer[0], encode_buffer.size(),
BitUtil::NumRequiredBits(max_level));

// TODO(wesm): push down vector encoding
for (int16_t level : levels) {
if (!encoder.Put(level)) {
throw ParquetException("out of space");
}
}

uint32_t rle_bytes = encoder.Flush();
LevelEncoder encoder;
encoder.Init(encoding, max_level, levels.size(),
encode_buffer.data(), encode_buffer.size());

encoder.Encode(levels.size(), levels.data());

uint32_t rle_bytes = encoder.len();
size_t levels_footprint = sizeof(uint32_t) + rle_bytes;
Reserve(levels_footprint);

*reinterpret_cast<uint32_t*>(Head()) = rle_bytes;
memcpy(Head() + sizeof(uint32_t), encoder.buffer(), rle_bytes);
memcpy(Head() + sizeof(uint32_t), encode_buffer.data(), rle_bytes);
buffer_size_ += levels_footprint;
}
};
1 change: 0 additions & 1 deletion cpp/src/parquet/reader-test.cc
Original file line number Diff line number Diff line change
@@ -89,7 +89,6 @@ TEST_F(TestAllTypesPlain, TestFlatScannerInt32) {

// column 0, id
std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));

int32_t val;
bool is_null;
for (size_t i = 0; i < 8; ++i) {
6 changes: 3 additions & 3 deletions cpp/src/parquet/reader.cc
Original file line number Diff line number Diff line change
@@ -242,10 +242,10 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
<< ")" << std::endl;
}

for (int i = 0; i < num_row_groups(); ++i) {
stream << "--- Row Group " << i << " ---\n";
for (int r = 0; r < num_row_groups(); ++r) {
stream << "--- Row Group " << r << " ---\n";

RowGroupReader* group_reader = RowGroup(i);
RowGroupReader* group_reader = RowGroup(r);

// Print column metadata
size_t num_columns = group_reader->num_columns();

0 comments on commit e9efa7a

Please sign in to comment.