From ef95b1da2881f3f98c161670e8218469ed9b1404 Mon Sep 17 00:00:00 2001 From: Yuan Date: Wed, 10 Aug 2022 18:41:57 +0800 Subject: [PATCH] [NSE-928] allow to sort with big partitions (#1038) * remove sort limit Signed-off-by: Yuan Zhou * s/uint16/uint32 Signed-off-by: Yuan Zhou * Revert "s/uint16/uint32" This reverts commit 35c09098882ed1cb24c8bfe37f73fbd60a6d938a. * Revert "remove sort limit" This reverts commit 6b5738acc4574fb62af1216c5239f52373b81f51. * change sort limit Signed-off-by: Yuan Zhou * Revert "[NSE-928] Add ARROW_CHECK for batch_size check (#973)" This reverts commit c4d4a65030f01c89372a2af76b453c32902884fb. * fix window sort Signed-off-by: Yuan Zhou --- .../arrow_compute/ext/array_item_index.h | 6 +-- .../codegen/arrow_compute/ext/kernels_ext.h | 8 ++-- .../codegen/arrow_compute/ext/sort_kernel.cc | 11 ----- .../arrow_compute/ext/window_kernel.cc | 40 +++++++++---------- .../arrow_compute/ext/window_sort_kernel.h | 18 ++++----- native-sql-engine/cpp/src/shuffle/splitter.cc | 3 +- 6 files changed, 37 insertions(+), 49 deletions(-) diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/array_item_index.h b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/array_item_index.h index 8d5897a2d..4bf946293 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/array_item_index.h +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/array_item_index.h @@ -29,10 +29,10 @@ struct ArrayItemIndex { ArrayItemIndex(uint16_t array_id, uint16_t id) : array_id(array_id), id(id) {} }; struct ArrayItemIndexS { - uint16_t id = 0; - uint16_t array_id = 0; + uint32_t id = 0; + uint32_t array_id = 0; ArrayItemIndexS() : array_id(0), id(0) {} - ArrayItemIndexS(uint16_t array_id, uint16_t id) : array_id(array_id), id(id) {} + ArrayItemIndexS(uint32_t array_id, uint32_t id) : array_id(array_id), id(id) {} }; } // namespace extra diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h index 7ca6716de..ad9aca21b 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h @@ -325,13 +325,13 @@ class WindowRankKernel : public KernalBase { arrow::Status SortToIndicesPrepare(std::vector values); arrow::Status SortToIndicesFinish( - std::vector> elements_to_sort, - std::vector>* offsets); + std::vector> elements_to_sort, + std::vector>* offsets); template arrow::Status AreTheSameValue(const std::vector& values, int column, - std::shared_ptr i, - std::shared_ptr j, bool* out); + std::shared_ptr i, + std::shared_ptr j, bool* out); private: std::shared_ptr sorter_; diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc index 46981a761..cebaa965d 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc @@ -1399,9 +1399,7 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl { if (nulls_total_ == 0) { // if all batches have no null value, // we do not need to check whether the value is null - ARROW_CHECK_LE(num_batches_, 64 * 1024); for (int array_id = 0; array_id < num_batches_; array_id++) { - ARROW_CHECK_LE(length_list_[array_id], 64 * 1024); for (int64_t i = 0; i < length_list_[array_id]; i++) { (indices_begin + indices_i)->array_id = array_id; (indices_begin + indices_i)->id = i; @@ -1410,9 +1408,7 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl { } } else { // we should support nulls first and nulls last here - ARROW_CHECK_LE(num_batches_, 64 * 1024); for (int array_id = 0; array_id < num_batches_; array_id++) { - ARROW_CHECK_LE(length_list_[array_id], 64 * 1024); if (cached_key_[array_id]->null_count() == 0) { // if this array has no null value, // we do need to check if the value is null @@ -1460,9 +1456,7 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl { int64_t indices_i = 0; int64_t indices_nan = 0; - ARROW_CHECK_LE(num_batches_, 64 * 1024); for (int array_id = 0; array_id < num_batches_; array_id++) { - ARROW_CHECK_LE(length_list_[array_id], 64 * 1024); for (int64_t i = 0; i < length_list_[array_id]; i++) { if (cached_key_[array_id]->IsNull(i)) { continue; @@ -1994,7 +1988,6 @@ class SortArraysCodegenKernel : public SortArraysToIndicesKernel::Impl { return BaseCodes() + R"( -#include "arrow/util/logging.h" #include "precompile/wscgapi.hpp" @@ -2030,9 +2023,7 @@ class TypedSorterImpl : public CodeGenBase { ArrayItemIndexS* indices_end = indices_begin + items_total_; int64_t indices_i = 0; - ARROW_CHECK_LE(num_batches_, 64 * 1024); for (int array_id = 0; array_id < num_batches_; array_id++) { - ARROW_CHECK_LE(length_list_[array_id], 64*1024); for (int64_t i = 0; i < length_list_[array_id]; i++) { (indices_begin + indices_i)->array_id = array_id; (indices_begin + indices_i)->id = i; @@ -2590,9 +2581,7 @@ class SortMultiplekeyKernel : public SortArraysToIndicesKernel::Impl { void Partition(ArrayItemIndexS* indices_begin, ArrayItemIndexS* indices_end) { int64_t indices_i = 0; int64_t indices_null = 0; - ARROW_CHECK_LE(num_batches_, 64 * 1024); for (int array_id = 0; array_id < num_batches_; array_id++) { - ARROW_CHECK_LE(length_list_[array_id], 64 * 1024); for (int64_t i = 0; i < length_list_[array_id]; i++) { (indices_begin + indices_i)->array_id = array_id; (indices_begin + indices_i)->id = i; diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_kernel.cc index 028778a72..1a6b9d322 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_kernel.cc @@ -359,7 +359,7 @@ arrow::Status WindowRankKernel::Finish(ArrayList* out) { #endif // initialize partitions to be sorted - std::vector>> partitions_to_sort; + std::vector>> partitions_to_sort; for (int i = 0; i <= max_group_id; i++) { partitions_to_sort.emplace_back(); } @@ -376,18 +376,18 @@ arrow::Status WindowRankKernel::Finish(ArrayList* out) { } uint64_t partition_id = slice->GetView(j); partitions_to_sort.at(partition_id) - .push_back(std::make_shared(i, j)); + .push_back(std::make_shared(i, j)); } } #ifdef DEBUG std::cout << "[window kernel] Finished. " << std::endl; #endif - std::vector>> sorted_partitions; + std::vector>> sorted_partitions; RETURN_NOT_OK(SortToIndicesPrepare(values)); for (int i = 0; i <= max_group_id; i++) { - std::vector> partition = partitions_to_sort.at(i); - std::vector> sorted_partition; + std::vector> partition = partitions_to_sort.at(i); + std::vector> sorted_partition; #ifdef DEBUG std::cout << "[window kernel] Sorting a single partition... " << std::endl; #endif @@ -406,17 +406,17 @@ arrow::Status WindowRankKernel::Finish(ArrayList* out) { std::cout << "[window kernel] Generating rank result on a single partition... " << std::endl; #endif - std::vector> sorted_partition = + std::vector> sorted_partition = sorted_partitions.at(i); int assumed_rank = 0; for (int j = 0; j < sorted_partition.size(); j++) { ++assumed_rank; // rank value starts from 1 - std::shared_ptr index = sorted_partition.at(j); + std::shared_ptr index = sorted_partition.at(j); if (j == 0) { rank_array[index->array_id][index->id] = 1; // rank value starts from 1 continue; } - std::shared_ptr last_index = sorted_partition.at(j - 1); + std::shared_ptr last_index = sorted_partition.at(j - 1); bool same = true; for (int column_id = 0; column_id < type_list_.size(); column_id++) { bool s = false; @@ -478,11 +478,11 @@ arrow::Status WindowRankKernel::Finish(ArrayList* out) { return arrow::Status::OK(); } -static arrow::Status EncodeIndices(std::vector> in, +static arrow::Status EncodeIndices(std::vector> in, std::shared_ptr* out) { arrow::UInt64Builder builder; for (const auto& each : in) { - uint64_t encoded = ((uint64_t)(each->array_id) << 16U) ^ ((uint64_t)(each->id)); + uint64_t encoded = ((uint64_t)(each->array_id) << 32U) ^ ((uint64_t)(each->id)); RETURN_NOT_OK(builder.Append(encoded)); } RETURN_NOT_OK(builder.Finish(out)); @@ -490,15 +490,15 @@ static arrow::Status EncodeIndices(std::vector> } static arrow::Status DecodeIndices(std::shared_ptr in, - std::vector>* out) { - std::vector> v; + std::vector>* out) { + std::vector> v; std::shared_ptr selected = std::dynamic_pointer_cast(in); for (int i = 0; i < selected->length(); i++) { uint64_t encoded = selected->GetView(i); - uint16_t array_id = (encoded & 0xFFFF0000U) >> 16U; - uint16_t id = encoded & 0xFFFFU; - v.push_back(std::make_shared(array_id, id)); + uint32_t array_id = (encoded & 0xFFFFFFFF00000000U) >> 32U; + uint32_t id = encoded & 0xFFFFFFFFU; + v.push_back(std::make_shared(array_id, id)); } *out = v; return arrow::Status::OK(); @@ -513,13 +513,13 @@ arrow::Status WindowRankKernel::SortToIndicesPrepare(std::vector valu } arrow::Status WindowRankKernel::SortToIndicesFinish( - std::vector> elements_to_sort, - std::vector>* offsets) { + std::vector> elements_to_sort, + std::vector>* offsets) { std::shared_ptr in; std::shared_ptr out; RETURN_NOT_OK(EncodeIndices(elements_to_sort, &in)); RETURN_NOT_OK(sorter_->Finish(in, &out)); - std::vector> decoded_out; + std::vector> decoded_out; RETURN_NOT_OK(DecodeIndices(out, &decoded_out)); *offsets = decoded_out; return arrow::Status::OK(); @@ -529,8 +529,8 @@ arrow::Status WindowRankKernel::SortToIndicesFinish( template arrow::Status WindowRankKernel::AreTheSameValue(const std::vector& values, int column, - std::shared_ptr i, - std::shared_ptr j, + std::shared_ptr i, + std::shared_ptr j, bool* out) { if (is_row_number_) { *out = false; diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h index 63b635525..c56e03908 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h @@ -253,8 +253,8 @@ class TypedSorterImpl : public CodeGenBase { for (int i = 0; i < selected->length(); i++) { uint64_t encoded = selected->GetView(i); - uint16_t array_id = (encoded & 0xFFFF0000U) >> 16U; - uint16_t id = encoded & 0xFFFFU; + uint32_t array_id = (encoded & 0xFFFFFFFF00000000U) >> 32U; + uint32_t id = encoded & 0xFFFFFFFFU; (indices_begin + indices_i)->array_id = array_id; (indices_begin + indices_i)->id = id; indices_i++; @@ -262,7 +262,7 @@ class TypedSorterImpl : public CodeGenBase { )" + sort_func_str + R"( std::shared_ptr out_type; - RETURN_NOT_OK(MakeFixedSizeBinaryType(sizeof(ArrayItemIndexS) / sizeof(int32_t), &out_type)); + RETURN_NOT_OK(MakeFixedSizeBinaryType(sizeof(ArrayItemIndexS) / sizeof(int64_t), &out_type)); RETURN_NOT_OK(MakeFixedSizeBinaryArray(out_type, items_total, indices_buf, out)); return arrow::Status::OK(); } @@ -273,7 +273,7 @@ class TypedSorterImpl : public CodeGenBase { arrow::UInt64Builder builder; auto *index = (ArrayItemIndexS *) indices_out->value_data(); for (int i = 0; i < indices_out->length(); i++) { - uint64_t encoded = ((uint64_t) (index->array_id) << 16U) ^ ((uint64_t) (index->id)); + uint64_t encoded = ((uint64_t) (index->array_id) << 32U) ^ ((uint64_t) (index->id)); RETURN_NOT_OK(builder.Append(encoded)); index++; } @@ -538,8 +538,8 @@ class WindowSortOnekeyKernel : public WindowSortKernel::Impl { std::dynamic_pointer_cast(in); for (int i = 0; i < selected->length(); i++) { uint64_t encoded = selected->GetView(i); - uint16_t array_id = (encoded & 0xFFFF0000U) >> 16U; - uint16_t id = encoded & 0xFFFFU; + uint32_t array_id = (encoded & 0xFFFFFFFF00000000U) >> 32U; + uint32_t id = encoded & 0xFFFFFFFFU; auto key_clip = cached_key_.at(array_id); if (key_clip->IsNull(id)) { nulls_total++; @@ -561,8 +561,8 @@ class WindowSortOnekeyKernel : public WindowSortKernel::Impl { // we should also support desc and asc here for (int i = 0; i < selected->length(); i++) { uint64_t encoded = selected->GetView(i); - uint16_t array_id = (encoded & 0xFFFF0000U) >> 16U; - uint16_t id = encoded & 0xFFFFU; + uint32_t array_id = (encoded & 0xFFFFFFFF00000000U) >> 32U; + uint32_t id = encoded & 0xFFFFFFFFU; auto key_clip = cached_key_.at(array_id); if (nulls_first_) { if (!key_clip->IsNull(id)) { @@ -621,7 +621,7 @@ class WindowSortOnekeyKernel : public WindowSortKernel::Impl { arrow::UInt64Builder builder; auto* index = (ArrayItemIndexS*)indices_out->value_data(); for (int i = 0; i < indices_out->length(); i++) { - uint64_t encoded = ((uint64_t)(index->array_id) << 16U) ^ ((uint64_t)(index->id)); + uint64_t encoded = ((uint64_t)(index->array_id) << 32U) ^ ((uint64_t)(index->id)); RETURN_NOT_OK(builder.Append(encoded)); index++; } diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index 4fe7dd7b6..c7d4d64ac 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -854,8 +854,7 @@ Splitter::row_offset_type Splitter::CalculateSplitBatchSize( arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) { // buffer is allocated less than 64K - // Will uncomment ARROW_CHECK_LE here, after fix the max batch_size issue - // ARROW_CHECK_LE(rb.num_rows(), 64 * 1024); + // ARROW_CHECK_LE(rb.num_rows(),64*1024); reducer_offsets_.resize(rb.num_rows());