Skip to content

Commit

Permalink
Rewrite Binary/FLBA specializations to correctly account for Accumula…
Browse files Browse the repository at this point in the history
…tor APIs
  • Loading branch information
pitrou committed Aug 21, 2023
1 parent acc40ed commit 5039cf9
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 150 deletions.
183 changes: 122 additions & 61 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1180,55 +1180,126 @@ int PlainBooleanDecoder::Decode(bool* buffer, int max_values) {
return max_values;
}

// A helper class to abstract away differences between EncodingTraits<DType>::Accumulator
// for ByteArrayType and FLBAType.
template <typename DType>
struct ArrowBinaryHelper {
explicit ArrowBinaryHelper(typename EncodingTraits<DType>::Accumulator* acc) {
if constexpr (std::is_same_v<DType, ByteArrayType>) {
builder = acc->builder.get();
chunks = &acc->chunks;
} else {
builder = acc;
struct ArrowBinaryHelper;

template <>
struct ArrowBinaryHelper<ByteArrayType> {
using Accumulator = typename EncodingTraits<ByteArrayType>::Accumulator;

ArrowBinaryHelper(Accumulator* acc, int64_t length)
: acc_(acc),
entries_remaining_(length),
chunk_space_remaining_(::arrow::kBinaryMemoryLimit -
acc_->builder->value_data_length()) {}

Status Prepare(std::optional<int64_t> estimated_data_length = {}) {
RETURN_NOT_OK(acc_->builder->Reserve(entries_remaining_));
if (estimated_data_length.has_value()) {
RETURN_NOT_OK(acc_->builder->ReserveData(
std::min<int64_t>(*estimated_data_length, ::arrow::kBinaryMemoryLimit)));
}
if (ARROW_PREDICT_FALSE(SubtractWithOverflow(::arrow::kBinaryMemoryLimit,
builder->value_data_length(),
&chunk_space_remaining))) {
throw ParquetException("excess expansion in ArrowBinaryHelper<DType>");
return Status::OK();
}

Status PrepareNextInput(int64_t next_value_length,
std::optional<int64_t> estimated_remaining_data_length = {}) {
if (ARROW_PREDICT_FALSE(!CanFit(next_value_length))) {
// This element would exceed the capacity of a chunk
RETURN_NOT_OK(PushChunk());
RETURN_NOT_OK(acc_->builder->Reserve(entries_remaining_));
if (estimated_remaining_data_length.has_value()) {
RETURN_NOT_OK(acc_->builder->ReserveData(
std::min<int64_t>(*estimated_remaining_data_length, chunk_space_remaining_)));
}
}
return Status::OK();
}

void UnsafeAppend(const uint8_t* data, int32_t length) {
DCHECK(CanFit(length));
DCHECK_GT(entries_remaining_, 0);
chunk_space_remaining_ -= length;
--entries_remaining_;
acc_->builder->UnsafeAppend(data, length);
}

Status Append(const uint8_t* data, int32_t length) {
DCHECK(CanFit(length));
DCHECK_GT(entries_remaining_, 0);
chunk_space_remaining_ -= length;
--entries_remaining_;
return acc_->builder->Append(data, length);
}

void UnsafeAppendNull() {
--entries_remaining_;
acc_->builder->UnsafeAppendNull();
}

Status AppendNull() {
--entries_remaining_;
return acc_->builder->AppendNull();
}

private:
Status PushChunk() {
std::shared_ptr<::arrow::Array> result;
RETURN_NOT_OK(builder->Finish(&result));
chunks->push_back(result);
chunk_space_remaining = ::arrow::kBinaryMemoryLimit;
ARROW_ASSIGN_OR_RAISE(auto chunk, acc_->builder->Finish());
acc_->chunks.push_back(std::move(chunk));
chunk_space_remaining_ = ::arrow::kBinaryMemoryLimit;
return Status::OK();
}

bool CanFit(int64_t length) const { return length <= chunk_space_remaining; }
bool CanFit(int64_t length) const { return length <= chunk_space_remaining_; }

void UnsafeAppend(const uint8_t* data, int32_t length) {
DCHECK(CanFit(length));
chunk_space_remaining -= length;
builder->UnsafeAppend(data, length);
Accumulator* acc_;
int64_t entries_remaining_;
int64_t chunk_space_remaining_;
};

template <>
struct ArrowBinaryHelper<FLBAType> {
using Accumulator = typename EncodingTraits<FLBAType>::Accumulator;

ArrowBinaryHelper(Accumulator* acc, int64_t length)
: acc_(acc), entries_remaining_(length) {}

Status Prepare(std::optional<int64_t> estimated_data_length = {}) {
return acc_->Reserve(entries_remaining_);
}

void UnsafeAppendNull() { builder->UnsafeAppendNull(); }
Status PrepareNextInput(int64_t next_value_length,
std::optional<int64_t> estimated_remaining_data_length = {}) {
return Status::OK();
}

void UnsafeAppend(const uint8_t* data, int32_t length) {
DCHECK_GT(entries_remaining_, 0);
--entries_remaining_;
acc_->UnsafeAppend(data);
}

Status Append(const uint8_t* data, int32_t length) {
DCHECK(CanFit(length));
chunk_space_remaining -= length;
if constexpr (std::is_same_v<DType, FLBAType>) {
return builder->Append(data);
} else {
return builder->Append(data, length);
}
DCHECK_GT(entries_remaining_, 0);
--entries_remaining_;
return acc_->Append(data);
}

Status AppendNull() { return builder->AppendNull(); }
void UnsafeAppendNull() {
--entries_remaining_;
acc_->UnsafeAppendNull();
}

Status AppendNull() {
--entries_remaining_;
return acc_->AppendNull();
}

typename EncodingTraits<DType>::BuilderType* builder;
std::vector<std::shared_ptr<::arrow::Array>>* chunks;
int64_t chunk_space_remaining;
private:
Accumulator* acc_;
int64_t entries_remaining_;
};

template <>
Expand Down Expand Up @@ -1329,12 +1400,10 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>,
int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::Accumulator* out,
int* out_values_decoded) {
ArrowBinaryHelper<ByteArrayType> helper(out);
ArrowBinaryHelper<ByteArrayType> helper(out, num_values);
int values_decoded = 0;

RETURN_NOT_OK(helper.builder->Reserve(num_values));
RETURN_NOT_OK(helper.builder->ReserveData(
std::min<int64_t>(len_, helper.chunk_space_remaining)));
RETURN_NOT_OK(helper.Prepare(len_));

int i = 0;
RETURN_NOT_OK(VisitNullBitmapInline(
Expand All @@ -1351,13 +1420,7 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>,
if (ARROW_PREDICT_FALSE(len_ < increment)) {
ParquetException::EofException();
}
if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) {
// This element would exceed the capacity of a chunk
RETURN_NOT_OK(helper.PushChunk());
RETURN_NOT_OK(helper.builder->Reserve(num_values - i));
RETURN_NOT_OK(helper.builder->ReserveData(
std::min<int64_t>(len_, helper.chunk_space_remaining)));
}
RETURN_NOT_OK(helper.PrepareNextInput(value_len, len_));
helper.UnsafeAppend(data_ + 4, value_len);
data_ += increment;
len_ -= increment;
Expand Down Expand Up @@ -1850,7 +1913,8 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
constexpr int32_t kBufferSize = 1024;
int32_t indices[kBufferSize];

ArrowBinaryHelper<ByteArrayType> helper(out);
ArrowBinaryHelper<ByteArrayType> helper(out, num_values);
RETURN_NOT_OK(helper.Prepare());

auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
int values_decoded = 0;
Expand All @@ -1871,9 +1935,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
const auto index = indices[pos_indices++];
RETURN_NOT_OK(IndexInBounds(index));
const auto& val = dict_values[index];
if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
RETURN_NOT_OK(helper.PushChunk());
}
RETURN_NOT_OK(helper.PrepareNextInput(val.len));
RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
++values_decoded;
return Status::OK();
Expand Down Expand Up @@ -1919,20 +1981,21 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
int32_t indices[kBufferSize];
int values_decoded = 0;

ArrowBinaryHelper<ByteArrayType> helper(out);
ArrowBinaryHelper<ByteArrayType> helper(out, num_values);
RETURN_NOT_OK(helper.Prepare(len_));

auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());

while (values_decoded < num_values) {
int32_t batch_size = std::min<int32_t>(kBufferSize, num_values - values_decoded);
int num_indices = idx_decoder_.GetBatch(indices, batch_size);
const int32_t batch_size =
std::min<int32_t>(kBufferSize, num_values - values_decoded);
const int num_indices = idx_decoder_.GetBatch(indices, batch_size);
if (num_indices == 0) ParquetException::EofException();
for (int i = 0; i < num_indices; ++i) {
auto idx = indices[i];
RETURN_NOT_OK(IndexInBounds(idx));
const auto& val = dict_values[idx];
if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
RETURN_NOT_OK(helper.PushChunk());
}
RETURN_NOT_OK(helper.PrepareNextInput(val.len));
RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
}
values_decoded += num_indices;
Expand Down Expand Up @@ -2762,7 +2825,8 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::Accumulator* out,
int* out_num_values) {
ArrowBinaryHelper<ByteArrayType> helper(out);
ArrowBinaryHelper<ByteArrayType> helper(out, num_values);
RETURN_NOT_OK(helper.Prepare());

std::vector<ByteArray> values(num_values - null_count);
const int num_valid_values = Decode(values.data(), num_values - null_count);
Expand All @@ -2778,9 +2842,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
const auto& val = values_ptr[value_idx];
if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
RETURN_NOT_OK(helper.PushChunk());
}
RETURN_NOT_OK(helper.PrepareNextInput(val.len));
RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
++value_idx;
return Status::OK();
Expand Down Expand Up @@ -3334,7 +3396,8 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public TypedDecode
int64_t valid_bits_offset,
typename EncodingTraits<DType>::Accumulator* out,
int* out_num_values) {
ArrowBinaryHelper<DType> helper(out);
ArrowBinaryHelper<DType> helper(out, num_values);
RETURN_NOT_OK(helper.Prepare());

std::vector<ByteArray> values(num_values);
const int num_valid_values = GetInternal(values.data(), num_values - null_count);
Expand All @@ -3347,9 +3410,7 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public TypedDecode
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
const auto& val = values_ptr[value_idx];
if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
RETURN_NOT_OK(helper.PushChunk());
}
RETURN_NOT_OK(helper.PrepareNextInput(val.len));
RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
++value_idx;
return Status::OK();
Expand Down
6 changes: 2 additions & 4 deletions cpp/src/parquet/encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,14 @@ template <>
struct EncodingTraits<ByteArrayType> {
using Encoder = ByteArrayEncoder;
using Decoder = ByteArrayDecoder;
using BuilderType = ::arrow::BinaryBuilder;

using ArrowType = ::arrow::BinaryType;
/// \brief Internal helper class for decoding BYTE_ARRAY data where we can
/// overflow the capacity of a single arrow::BinaryArray
struct Accumulator {
std::unique_ptr<::arrow::BinaryBuilder> builder;
std::vector<std::shared_ptr<::arrow::Array>> chunks;
};
using ArrowType = ::arrow::BinaryType;
using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::BinaryType>;
};

Expand All @@ -158,8 +157,7 @@ struct EncodingTraits<FLBAType> {
using Decoder = FLBADecoder;

using ArrowType = ::arrow::FixedSizeBinaryType;
using BuilderType = ::arrow::FixedSizeBinaryBuilder;
using Accumulator = BuilderType;
using Accumulator = ::arrow::FixedSizeBinaryBuilder;
using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::FixedSizeBinaryType>;
};

Expand Down
Loading

0 comments on commit 5039cf9

Please sign in to comment.