Skip to content

Commit

Permalink
Add DataPage base class for DataPageV1 and DataPageV2
Browse files Browse the repository at this point in the history
  • Loading branch information
wesm committed Mar 6, 2019
1 parent 8df8328 commit c5cb0f3
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 70 deletions.
47 changes: 21 additions & 26 deletions cpp/src/parquet/arrow/record_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace BitUtil = ::arrow::BitUtil;

// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
// encoding.
static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
static bool IsDictionaryIndexEncoding(Encoding::type e) {
return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
}

Expand Down Expand Up @@ -86,7 +86,7 @@ class RecordReader::RecordReaderImpl {

virtual ~RecordReaderImpl() = default;

virtual int64_t ReadRecordData(const int64_t num_records) = 0;
virtual int64_t ReadRecordData(int64_t num_records) = 0;

// Returns true if there are still values in this column.
bool HasNext() {
Expand Down Expand Up @@ -494,7 +494,7 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
}

// Return number of logical records read
int64_t ReadRecordData(const int64_t num_records) override {
int64_t ReadRecordData(int64_t num_records) override {
// Conservative upper bound
const int64_t possible_num_values =
std::max(num_records, levels_written_ - levels_position_);
Expand Down Expand Up @@ -581,14 +581,11 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
DecoderType* current_decoder_;

// Initialize repetition and definition level decoders on the next data page.
template <typename PageType>
int64_t InitializeLevelDecoders(const std::shared_ptr<PageType> page,
const Encoding::type repetition_level_encoding,
const Encoding::type definition_level_encoding);
int64_t InitializeLevelDecoders(const DataPage& page,
Encoding::type repetition_level_encoding,
Encoding::type definition_level_encoding);

template <typename PageType>
void InitializeDataDecoder(const std::shared_ptr<PageType> page,
const int64_t levels_bytes);
void InitializeDataDecoder(const DataPage& page, int64_t levels_bytes);

// Advance to the next data page
bool ReadNewPage() override;
Expand Down Expand Up @@ -731,17 +728,16 @@ inline void TypedRecordReader<DType>::ConfigureDictionary(const DictionaryPage*
// initialize the level decoders and return the number of encoded level bytes.
// The return value helps determine the number of bytes in the encoded data.
template <typename DType>
template <typename PageType>
int64_t TypedRecordReader<DType>::InitializeLevelDecoders(
const std::shared_ptr<PageType> page, const Encoding::type repetition_level_encoding,
const Encoding::type definition_level_encoding) {
const DataPage& page, Encoding::type repetition_level_encoding,
Encoding::type definition_level_encoding) {
// Read a data page.
num_buffered_values_ = page->num_values();
num_buffered_values_ = page.num_values();

// Have not decoded any values from the data page yet
num_decoded_values_ = 0;

const uint8_t* buffer = page->data();
const uint8_t* buffer = page.data();
int64_t levels_byte_size = 0;

// Data page Layout: Repetition Levels - Definition Levels - encoded values.
Expand Down Expand Up @@ -771,13 +767,12 @@ int64_t TypedRecordReader<DType>::InitializeLevelDecoders(
// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
template <typename DType>
template <typename PageType>
void TypedRecordReader<DType>::InitializeDataDecoder(const std::shared_ptr<PageType> page,
const int64_t levels_byte_size) {
const uint8_t* buffer = page->data() + levels_byte_size;
const int64_t data_size = page->size() - levels_byte_size;
void TypedRecordReader<DType>::InitializeDataDecoder(const DataPage& page,
int64_t levels_byte_size) {
const uint8_t* buffer = page.data() + levels_byte_size;
const int64_t data_size = page.size() - levels_byte_size;

Encoding::type encoding = page->encoding();
Encoding::type encoding = page.encoding();

if (IsDictionaryIndexEncoding(encoding)) {
encoding = Encoding::RLE_DICTIONARY;
Expand Down Expand Up @@ -828,18 +823,18 @@ bool TypedRecordReader<DType>::ReadNewPage() {
ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
continue;
} else if (current_page_->type() == PageType::DATA_PAGE) {
const auto page = std::static_pointer_cast<DataPage>(current_page_);
const auto page = std::static_pointer_cast<DataPageV1>(current_page_);
const int64_t levels_byte_size = InitializeLevelDecoders(
page, page->repetition_level_encoding(), page->definition_level_encoding());
InitializeDataDecoder(page, levels_byte_size);
*page, page->repetition_level_encoding(), page->definition_level_encoding());
InitializeDataDecoder(*page, levels_byte_size);
return true;
} else if (current_page_->type() == PageType::DATA_PAGE_V2) {
const auto page = std::static_pointer_cast<DataPageV2>(current_page_);
// Repetition and definition levels are always encoded using RLE encoding
// in the DataPageV2 format.
const int64_t levels_byte_size =
InitializeLevelDecoders(page, Encoding::RLE, Encoding::RLE);
InitializeDataDecoder(page, levels_byte_size);
InitializeLevelDecoders(*page, Encoding::RLE, Encoding::RLE);
InitializeDataDecoder(*page, levels_byte_size);
return true;
} else {
// We don't know what this page type is. We're allowed to skip non-data
Expand Down
53 changes: 27 additions & 26 deletions cpp/src/parquet/column_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,45 +59,54 @@ class Page {
PageType::type type_;
};

/// \brief Base type for DataPageV1 and DataPageV2 including common attributes
class DataPage : public Page {
public:
DataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding,
int32_t num_values() const { return num_values_; }
Encoding::type encoding() const { return encoding_; }
const EncodedStatistics& statistics() const { return statistics_; }

protected:
DataPage(PageType::type type, const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding,
const EncodedStatistics& statistics = EncodedStatistics())
: Page(buffer, PageType::DATA_PAGE),
: Page(buffer, type),
num_values_(num_values),
encoding_(encoding),
definition_level_encoding_(definition_level_encoding),
repetition_level_encoding_(repetition_level_encoding),
statistics_(statistics) {}

int32_t num_values() const { return num_values_; }
int32_t num_values_;
Encoding::type encoding_;
EncodedStatistics statistics_;
};

Encoding::type encoding() const { return encoding_; }
class DataPageV1 : public DataPage {
public:
DataPageV1(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding,
const EncodedStatistics& statistics = EncodedStatistics())
: DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, statistics),
definition_level_encoding_(definition_level_encoding),
repetition_level_encoding_(repetition_level_encoding) {}

Encoding::type repetition_level_encoding() const { return repetition_level_encoding_; }

Encoding::type definition_level_encoding() const { return definition_level_encoding_; }

const EncodedStatistics& statistics() const { return statistics_; }

private:
int32_t num_values_;
Encoding::type encoding_;
Encoding::type definition_level_encoding_;
Encoding::type repetition_level_encoding_;
EncodedStatistics statistics_;
};

class CompressedDataPage : public DataPage {
class CompressedDataPage : public DataPageV1 {
public:
CompressedDataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding, int64_t uncompressed_size,
const EncodedStatistics& statistics = EncodedStatistics())
: DataPage(buffer, num_values, encoding, definition_level_encoding,
repetition_level_encoding, statistics),
: DataPageV1(buffer, num_values, encoding, definition_level_encoding,
repetition_level_encoding, statistics),
uncompressed_size_(uncompressed_size) {}

int64_t uncompressed_size() const { return uncompressed_size_; }
Expand All @@ -106,40 +115,32 @@ class CompressedDataPage : public DataPage {
int64_t uncompressed_size_;
};

class DataPageV2 : public Page {
class DataPageV2 : public DataPage {
public:
DataPageV2(const std::shared_ptr<Buffer>& buffer, int32_t num_values, int32_t num_nulls,
int32_t num_rows, Encoding::type encoding,
int32_t definition_levels_byte_length, int32_t repetition_levels_byte_length,
bool is_compressed = false)
: Page(buffer, PageType::DATA_PAGE_V2),
num_values_(num_values),
: DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding),
num_nulls_(num_nulls),
num_rows_(num_rows),
encoding_(encoding),
definition_levels_byte_length_(definition_levels_byte_length),
repetition_levels_byte_length_(repetition_levels_byte_length),
is_compressed_(is_compressed) {}

int32_t num_values() const { return num_values_; }

int32_t num_nulls() const { return num_nulls_; }

int32_t num_rows() const { return num_rows_; }

Encoding::type encoding() const { return encoding_; }

int32_t definition_levels_byte_length() const { return definition_levels_byte_length_; }

int32_t repetition_levels_byte_length() const { return repetition_levels_byte_length_; }

bool is_compressed() const { return is_compressed_; }

private:
int32_t num_values_;
int32_t num_nulls_;
int32_t num_rows_;
Encoding::type encoding_;
int32_t definition_levels_byte_length_;
int32_t repetition_levels_byte_length_;
bool is_compressed_;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_reader-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {

shared_ptr<DictionaryPage> dict_page =
std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(
shared_ptr<DataPageV1> data_page = MakeDataPage<Int32Type>(
&descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0);
pages_.push_back(dict_page);
pages_.push_back(data_page);
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {

seen_num_rows_ += header.num_values;

return std::make_shared<DataPage>(
return std::make_shared<DataPageV1>(
page_buffer, header.num_values, FromThrift(header.encoding),
FromThrift(header.definition_level_encoding),
FromThrift(header.repetition_level_encoding), page_statistics);
Expand Down Expand Up @@ -613,27 +613,27 @@ bool TypedColumnReaderImpl<DType>::ReadNewPage() {
ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
continue;
} else if (current_page_->type() == PageType::DATA_PAGE) {
const DataPage* page = static_cast<const DataPage*>(current_page_.get());
const DataPageV1& page = static_cast<const DataPageV1&>(*current_page_);

// Read a data page.
num_buffered_values_ = page->num_values();
num_buffered_values_ = page.num_values();

// Have not decoded any values from the data page yet
num_decoded_values_ = 0;

buffer = page->data();
buffer = page.data();

// If the data page includes repetition and definition levels, we
// initialize the level decoder and subtract the encoded level bytes from
// the page size to determine the number of bytes in the encoded data.
int64_t data_size = page->size();
int64_t data_size = page.size();

// Data page Layout: Repetition Levels - Definition Levels - encoded values.
// Levels are encoded as rle or bit-packed.
// Init repetition levels
if (descr_->max_repetition_level() > 0) {
int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
page->repetition_level_encoding(), descr_->max_repetition_level(),
page.repetition_level_encoding(), descr_->max_repetition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += rep_levels_bytes;
data_size -= rep_levels_bytes;
Expand All @@ -644,15 +644,15 @@ bool TypedColumnReaderImpl<DType>::ReadNewPage() {
// Init definition levels
if (descr_->max_definition_level() > 0) {
int64_t def_levels_bytes = definition_level_decoder_.SetData(
page->definition_level_encoding(), descr_->max_definition_level(),
page.definition_level_encoding(), descr_->max_definition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += def_levels_bytes;
data_size -= def_levels_bytes;
}

// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
Encoding::type encoding = page->encoding();
Encoding::type encoding = page.encoding();

if (IsDictionaryIndexEncoding(encoding)) {
encoding = Encoding::RLE_DICTIONARY;
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/parquet/file-deserialize-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class TestPageSerde : public ::testing::Test {
void CheckDataPageHeader(const format::DataPageHeader expected, const Page* page) {
ASSERT_EQ(PageType::DATA_PAGE, page->type());

const DataPage* data_page = static_cast<const DataPage*>(page);
const DataPageV1* data_page = static_cast<const DataPageV1*>(page);
ASSERT_EQ(expected.num_values, data_page->num_values());
ASSERT_EQ(expected.encoding, data_page->encoding());
ASSERT_EQ(expected.definition_level_encoding, data_page->definition_level_encoding());
Expand Down Expand Up @@ -154,7 +154,7 @@ void CheckDataPageHeader(const format::DataPageHeaderV2 expected, const Page* pa
// TODO: Tests for DataPageHeaderV2 statistics.
}

TEST_F(TestPageSerde, DataPage) {
TEST_F(TestPageSerde, DataPageV1) {
format::PageHeader out_page_header;

int stats_size = 512;
Expand Down Expand Up @@ -264,11 +264,11 @@ TEST_F(TestPageSerde, Compression) {
InitSerializedPageReader(num_rows * num_pages, codec_type);

std::shared_ptr<Page> page;
const DataPage* data_page;
const DataPageV1* data_page;
for (int i = 0; i < num_pages; ++i) {
int data_size = static_cast<int>(faux_data[i].size());
page = page_reader_->NextPage();
data_page = static_cast<const DataPage*>(page.get());
data_page = static_cast<const DataPageV1*>(page.get());
ASSERT_EQ(data_size, data_page->size());
ASSERT_EQ(0, memcmp(faux_data[i].data(), data_page->data(), data_size));
}
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/parquet/test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void DataPageBuilder<BooleanType>::AppendValues(const ColumnDescriptor* d,
}

template <typename Type>
static shared_ptr<DataPage> MakeDataPage(
static shared_ptr<DataPageV1> MakeDataPage(
const ColumnDescriptor* d, const vector<typename Type::c_type>& values, int num_vals,
Encoding::type encoding, const uint8_t* indices, int indices_size,
const vector<int16_t>& def_levels, int16_t max_def_level,
Expand All @@ -243,9 +243,9 @@ static shared_ptr<DataPage> MakeDataPage(

auto buffer = page_stream.GetBuffer();

return std::make_shared<DataPage>(buffer, num_values, encoding,
page_builder.def_level_encoding(),
page_builder.rep_level_encoding());
return std::make_shared<DataPageV1>(buffer, num_values, encoding,
page_builder.def_level_encoding(),
page_builder.rep_level_encoding());
}

template <typename TYPE>
Expand Down Expand Up @@ -357,7 +357,7 @@ static void PaginateDict(const ColumnDescriptor* d,
rep_level_start = i * num_levels_per_page;
rep_level_end = (i + 1) * num_levels_per_page;
}
shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(
shared_ptr<DataPageV1> data_page = MakeDataPage<Int32Type>(
d, {}, values_per_page[i], encoding, rle_indices[i]->data(),
static_cast<int>(rle_indices[i]->size()),
slice(def_levels, def_level_start, def_level_end), max_def_level,
Expand Down

0 comments on commit c5cb0f3

Please sign in to comment.