forked from apache/arrow
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
PARQUET-485: Decouple page deserialization from column reader to faci…
…litate unit testing Several things in this patch * Adds PageReader abstraction, and a SerializedPageReader implementation according to the Parquet file format * Adds a MockPageReader and a couple unit tests demonstrating end-to-end test without creating a Parquet file * Adds a DataPageBuilder test fixture tool, may become part of the main write path later * Adds PlainEncoder implementation for a few primitive types * Fixes a few ColumnReader bugs exposed by the unit tests Author: Wes McKinney <[email protected]> Closes apache#32 from wesm/PARQUET-485 and squashes the following commits: aa33078 [Wes McKinney] Fix function doc e897a81 [Wes McKinney] Restore NumRequiredBits function after rebase ee4d97a [Wes McKinney] Change PageReader::NextPage API to return shared_ptr<Page>(nullptr) on eos 0324021 [Wes McKinney] Clarify some comments ec871c4 [Wes McKinney] Add include guards e63bbdd [Wes McKinney] Move vector_equal to util/test-common.h 44a78a1 [Wes McKinney] Refactor to decouple page deserialization from column reader so that mock data pages cna be constructed in unit tests.
- Loading branch information
Showing
14 changed files
with
922 additions
and
139 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#include <cstdint> | ||
#include <cstdlib> | ||
#include <iostream> | ||
#include <sstream> | ||
#include <string> | ||
#include <vector> | ||
|
||
#include <gtest/gtest.h> | ||
|
||
#include "parquet/types.h" | ||
#include "parquet/column/page.h" | ||
#include "parquet/column/reader.h" | ||
#include "parquet/column/test-util.h" | ||
|
||
#include "parquet/util/test-common.h" | ||
|
||
using std::string; | ||
using std::vector; | ||
using std::shared_ptr; | ||
using parquet::FieldRepetitionType; | ||
using parquet::SchemaElement; | ||
using parquet::Encoding; | ||
using parquet::Type; | ||
|
||
namespace parquet_cpp { | ||
|
||
namespace test { | ||
|
||
class TestPrimitiveReader : public ::testing::Test { | ||
public: | ||
void SetUp() {} | ||
|
||
void TearDown() {} | ||
|
||
void InitReader(const SchemaElement* element) { | ||
pager_.reset(new test::MockPageReader(pages_)); | ||
reader_ = ColumnReader::Make(element, std::move(pager_)); | ||
} | ||
|
||
protected: | ||
std::shared_ptr<ColumnReader> reader_; | ||
std::unique_ptr<PageReader> pager_; | ||
vector<shared_ptr<Page> > pages_; | ||
}; | ||
|
||
template <typename T> | ||
static vector<T> slice(const vector<T>& values, size_t start, size_t end) { | ||
if (end < start) { | ||
return vector<T>(0); | ||
} | ||
|
||
vector<T> out(end - start); | ||
for (size_t i = start; i < end; ++i) { | ||
out[i - start] = values[i]; | ||
} | ||
return out; | ||
} | ||
|
||
|
||
TEST_F(TestPrimitiveReader, TestInt32FlatRequired) { | ||
vector<int32_t> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; | ||
size_t num_values = values.size(); | ||
Encoding::type value_encoding = Encoding::PLAIN; | ||
|
||
vector<uint8_t> page1; | ||
test::DataPageBuilder<Type::INT32> page_builder(&page1); | ||
page_builder.AppendValues(values, Encoding::PLAIN); | ||
pages_.push_back(page_builder.Finish()); | ||
|
||
// TODO: simplify this | ||
SchemaElement element; | ||
element.__set_type(Type::INT32); | ||
element.__set_repetition_type(FieldRepetitionType::REQUIRED); | ||
InitReader(&element); | ||
|
||
Int32Reader* reader = static_cast<Int32Reader*>(reader_.get()); | ||
|
||
vector<int32_t> result(10, -1); | ||
|
||
size_t values_read = 0; | ||
size_t batch_actual = reader->ReadBatch(10, nullptr, nullptr, | ||
&result[0], &values_read); | ||
ASSERT_EQ(10, batch_actual); | ||
ASSERT_EQ(10, values_read); | ||
|
||
ASSERT_TRUE(vector_equal(result, values)); | ||
} | ||
|
||
TEST_F(TestPrimitiveReader, TestInt32FlatOptional) { | ||
vector<int32_t> values = {1, 2, 3, 4, 5}; | ||
vector<int16_t> def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1}; | ||
|
||
size_t num_values = values.size(); | ||
Encoding::type value_encoding = Encoding::PLAIN; | ||
|
||
vector<uint8_t> page1; | ||
test::DataPageBuilder<Type::INT32> page_builder(&page1); | ||
|
||
// Definition levels precede the values | ||
page_builder.AppendDefLevels(def_levels, 1, Encoding::RLE); | ||
page_builder.AppendValues(values, Encoding::PLAIN); | ||
|
||
pages_.push_back(page_builder.Finish()); | ||
|
||
// TODO: simplify this | ||
SchemaElement element; | ||
element.__set_type(Type::INT32); | ||
element.__set_repetition_type(FieldRepetitionType::OPTIONAL); | ||
InitReader(&element); | ||
|
||
Int32Reader* reader = static_cast<Int32Reader*>(reader_.get()); | ||
|
||
std::vector<int32_t> vexpected; | ||
std::vector<int16_t> dexpected; | ||
|
||
size_t values_read = 0; | ||
size_t batch_actual = 0; | ||
|
||
vector<int32_t> vresult(3, -1); | ||
vector<int16_t> dresult(5, -1); | ||
|
||
batch_actual = reader->ReadBatch(5, &dresult[0], nullptr, | ||
&vresult[0], &values_read); | ||
ASSERT_EQ(5, batch_actual); | ||
ASSERT_EQ(3, values_read); | ||
|
||
ASSERT_TRUE(vector_equal(vresult, slice(values, 0, 3))); | ||
ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 0, 5))); | ||
|
||
batch_actual = reader->ReadBatch(5, &dresult[0], nullptr, | ||
&vresult[0], &values_read); | ||
ASSERT_EQ(5, batch_actual); | ||
ASSERT_EQ(2, values_read); | ||
|
||
ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values, 3, 5))); | ||
ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 5, 10))); | ||
|
||
// EOS, pass all nullptrs to check for improper writes. Do not segfault / | ||
// core dump | ||
batch_actual = reader->ReadBatch(5, nullptr, nullptr, | ||
nullptr, &values_read); | ||
ASSERT_EQ(0, batch_actual); | ||
ASSERT_EQ(0, values_read); | ||
} | ||
|
||
} // namespace test | ||
|
||
} // namespace parquet_cpp |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
// This module defines an abstract interface for iterating through pages in a | ||
// Parquet column chunk within a row group. It could be extended in the future | ||
// to iterate through all data pages in all chunks in a file. | ||
|
||
#ifndef PARQUET_COLUMN_PAGE_H | ||
#define PARQUET_COLUMN_PAGE_H | ||
|
||
#include "parquet/thrift/parquet_types.h" | ||
|
||
namespace parquet_cpp { | ||
|
||
// Note: Copying the specific page header Thrift metadata to the Page object | ||
// (instead of using a pointer) presently so that data pages can be | ||
// decompressed and processed in parallel. We can turn the header members of | ||
// these classes into pointers at some point, but the downside is that | ||
// applications materializing multiple data pages at once will have to have a | ||
// data container that manages the lifetime of the deserialized | ||
// parquet::PageHeader structs. | ||
// | ||
// TODO: Parallel processing is not yet safe because of memory-ownership | ||
// semantics (the PageReader may or may not own the memory referenced by a | ||
// page) | ||
class Page { | ||
// TODO(wesm): In the future Parquet implementations may store the crc code | ||
// in parquet::PageHeader. parquet-mr currently does not, so we also skip it | ||
// here, both on the read and write path | ||
public: | ||
Page(const uint8_t* buffer, size_t buffer_size, parquet::PageType::type type) : | ||
buffer_(buffer), | ||
buffer_size_(buffer_size), | ||
type_(type) {} | ||
|
||
parquet::PageType::type type() const { | ||
return type_; | ||
} | ||
|
||
// @returns: a pointer to the page's data | ||
const uint8_t* data() const { | ||
return buffer_; | ||
} | ||
|
||
// @returns: the total size in bytes of the page's data buffer | ||
size_t size() const { | ||
return buffer_size_; | ||
} | ||
|
||
private: | ||
const uint8_t* buffer_; | ||
size_t buffer_size_; | ||
|
||
parquet::PageType::type type_; | ||
}; | ||
|
||
|
||
class DataPage : public Page { | ||
public: | ||
DataPage(const uint8_t* buffer, size_t buffer_size, | ||
const parquet::DataPageHeader& header) : | ||
Page(buffer, buffer_size, parquet::PageType::DATA_PAGE), | ||
header_(header) {} | ||
|
||
size_t num_values() const { | ||
return header_.num_values; | ||
} | ||
|
||
parquet::Encoding::type encoding() const { | ||
return header_.encoding; | ||
} | ||
|
||
private: | ||
parquet::DataPageHeader header_; | ||
}; | ||
|
||
|
||
class DataPageV2 : public Page { | ||
public: | ||
DataPageV2(const uint8_t* buffer, size_t buffer_size, | ||
const parquet::DataPageHeaderV2& header) : | ||
Page(buffer, buffer_size, parquet::PageType::DATA_PAGE_V2), | ||
header_(header) {} | ||
|
||
private: | ||
parquet::DataPageHeaderV2 header_; | ||
}; | ||
|
||
|
||
class DictionaryPage : public Page { | ||
public: | ||
DictionaryPage(const uint8_t* buffer, size_t buffer_size, | ||
const parquet::DictionaryPageHeader& header) : | ||
Page(buffer, buffer_size, parquet::PageType::DICTIONARY_PAGE), | ||
header_(header) {} | ||
|
||
size_t num_values() const { | ||
return header_.num_values; | ||
} | ||
|
||
private: | ||
parquet::DictionaryPageHeader header_; | ||
}; | ||
|
||
// Abstract page iterator interface. This way, we can feed column pages to the | ||
// ColumnReader through whatever mechanism we choose | ||
class PageReader { | ||
public: | ||
virtual ~PageReader() {} | ||
|
||
// @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page> | ||
// containing new Page otherwise | ||
virtual std::shared_ptr<Page> NextPage() = 0; | ||
}; | ||
|
||
} // namespace parquet_cpp | ||
|
||
#endif // PARQUET_COLUMN_PAGE_H |
Oops, something went wrong.