From d5d1445eb12a6f83675b45efb05e89e94c6cf042 Mon Sep 17 00:00:00 2001 From: Jin Shang Date: Sat, 23 Dec 2023 23:50:39 +0800 Subject: [PATCH] GH-37055: [C++] Optimize hash kernels for Dictionary ChunkedArrays (#38394) ### Rationale for this change When merging dictionaries across chunks, the hash kernels unnecessarily unify the existing dictionary, dragging down the performance. ### What changes are included in this PR? Reuse the dictionary unifier across chunks. ### Are these changes tested? Yes, with a new benchmark for dictionary chunked arrays. ### Are there any user-facing changes? No. * Closes: #37055 Lead-authored-by: Jin Shang Co-authored-by: Felipe Oliveira Carvalho Signed-off-by: Felipe Oliveira Carvalho --- cpp/src/arrow/compute/kernels/vector_hash.cc | 55 +++++++++++++------ .../compute/kernels/vector_hash_benchmark.cc | 36 ++++++++++++ 2 files changed, 74 insertions(+), 17 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_hash.cc b/cpp/src/arrow/compute/kernels/vector_hash.cc index 65e59d1a2eb14..800deba3a5ed2 100644 --- a/cpp/src/arrow/compute/kernels/vector_hash.cc +++ b/cpp/src/arrow/compute/kernels/vector_hash.cc @@ -26,17 +26,20 @@ #include "arrow/array/concatenate.h" #include "arrow/array/dict_internal.h" #include "arrow/array/util.h" +#include "arrow/buffer.h" #include "arrow/compute/api_vector.h" #include "arrow/compute/cast.h" #include "arrow/compute/kernels/common_internal.h" #include "arrow/result.h" #include "arrow/util/hashing.h" +#include "arrow/util/int_util.h" #include "arrow/util/unreachable.h" namespace arrow { using internal::DictionaryTraits; using internal::HashTraits; +using internal::TransposeInts; namespace compute { namespace internal { @@ -448,9 +451,9 @@ class DictionaryHashKernel : public HashKernel { Status Append(const ArraySpan& arr) override { auto arr_dict = arr.dictionary().ToArray(); - if (!dictionary_) { - dictionary_ = arr_dict; - } else if (!dictionary_->Equals(*arr_dict)) { + if (!first_dictionary_) { + first_dictionary_ = arr_dict; + } else if (!first_dictionary_->Equals(*arr_dict)) { // NOTE: This approach computes a new dictionary unification per chunk. // This is in effect O(n*k) where n is the total chunked array length and // k is the number of chunks (therefore O(n**2) if chunks have a fixed size). @@ -458,21 +461,23 @@ class DictionaryHashKernel : public HashKernel { // A better approach may be to run the kernel over each individual chunk, // and then hash-aggregate all results (for example sum-group-by for // the "value_counts" kernel). - auto out_dict_type = dictionary_->type(); + if (dictionary_unifier_ == nullptr) { + ARROW_ASSIGN_OR_RAISE(dictionary_unifier_, + DictionaryUnifier::Make(first_dictionary_->type())); + RETURN_NOT_OK(dictionary_unifier_->Unify(*first_dictionary_)); + } + auto out_dict_type = first_dictionary_->type(); std::shared_ptr transpose_map; - std::shared_ptr out_dict; - ARROW_ASSIGN_OR_RAISE(auto unifier, DictionaryUnifier::Make(out_dict_type)); - ARROW_CHECK_OK(unifier->Unify(*dictionary_)); - ARROW_CHECK_OK(unifier->Unify(*arr_dict, &transpose_map)); - ARROW_CHECK_OK(unifier->GetResult(&out_dict_type, &out_dict)); + RETURN_NOT_OK(dictionary_unifier_->Unify(*arr_dict, &transpose_map)); - dictionary_ = out_dict; auto transpose = reinterpret_cast(transpose_map->data()); - auto in_dict_array = arr.ToArray(); + auto in_array = arr.ToArray(); + const auto& in_dict_array = + arrow::internal::checked_cast(*in_array); ARROW_ASSIGN_OR_RAISE( - auto tmp, arrow::internal::checked_cast(*in_dict_array) - .Transpose(arr.type->GetSharedPtr(), out_dict, transpose)); + auto tmp, in_dict_array.Transpose(arr.type->GetSharedPtr(), + in_dict_array.dictionary(), transpose)); return indices_kernel_->Append(*tmp->data()); } @@ -495,12 +500,27 @@ class DictionaryHashKernel : public HashKernel { return dictionary_value_type_; } - std::shared_ptr dictionary() const { return dictionary_; } + /// This can't be called more than once because DictionaryUnifier::GetResult() + /// can't be called more than once and produce the same output. + Result> dictionary() const { + if (!first_dictionary_) { // Append was never called + return nullptr; + } + if (!dictionary_unifier_) { // Append was called only once + return first_dictionary_; + } + + auto out_dict_type = first_dictionary_->type(); + std::shared_ptr out_dict; + RETURN_NOT_OK(dictionary_unifier_->GetResult(&out_dict_type, &out_dict)); + return out_dict; + } private: std::unique_ptr indices_kernel_; - std::shared_ptr dictionary_; + std::shared_ptr first_dictionary_; std::shared_ptr dictionary_value_type_; + std::unique_ptr dictionary_unifier_; }; // ---------------------------------------------------------------------- @@ -630,8 +650,9 @@ Status ValueCountsFinalize(KernelContext* ctx, std::vector* out) { // hence have no dictionary. Result> EnsureHashDictionary(KernelContext* ctx, DictionaryHashKernel* hash) { - if (hash->dictionary()) { - return hash->dictionary()->data(); + ARROW_ASSIGN_OR_RAISE(auto dict, hash->dictionary()); + if (dict) { + return dict->data(); } ARROW_ASSIGN_OR_RAISE(auto null, MakeArrayOfNull(hash->dictionary_value_type(), /*length=*/0, ctx->memory_pool())); diff --git a/cpp/src/arrow/compute/kernels/vector_hash_benchmark.cc b/cpp/src/arrow/compute/kernels/vector_hash_benchmark.cc index e9548e133aa00..472f50db8cf92 100644 --- a/cpp/src/arrow/compute/kernels/vector_hash_benchmark.cc +++ b/cpp/src/arrow/compute/kernels/vector_hash_benchmark.cc @@ -25,6 +25,7 @@ #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" #include "arrow/testing/util.h" +#include "arrow/util/logging.h" #include "arrow/compute/api.h" @@ -226,6 +227,33 @@ static void UniqueString100bytes(benchmark::State& state) { BenchUnique(state, HashParams{general_bench_cases[state.range(0)], 100}); } +template +void BenchValueCountsDictionaryChunks(benchmark::State& state, const ParamType& params) { + std::shared_ptr arr; + params.GenerateTestData(&arr); + // chunk arr to 100 slices + std::vector> chunks; + const int64_t chunk_size = arr->length() / 100; + for (int64_t i = 0; i < 100; ++i) { + auto slice = arr->Slice(i * chunk_size, chunk_size); + auto datum = DictionaryEncode(slice).ValueOrDie(); + ARROW_CHECK(datum.is_array()); + chunks.push_back(datum.make_array()); + } + auto chunked_array = std::make_shared(chunks); + + while (state.KeepRunning()) { + ABORT_NOT_OK(ValueCounts(chunked_array).status()); + } + params.SetMetadata(state); +} + +static void ValueCountsDictionaryChunks(benchmark::State& state) { + // Dictionary of byte strings with 10 bytes each + BenchValueCountsDictionaryChunks( + state, HashParams{general_bench_cases[state.range(0)], 10}); +} + void HashSetArgs(benchmark::internal::Benchmark* bench) { for (int i = 0; i < static_cast(general_bench_cases.size()); ++i) { bench->Arg(i); @@ -239,6 +267,14 @@ BENCHMARK(UniqueInt64)->Apply(HashSetArgs); BENCHMARK(UniqueString10bytes)->Apply(HashSetArgs); BENCHMARK(UniqueString100bytes)->Apply(HashSetArgs); +void DictionaryChunksHashSetArgs(benchmark::internal::Benchmark* bench) { + for (int i = 0; i < static_cast(general_bench_cases.size()); ++i) { + bench->Arg(i); + } +} + +BENCHMARK(ValueCountsDictionaryChunks)->Apply(DictionaryChunksHashSetArgs); + void UInt8SetArgs(benchmark::internal::Benchmark* bench) { for (int i = 0; i < static_cast(uint8_bench_cases.size()); ++i) { bench->Arg(i);