Skip to content

Commit

Permalink
PARQUET-538: Improve ColumnReader Tests
Browse files Browse the repository at this point in the history
closes apache#43 and closes apache#50
This PR also implements
1) PARQUET-532: Null values detection needs to be fixed and tested
2) PARQUET-502: Scanner segfaults when its batch size is smaller than the number of rows
3) PARQUET-526: Add more complete unit test coverage for column Scanner implementations
4) PARQUET-531: Can't read past first page in a column

Author: Deepak Majeti <[email protected]>

Closes apache#62 from majetideepak/PARQUET-538 and squashes the following commits:

1e56f83 [Deepak Majeti] Trigger notification
6478a7c [Deepak Majeti] TYPED_TEST
1d14171 [Deepak Majeti] Added Boolean Test and Scanner:Next API
d1da031 [Deepak Majeti] lint issue
45f10aa [Deepak Majeti] Reproducer for PARQUET-502
88e27c6 [Deepak Majeti] formatting
8aac435 [Deepak Majeti] PARQUET-526
dca7e2d [Deepak Majeti] PARQUET-532 and PARQUET-502 Fix
a622021 [Deepak Majeti] Reverted PARQUET-524 and addressed comments
859c1df [Deepak Majeti] minor comment edits
d938a13 [Deepak Majeti] PARQUET-538
df1fbd7 [Deepak Majeti] Templated single page tests
8548e3c [Deepak Majeti] PARQUET-524
c265fea [Deepak Majeti] fixed PARQUET-499 bugs

Change-Id: I72ba1fb655e851fc6045d925ac5715f5b535ad7f
  • Loading branch information
Deepak Majeti authored and nongli committed Feb 24, 2016
1 parent fac463c commit f813cce
Show file tree
Hide file tree
Showing 11 changed files with 508 additions and 211 deletions.
1 change: 1 addition & 0 deletions cpp/src/parquet/column/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ install(FILES

ADD_PARQUET_TEST(column-reader-test)
ADD_PARQUET_TEST(levels-test)
ADD_PARQUET_TEST(scanner-test)
281 changes: 109 additions & 172 deletions cpp/src/parquet/column/column-reader-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,195 +44,132 @@ namespace test {

class TestPrimitiveReader : public ::testing::Test {
public:
void SetUp() {}

void TearDown() {}
void MakePages(const ColumnDescriptor *d, int num_pages, int levels_per_page) {
num_levels_ = levels_per_page * num_pages;
num_values_ = 0;
uint32_t seed = 0;
int16_t zero = 0;
vector<int> values_per_page(num_pages, levels_per_page);
// Create definition levels
if (max_def_level_ > 0) {
def_levels_.resize(num_levels_);
random_numbers(num_levels_, seed, zero, max_def_level_, def_levels_.data());
for (int p = 0; p < num_pages; p++) {
int num_values_per_page = 0;
for (int i = 0; i < levels_per_page; i++) {
if (def_levels_[i + p * levels_per_page] == max_def_level_) {
num_values_per_page++;
num_values_++;
}
}
values_per_page[p] = num_values_per_page;
}
} else {
num_values_ = num_levels_;
}
// Create repitition levels
if (max_rep_level_ > 0) {
rep_levels_.resize(num_levels_);
random_numbers(num_levels_, seed, zero, max_rep_level_, rep_levels_.data());
}
// Create values
values_.resize(num_values_);
random_numbers(num_values_, seed, std::numeric_limits<int32_t>::min(),
std::numeric_limits<int32_t>::max(), values_.data());
Paginate<Type::INT32, int32_t>(d, values_, def_levels_, max_def_level_,
rep_levels_, max_rep_level_, levels_per_page, values_per_page, pages_);
}

void InitReader(const ColumnDescriptor* descr) {
void InitReader(const ColumnDescriptor* d) {
std::unique_ptr<PageReader> pager_;
pager_.reset(new test::MockPageReader(pages_));
reader_ = ColumnReader::Make(descr, std::move(pager_));
reader_ = ColumnReader::Make(d, std::move(pager_));
}

void CheckResults() {
vector<int32_t> vresult(num_values_, -1);
vector<int16_t> dresult(num_levels_, -1);
vector<int16_t> rresult(num_levels_, -1);
size_t values_read = 0;
size_t total_values_read = 0;
size_t batch_actual = 0;

Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
int32_t batch_size = 8;
size_t batch = 0;
// This will cover both the cases
// 1) batch_size < page_size (multiple ReadBatch from a single page)
// 2) batch_size > page_size (BatchRead limits to a single page)
do {
batch = reader->ReadBatch(batch_size, &dresult[0] + batch_actual,
&rresult[0] + batch_actual, &vresult[0] + total_values_read, &values_read);
total_values_read += values_read;
batch_actual += batch;
batch_size = std::max(batch_size * 2, 4096);
} while (batch > 0);

ASSERT_EQ(num_levels_, batch_actual);
ASSERT_EQ(num_values_, total_values_read);
ASSERT_TRUE(vector_equal(values_, vresult));
if (max_def_level_ > 0) {
ASSERT_TRUE(vector_equal(def_levels_, dresult));
}
if (max_rep_level_ > 0) {
ASSERT_TRUE(vector_equal(rep_levels_, rresult));
}
// catch improper writes at EOS
batch_actual = reader->ReadBatch(5, nullptr, nullptr, nullptr, &values_read);
ASSERT_EQ(0, batch_actual);
ASSERT_EQ(0, values_read);
}

void execute(int num_pages, int levels_page, const ColumnDescriptor *d) {
MakePages(d, num_pages, levels_page);
InitReader(d);
CheckResults();
}

protected:
std::shared_ptr<ColumnReader> reader_;
std::unique_ptr<PageReader> pager_;
int num_levels_;
int num_values_;
int16_t max_def_level_;
int16_t max_rep_level_;
vector<shared_ptr<Page> > pages_;
std::shared_ptr<ColumnReader> reader_;
vector<int32_t> values_;
vector<int16_t> def_levels_;
vector<int16_t> rep_levels_;
};


TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
vector<int32_t> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, {}, 0,
{}, 0);
pages_.push_back(page);

int levels_per_page = 100;
int num_pages = 50;
max_def_level_ = 0;
max_rep_level_ = 0;
NodePtr type = schema::Int32("a", Repetition::REQUIRED);
ColumnDescriptor descr(type, 0, 0);
InitReader(&descr);

Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());

vector<int32_t> result(10, -1);

size_t values_read = 0;
size_t batch_actual = reader->ReadBatch(10, nullptr, nullptr,
&result[0], &values_read);
ASSERT_EQ(10, batch_actual);
ASSERT_EQ(10, values_read);

ASSERT_TRUE(vector_equal(result, values));
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
execute(num_pages, levels_per_page, &descr);
}


TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
vector<int32_t> values = {1, 2, 3, 4, 5};
vector<int16_t> def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1};

std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, def_levels, 1,
{}, 0);

pages_.push_back(page);

NodePtr type = schema::Int32("a", Repetition::OPTIONAL);
ColumnDescriptor descr(type, 1, 0);
InitReader(&descr);

Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());

size_t values_read = 0;
size_t batch_actual = 0;

vector<int32_t> vresult(3, -1);
vector<int16_t> dresult(5, -1);

batch_actual = reader->ReadBatch(5, &dresult[0], nullptr,
&vresult[0], &values_read);
ASSERT_EQ(5, batch_actual);
ASSERT_EQ(3, values_read);

ASSERT_TRUE(vector_equal(vresult, slice(values, 0, 3)));
ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 0, 5)));

batch_actual = reader->ReadBatch(5, &dresult[0], nullptr,
&vresult[0], &values_read);
ASSERT_EQ(5, batch_actual);
ASSERT_EQ(2, values_read);

ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values, 3, 5)));
ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 5, 10)));

// EOS, pass all nullptrs to check for improper writes. Do not segfault /
// core dump
batch_actual = reader->ReadBatch(5, nullptr, nullptr,
nullptr, &values_read);
ASSERT_EQ(0, batch_actual);
ASSERT_EQ(0, values_read);
int levels_per_page = 100;
int num_pages = 50;
max_def_level_ = 4;
max_rep_level_ = 0;
NodePtr type = schema::Int32("b", Repetition::OPTIONAL);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
execute(num_pages, levels_per_page, &descr);
}

TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
vector<int32_t> values = {1, 2, 3, 4, 5};
vector<int16_t> def_levels = {2, 1, 1, 2, 2, 1, 1, 2, 2, 1};
vector<int16_t> rep_levels = {0, 1, 1, 0, 0, 1, 1, 0, 0, 1};

std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values,
def_levels, 2, rep_levels, 1);

pages_.push_back(page);

NodePtr type = schema::Int32("a", Repetition::REPEATED);
ColumnDescriptor descr(type, 2, 1);
InitReader(&descr);

Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());

size_t values_read = 0;
size_t batch_actual = 0;

vector<int32_t> vresult(3, -1);
vector<int16_t> dresult(5, -1);
vector<int16_t> rresult(5, -1);

batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
&vresult[0], &values_read);
ASSERT_EQ(5, batch_actual);
ASSERT_EQ(3, values_read);

ASSERT_TRUE(vector_equal(vresult, slice(values, 0, 3)));
ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 0, 5)));
ASSERT_TRUE(vector_equal(rresult, slice(rep_levels, 0, 5)));

batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
&vresult[0], &values_read);
ASSERT_EQ(5, batch_actual);
ASSERT_EQ(2, values_read);

ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values, 3, 5)));
ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 5, 10)));
ASSERT_TRUE(vector_equal(rresult, slice(rep_levels, 5, 10)));

// EOS, pass all nullptrs to check for improper writes. Do not segfault /
// core dump
batch_actual = reader->ReadBatch(5, nullptr, nullptr,
nullptr, &values_read);
ASSERT_EQ(0, batch_actual);
ASSERT_EQ(0, values_read);
int levels_per_page = 100;
int num_pages = 50;
max_def_level_ = 4;
max_rep_level_ = 2;
NodePtr type = schema::Int32("c", Repetition::REPEATED);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
execute(num_pages, levels_per_page, &descr);
}

TEST_F(TestPrimitiveReader, TestInt32FlatRepeatedMultiplePages) {
vector<int32_t> values[2] = {{1, 2, 3, 4, 5},
{6, 7, 8, 9, 10}};
vector<int16_t> def_levels[2] = {{2, 1, 1, 2, 2, 1, 1, 2, 2, 1},
{2, 2, 1, 2, 1, 1, 2, 1, 2, 1}};
vector<int16_t> rep_levels[2] = {{0, 1, 1, 0, 0, 1, 1, 0, 0, 1},
{0, 0, 1, 0, 1, 1, 0, 1, 0, 1}};

std::shared_ptr<DataPage> page;

for (int i = 0; i < 4; i++) {
page = MakeDataPage<Type::INT32>(values[i % 2],
def_levels[i % 2], 2, rep_levels[i % 2], 1);
pages_.push_back(page);
}

NodePtr type = schema::Int32("a", Repetition::REPEATED);
ColumnDescriptor descr(type, 2, 1);
InitReader(&descr);

Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());

size_t values_read = 0;
size_t batch_actual = 0;

vector<int32_t> vresult(3, -1);
vector<int16_t> dresult(5, -1);
vector<int16_t> rresult(5, -1);

for (int i = 0; i < 4; i++) {
batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
&vresult[0], &values_read);
ASSERT_EQ(5, batch_actual);
ASSERT_EQ(3, values_read);

ASSERT_TRUE(vector_equal(vresult, slice(values[i % 2], 0, 3)));
ASSERT_TRUE(vector_equal(dresult, slice(def_levels[i % 2], 0, 5)));
ASSERT_TRUE(vector_equal(rresult, slice(rep_levels[i % 2], 0, 5)));

batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
&vresult[0], &values_read);
ASSERT_EQ(5, batch_actual);
ASSERT_EQ(2, values_read);

ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values[i % 2], 3, 5)));
ASSERT_TRUE(vector_equal(dresult, slice(def_levels[i % 2], 5, 10)));
ASSERT_TRUE(vector_equal(rresult, slice(rep_levels[i % 2], 5, 10)));
}
// EOS, pass all nullptrs to check for improper writes. Do not segfault /
// core dump
batch_actual = reader->ReadBatch(5, nullptr, nullptr,
nullptr, &values_read);
ASSERT_EQ(0, batch_actual);
ASSERT_EQ(0, values_read);
}
} // namespace test
} // namespace parquet_cpp
2 changes: 1 addition & 1 deletion cpp/src/parquet/column/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class TypedColumnReader : public ColumnReader {
// This API is the same for both V1 and V2 of the DataPage
//
// @returns: actual number of levels read (see values_read for number of values read)
size_t ReadBatch(int batch_size, int16_t* def_levels, int16_t* rep_levels,
size_t ReadBatch(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
T* values, size_t* values_read);

private:
Expand Down
Loading

0 comments on commit f813cce

Please sign in to comment.