Skip to content

Commit

Permalink
Incorporate review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xhochy committed May 10, 2016
1 parent 8d2db22 commit 5fa1026
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 38 deletions.
8 changes: 2 additions & 6 deletions cpp/src/arrow/parquet/parquet-reader-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,8 @@
#include "arrow/util/memory-pool.h"
#include "arrow/util/status.h"

#include "parquet/api/schema.h"
#include "parquet/column/writer.h"
#include "parquet/file/reader.h"
#include "parquet/file/writer.h"
#include "parquet/util/input.h"
#include "parquet/util/output.h"
#include "parquet/api/reader.h"
#include "parquet/api/writer.h"

using ParquetBuffer = parquet::Buffer;
using parquet::BufferReader;
Expand Down
87 changes: 62 additions & 25 deletions cpp/src/arrow/parquet/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <queue>

#include "arrow/parquet/schema.h"
#include "arrow/parquet/utils.h"
#include "arrow/schema.h"
#include "arrow/types/primitive.h"
#include "arrow/util/status.h"
Expand All @@ -36,6 +37,7 @@ class FileReader::Impl {
virtual ~Impl() {}

Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
Status ReadFlatColumn(int i, std::shared_ptr<Array>* out);

private:
MemoryPool* pool_;
Expand All @@ -45,35 +47,46 @@ class FileReader::Impl {
class FlatColumnReader::Impl {
public:
Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr,
std::queue<std::shared_ptr<ColumnReader>>&& column_readers);
::parquet::ParquetFileReader* reader, int column_index);
virtual ~Impl() {}

Status NextBatch(int batch_size, std::shared_ptr<Array>* out);
template <typename ArrowType, typename ParquetType, typename CType>
Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);

private:
void NextRowGroup();

MemoryPool* pool_;
const ::parquet::ColumnDescriptor* descr_;
std::queue<std::shared_ptr<ColumnReader>> column_readers_;
::parquet::ParquetFileReader* reader_;
int column_index_;
int next_row_group_;
std::shared_ptr<ColumnReader> column_reader_;
std::shared_ptr<Field> field_;

PoolBuffer values_buffer_;
PoolBuffer def_levels_buffer_;
PoolBuffer rep_levels_buffer_;
};

FileReader::Impl::Impl(
MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader)
: pool_(pool), reader_(std::move(reader)) {}

Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
std::queue<std::shared_ptr<ColumnReader>> column_readers;
for (int rg = 0; rg < reader_->num_row_groups(); rg++) {
column_readers.push(reader_->RowGroup(rg)->Column(i));
}
std::unique_ptr<FlatColumnReader::Impl> impl(new FlatColumnReader::Impl(
pool_, reader_->descr()->Column(i), std::move(column_readers)));
std::unique_ptr<FlatColumnReader::Impl> impl(
new FlatColumnReader::Impl(pool_, reader_->descr()->Column(i), reader_.get(), i));
*out = std::unique_ptr<FlatColumnReader>(new FlatColumnReader(std::move(impl)));
return Status::OK();
}

Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
std::unique_ptr<FlatColumnReader> flat_column_reader;
RETURN_NOT_OK(GetFlatColumn(i, &flat_column_reader));
return flat_column_reader->NextBatch(reader_->num_rows(), out);
}

FileReader::FileReader(
MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader)
: impl_(new FileReader::Impl(pool, std::move(reader))) {}
Expand All @@ -85,35 +98,50 @@ Status FileReader::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out)
}

Status FileReader::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
return Status::OK();
return impl_->ReadFlatColumn(i, out);
}

FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr,
std::queue<std::shared_ptr<ColumnReader>>&& column_readers)
: pool_(pool), descr_(descr), column_readers_(column_readers) {
::parquet::ParquetFileReader* reader, int column_index)
: pool_(pool),
descr_(descr),
reader_(reader),
column_index_(column_index),
next_row_group_(0),
values_buffer_(pool),
def_levels_buffer_(pool),
rep_levels_buffer_(pool) {
NodeToField(descr_->schema_node(), &field_);
NextRowGroup();
}

template <typename ArrowType, typename ParquetType, typename CType>
Status FlatColumnReader::Impl::TypedReadBatch(
int batch_size, std::shared_ptr<Array>* out) {
int values_to_read = batch_size;
NumericBuilder<ArrowType> builder(pool_, field_->type);
while ((values_to_read > 0) && (column_readers_.size() > 0)) {
// TODO: This is a lot malloc-thresing and not using the memory pool.
std::vector<CType> values(values_to_read);
std::vector<int16_t> def_levels(values_to_read);
auto reader =
dynamic_cast<TypedColumnReader<ParquetType>*>(column_readers_.front().get());
while ((values_to_read > 0) && column_reader_) {
values_buffer_.Resize(values_to_read * sizeof(CType));
if (descr_->max_definition_level() > 0) {
def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
}
if (descr_->max_repetition_level() > 0) {
rep_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
}
auto reader = dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get());
int64_t values_read;
values_to_read -= reader->ReadBatch(
values_to_read, def_levels.data(), nullptr, values.data(), &values_read);
CType* values = reinterpret_cast<CType*>(values_buffer_.mutable_data());
PARQUET_CATCH_NOT_OK(
values_to_read -= reader->ReadBatch(values_to_read,
reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()),
reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data()), values,
&values_read));
if (descr_->max_definition_level() == 0) {
builder.Append(values.data(), values_read);
RETURN_NOT_OK(builder.Append(values, values_read));
} else {
return Status::NotImplemented("no support for definition levels yet");
}
if (!column_readers_.front()->HasNext()) { column_readers_.pop(); }
if (!column_reader_->HasNext()) { NextRowGroup(); }
}
*out = builder.Finish();
return Status::OK();
Expand All @@ -125,16 +153,16 @@ Status FlatColumnReader::Impl::TypedReadBatch(
break;

Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
if (column_readers_.size() == 0) {
// Exhausted all readers.
*out = std::shared_ptr<Array>(nullptr);
if (!column_reader_) {
// Exhausted all row groups.
*out = nullptr;
return Status::OK();
}

if (descr_->max_repetition_level() > 0) {
return Status::NotImplemented("no support for repetition yet");
}

*out = std::shared_ptr<Array>(nullptr);
switch (field_->type->type) {
TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type, int32_t)
TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type, int64_t)
Expand All @@ -145,6 +173,15 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>*
}
}

void FlatColumnReader::Impl::NextRowGroup() {
if (next_row_group_ < reader_->num_row_groups()) {
column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_);
next_row_group_++;
} else {
column_reader_ = nullptr;
}
}

FlatColumnReader::FlatColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}

FlatColumnReader::~FlatColumnReader() {}
Expand Down
8 changes: 1 addition & 7 deletions cpp/src/arrow/parquet/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@
#include <vector>

#include "parquet/api/schema.h"
#include "parquet/exception.h"

#include "arrow/parquet/utils.h"
#include "arrow/types/decimal.h"
#include "arrow/types/string.h"
#include "arrow/util/status.h"

using parquet::ParquetException;
using parquet::Repetition;
using parquet::schema::Node;
using parquet::schema::NodePtr;
Expand All @@ -41,11 +40,6 @@ namespace arrow {

namespace parquet {

#define PARQUET_CATCH_NOT_OK(s) \
try { \
(s); \
} catch (const ParquetException& e) { return Status::Invalid(e.what()); }

const auto BOOL = std::make_shared<BooleanType>();
const auto UINT8 = std::make_shared<UInt8Type>();
const auto INT32 = std::make_shared<Int32Type>();
Expand Down
38 changes: 38 additions & 0 deletions cpp/src/arrow/parquet/utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef ARROW_PARQUET_UTILS_H
#define ARROW_PARQUET_UTILS_H

#include "arrow/util/status.h"

#include "parquet/exception.h"

namespace arrow {

namespace parquet {

#define PARQUET_CATCH_NOT_OK(s) \
try { \
(s); \
} catch (const ::parquet::ParquetException& e) { return Status::Invalid(e.what()); }

} // namespace parquet

} // namespace arrow

#endif // ARROW_PARQUET_UTILS_H

0 comments on commit 5fa1026

Please sign in to comment.