Skip to content

Commit

Permalink
ARROW-13104: [C++] Fix unsafe cast in ByteStreamSplit implementation
Browse files Browse the repository at this point in the history
Closes #10596 from pitrou/ARROW-13104-unsafe-cast

Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
pitrou committed Jun 30, 2021
1 parent cc4e69d commit 58b3109
Showing 1 changed file with 28 additions and 21 deletions.
49 changes: 28 additions & 21 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -861,55 +861,62 @@ class ByteStreamSplitEncoder : public EncoderImpl, virtual public TypedEncoder<D
int64_t valid_bits_offset) override;

protected:
::arrow::TypedBufferBuilder<T> values_;
template <typename ArrowType>
void PutImpl(const ::arrow::Array& values) {
if (values.type_id() != ArrowType::type_id) {
throw ParquetException(std::string() + "direct put to " + ArrowType::type_name() +
" from " + values.type()->ToString() + " not supported");
}
const auto& data = *values.data();
PutSpaced(data.GetValues<typename ArrowType::c_type>(1),
static_cast<int>(data.length), data.GetValues<uint8_t>(0, 0), data.offset);
}

private:
void PutArrowArray(const ::arrow::Array& values);
::arrow::BufferBuilder sink_;
int64_t num_values_in_buffer_;
};

template <typename DType>
ByteStreamSplitEncoder<DType>::ByteStreamSplitEncoder(const ColumnDescriptor* descr,
::arrow::MemoryPool* pool)
: EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool), values_{pool} {}
: EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool),
sink_{pool},
num_values_in_buffer_{0} {}

template <typename DType>
int64_t ByteStreamSplitEncoder<DType>::EstimatedDataEncodedSize() {
return values_.length() * sizeof(T);
return sink_.length();
}

template <typename DType>
std::shared_ptr<Buffer> ByteStreamSplitEncoder<DType>::FlushValues() {
std::shared_ptr<ResizableBuffer> output_buffer =
AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize());
uint8_t* output_buffer_raw = output_buffer->mutable_data();
const size_t num_values = values_.length();
const uint8_t* raw_values = reinterpret_cast<const uint8_t*>(values_.data());
::arrow::util::internal::ByteStreamSplitEncode<T>(raw_values, num_values,
const uint8_t* raw_values = sink_.data();
::arrow::util::internal::ByteStreamSplitEncode<T>(raw_values, num_values_in_buffer_,
output_buffer_raw);
values_.Reset();
sink_.Reset();
num_values_in_buffer_ = 0;
return std::move(output_buffer);
}

template <typename DType>
void ByteStreamSplitEncoder<DType>::Put(const T* buffer, int num_values) {
if (num_values > 0) PARQUET_THROW_NOT_OK(values_.Append(buffer, num_values));
}

template <typename DType>
void ByteStreamSplitEncoder<DType>::Put(const ::arrow::Array& values) {
PutArrowArray(values);
if (num_values > 0) {
PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
num_values_in_buffer_ += num_values;
}
}

template <>
void ByteStreamSplitEncoder<FloatType>::PutArrowArray(const ::arrow::Array& values) {
DirectPutImpl<::arrow::FloatArray>(values,
reinterpret_cast<::arrow::BufferBuilder*>(&values_));
void ByteStreamSplitEncoder<FloatType>::Put(const ::arrow::Array& values) {
PutImpl<::arrow::FloatType>(values);
}

template <>
void ByteStreamSplitEncoder<DoubleType>::PutArrowArray(const ::arrow::Array& values) {
DirectPutImpl<::arrow::DoubleArray>(
values, reinterpret_cast<::arrow::BufferBuilder*>(&values_));
void ByteStreamSplitEncoder<DoubleType>::Put(const ::arrow::Array& values) {
PutImpl<::arrow::DoubleType>(values);
}

template <typename DType>
Expand Down

0 comments on commit 58b3109

Please sign in to comment.