diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index 3cecab3a633cc..3dc8eac1abf64 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -43,6 +43,7 @@ add_arrow_test(utility-test align_util_test.cc atfork_test.cc byte_size_test.cc + byte_stream_split_test.cc cache_test.cc checked_cast_test.cc compression_test.cc diff --git a/cpp/src/arrow/util/byte_stream_split.h b/cpp/src/arrow/util/byte_stream_split_internal.h similarity index 84% rename from cpp/src/arrow/util/byte_stream_split.h rename to cpp/src/arrow/util/byte_stream_split_internal.h index d428df0659b28..ae85e2cfa81a3 100644 --- a/cpp/src/arrow/util/byte_stream_split.h +++ b/cpp/src/arrow/util/byte_stream_split_internal.h @@ -17,20 +17,24 @@ #pragma once +#include "arrow/util/endian.h" #include "arrow/util/simd.h" #include "arrow/util/ubsan.h" -#include #include +#include +#include #ifdef ARROW_HAVE_SSE4_2 // Enable the SIMD for ByteStreamSplit Encoder/Decoder #define ARROW_HAVE_SIMD_SPLIT #endif // ARROW_HAVE_SSE4_2 -namespace arrow { -namespace util { -namespace internal { +namespace arrow::util::internal { + +// +// SIMD implementations +// #if defined(ARROW_HAVE_SSE4_2) template @@ -565,48 +569,140 @@ void inline ByteStreamSplitDecodeSimd(const uint8_t* data, int64_t num_values, } template -void inline ByteStreamSplitEncodeSimd(const uint8_t* raw_values, const size_t num_values, +void inline ByteStreamSplitEncodeSimd(const uint8_t* raw_values, const int64_t num_values, uint8_t* output_buffer_raw) { #if defined(ARROW_HAVE_AVX512) - return ByteStreamSplitEncodeAvx512(raw_values, num_values, output_buffer_raw); + return ByteStreamSplitEncodeAvx512(raw_values, static_cast(num_values), + output_buffer_raw); #elif defined(ARROW_HAVE_AVX2) - return ByteStreamSplitEncodeAvx2(raw_values, num_values, output_buffer_raw); + return ByteStreamSplitEncodeAvx2(raw_values, static_cast(num_values), + output_buffer_raw); #elif defined(ARROW_HAVE_SSE4_2) - return ByteStreamSplitEncodeSse2(raw_values, num_values, output_buffer_raw); + return ByteStreamSplitEncodeSse2(raw_values, static_cast(num_values), + output_buffer_raw); #else #error "ByteStreamSplitEncodeSimd not implemented" #endif } #endif +// +// Scalar implementations +// + +inline void DoSplitStreams(const uint8_t* src, int width, int64_t nvalues, + uint8_t** dest_streams) { + // Value empirically chosen to provide the best performance on the author's machine + constexpr int kBlockSize = 32; + + while (nvalues >= kBlockSize) { + for (int stream = 0; stream < width; ++stream) { + uint8_t* dest = dest_streams[stream]; + for (int i = 0; i < kBlockSize; i += 8) { + uint64_t a = src[stream + i * width]; + uint64_t b = src[stream + (i + 1) * width]; + uint64_t c = src[stream + (i + 2) * width]; + uint64_t d = src[stream + (i + 3) * width]; + uint64_t e = src[stream + (i + 4) * width]; + uint64_t f = src[stream + (i + 5) * width]; + uint64_t g = src[stream + (i + 6) * width]; + uint64_t h = src[stream + (i + 7) * width]; +#if ARROW_LITTLE_ENDIAN + uint64_t r = a | (b << 8) | (c << 16) | (d << 24) | (e << 32) | (f << 40) | + (g << 48) | (h << 56); +#else + uint64_t r = (a << 56) | (b << 48) | (c << 40) | (d << 32) | (e << 24) | + (f << 16) | (g << 8) | h; +#endif + arrow::util::SafeStore(&dest[i], r); + } + dest_streams[stream] += kBlockSize; + } + src += width * kBlockSize; + nvalues -= kBlockSize; + } + + // Epilog + for (int stream = 0; stream < width; ++stream) { + uint8_t* dest = dest_streams[stream]; + for (int64_t i = 0; i < nvalues; ++i) { + dest[i] = src[stream + i * width]; + } + } +} + +inline void DoMergeStreams(const uint8_t** src_streams, int width, int64_t nvalues, + uint8_t* dest) { + // Value empirically chosen to provide the best performance on the author's machine + constexpr int kBlockSize = 128; + + while (nvalues >= kBlockSize) { + for (int stream = 0; stream < width; ++stream) { + // Take kBlockSize bytes from the given stream and spread them + // to their logical places in destination. + const uint8_t* src = src_streams[stream]; + for (int i = 0; i < kBlockSize; i += 8) { + uint64_t v = arrow::util::SafeLoadAs(&src[i]); +#if ARROW_LITTLE_ENDIAN + dest[stream + i * width] = static_cast(v); + dest[stream + (i + 1) * width] = static_cast(v >> 8); + dest[stream + (i + 2) * width] = static_cast(v >> 16); + dest[stream + (i + 3) * width] = static_cast(v >> 24); + dest[stream + (i + 4) * width] = static_cast(v >> 32); + dest[stream + (i + 5) * width] = static_cast(v >> 40); + dest[stream + (i + 6) * width] = static_cast(v >> 48); + dest[stream + (i + 7) * width] = static_cast(v >> 56); +#else + dest[stream + i * width] = static_cast(v >> 56); + dest[stream + (i + 1) * width] = static_cast(v >> 48); + dest[stream + (i + 2) * width] = static_cast(v >> 40); + dest[stream + (i + 3) * width] = static_cast(v >> 32); + dest[stream + (i + 4) * width] = static_cast(v >> 24); + dest[stream + (i + 5) * width] = static_cast(v >> 16); + dest[stream + (i + 6) * width] = static_cast(v >> 8); + dest[stream + (i + 7) * width] = static_cast(v); +#endif + } + src_streams[stream] += kBlockSize; + } + dest += width * kBlockSize; + nvalues -= kBlockSize; + } + + // Epilog + for (int stream = 0; stream < width; ++stream) { + const uint8_t* src = src_streams[stream]; + for (int64_t i = 0; i < nvalues; ++i) { + dest[stream + i * width] = src[i]; + } + } +} + template -void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const size_t num_values, +void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const int64_t num_values, uint8_t* output_buffer_raw) { - constexpr size_t kNumStreams = sizeof(T); - for (size_t i = 0U; i < num_values; ++i) { - for (size_t j = 0U; j < kNumStreams; ++j) { - const uint8_t byte_in_value = raw_values[i * kNumStreams + j]; - output_buffer_raw[j * num_values + i] = byte_in_value; - } + constexpr int kNumStreams = static_cast(sizeof(T)); + std::array dest_streams; + for (int stream = 0; stream < kNumStreams; ++stream) { + dest_streams[stream] = &output_buffer_raw[stream * num_values]; } + DoSplitStreams(raw_values, kNumStreams, num_values, dest_streams.data()); } template void ByteStreamSplitDecodeScalar(const uint8_t* data, int64_t num_values, int64_t stride, T* out) { - constexpr size_t kNumStreams = sizeof(T); - auto output_buffer_raw = reinterpret_cast(out); - - for (int64_t i = 0; i < num_values; ++i) { - for (size_t b = 0; b < kNumStreams; ++b) { - const size_t byte_index = b * stride + i; - output_buffer_raw[i * kNumStreams + b] = data[byte_index]; - } + constexpr int kNumStreams = static_cast(sizeof(T)); + std::array src_streams; + for (int stream = 0; stream < kNumStreams; ++stream) { + src_streams[stream] = &data[stream * stride]; } + DoMergeStreams(src_streams.data(), kNumStreams, num_values, + reinterpret_cast(out)); } template -void inline ByteStreamSplitEncode(const uint8_t* raw_values, const size_t num_values, +void inline ByteStreamSplitEncode(const uint8_t* raw_values, const int64_t num_values, uint8_t* output_buffer_raw) { #if defined(ARROW_HAVE_SIMD_SPLIT) return ByteStreamSplitEncodeSimd(raw_values, num_values, output_buffer_raw); @@ -625,6 +721,4 @@ void inline ByteStreamSplitDecode(const uint8_t* data, int64_t num_values, int64 #endif } -} // namespace internal -} // namespace util -} // namespace arrow +} // namespace arrow::util::internal diff --git a/cpp/src/arrow/util/byte_stream_split_test.cc b/cpp/src/arrow/util/byte_stream_split_test.cc new file mode 100644 index 0000000000000..3ea27f57da881 --- /dev/null +++ b/cpp/src/arrow/util/byte_stream_split_test.cc @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/util.h" +#include "arrow/util/byte_stream_split_internal.h" + +namespace arrow::util::internal { + +using ByteStreamSplitTypes = ::testing::Types; + +template +struct NamedFunc { + std::string name; + Func func; + + friend std::ostream& operator<<(std::ostream& os, const NamedFunc& func) { + os << func.name; + return os; + } +}; + +// A simplistic reference implementation for validation +void RefererenceByteStreamSplitEncode(const uint8_t* src, int width, + const int64_t num_values, uint8_t* dest) { + for (int64_t i = 0; i < num_values; ++i) { + for (int stream = 0; stream < width; ++stream) { + dest[stream * num_values + i] = *src++; + } + } +} + +template +class TestByteStreamSplitSpecialized : public ::testing::Test { + public: + using EncodeFunc = NamedFunc)>>; + using DecodeFunc = NamedFunc)>>; + + static constexpr int kWidth = static_cast(sizeof(T)); + + void SetUp() override { + encode_funcs_.push_back({"reference", &ReferenceEncode}); + encode_funcs_.push_back({"scalar", &ByteStreamSplitEncodeScalar}); + decode_funcs_.push_back({"scalar", &ByteStreamSplitDecodeScalar}); +#if defined(ARROW_HAVE_SIMD_SPLIT) + encode_funcs_.push_back({"simd", &ByteStreamSplitEncodeSimd}); + decode_funcs_.push_back({"simd", &ByteStreamSplitDecodeSimd}); +#endif + } + + void TestRoundtrip(int64_t num_values) { + // Test one-shot roundtrip among all encode/decode function combinations + ARROW_SCOPED_TRACE("num_values = ", num_values); + const auto input = MakeRandomInput(num_values); + std::vector encoded(num_values * kWidth); + std::vector decoded(num_values); + + for (const auto& encode_func : encode_funcs_) { + ARROW_SCOPED_TRACE("encode_func = ", encode_func); + encoded.assign(encoded.size(), 0); + encode_func.func(reinterpret_cast(input.data()), num_values, + encoded.data()); + for (const auto& decode_func : decode_funcs_) { + ARROW_SCOPED_TRACE("decode_func = ", decode_func); + decoded.assign(decoded.size(), T{}); + decode_func.func(encoded.data(), num_values, /*stride=*/num_values, + decoded.data()); + ASSERT_EQ(decoded, input); + } + } + } + + void TestPiecewiseDecode(int64_t num_values) { + // Test chunked decoding against the reference encode function + ARROW_SCOPED_TRACE("num_values = ", num_values); + const auto input = MakeRandomInput(num_values); + std::vector encoded(num_values * kWidth); + ReferenceEncode(reinterpret_cast(input.data()), num_values, + encoded.data()); + std::vector decoded(num_values); + + std::default_random_engine gen(seed_++); + std::uniform_int_distribution chunk_size_dist(1, 123); + + for (const auto& decode_func : decode_funcs_) { + ARROW_SCOPED_TRACE("decode_func = ", decode_func); + decoded.assign(decoded.size(), T{}); + + int64_t offset = 0; + while (offset < num_values) { + auto chunk_size = std::min(num_values - offset, chunk_size_dist(gen)); + decode_func.func(encoded.data() + offset, chunk_size, /*stride=*/num_values, + decoded.data() + offset); + offset += chunk_size; + } + ASSERT_EQ(offset, num_values); + ASSERT_EQ(decoded, input); + } + } + + protected: + static void ReferenceEncode(const uint8_t* raw_values, const int64_t num_values, + uint8_t* output_buffer_raw) { + RefererenceByteStreamSplitEncode(raw_values, kWidth, num_values, output_buffer_raw); + } + + static std::vector MakeRandomInput(int64_t num_values) { + std::vector input(num_values); + random_bytes(kWidth * num_values, seed_++, reinterpret_cast(input.data())); + // Avoid NaNs to ease comparison + for (auto& value : input) { + if (std::isnan(value)) { + value = nan_replacement_++; + } + } + return input; + } + + std::vector encode_funcs_; + std::vector decode_funcs_; + + static inline uint32_t seed_ = 42; + static inline T nan_replacement_ = 0; +}; + +TYPED_TEST_SUITE(TestByteStreamSplitSpecialized, ByteStreamSplitTypes); + +TYPED_TEST(TestByteStreamSplitSpecialized, RoundtripSmall) { + for (int64_t num_values : {1, 5, 7, 12, 19, 31, 32}) { + this->TestRoundtrip(num_values); + } +} + +TYPED_TEST(TestByteStreamSplitSpecialized, RoundtripMidsized) { + for (int64_t num_values : {126, 127, 128, 129, 133, 200}) { + this->TestRoundtrip(num_values); + } +} + +TYPED_TEST(TestByteStreamSplitSpecialized, PiecewiseDecode) { + this->TestPiecewiseDecode(/*num_values=*/500); +} + +} // namespace arrow::util::internal diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 5221f2588c0d3..1bb487c20d3e2 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -37,7 +37,7 @@ #include "arrow/util/bit_util.h" #include "arrow/util/bitmap_ops.h" #include "arrow/util/bitmap_writer.h" -#include "arrow/util/byte_stream_split.h" +#include "arrow/util/byte_stream_split_internal.h" #include "arrow/util/checked_cast.h" #include "arrow/util/hashing.h" #include "arrow/util/int_util_overflow.h" @@ -850,8 +850,8 @@ std::shared_ptr ByteStreamSplitEncoder::FlushValues() { AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize()); uint8_t* output_buffer_raw = output_buffer->mutable_data(); const uint8_t* raw_values = sink_.data(); - ::arrow::util::internal::ByteStreamSplitEncode( - raw_values, static_cast(num_values_in_buffer_), output_buffer_raw); + ::arrow::util::internal::ByteStreamSplitEncode(raw_values, num_values_in_buffer_, + output_buffer_raw); sink_.Reset(); num_values_in_buffer_ = 0; return std::move(output_buffer); diff --git a/cpp/src/parquet/encoding_benchmark.cc b/cpp/src/parquet/encoding_benchmark.cc index 717c716330563..b5b6cc8d93e03 100644 --- a/cpp/src/parquet/encoding_benchmark.cc +++ b/cpp/src/parquet/encoding_benchmark.cc @@ -24,7 +24,7 @@ #include "arrow/testing/random.h" #include "arrow/testing/util.h" #include "arrow/type.h" -#include "arrow/util/byte_stream_split.h" +#include "arrow/util/byte_stream_split_internal.h" #include "arrow/visit_data_inline.h" #include "parquet/encoding.h"