Skip to content

Commit

Permalink
PARQUET-485: Decouple page deserialization from column reader to faci…
Browse files Browse the repository at this point in the history
…litate unit testing

Several things in this patch

* Adds PageReader abstraction, and a SerializedPageReader implementation
  according to the Parquet file format
* Adds a MockPageReader and a couple unit tests demonstrating end-to-end test
  without creating a Parquet file
* Adds a DataPageBuilder test fixture tool, may become part of the main write
  path later
* Adds PlainEncoder implementation for a few primitive types
* Fixes a few ColumnReader bugs exposed by the unit tests

Author: Wes McKinney <[email protected]>

Closes apache#32 from wesm/PARQUET-485 and squashes the following commits:

aa33078 [Wes McKinney] Fix function doc
e897a81 [Wes McKinney] Restore NumRequiredBits function after rebase
ee4d97a [Wes McKinney] Change PageReader::NextPage API to return shared_ptr<Page>(nullptr) on eos
0324021 [Wes McKinney] Clarify some comments
ec871c4 [Wes McKinney] Add include guards
e63bbdd [Wes McKinney] Move vector_equal to util/test-common.h
44a78a1 [Wes McKinney] Refactor to decouple page deserialization from column reader so that mock data pages cna be constructed in unit tests.

Change-Id: I7c4564cc7978d0cde430a7a2c6a2944ba01d80c8
  • Loading branch information
wesm authored and nongli committed Feb 2, 2016
1 parent 6f9af51 commit 76f4dfd
Show file tree
Hide file tree
Showing 14 changed files with 922 additions and 139 deletions.
4 changes: 4 additions & 0 deletions cpp/src/parquet/column/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

# Headers: top level
install(FILES
page.h
reader.h
serialized-page.h
scanner.h
DESTINATION include/parquet/column)

ADD_PARQUET_TEST(column-reader-test)
165 changes: 165 additions & 0 deletions cpp/src/parquet/column/column-reader-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <cstdint>
#include <cstdlib>
#include <iostream>
#include <sstream>
#include <string>
#include <vector>

#include <gtest/gtest.h>

#include "parquet/types.h"
#include "parquet/column/page.h"
#include "parquet/column/reader.h"
#include "parquet/column/test-util.h"

#include "parquet/util/test-common.h"

using std::string;
using std::vector;
using std::shared_ptr;
using parquet::FieldRepetitionType;
using parquet::SchemaElement;
using parquet::Encoding;
using parquet::Type;

namespace parquet_cpp {

namespace test {

class TestPrimitiveReader : public ::testing::Test {
public:
void SetUp() {}

void TearDown() {}

void InitReader(const SchemaElement* element) {
pager_.reset(new test::MockPageReader(pages_));
reader_ = ColumnReader::Make(element, std::move(pager_));
}

protected:
std::shared_ptr<ColumnReader> reader_;
std::unique_ptr<PageReader> pager_;
vector<shared_ptr<Page> > pages_;
};

template <typename T>
static vector<T> slice(const vector<T>& values, size_t start, size_t end) {
if (end < start) {
return vector<T>(0);
}

vector<T> out(end - start);
for (size_t i = start; i < end; ++i) {
out[i - start] = values[i];
}
return out;
}


TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
vector<int32_t> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
size_t num_values = values.size();
Encoding::type value_encoding = Encoding::PLAIN;

vector<uint8_t> page1;
test::DataPageBuilder<Type::INT32> page_builder(&page1);
page_builder.AppendValues(values, Encoding::PLAIN);
pages_.push_back(page_builder.Finish());

// TODO: simplify this
SchemaElement element;
element.__set_type(Type::INT32);
element.__set_repetition_type(FieldRepetitionType::REQUIRED);
InitReader(&element);

Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());

vector<int32_t> result(10, -1);

size_t values_read = 0;
size_t batch_actual = reader->ReadBatch(10, nullptr, nullptr,
&result[0], &values_read);
ASSERT_EQ(10, batch_actual);
ASSERT_EQ(10, values_read);

ASSERT_TRUE(vector_equal(result, values));
}

TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
vector<int32_t> values = {1, 2, 3, 4, 5};
vector<int16_t> def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1};

size_t num_values = values.size();
Encoding::type value_encoding = Encoding::PLAIN;

vector<uint8_t> page1;
test::DataPageBuilder<Type::INT32> page_builder(&page1);

// Definition levels precede the values
page_builder.AppendDefLevels(def_levels, 1, Encoding::RLE);
page_builder.AppendValues(values, Encoding::PLAIN);

pages_.push_back(page_builder.Finish());

// TODO: simplify this
SchemaElement element;
element.__set_type(Type::INT32);
element.__set_repetition_type(FieldRepetitionType::OPTIONAL);
InitReader(&element);

Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());

std::vector<int32_t> vexpected;
std::vector<int16_t> dexpected;

size_t values_read = 0;
size_t batch_actual = 0;

vector<int32_t> vresult(3, -1);
vector<int16_t> dresult(5, -1);

batch_actual = reader->ReadBatch(5, &dresult[0], nullptr,
&vresult[0], &values_read);
ASSERT_EQ(5, batch_actual);
ASSERT_EQ(3, values_read);

ASSERT_TRUE(vector_equal(vresult, slice(values, 0, 3)));
ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 0, 5)));

batch_actual = reader->ReadBatch(5, &dresult[0], nullptr,
&vresult[0], &values_read);
ASSERT_EQ(5, batch_actual);
ASSERT_EQ(2, values_read);

ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values, 3, 5)));
ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 5, 10)));

// EOS, pass all nullptrs to check for improper writes. Do not segfault /
// core dump
batch_actual = reader->ReadBatch(5, nullptr, nullptr,
nullptr, &values_read);
ASSERT_EQ(0, batch_actual);
ASSERT_EQ(0, values_read);
}

} // namespace test

} // namespace parquet_cpp
132 changes: 132 additions & 0 deletions cpp/src/parquet/column/page.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// This module defines an abstract interface for iterating through pages in a
// Parquet column chunk within a row group. It could be extended in the future
// to iterate through all data pages in all chunks in a file.

#ifndef PARQUET_COLUMN_PAGE_H
#define PARQUET_COLUMN_PAGE_H

#include "parquet/thrift/parquet_types.h"

namespace parquet_cpp {

// Note: Copying the specific page header Thrift metadata to the Page object
// (instead of using a pointer) presently so that data pages can be
// decompressed and processed in parallel. We can turn the header members of
// these classes into pointers at some point, but the downside is that
// applications materializing multiple data pages at once will have to have a
// data container that manages the lifetime of the deserialized
// parquet::PageHeader structs.
//
// TODO: Parallel processing is not yet safe because of memory-ownership
// semantics (the PageReader may or may not own the memory referenced by a
// page)
class Page {
// TODO(wesm): In the future Parquet implementations may store the crc code
// in parquet::PageHeader. parquet-mr currently does not, so we also skip it
// here, both on the read and write path
public:
Page(const uint8_t* buffer, size_t buffer_size, parquet::PageType::type type) :
buffer_(buffer),
buffer_size_(buffer_size),
type_(type) {}

parquet::PageType::type type() const {
return type_;
}

// @returns: a pointer to the page's data
const uint8_t* data() const {
return buffer_;
}

// @returns: the total size in bytes of the page's data buffer
size_t size() const {
return buffer_size_;
}

private:
const uint8_t* buffer_;
size_t buffer_size_;

parquet::PageType::type type_;
};


class DataPage : public Page {
public:
DataPage(const uint8_t* buffer, size_t buffer_size,
const parquet::DataPageHeader& header) :
Page(buffer, buffer_size, parquet::PageType::DATA_PAGE),
header_(header) {}

size_t num_values() const {
return header_.num_values;
}

parquet::Encoding::type encoding() const {
return header_.encoding;
}

private:
parquet::DataPageHeader header_;
};


class DataPageV2 : public Page {
public:
DataPageV2(const uint8_t* buffer, size_t buffer_size,
const parquet::DataPageHeaderV2& header) :
Page(buffer, buffer_size, parquet::PageType::DATA_PAGE_V2),
header_(header) {}

private:
parquet::DataPageHeaderV2 header_;
};


class DictionaryPage : public Page {
public:
DictionaryPage(const uint8_t* buffer, size_t buffer_size,
const parquet::DictionaryPageHeader& header) :
Page(buffer, buffer_size, parquet::PageType::DICTIONARY_PAGE),
header_(header) {}

size_t num_values() const {
return header_.num_values;
}

private:
parquet::DictionaryPageHeader header_;
};

// Abstract page iterator interface. This way, we can feed column pages to the
// ColumnReader through whatever mechanism we choose
class PageReader {
public:
virtual ~PageReader() {}

// @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
// containing new Page otherwise
virtual std::shared_ptr<Page> NextPage() = 0;
};

} // namespace parquet_cpp

#endif // PARQUET_COLUMN_PAGE_H
Loading

0 comments on commit 76f4dfd

Please sign in to comment.