Skip to content

Commit

Permalink
[C++][Parquet] Minor: Using arrow::Buffer data_as api to replace the …
Browse files Browse the repository at this point in the history
…reinterpret_cast
  • Loading branch information
mapleFU committed Jan 2, 2024
1 parent 98f677a commit f769b1c
Showing 1 changed file with 33 additions and 41 deletions.
74 changes: 33 additions & 41 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
if (valid_bits != NULLPTR) {
PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
this->memory_pool()));
T* data = reinterpret_cast<T*>(buffer->mutable_data());
T* data = buffer->template mutable_data_as<T>();
int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
Expand Down Expand Up @@ -323,7 +323,7 @@ class PlainEncoder<BooleanType> : public EncoderImpl, virtual public BooleanEnco
if (valid_bits != NULLPTR) {
PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
this->memory_pool()));
T* data = reinterpret_cast<T*>(buffer->mutable_data());
T* data = buffer->mutable_data_as<T>();
int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
Expand Down Expand Up @@ -882,7 +882,7 @@ void ByteStreamSplitEncoder<DType>::PutSpaced(const T* src, int num_values,
if (valid_bits != NULLPTR) {
PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
this->memory_pool()));
T* data = reinterpret_cast<T*>(buffer->mutable_data());
T* data = buffer->template mutable_data_as<T>();
int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
Expand Down Expand Up @@ -1080,7 +1080,7 @@ inline int DecodePlain<FixedLenByteArray>(const uint8_t* data, int64_t data_size
ParquetException::EofException();
}
for (int i = 0; i < num_values; ++i) {
out[i].ptr = data + i * type_length;
out[i].ptr = data + i * static_cast<int64_t>(type_length);
}
return static_cast<int>(bytes_to_decode);
}
Expand Down Expand Up @@ -1537,9 +1537,8 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {

int Decode(T* buffer, int num_values) override {
num_values = std::min(num_values, num_values_);
int decoded_values =
idx_decoder_.GetBatchWithDict(reinterpret_cast<const T*>(dictionary_->data()),
dictionary_length_, buffer, num_values);
int decoded_values = idx_decoder_.GetBatchWithDict(
dictionary_->data_as<T>(), dictionary_length_, buffer, num_values);
if (decoded_values != num_values) {
ParquetException::EofException();
}
Expand All @@ -1551,9 +1550,8 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {
int64_t valid_bits_offset) override {
num_values = std::min(num_values, num_values_);
if (num_values != idx_decoder_.GetBatchWithDictSpaced(
reinterpret_cast<const T*>(dictionary_->data()),
dictionary_length_, buffer, num_values, null_count, valid_bits,
valid_bits_offset)) {
dictionary_->data_as<T>(), dictionary_length_, buffer,
num_values, null_count, valid_bits, valid_bits_offset)) {
ParquetException::EofException();
}
num_values_ -= num_values;
Expand All @@ -1580,8 +1578,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {
num_values, /*shrink_to_fit=*/false));
}

auto indices_buffer =
reinterpret_cast<int32_t*>(indices_scratch_space_->mutable_data());
auto indices_buffer = indices_scratch_space_->mutable_data_as<int32_t>();

if (num_values != idx_decoder_.GetBatchSpaced(num_values, null_count, valid_bits,
valid_bits_offset, indices_buffer)) {
Expand Down Expand Up @@ -1611,8 +1608,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {
PARQUET_THROW_NOT_OK(indices_scratch_space_->TypedResize<int32_t>(
num_values, /*shrink_to_fit=*/false));
}
auto indices_buffer =
reinterpret_cast<int32_t*>(indices_scratch_space_->mutable_data());
auto indices_buffer = indices_scratch_space_->mutable_data_as<int32_t>();
if (num_values != idx_decoder_.GetBatch(indices_buffer, num_values)) {
ParquetException::EofException();
}
Expand All @@ -1632,7 +1628,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {

void GetDictionary(const T** dictionary, int32_t* dictionary_length) override {
*dictionary_length = dictionary_length_;
*dictionary = reinterpret_cast<T*>(dictionary_->mutable_data());
*dictionary = dictionary_->mutable_data_as<T>();
}

protected:
Expand All @@ -1647,8 +1643,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {
dictionary_length_ = static_cast<int32_t>(dictionary->values_left());
PARQUET_THROW_NOT_OK(dictionary_->Resize(dictionary_length_ * sizeof(T),
/*shrink_to_fit=*/false));
dictionary->Decode(reinterpret_cast<T*>(dictionary_->mutable_data()),
dictionary_length_);
dictionary->Decode(dictionary_->mutable_data_as<T>(), dictionary_length_);
}

// Only one is set.
Expand Down Expand Up @@ -1688,7 +1683,7 @@ template <>
void DictDecoderImpl<ByteArrayType>::SetDict(TypedDecoder<ByteArrayType>* dictionary) {
DecodeDict(dictionary);

auto dict_values = reinterpret_cast<ByteArray*>(dictionary_->mutable_data());
ByteArray* dict_values = dictionary_->mutable_data_as<ByteArray>();

int total_size = 0;
for (int i = 0; i < dictionary_length_; ++i) {
Expand All @@ -1702,8 +1697,7 @@ void DictDecoderImpl<ByteArrayType>::SetDict(TypedDecoder<ByteArrayType>* dictio

int32_t offset = 0;
uint8_t* bytes_data = byte_array_data_->mutable_data();
int32_t* bytes_offsets =
reinterpret_cast<int32_t*>(byte_array_offsets_->mutable_data());
int32_t* bytes_offsets = byte_array_offsets_->mutable_data_as<int32_t>();
for (int i = 0; i < dictionary_length_; ++i) {
memcpy(bytes_data + offset, dict_values[i].ptr, dict_values[i].len);
bytes_offsets[i] = offset;
Expand All @@ -1717,7 +1711,7 @@ template <>
inline void DictDecoderImpl<FLBAType>::SetDict(TypedDecoder<FLBAType>* dictionary) {
DecodeDict(dictionary);

auto dict_values = reinterpret_cast<FLBA*>(dictionary_->mutable_data());
auto dict_values = dictionary_->mutable_data_as<FLBA>();

int fixed_len = descr_->type_length();
int total_size = dictionary_length_ * fixed_len;
Expand Down Expand Up @@ -1765,7 +1759,7 @@ int DictDecoderImpl<DType>::DecodeArrow(
typename EncodingTraits<DType>::DictAccumulator* builder) {
PARQUET_THROW_NOT_OK(builder->Reserve(num_values));

auto dict_values = reinterpret_cast<const typename DType::c_type*>(dictionary_->data());
auto dict_values = dictionary_->data_as<typename DType::c_type>();

VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
Expand Down Expand Up @@ -1801,7 +1795,7 @@ inline int DictDecoderImpl<FLBAType>::DecodeArrow(

PARQUET_THROW_NOT_OK(builder->Reserve(num_values));

auto dict_values = reinterpret_cast<const FLBA*>(dictionary_->data());
const FLBA* dict_values = dictionary_->data_as<FLBA>();

VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
Expand Down Expand Up @@ -1834,7 +1828,7 @@ int DictDecoderImpl<FLBAType>::DecodeArrow(

PARQUET_THROW_NOT_OK(builder->Reserve(num_values));

auto dict_values = reinterpret_cast<const FLBA*>(dictionary_->data());
const FLBA* dict_values = dictionary_->data_as<FLBA>();

VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
Expand All @@ -1858,7 +1852,7 @@ int DictDecoderImpl<Type>::DecodeArrow(
PARQUET_THROW_NOT_OK(builder->Reserve(num_values));

using value_type = typename Type::c_type;
auto dict_values = reinterpret_cast<const value_type*>(dictionary_->data());
const auto* dict_values = dictionary_->data_as<value_type>();

VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
Expand Down Expand Up @@ -1936,7 +1930,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
// space for binary data.
RETURN_NOT_OK(helper.Prepare());

auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
const ByteArray* dict_values = dictionary_->data_as<ByteArray>();
int values_decoded = 0;
int num_indices = 0;
int pos_indices = 0;
Expand Down Expand Up @@ -2007,7 +2001,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
// space for binary data.
RETURN_NOT_OK(helper.Prepare());

auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
const auto* dict_values = dictionary_->data_as<ByteArray>();

while (values_decoded < num_values) {
const int32_t batch_size =
Expand Down Expand Up @@ -2037,7 +2031,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
RETURN_NOT_OK(builder->Reserve(num_values));
::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);

auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
const auto* dict_values = dictionary_->data_as<ByteArray>();

int values_decoded = 0;
int num_appended = 0;
Expand Down Expand Up @@ -2090,7 +2084,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,

RETURN_NOT_OK(builder->Reserve(num_values));

auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
const auto* dict_values = dictionary_->data_as<ByteArray>();

int values_decoded = 0;
while (values_decoded < num_values) {
Expand Down Expand Up @@ -2388,7 +2382,7 @@ void DeltaBitPackEncoder<DType>::PutSpaced(const T* src, int num_values,
if (valid_bits != NULLPTR) {
PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
this->memory_pool()));
T* data = reinterpret_cast<T*>(buffer->mutable_data());
T* data = buffer->template mutable_data_as<T>();
int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
Expand Down Expand Up @@ -2734,7 +2728,7 @@ void DeltaLengthByteArrayEncoder<DType>::PutSpaced(const T* src, int num_values,
if (valid_bits != NULLPTR) {
PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
this->memory_pool()));
T* data = reinterpret_cast<T*>(buffer->mutable_data());
T* data = buffer->template mutable_data_as<T>();
int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
Expand Down Expand Up @@ -2789,8 +2783,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
}

int32_t data_size = 0;
const int32_t* length_ptr =
reinterpret_cast<const int32_t*>(buffered_length_->data()) + length_idx_;
const int32_t* length_ptr = buffered_length_->data_as<int32_t>() + length_idx_;
int bytes_offset = len_ - decoder_->bytes_left();
for (int i = 0; i < max_values; ++i) {
int32_t len = length_ptr[i];
Expand Down Expand Up @@ -2844,8 +2837,8 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,

// call len_decoder_.Decode to decode all the lengths.
// all the lengths are buffered in buffered_length_.
int ret = len_decoder_.Decode(
reinterpret_cast<int32_t*>(buffered_length_->mutable_data()), num_length);
int ret =
len_decoder_.Decode(buffered_length_->mutable_data_as<int32_t>(), num_length);
DCHECK_EQ(ret, num_length);
length_idx_ = 0;
num_valid_values_ = num_length;
Expand Down Expand Up @@ -2938,7 +2931,7 @@ class RleBooleanEncoder final : public EncoderImpl, virtual public BooleanEncode
if (valid_bits != NULLPTR) {
PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
this->memory_pool()));
T* data = reinterpret_cast<T*>(buffer->mutable_data());
T* data = buffer->mutable_data_as<T>();
int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
Expand Down Expand Up @@ -3136,7 +3129,7 @@ class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DT
} else {
PARQUET_THROW_NOT_OK(buffer_->Resize(num_values * sizeof(T), false));
}
T* data = reinterpret_cast<T*>(buffer_->mutable_data());
T* data = buffer_->mutable_data_as<T>();
int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
Expand Down Expand Up @@ -3338,7 +3331,7 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public TypedDecode
// all the prefix lengths are buffered in buffered_prefix_length_.
PARQUET_THROW_NOT_OK(buffered_prefix_length_->Resize(num_prefix * sizeof(int32_t)));
int ret = prefix_len_decoder_.Decode(
reinterpret_cast<int32_t*>(buffered_prefix_length_->mutable_data()), num_prefix);
buffered_prefix_length_->mutable_data_as<int32_t>(), num_prefix);
DCHECK_EQ(ret, num_prefix);
prefix_len_offset_ = 0;
num_valid_values_ = num_prefix;
Expand Down Expand Up @@ -3425,8 +3418,7 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public TypedDecode

int64_t data_size = 0;
const int32_t* prefix_len_ptr =
reinterpret_cast<const int32_t*>(buffered_prefix_length_->data()) +
prefix_len_offset_;
buffered_prefix_length_->data_as<int32_t>() + prefix_len_offset_;
for (int i = 0; i < max_values; ++i) {
if (prefix_len_ptr[i] == 0) {
// We don't need to copy the suffix if the prefix length is 0.
Expand Down Expand Up @@ -3578,7 +3570,7 @@ class ByteStreamSplitDecoder : public DecoderImpl, virtual public TypedDecoder<D
if (!decode_buffer_ || decode_buffer_->size() < size) {
PARQUET_ASSIGN_OR_THROW(decode_buffer_, ::arrow::AllocateBuffer(size));
}
return reinterpret_cast<T*>(decode_buffer_->mutable_data());
return decode_buffer_->mutable_data_as<T>();
}

private:
Expand Down

0 comments on commit f769b1c

Please sign in to comment.