Skip to content

Commit

Permalink
apacheGH-38542: [C++][Parquet] Faster scalar BYTE_STREAM_SPLIT (apach…
Browse files Browse the repository at this point in the history
…e#38529)

### Rationale for this change

BYTE_STREAM_SPLIT encoding and decoding benefit from SIMD accelerations on x86, but scalar implementations are used otherwise.

### What changes are included in this PR?

Improve the speed of scalar implementations of BYTE_STREAM_SPLIT by using a blocked algorithm.

Benchmark numbers on the author's machine (AMD Ryzen 9 3900X):

* before:
```
-------------------------------------------------------------------------------------------------------
Benchmark                                             Time             CPU   Iterations UserCounters...
-------------------------------------------------------------------------------------------------------
BM_ByteStreamSplitDecode_Float_Scalar/1024         1374 ns         1374 ns       510396 bytes_per_second=2.77574G/s
BM_ByteStreamSplitDecode_Float_Scalar/4096         5483 ns         5483 ns       127498 bytes_per_second=2.78303G/s
BM_ByteStreamSplitDecode_Float_Scalar/32768       44042 ns        44035 ns        15905 bytes_per_second=2.77212G/s
BM_ByteStreamSplitDecode_Float_Scalar/65536       87966 ns        87952 ns         7962 bytes_per_second=2.77583G/s

BM_ByteStreamSplitDecode_Double_Scalar/1024        2583 ns         2583 ns       271436 bytes_per_second=2.95408G/s
BM_ByteStreamSplitDecode_Double_Scalar/4096       10533 ns        10532 ns        65695 bytes_per_second=2.89761G/s
BM_ByteStreamSplitDecode_Double_Scalar/32768      84067 ns        84053 ns         8275 bytes_per_second=2.90459G/s
BM_ByteStreamSplitDecode_Double_Scalar/65536     168332 ns       168309 ns         4155 bytes_per_second=2.9011G/s

BM_ByteStreamSplitEncode_Float_Scalar/1024         1435 ns         1435 ns       484278 bytes_per_second=2.65802G/s
BM_ByteStreamSplitEncode_Float_Scalar/4096         5725 ns         5725 ns       121877 bytes_per_second=2.66545G/s
BM_ByteStreamSplitEncode_Float_Scalar/32768       46291 ns        46283 ns        15134 bytes_per_second=2.63745G/s
BM_ByteStreamSplitEncode_Float_Scalar/65536       91139 ns        91128 ns         7707 bytes_per_second=2.6791G/s

BM_ByteStreamSplitEncode_Double_Scalar/1024        3093 ns         3093 ns       226198 bytes_per_second=2.46663G/s
BM_ByteStreamSplitEncode_Double_Scalar/4096       12724 ns        12722 ns        54522 bytes_per_second=2.39873G/s
BM_ByteStreamSplitEncode_Double_Scalar/32768     100488 ns       100475 ns         6957 bytes_per_second=2.42987G/s
BM_ByteStreamSplitEncode_Double_Scalar/65536     200885 ns       200852 ns         3486 bytes_per_second=2.43105G/s
```
* after:
```
-------------------------------------------------------------------------------------------------------
Benchmark                                             Time             CPU   Iterations UserCounters...
-------------------------------------------------------------------------------------------------------
BM_ByteStreamSplitDecode_Float_Scalar/1024          932 ns          932 ns       753352 bytes_per_second=4.09273G/s
BM_ByteStreamSplitDecode_Float_Scalar/4096         3715 ns         3715 ns       188394 bytes_per_second=4.10783G/s
BM_ByteStreamSplitDecode_Float_Scalar/32768       30167 ns        30162 ns        23441 bytes_per_second=4.04716G/s
BM_ByteStreamSplitDecode_Float_Scalar/65536       59483 ns        59475 ns        11744 bytes_per_second=4.10496G/s

BM_ByteStreamSplitDecode_Double_Scalar/1024        1862 ns         1862 ns       374715 bytes_per_second=4.09823G/s
BM_ByteStreamSplitDecode_Double_Scalar/4096        7554 ns         7553 ns        91975 bytes_per_second=4.04038G/s
BM_ByteStreamSplitDecode_Double_Scalar/32768      60429 ns        60421 ns        11499 bytes_per_second=4.04067G/s
BM_ByteStreamSplitDecode_Double_Scalar/65536     120992 ns       120972 ns         5756 bytes_per_second=4.03631G/s

BM_ByteStreamSplitEncode_Float_Scalar/1024          737 ns          737 ns       947423 bytes_per_second=5.17843G/s
BM_ByteStreamSplitEncode_Float_Scalar/4096         2934 ns         2933 ns       239459 bytes_per_second=5.20175G/s
BM_ByteStreamSplitEncode_Float_Scalar/32768       23730 ns        23727 ns        29243 bytes_per_second=5.14485G/s
BM_ByteStreamSplitEncode_Float_Scalar/65536       47671 ns        47664 ns        14682 bytes_per_second=5.12209G/s

BM_ByteStreamSplitEncode_Double_Scalar/1024        1517 ns         1517 ns       458928 bytes_per_second=5.02827G/s
BM_ByteStreamSplitEncode_Double_Scalar/4096        6224 ns         6223 ns       111361 bytes_per_second=4.90407G/s
BM_ByteStreamSplitEncode_Double_Scalar/32768      49719 ns        49713 ns        14059 bytes_per_second=4.91099G/s
BM_ByteStreamSplitEncode_Double_Scalar/65536      99445 ns        99432 ns         7027 bytes_per_second=4.91072G/s
```

### Are these changes tested?

Yes, though the scalar implementations are unfortunately only exercised on non-x86 by default (see added comment in the PR).

### Are there any user-facing changes?

No.

* Closes: apache#38542

Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
pitrou authored and dgreiss committed Feb 17, 2024
1 parent 7959e3b commit 95ec9de
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 31 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ add_arrow_test(utility-test
align_util_test.cc
atfork_test.cc
byte_size_test.cc
byte_stream_split_test.cc
cache_test.cc
checked_cast_test.cc
compression_test.cc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@

#pragma once

#include "arrow/util/endian.h"
#include "arrow/util/simd.h"
#include "arrow/util/ubsan.h"

#include <stdint.h>
#include <algorithm>
#include <cassert>
#include <cstdint>

#ifdef ARROW_HAVE_SSE4_2
// Enable the SIMD for ByteStreamSplit Encoder/Decoder
#define ARROW_HAVE_SIMD_SPLIT
#endif // ARROW_HAVE_SSE4_2

namespace arrow {
namespace util {
namespace internal {
namespace arrow::util::internal {

//
// SIMD implementations
//

#if defined(ARROW_HAVE_SSE4_2)
template <typename T>
Expand Down Expand Up @@ -565,48 +569,140 @@ void inline ByteStreamSplitDecodeSimd(const uint8_t* data, int64_t num_values,
}

template <typename T>
void inline ByteStreamSplitEncodeSimd(const uint8_t* raw_values, const size_t num_values,
void inline ByteStreamSplitEncodeSimd(const uint8_t* raw_values, const int64_t num_values,
uint8_t* output_buffer_raw) {
#if defined(ARROW_HAVE_AVX512)
return ByteStreamSplitEncodeAvx512<T>(raw_values, num_values, output_buffer_raw);
return ByteStreamSplitEncodeAvx512<T>(raw_values, static_cast<size_t>(num_values),
output_buffer_raw);
#elif defined(ARROW_HAVE_AVX2)
return ByteStreamSplitEncodeAvx2<T>(raw_values, num_values, output_buffer_raw);
return ByteStreamSplitEncodeAvx2<T>(raw_values, static_cast<size_t>(num_values),
output_buffer_raw);
#elif defined(ARROW_HAVE_SSE4_2)
return ByteStreamSplitEncodeSse2<T>(raw_values, num_values, output_buffer_raw);
return ByteStreamSplitEncodeSse2<T>(raw_values, static_cast<size_t>(num_values),
output_buffer_raw);
#else
#error "ByteStreamSplitEncodeSimd not implemented"
#endif
}
#endif

//
// Scalar implementations
//

inline void DoSplitStreams(const uint8_t* src, int width, int64_t nvalues,
uint8_t** dest_streams) {
// Value empirically chosen to provide the best performance on the author's machine
constexpr int kBlockSize = 32;

while (nvalues >= kBlockSize) {
for (int stream = 0; stream < width; ++stream) {
uint8_t* dest = dest_streams[stream];
for (int i = 0; i < kBlockSize; i += 8) {
uint64_t a = src[stream + i * width];
uint64_t b = src[stream + (i + 1) * width];
uint64_t c = src[stream + (i + 2) * width];
uint64_t d = src[stream + (i + 3) * width];
uint64_t e = src[stream + (i + 4) * width];
uint64_t f = src[stream + (i + 5) * width];
uint64_t g = src[stream + (i + 6) * width];
uint64_t h = src[stream + (i + 7) * width];
#if ARROW_LITTLE_ENDIAN
uint64_t r = a | (b << 8) | (c << 16) | (d << 24) | (e << 32) | (f << 40) |
(g << 48) | (h << 56);
#else
uint64_t r = (a << 56) | (b << 48) | (c << 40) | (d << 32) | (e << 24) |
(f << 16) | (g << 8) | h;
#endif
arrow::util::SafeStore(&dest[i], r);
}
dest_streams[stream] += kBlockSize;
}
src += width * kBlockSize;
nvalues -= kBlockSize;
}

// Epilog
for (int stream = 0; stream < width; ++stream) {
uint8_t* dest = dest_streams[stream];
for (int64_t i = 0; i < nvalues; ++i) {
dest[i] = src[stream + i * width];
}
}
}

inline void DoMergeStreams(const uint8_t** src_streams, int width, int64_t nvalues,
uint8_t* dest) {
// Value empirically chosen to provide the best performance on the author's machine
constexpr int kBlockSize = 128;

while (nvalues >= kBlockSize) {
for (int stream = 0; stream < width; ++stream) {
// Take kBlockSize bytes from the given stream and spread them
// to their logical places in destination.
const uint8_t* src = src_streams[stream];
for (int i = 0; i < kBlockSize; i += 8) {
uint64_t v = arrow::util::SafeLoadAs<uint64_t>(&src[i]);
#if ARROW_LITTLE_ENDIAN
dest[stream + i * width] = static_cast<uint8_t>(v);
dest[stream + (i + 1) * width] = static_cast<uint8_t>(v >> 8);
dest[stream + (i + 2) * width] = static_cast<uint8_t>(v >> 16);
dest[stream + (i + 3) * width] = static_cast<uint8_t>(v >> 24);
dest[stream + (i + 4) * width] = static_cast<uint8_t>(v >> 32);
dest[stream + (i + 5) * width] = static_cast<uint8_t>(v >> 40);
dest[stream + (i + 6) * width] = static_cast<uint8_t>(v >> 48);
dest[stream + (i + 7) * width] = static_cast<uint8_t>(v >> 56);
#else
dest[stream + i * width] = static_cast<uint8_t>(v >> 56);
dest[stream + (i + 1) * width] = static_cast<uint8_t>(v >> 48);
dest[stream + (i + 2) * width] = static_cast<uint8_t>(v >> 40);
dest[stream + (i + 3) * width] = static_cast<uint8_t>(v >> 32);
dest[stream + (i + 4) * width] = static_cast<uint8_t>(v >> 24);
dest[stream + (i + 5) * width] = static_cast<uint8_t>(v >> 16);
dest[stream + (i + 6) * width] = static_cast<uint8_t>(v >> 8);
dest[stream + (i + 7) * width] = static_cast<uint8_t>(v);
#endif
}
src_streams[stream] += kBlockSize;
}
dest += width * kBlockSize;
nvalues -= kBlockSize;
}

// Epilog
for (int stream = 0; stream < width; ++stream) {
const uint8_t* src = src_streams[stream];
for (int64_t i = 0; i < nvalues; ++i) {
dest[stream + i * width] = src[i];
}
}
}

template <typename T>
void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const size_t num_values,
void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const int64_t num_values,
uint8_t* output_buffer_raw) {
constexpr size_t kNumStreams = sizeof(T);
for (size_t i = 0U; i < num_values; ++i) {
for (size_t j = 0U; j < kNumStreams; ++j) {
const uint8_t byte_in_value = raw_values[i * kNumStreams + j];
output_buffer_raw[j * num_values + i] = byte_in_value;
}
constexpr int kNumStreams = static_cast<int>(sizeof(T));
std::array<uint8_t*, kNumStreams> dest_streams;
for (int stream = 0; stream < kNumStreams; ++stream) {
dest_streams[stream] = &output_buffer_raw[stream * num_values];
}
DoSplitStreams(raw_values, kNumStreams, num_values, dest_streams.data());
}

template <typename T>
void ByteStreamSplitDecodeScalar(const uint8_t* data, int64_t num_values, int64_t stride,
T* out) {
constexpr size_t kNumStreams = sizeof(T);
auto output_buffer_raw = reinterpret_cast<uint8_t*>(out);

for (int64_t i = 0; i < num_values; ++i) {
for (size_t b = 0; b < kNumStreams; ++b) {
const size_t byte_index = b * stride + i;
output_buffer_raw[i * kNumStreams + b] = data[byte_index];
}
constexpr int kNumStreams = static_cast<int>(sizeof(T));
std::array<const uint8_t*, kNumStreams> src_streams;
for (int stream = 0; stream < kNumStreams; ++stream) {
src_streams[stream] = &data[stream * stride];
}
DoMergeStreams(src_streams.data(), kNumStreams, num_values,
reinterpret_cast<uint8_t*>(out));
}

template <typename T>
void inline ByteStreamSplitEncode(const uint8_t* raw_values, const size_t num_values,
void inline ByteStreamSplitEncode(const uint8_t* raw_values, const int64_t num_values,
uint8_t* output_buffer_raw) {
#if defined(ARROW_HAVE_SIMD_SPLIT)
return ByteStreamSplitEncodeSimd<T>(raw_values, num_values, output_buffer_raw);
Expand All @@ -625,6 +721,4 @@ void inline ByteStreamSplitDecode(const uint8_t* data, int64_t num_values, int64
#endif
}

} // namespace internal
} // namespace util
} // namespace arrow
} // namespace arrow::util::internal
172 changes: 172 additions & 0 deletions cpp/src/arrow/util/byte_stream_split_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <algorithm>
#include <cmath>
#include <cstddef>
#include <functional>
#include <iostream>
#include <memory>
#include <random>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>

#include <gtest/gtest.h>

#include "arrow/testing/gtest_util.h"
#include "arrow/testing/util.h"
#include "arrow/util/byte_stream_split_internal.h"

namespace arrow::util::internal {

using ByteStreamSplitTypes = ::testing::Types<float, double>;

template <typename Func>
struct NamedFunc {
std::string name;
Func func;

friend std::ostream& operator<<(std::ostream& os, const NamedFunc& func) {
os << func.name;
return os;
}
};

// A simplistic reference implementation for validation
void RefererenceByteStreamSplitEncode(const uint8_t* src, int width,
const int64_t num_values, uint8_t* dest) {
for (int64_t i = 0; i < num_values; ++i) {
for (int stream = 0; stream < width; ++stream) {
dest[stream * num_values + i] = *src++;
}
}
}

template <typename T>
class TestByteStreamSplitSpecialized : public ::testing::Test {
public:
using EncodeFunc = NamedFunc<std::function<decltype(ByteStreamSplitEncode<T>)>>;
using DecodeFunc = NamedFunc<std::function<decltype(ByteStreamSplitDecode<T>)>>;

static constexpr int kWidth = static_cast<int>(sizeof(T));

void SetUp() override {
encode_funcs_.push_back({"reference", &ReferenceEncode});
encode_funcs_.push_back({"scalar", &ByteStreamSplitEncodeScalar<T>});
decode_funcs_.push_back({"scalar", &ByteStreamSplitDecodeScalar<T>});
#if defined(ARROW_HAVE_SIMD_SPLIT)
encode_funcs_.push_back({"simd", &ByteStreamSplitEncodeSimd<T>});
decode_funcs_.push_back({"simd", &ByteStreamSplitDecodeSimd<T>});
#endif
}

void TestRoundtrip(int64_t num_values) {
// Test one-shot roundtrip among all encode/decode function combinations
ARROW_SCOPED_TRACE("num_values = ", num_values);
const auto input = MakeRandomInput(num_values);
std::vector<uint8_t> encoded(num_values * kWidth);
std::vector<T> decoded(num_values);

for (const auto& encode_func : encode_funcs_) {
ARROW_SCOPED_TRACE("encode_func = ", encode_func);
encoded.assign(encoded.size(), 0);
encode_func.func(reinterpret_cast<const uint8_t*>(input.data()), num_values,
encoded.data());
for (const auto& decode_func : decode_funcs_) {
ARROW_SCOPED_TRACE("decode_func = ", decode_func);
decoded.assign(decoded.size(), T{});
decode_func.func(encoded.data(), num_values, /*stride=*/num_values,
decoded.data());
ASSERT_EQ(decoded, input);
}
}
}

void TestPiecewiseDecode(int64_t num_values) {
// Test chunked decoding against the reference encode function
ARROW_SCOPED_TRACE("num_values = ", num_values);
const auto input = MakeRandomInput(num_values);
std::vector<uint8_t> encoded(num_values * kWidth);
ReferenceEncode(reinterpret_cast<const uint8_t*>(input.data()), num_values,
encoded.data());
std::vector<T> decoded(num_values);

std::default_random_engine gen(seed_++);
std::uniform_int_distribution<int64_t> chunk_size_dist(1, 123);

for (const auto& decode_func : decode_funcs_) {
ARROW_SCOPED_TRACE("decode_func = ", decode_func);
decoded.assign(decoded.size(), T{});

int64_t offset = 0;
while (offset < num_values) {
auto chunk_size = std::min<int64_t>(num_values - offset, chunk_size_dist(gen));
decode_func.func(encoded.data() + offset, chunk_size, /*stride=*/num_values,
decoded.data() + offset);
offset += chunk_size;
}
ASSERT_EQ(offset, num_values);
ASSERT_EQ(decoded, input);
}
}

protected:
static void ReferenceEncode(const uint8_t* raw_values, const int64_t num_values,
uint8_t* output_buffer_raw) {
RefererenceByteStreamSplitEncode(raw_values, kWidth, num_values, output_buffer_raw);
}

static std::vector<T> MakeRandomInput(int64_t num_values) {
std::vector<T> input(num_values);
random_bytes(kWidth * num_values, seed_++, reinterpret_cast<uint8_t*>(input.data()));
// Avoid NaNs to ease comparison
for (auto& value : input) {
if (std::isnan(value)) {
value = nan_replacement_++;
}
}
return input;
}

std::vector<EncodeFunc> encode_funcs_;
std::vector<DecodeFunc> decode_funcs_;

static inline uint32_t seed_ = 42;
static inline T nan_replacement_ = 0;
};

TYPED_TEST_SUITE(TestByteStreamSplitSpecialized, ByteStreamSplitTypes);

TYPED_TEST(TestByteStreamSplitSpecialized, RoundtripSmall) {
for (int64_t num_values : {1, 5, 7, 12, 19, 31, 32}) {
this->TestRoundtrip(num_values);
}
}

TYPED_TEST(TestByteStreamSplitSpecialized, RoundtripMidsized) {
for (int64_t num_values : {126, 127, 128, 129, 133, 200}) {
this->TestRoundtrip(num_values);
}
}

TYPED_TEST(TestByteStreamSplitSpecialized, PiecewiseDecode) {
this->TestPiecewiseDecode(/*num_values=*/500);
}

} // namespace arrow::util::internal
6 changes: 3 additions & 3 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/bitmap_writer.h"
#include "arrow/util/byte_stream_split.h"
#include "arrow/util/byte_stream_split_internal.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/hashing.h"
#include "arrow/util/int_util_overflow.h"
Expand Down Expand Up @@ -850,8 +850,8 @@ std::shared_ptr<Buffer> ByteStreamSplitEncoder<DType>::FlushValues() {
AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize());
uint8_t* output_buffer_raw = output_buffer->mutable_data();
const uint8_t* raw_values = sink_.data();
::arrow::util::internal::ByteStreamSplitEncode<T>(
raw_values, static_cast<size_t>(num_values_in_buffer_), output_buffer_raw);
::arrow::util::internal::ByteStreamSplitEncode<T>(raw_values, num_values_in_buffer_,
output_buffer_raw);
sink_.Reset();
num_values_in_buffer_ = 0;
return std::move(output_buffer);
Expand Down
Loading

0 comments on commit 95ec9de

Please sign in to comment.