From d5cda4afd25503c360efe08989c3c58e62369e2f Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 26 Nov 2024 13:53:06 +0100 Subject: [PATCH] GH-44084: [C++] Improve merge step in chunked sorting (#44217) ### Rationale for this change When merge-sorting the chunks of a chunked array or table, we would currently repeatedly resolve the chunk indices for each individual value lookup. This requires `O(n*log k)` chunk resolutions with `n` being the chunked array or table length, and `k` the number of chunks. Instead, this PR translates the logical indices to physical all at once, without even requiring expensive chunk resolution as the logical indices are initially chunk-partitioned. This change yields significant speedups on chunked array and table sorting: ``` benchmark baseline contender change % counters ChunkedArraySortIndicesInt64Narrow/1048576/100 345.419 MiB/sec 628.334 MiB/sec 81.905 {'family_index': 0, 'per_family_instance_index': 6, 'run_name': 'ChunkedArraySortIndicesInt64Narrow/1048576/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 242, 'null_percent': 1.0} TableSortIndicesInt64Narrow/1048576/0/1/32 25.997M items/sec 44.550M items/sec 71.366 {'family_index': 3, 'per_family_instance_index': 11, 'run_name': 'TableSortIndicesInt64Narrow/1048576/0/1/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 17, 'chunks': 32.0, 'columns': 1.0, 'null_percent': 0.0} ChunkedArraySortIndicesInt64Wide/32768/10000 91.182 MiB/sec 153.756 MiB/sec 68.625 {'family_index': 1, 'per_family_instance_index': 0, 'run_name': 'ChunkedArraySortIndicesInt64Wide/32768/10000', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2067, 'null_percent': 0.01} ChunkedArraySortIndicesInt64Wide/32768/10 96.536 MiB/sec 161.648 MiB/sec 67.449 {'family_index': 1, 'per_family_instance_index': 2, 'run_name': 'ChunkedArraySortIndicesInt64Wide/32768/10', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2238, 'null_percent': 10.0} TableSortIndicesInt64Narrow/1048576/100/1/32 24.290M items/sec 40.513M items/sec 66.791 {'family_index': 3, 'per_family_instance_index': 9, 'run_name': 'TableSortIndicesInt64Narrow/1048576/100/1/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 16, 'chunks': 32.0, 'columns': 1.0, 'null_percent': 1.0} ChunkedArraySortIndicesInt64Wide/32768/100 90.030 MiB/sec 149.633 MiB/sec 66.203 {'family_index': 1, 'per_family_instance_index': 1, 'run_name': 'ChunkedArraySortIndicesInt64Wide/32768/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2017, 'null_percent': 1.0} ChunkedArraySortIndicesInt64Wide/32768/0 91.982 MiB/sec 152.840 MiB/sec 66.163 {'family_index': 1, 'per_family_instance_index': 5, 'run_name': 'ChunkedArraySortIndicesInt64Wide/32768/0', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2115, 'null_percent': 0.0} ChunkedArraySortIndicesInt64Narrow/8388608/100 240.335 MiB/sec 387.423 MiB/sec 61.201 {'family_index': 0, 'per_family_instance_index': 7, 'run_name': 'ChunkedArraySortIndicesInt64Narrow/8388608/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 21, 'null_percent': 1.0} ChunkedArraySortIndicesInt64Wide/32768/2 172.376 MiB/sec 274.133 MiB/sec 59.032 {'family_index': 1, 'per_family_instance_index': 3, 'run_name': 'ChunkedArraySortIndicesInt64Wide/32768/2', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3770, 'null_percent': 50.0} TableSortIndicesInt64Wide/1048576/4/1/32 7.407M items/sec 11.621M items/sec 56.904 {'family_index': 4, 'per_family_instance_index': 10, 'run_name': 'TableSortIndicesInt64Wide/1048576/4/1/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 5, 'chunks': 32.0, 'columns': 1.0, 'null_percent': 25.0} TableSortIndicesInt64Wide/1048576/100/1/32 5.788M items/sec 9.062M items/sec 56.565 {'family_index': 4, 'per_family_instance_index': 9, 'run_name': 'TableSortIndicesInt64Wide/1048576/100/1/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0, 'columns': 1.0, 'null_percent': 1.0} TableSortIndicesInt64Wide/1048576/0/1/32 5.785M items/sec 9.049M items/sec 56.409 {'family_index': 4, 'per_family_instance_index': 11, 'run_name': 'TableSortIndicesInt64Wide/1048576/0/1/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0, 'columns': 1.0, 'null_percent': 0.0} ChunkedArraySortIndicesInt64Narrow/32768/2 194.743 MiB/sec 291.432 MiB/sec 49.649 {'family_index': 0, 'per_family_instance_index': 3, 'run_name': 'ChunkedArraySortIndicesInt64Narrow/32768/2', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4340, 'null_percent': 50.0} TableSortIndicesInt64Narrow/1048576/4/1/32 25.686M items/sec 38.087M items/sec 48.279 {'family_index': 3, 'per_family_instance_index': 10, 'run_name': 'TableSortIndicesInt64Narrow/1048576/4/1/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 17, 'chunks': 32.0, 'columns': 1.0, 'null_percent': 25.0} TableSortIndicesInt64Wide/1048576/0/8/32 5.766M items/sec 8.374M items/sec 45.240 {'family_index': 4, 'per_family_instance_index': 5, 'run_name': 'TableSortIndicesInt64Wide/1048576/0/8/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0, 'columns': 8.0, 'null_percent': 0.0} TableSortIndicesInt64Wide/1048576/0/16/32 5.752M items/sec 8.352M items/sec 45.202 {'family_index': 4, 'per_family_instance_index': 2, 'run_name': 'TableSortIndicesInt64Wide/1048576/0/16/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0, 'columns': 16.0, 'null_percent': 0.0} ChunkedArraySortIndicesInt64Narrow/32768/10000 121.253 MiB/sec 175.286 MiB/sec 44.562 {'family_index': 0, 'per_family_instance_index': 0, 'run_name': 'ChunkedArraySortIndicesInt64Narrow/32768/10000', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2673, 'null_percent': 0.01} TableSortIndicesInt64Wide/1048576/100/2/32 5.549M items/sec 7.984M items/sec 43.876 {'family_index': 4, 'per_family_instance_index': 6, 'run_name': 'TableSortIndicesInt64Wide/1048576/100/2/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0, 'columns': 2.0, 'null_percent': 1.0} ChunkedArraySortIndicesInt64Wide/1048576/100 69.599 MiB/sec 99.666 MiB/sec 43.200 {'family_index': 1, 'per_family_instance_index': 6, 'run_name': 'ChunkedArraySortIndicesInt64Wide/1048576/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 49, 'null_percent': 1.0} TableSortIndicesInt64Narrow/1048576/0/1/4 55.940M items/sec 79.984M items/sec 42.982 {'family_index': 3, 'per_family_instance_index': 23, 'run_name': 'TableSortIndicesInt64Narrow/1048576/0/1/4', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 37, 'chunks': 4.0, 'columns': 1.0, 'null_percent': 0.0} TableSortIndicesInt64Wide/1048576/100/16/32 5.554M items/sec 7.909M items/sec 42.417 {'family_index': 4, 'per_family_instance_index': 0, 'run_name': 'TableSortIndicesInt64Wide/1048576/100/16/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0, 'columns': 16.0, 'null_percent': 1.0} ChunkedArraySortIndicesInt64Narrow/32768/10 127.758 MiB/sec 181.407 MiB/sec 41.992 {'family_index': 0, 'per_family_instance_index': 2, 'run_name': 'ChunkedArraySortIndicesInt64Narrow/32768/10', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2856, 'null_percent': 10.0} TableSortIndicesInt64Wide/1048576/100/8/32 5.572M items/sec 7.775M items/sec 39.548 {'family_index': 4, 'per_family_instance_index': 3, 'run_name': 'TableSortIndicesInt64Wide/1048576/100/8/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0, 'columns': 8.0, 'null_percent': 1.0} ChunkedArraySortIndicesInt64Narrow/32768/100 119.600 MiB/sec 166.454 MiB/sec 39.176 {'family_index': 0, 'per_family_instance_index': 1, 'run_name': 'ChunkedArraySortIndicesInt64Narrow/32768/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2667, 'null_percent': 1.0} TableSortIndicesInt64Wide/1048576/0/2/32 5.781M items/sec 8.016M items/sec 38.669 {'family_index': 4, 'per_family_instance_index': 8, 'run_name': 'TableSortIndicesInt64Wide/1048576/0/2/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0, 'columns': 2.0, 'null_percent': 0.0} TableSortIndicesInt64Narrow/1048576/100/1/4 52.252M items/sec 72.193M items/sec 38.162 {'family_index': 3, 'per_family_instance_index': 21, 'run_name': 'TableSortIndicesInt64Narrow/1048576/100/1/4', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 35, 'chunks': 4.0, 'columns': 1.0, 'null_percent': 1.0} ChunkedArraySortIndicesInt64Narrow/32768/0 121.868 MiB/sec 168.364 MiB/sec 38.152 {'family_index': 0, 'per_family_instance_index': 5, 'run_name': 'ChunkedArraySortIndicesInt64Narrow/32768/0', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2691, 'null_percent': 0.0} TableSortIndicesInt64Wide/1048576/4/2/32 5.017M items/sec 6.720M items/sec 33.934 {'family_index': 4, 'per_family_instance_index': 7, 'run_name': 'TableSortIndicesInt64Wide/1048576/4/2/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3, 'chunks': 32.0, 'columns': 2.0, 'null_percent': 25.0} ChunkedArraySortIndicesInt64Wide/8388608/100 54.785 MiB/sec 72.642 MiB/sec 32.593 {'family_index': 1, 'per_family_instance_index': 7, 'run_name': 'ChunkedArraySortIndicesInt64Wide/8388608/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 5, 'null_percent': 1.0} TableSortIndicesInt64Wide/1048576/4/8/32 4.222M items/sec 5.483M items/sec 29.861 {'family_index': 4, 'per_family_instance_index': 4, 'run_name': 'TableSortIndicesInt64Wide/1048576/4/8/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3, 'chunks': 32.0, 'columns': 8.0, 'null_percent': 25.0} ChunkedArraySortIndicesString/32768/10 146.866 MiB/sec 190.314 MiB/sec 29.583 {'family_index': 2, 'per_family_instance_index': 2, 'run_name': 'ChunkedArraySortIndicesString/32768/10', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3494, 'null_percent': 10.0} TableSortIndicesInt64Wide/1048576/4/16/32 4.225M items/sec 5.433M items/sec 28.599 {'family_index': 4, 'per_family_instance_index': 1, 'run_name': 'TableSortIndicesInt64Wide/1048576/4/16/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3, 'chunks': 32.0, 'columns': 16.0, 'null_percent': 25.0} TableSortIndicesInt64Narrow/1048576/100/16/32 2.193M items/sec 2.711M items/sec 23.652 {'family_index': 3, 'per_family_instance_index': 0, 'run_name': 'TableSortIndicesInt64Narrow/1048576/100/16/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2, 'chunks': 32.0, 'columns': 16.0, 'null_percent': 1.0} ChunkedArraySortIndicesString/32768/100 156.401 MiB/sec 191.910 MiB/sec 22.704 {'family_index': 2, 'per_family_instance_index': 1, 'run_name': 'ChunkedArraySortIndicesString/32768/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3488, 'null_percent': 1.0} TableSortIndicesInt64Narrow/1048576/4/1/4 47.342M items/sec 58.062M items/sec 22.644 {'family_index': 3, 'per_family_instance_index': 22, 'run_name': 'TableSortIndicesInt64Narrow/1048576/4/1/4', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 32, 'chunks': 4.0, 'columns': 1.0, 'null_percent': 25.0} ChunkedArraySortIndicesString/32768/0 161.457 MiB/sec 195.782 MiB/sec 21.259 {'family_index': 2, 'per_family_instance_index': 5, 'run_name': 'ChunkedArraySortIndicesString/32768/0', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3644, 'null_percent': 0.0} TableSortIndicesInt64Narrow/1048576/4/16/32 1.915M items/sec 2.309M items/sec 20.561 {'family_index': 3, 'per_family_instance_index': 1, 'run_name': 'TableSortIndicesInt64Narrow/1048576/4/16/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 1, 'chunks': 32.0, 'columns': 16.0, 'null_percent': 25.0} TableSortIndicesInt64Narrow/1048576/0/16/32 2.561M items/sec 3.079M items/sec 20.208 {'family_index': 3, 'per_family_instance_index': 2, 'run_name': 'TableSortIndicesInt64Narrow/1048576/0/16/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2, 'chunks': 32.0, 'columns': 16.0, 'null_percent': 0.0} ChunkedArraySortIndicesString/32768/10000 157.786 MiB/sec 189.412 MiB/sec 20.043 {'family_index': 2, 'per_family_instance_index': 0, 'run_name': 'ChunkedArraySortIndicesString/32768/10000', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3539, 'null_percent': 0.01} ChunkedArraySortIndicesString/32768/2 139.241 MiB/sec 164.172 MiB/sec 17.904 {'family_index': 2, 'per_family_instance_index': 3, 'run_name': 'ChunkedArraySortIndicesString/32768/2', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3155, 'null_percent': 50.0} TableSortIndicesInt64Narrow/1048576/0/8/32 2.595M items/sec 3.038M items/sec 17.081 {'family_index': 3, 'per_family_instance_index': 5, 'run_name': 'TableSortIndicesInt64Narrow/1048576/0/8/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2, 'chunks': 32.0, 'columns': 8.0, 'null_percent': 0.0} TableSortIndicesInt64Narrow/1048576/4/8/32 1.999M items/sec 2.298M items/sec 14.936 {'family_index': 3, 'per_family_instance_index': 4, 'run_name': 'TableSortIndicesInt64Narrow/1048576/4/8/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 1, 'chunks': 32.0, 'columns': 8.0, 'null_percent': 25.0} ChunkedArraySortIndicesString/8388608/100 81.026 MiB/sec 93.120 MiB/sec 14.926 {'family_index': 2, 'per_family_instance_index': 7, 'run_name': 'ChunkedArraySortIndicesString/8388608/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 7, 'null_percent': 1.0} TableSortIndicesInt64Narrow/1048576/100/8/32 2.382M items/sec 2.719M items/sec 14.168 {'family_index': 3, 'per_family_instance_index': 3, 'run_name': 'TableSortIndicesInt64Narrow/1048576/100/8/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2, 'chunks': 32.0, 'columns': 8.0, 'null_percent': 1.0} ChunkedArraySortIndicesString/1048576/100 107.722 MiB/sec 122.229 MiB/sec 13.467 {'family_index': 2, 'per_family_instance_index': 6, 'run_name': 'ChunkedArraySortIndicesString/1048576/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 77, 'null_percent': 1.0} TableSortIndicesInt64Narrow/1048576/100/2/32 4.019M items/sec 4.477M items/sec 11.383 {'family_index': 3, 'per_family_instance_index': 6, 'run_name': 'TableSortIndicesInt64Narrow/1048576/100/2/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3, 'chunks': 32.0, 'columns': 2.0, 'null_percent': 1.0} TableSortIndicesInt64Wide/1048576/4/1/4 11.595M items/sec 12.791M items/sec 10.314 {'family_index': 4, 'per_family_instance_index': 22, 'run_name': 'TableSortIndicesInt64Wide/1048576/4/1/4', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 8, 'chunks': 4.0, 'columns': 1.0, 'null_percent': 25.0} TableSortIndicesInt64Wide/1048576/0/1/4 9.231M items/sec 10.181M items/sec 10.294 {'family_index': 4, 'per_family_instance_index': 23, 'run_name': 'TableSortIndicesInt64Wide/1048576/0/1/4', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 6, 'chunks': 4.0, 'columns': 1.0, 'null_percent': 0.0} ``` However, performance also regresses when the input is all-nulls (which is probably rare): ``` benchmark baseline contender change % counters ChunkedArraySortIndicesString/32768/1 5.636 GiB/sec 4.336 GiB/sec -23.068 {'family_index': 2, 'per_family_instance_index': 4, 'run_name': 'ChunkedArraySortIndicesString/32768/1', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 127778, 'null_percent': 100.0} ChunkedArraySortIndicesInt64Narrow/32768/1 3.963 GiB/sec 2.852 GiB/sec -28.025 {'family_index': 0, 'per_family_instance_index': 4, 'run_name': 'ChunkedArraySortIndicesInt64Narrow/32768/1', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 91209, 'null_percent': 100.0} ChunkedArraySortIndicesInt64Wide/32768/1 4.038 GiB/sec 2.869 GiB/sec -28.954 {'family_index': 1, 'per_family_instance_index': 4, 'run_name': 'ChunkedArraySortIndicesInt64Wide/32768/1', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 94090, 'null_percent': 100.0} ``` ### Are these changes tested? Yes, by existing tests. ### Are there any user-facing changes? No. * GitHub Issue: #44084 Authored-by: Antoine Pitrou Signed-off-by: Antoine Pitrou --- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/chunk_resolver.cc | 10 +- cpp/src/arrow/chunk_resolver.h | 10 +- .../arrow/compute/kernels/chunked_internal.cc | 122 ++++++++ .../arrow/compute/kernels/chunked_internal.h | 121 ++++++-- cpp/src/arrow/compute/kernels/vector_rank.cc | 4 +- cpp/src/arrow/compute/kernels/vector_sort.cc | 283 ++++++++++-------- .../compute/kernels/vector_sort_internal.h | 141 ++++++--- 8 files changed, 496 insertions(+), 196 deletions(-) create mode 100644 cpp/src/arrow/compute/kernels/chunked_internal.cc diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 5f6b568460afe..4e40056839ce2 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -731,6 +731,7 @@ set(ARROW_COMPUTE_SRCS compute/light_array_internal.cc compute/ordering.cc compute/registry.cc + compute/kernels/chunked_internal.cc compute/kernels/codegen_internal.cc compute/kernels/ree_util_internal.cc compute/kernels/scalar_cast_boolean.cc diff --git a/cpp/src/arrow/chunk_resolver.cc b/cpp/src/arrow/chunk_resolver.cc index ca74ffa06c820..7fc259f38c245 100644 --- a/cpp/src/arrow/chunk_resolver.cc +++ b/cpp/src/arrow/chunk_resolver.cc @@ -28,6 +28,8 @@ namespace arrow { +using util::span; + namespace { template int64_t GetLength(const T& array) { @@ -42,7 +44,7 @@ int64_t GetLength>( } template -inline std::vector MakeChunksOffsets(const std::vector& chunks) { +inline std::vector MakeChunksOffsets(span chunks) { std::vector offsets(chunks.size() + 1); int64_t offset = 0; std::transform(chunks.begin(), chunks.end(), offsets.begin(), @@ -112,13 +114,13 @@ void ResolveManyInline(uint32_t num_offsets, const int64_t* signed_offsets, } // namespace ChunkResolver::ChunkResolver(const ArrayVector& chunks) noexcept - : offsets_(MakeChunksOffsets(chunks)), cached_chunk_(0) {} + : offsets_(MakeChunksOffsets(span(chunks))), cached_chunk_(0) {} -ChunkResolver::ChunkResolver(const std::vector& chunks) noexcept +ChunkResolver::ChunkResolver(span chunks) noexcept : offsets_(MakeChunksOffsets(chunks)), cached_chunk_(0) {} ChunkResolver::ChunkResolver(const RecordBatchVector& batches) noexcept - : offsets_(MakeChunksOffsets(batches)), cached_chunk_(0) {} + : offsets_(MakeChunksOffsets(span(batches))), cached_chunk_(0) {} ChunkResolver::ChunkResolver(ChunkResolver&& other) noexcept : offsets_(std::move(other.offsets_)), diff --git a/cpp/src/arrow/chunk_resolver.h b/cpp/src/arrow/chunk_resolver.h index ab0e753d0040e..3d6458167fac9 100644 --- a/cpp/src/arrow/chunk_resolver.h +++ b/cpp/src/arrow/chunk_resolver.h @@ -26,6 +26,7 @@ #include "arrow/type_fwd.h" #include "arrow/util/macros.h" +#include "arrow/util/span.h" namespace arrow { @@ -76,11 +77,14 @@ class ARROW_EXPORT ChunkResolver { public: explicit ChunkResolver(const ArrayVector& chunks) noexcept; - - explicit ChunkResolver(const std::vector& chunks) noexcept; - + explicit ChunkResolver(util::span chunks) noexcept; explicit ChunkResolver(const RecordBatchVector& batches) noexcept; + /// \brief Construct a ChunkResolver from a vector of chunks.size() + 1 offsets. + /// + /// The first offset must be 0 and the last offset must be the logical length of the + /// chunked array. Each offset before the last represents the starting logical index of + /// the corresponding chunk. explicit ChunkResolver(std::vector offsets) noexcept : offsets_(std::move(offsets)), cached_chunk_(0) { #ifndef NDEBUG diff --git a/cpp/src/arrow/compute/kernels/chunked_internal.cc b/cpp/src/arrow/compute/kernels/chunked_internal.cc new file mode 100644 index 0000000000000..e72b8e1f5bfb7 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/chunked_internal.cc @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/kernels/chunked_internal.h" + +#include + +#include "arrow/record_batch.h" +#include "arrow/util/logging.h" + +namespace arrow::compute::internal { + +std::vector GetArrayPointers(const ArrayVector& arrays) { + std::vector pointers(arrays.size()); + std::transform(arrays.begin(), arrays.end(), pointers.begin(), + [&](const std::shared_ptr& array) { return array.get(); }); + return pointers; +} + +std::vector ChunkedIndexMapper::GetChunkLengths( + util::span chunks) { + std::vector chunk_lengths(chunks.size()); + for (int64_t i = 0; i < static_cast(chunks.size()); ++i) { + chunk_lengths[i] = chunks[i]->length(); + } + return chunk_lengths; +} + +std::vector ChunkedIndexMapper::GetChunkLengths( + const RecordBatchVector& chunks) { + std::vector chunk_lengths(chunks.size()); + for (int64_t i = 0; i < static_cast(chunks.size()); ++i) { + chunk_lengths[i] = chunks[i]->num_rows(); + } + return chunk_lengths; +} + +Result> +ChunkedIndexMapper::LogicalToPhysical() { + // Check that indices would fall in bounds for CompressedChunkLocation + if (ARROW_PREDICT_FALSE(chunk_lengths_.size() > + CompressedChunkLocation::kMaxChunkIndex + 1)) { + return Status::NotImplemented("Chunked array has more than ", + CompressedChunkLocation::kMaxChunkIndex + 1, " chunks"); + } + for (int64_t chunk_length : chunk_lengths_) { + if (ARROW_PREDICT_FALSE(static_cast(chunk_length) > + CompressedChunkLocation::kMaxIndexInChunk + 1)) { + return Status::NotImplemented("Individual chunk in chunked array has more than ", + CompressedChunkLocation::kMaxIndexInChunk + 1, + " elements"); + } + } + + const int64_t num_indices = static_cast(indices_end_ - indices_begin_); + DCHECK_EQ(num_indices, std::accumulate(chunk_lengths_.begin(), chunk_lengths_.end(), + static_cast(0))); + CompressedChunkLocation* physical_begin = + reinterpret_cast(indices_begin_); + DCHECK_EQ(physical_begin + num_indices, + reinterpret_cast(indices_end_)); + + int64_t chunk_offset = 0; + for (int64_t chunk_index = 0; chunk_index < static_cast(chunk_lengths_.size()); + ++chunk_index) { + const int64_t chunk_length = chunk_lengths_[chunk_index]; + for (int64_t i = 0; i < chunk_length; ++i) { + // Logical indices are expected to be chunk-partitioned, which avoids costly + // chunked index resolution. + DCHECK_GE(indices_begin_[chunk_offset + i], static_cast(chunk_offset)); + DCHECK_LT(indices_begin_[chunk_offset + i], + static_cast(chunk_offset + chunk_length)); + physical_begin[chunk_offset + i] = CompressedChunkLocation{ + static_cast(chunk_index), + indices_begin_[chunk_offset + i] - static_cast(chunk_offset)}; + } + chunk_offset += chunk_length; + } + + return std::pair{physical_begin, physical_begin + num_indices}; +} + +Status ChunkedIndexMapper::PhysicalToLogical() { + std::vector chunk_offsets(chunk_lengths_.size()); + { + int64_t offset = 0; + for (int64_t i = 0; i < static_cast(chunk_lengths_.size()); ++i) { + chunk_offsets[i] = offset; + offset += chunk_lengths_[i]; + } + } + + const int64_t num_indices = static_cast(indices_end_ - indices_begin_); + CompressedChunkLocation* physical_begin = + reinterpret_cast(indices_begin_); + for (int64_t i = 0; i < num_indices; ++i) { + const auto loc = physical_begin[i]; + DCHECK_LT(loc.chunk_index(), chunk_offsets.size()); + DCHECK_LT(loc.index_in_chunk(), + static_cast(chunk_lengths_[loc.chunk_index()])); + indices_begin_[i] = + chunk_offsets[loc.chunk_index()] + static_cast(loc.index_in_chunk()); + } + + return Status::OK(); +} + +} // namespace arrow::compute::internal diff --git a/cpp/src/arrow/compute/kernels/chunked_internal.h b/cpp/src/arrow/compute/kernels/chunked_internal.h index f7cb615f3ed81..5bc8233016f30 100644 --- a/cpp/src/arrow/compute/kernels/chunked_internal.h +++ b/cpp/src/arrow/compute/kernels/chunked_internal.h @@ -20,26 +20,32 @@ #include #include #include +#include #include #include "arrow/array.h" #include "arrow/chunk_resolver.h" #include "arrow/compute/kernels/codegen_internal.h" +#include "arrow/util/span.h" -namespace arrow { -namespace compute { -namespace internal { +namespace arrow::compute::internal { // The target chunk in a chunked array. struct ResolvedChunk { // The target array in chunked array. const Array* array; // The index in the target array. - const int64_t index; + int64_t index; ResolvedChunk(const Array* array, int64_t index) : array(array), index(index) {} - public: + friend bool operator==(const ResolvedChunk& left, const ResolvedChunk& right) { + return left.array == right.array && left.index == right.index; + } + friend bool operator!=(const ResolvedChunk& left, const ResolvedChunk& right) { + return left.array != right.array || left.index != right.index; + } + bool IsNull() const { return array->IsNull(index); } template > @@ -50,20 +56,63 @@ struct ResolvedChunk { } }; +// A compressed (chunk_index, index_in_chunk) pair. +// The goal of compression is to make it fit in 64 bits, allowing in place +// replacement of logical uint64_t indices with physical indices. +// (see ChunkedIndexMapper) +struct CompressedChunkLocation { + static constexpr int kChunkIndexBits = 24; + static constexpr int KIndexInChunkBits = 64 - kChunkIndexBits; + + static constexpr uint64_t kMaxChunkIndex = (1ULL << kChunkIndexBits) - 1; + static constexpr uint64_t kMaxIndexInChunk = (1ULL << KIndexInChunkBits) - 1; + + CompressedChunkLocation() = default; + + constexpr uint64_t chunk_index() const { return data_ & kMaxChunkIndex; } + constexpr uint64_t index_in_chunk() const { return data_ >> kChunkIndexBits; } + + explicit constexpr CompressedChunkLocation(uint64_t chunk_index, + uint64_t index_in_chunk) + : data_((index_in_chunk << kChunkIndexBits) | chunk_index) {} + + template + explicit operator TypedChunkLocation() { + return {static_cast(chunk_index()), + static_cast(index_in_chunk())}; + } + + private: + uint64_t data_; +}; + +static_assert(sizeof(uint64_t) == sizeof(CompressedChunkLocation)); + class ChunkedArrayResolver { private: ChunkResolver resolver_; - std::vector chunks_; + util::span chunks_; + std::vector owned_chunks_; public: - explicit ChunkedArrayResolver(const std::vector& chunks) + explicit ChunkedArrayResolver(std::vector&& chunks) + : resolver_(chunks), chunks_(chunks), owned_chunks_(std::move(chunks)) {} + explicit ChunkedArrayResolver(util::span chunks) : resolver_(chunks), chunks_(chunks) {} - ChunkedArrayResolver(ChunkedArrayResolver&& other) = default; - ChunkedArrayResolver& operator=(ChunkedArrayResolver&& other) = default; + ARROW_DEFAULT_MOVE_AND_ASSIGN(ChunkedArrayResolver); - ChunkedArrayResolver(const ChunkedArrayResolver& other) = default; - ChunkedArrayResolver& operator=(const ChunkedArrayResolver& other) = default; + ChunkedArrayResolver(const ChunkedArrayResolver& other) + : resolver_(other.resolver_), owned_chunks_(other.owned_chunks_) { + // Rebind span to owned_chunks_ if necessary + chunks_ = owned_chunks_.empty() ? other.chunks_ : owned_chunks_; + } + ChunkedArrayResolver& operator=(const ChunkedArrayResolver& other) { + resolver_ = other.resolver_; + owned_chunks_ = other.owned_chunks_; + chunks_ = owned_chunks_.empty() ? other.chunks_ : owned_chunks_; + return *this; + } ResolvedChunk Resolve(int64_t index) const { const auto loc = resolver_.Resolve(index); @@ -71,13 +120,45 @@ class ChunkedArrayResolver { } }; -inline std::vector GetArrayPointers(const ArrayVector& arrays) { - std::vector pointers(arrays.size()); - std::transform(arrays.begin(), arrays.end(), pointers.begin(), - [&](const std::shared_ptr& array) { return array.get(); }); - return pointers; -} +std::vector GetArrayPointers(const ArrayVector& arrays); + +// A class that turns logical (linear) indices into physical (chunked) indices, +// and vice-versa. +class ChunkedIndexMapper { + public: + ChunkedIndexMapper(const std::vector& chunks, uint64_t* indices_begin, + uint64_t* indices_end) + : ChunkedIndexMapper(util::span(chunks), indices_begin, indices_end) {} + ChunkedIndexMapper(util::span chunks, uint64_t* indices_begin, + uint64_t* indices_end) + : chunk_lengths_(GetChunkLengths(chunks)), + indices_begin_(indices_begin), + indices_end_(indices_end) {} + ChunkedIndexMapper(const RecordBatchVector& chunks, uint64_t* indices_begin, + uint64_t* indices_end) + : chunk_lengths_(GetChunkLengths(chunks)), + indices_begin_(indices_begin), + indices_end_(indices_end) {} + + // Turn the original uint64_t logical indices into physical. This reuses the + // same memory area, so the logical indices cannot be used anymore until + // PhysicalToLogical() is called. + // + // This assumes that the logical indices are originally chunk-partitioned. + Result> + LogicalToPhysical(); + + // Turn the physical indices back into logical, making the uint64_t indices + // usable again. + Status PhysicalToLogical(); + + private: + static std::vector GetChunkLengths(util::span chunks); + static std::vector GetChunkLengths(const RecordBatchVector& chunks); + + std::vector chunk_lengths_; + uint64_t* indices_begin_; + uint64_t* indices_end_; +}; -} // namespace internal -} // namespace compute -} // namespace arrow +} // namespace arrow::compute::internal diff --git a/cpp/src/arrow/compute/kernels/vector_rank.cc b/cpp/src/arrow/compute/kernels/vector_rank.cc index c4e52701411fd..b374862fe6d2c 100644 --- a/cpp/src/arrow/compute/kernels/vector_rank.cc +++ b/cpp/src/arrow/compute/kernels/vector_rank.cc @@ -21,6 +21,8 @@ namespace arrow::compute::internal { +using ::arrow::util::span; + namespace { // ---------------------------------------------------------------------- @@ -237,7 +239,7 @@ class Ranker : public RankerMixin(); }; ARROW_ASSIGN_OR_RAISE(*output_, CreateRankings(ctx_, sorted, null_placement_, diff --git a/cpp/src/arrow/compute/kernels/vector_sort.cc b/cpp/src/arrow/compute/kernels/vector_sort.cc index 395ed86a06b4a..d81187837de7e 100644 --- a/cpp/src/arrow/compute/kernels/vector_sort.cc +++ b/cpp/src/arrow/compute/kernels/vector_sort.cc @@ -24,6 +24,7 @@ namespace arrow { using internal::checked_cast; +using util::span; namespace compute { namespace internal { @@ -82,6 +83,7 @@ class ChunkedArraySorter : public TypeVisitor { *output_ = {indices_end_, indices_end_, indices_end_, indices_end_}; return Status::OK(); } + const int64_t num_indices = static_cast(indices_end_ - indices_begin_); const auto arrays = GetArrayPointers(physical_chunks_); // Sort each chunk independently and merge to sorted indices. @@ -101,45 +103,63 @@ class ChunkedArraySorter : public TypeVisitor { begin_offset, options, ctx_)); begin_offset = end_offset; } - DCHECK_EQ(end_offset, indices_end_ - indices_begin_); + DCHECK_EQ(end_offset, num_indices); // Then merge them by pairs, recursively if (sorted.size() > 1) { - auto merge_nulls = [&](uint64_t* nulls_begin, uint64_t* nulls_middle, - uint64_t* nulls_end, uint64_t* temp_indices, - int64_t null_count) { + ChunkedIndexMapper chunked_mapper(arrays, indices_begin_, indices_end_); + ARROW_ASSIGN_OR_RAISE(auto chunked_indices_pair, + chunked_mapper.LogicalToPhysical()); + auto [chunked_indices_begin, chunked_indices_end] = chunked_indices_pair; + + std::vector chunk_sorted(num_chunks); + for (int i = 0; i < num_chunks; ++i) { + chunk_sorted[i] = sorted[i].TranslateTo(indices_begin_, chunked_indices_begin); + } + + auto merge_nulls = [&](CompressedChunkLocation* nulls_begin, + CompressedChunkLocation* nulls_middle, + CompressedChunkLocation* nulls_end, + CompressedChunkLocation* temp_indices, int64_t null_count) { if (has_null_like_values::value) { - PartitionNullsOnly(nulls_begin, nulls_end, - ChunkedArrayResolver(arrays), null_count, - null_placement_); + PartitionNullsOnly(nulls_begin, nulls_end, arrays, + null_count, null_placement_); } }; - auto merge_non_nulls = [&](uint64_t* range_begin, uint64_t* range_middle, - uint64_t* range_end, uint64_t* temp_indices) { - MergeNonNulls(range_begin, range_middle, range_end, arrays, - temp_indices); - }; - - MergeImpl merge_impl{null_placement_, std::move(merge_nulls), - std::move(merge_non_nulls)}; + auto merge_non_nulls = + [&](CompressedChunkLocation* range_begin, CompressedChunkLocation* range_middle, + CompressedChunkLocation* range_end, CompressedChunkLocation* temp_indices) { + MergeNonNulls(range_begin, range_middle, range_end, arrays, + temp_indices); + }; + + ChunkedMergeImpl merge_impl{null_placement_, std::move(merge_nulls), + std::move(merge_non_nulls)}; // std::merge is only called on non-null values, so size temp indices accordingly - RETURN_NOT_OK(merge_impl.Init(ctx_, indices_end_ - indices_begin_ - null_count)); + RETURN_NOT_OK(merge_impl.Init(ctx_, num_indices - null_count)); - while (sorted.size() > 1) { - auto out_it = sorted.begin(); - auto it = sorted.begin(); - while (it < sorted.end() - 1) { + while (chunk_sorted.size() > 1) { + // Merge all pairs of chunks + auto out_it = chunk_sorted.begin(); + auto it = chunk_sorted.begin(); + while (it < chunk_sorted.end() - 1) { const auto& left = *it++; const auto& right = *it++; DCHECK_EQ(left.overall_end(), right.overall_begin()); const auto merged = merge_impl.Merge(left, right, null_count); *out_it++ = merged; } - if (it < sorted.end()) { + if (it < chunk_sorted.end()) { *out_it++ = *it++; } - sorted.erase(out_it, sorted.end()); + chunk_sorted.erase(out_it, chunk_sorted.end()); } + + // Reverse everything + sorted.resize(1); + sorted[0] = chunk_sorted[0].TranslateTo(chunked_indices_begin, indices_begin_); + + RETURN_NOT_OK(chunked_mapper.PhysicalToLogical()); } DCHECK_EQ(sorted.size(), 1); @@ -153,34 +173,39 @@ class ChunkedArraySorter : public TypeVisitor { } template - void MergeNonNulls(uint64_t* range_begin, uint64_t* range_middle, uint64_t* range_end, - const std::vector& arrays, uint64_t* temp_indices) { + void MergeNonNulls(CompressedChunkLocation* range_begin, + CompressedChunkLocation* range_middle, + CompressedChunkLocation* range_end, span arrays, + CompressedChunkLocation* temp_indices) { using ArrowType = typename ArrayType::TypeClass; - const ChunkedArrayResolver left_resolver(arrays); - const ChunkedArrayResolver right_resolver(arrays); if (order_ == SortOrder::Ascending) { std::merge(range_begin, range_middle, range_middle, range_end, temp_indices, - [&](uint64_t left, uint64_t right) { - const auto chunk_left = left_resolver.Resolve(left); - const auto chunk_right = right_resolver.Resolve(right); - return chunk_left.Value() < chunk_right.Value(); + [&](CompressedChunkLocation left, CompressedChunkLocation right) { + return ChunkValue(arrays, left) < + ChunkValue(arrays, right); }); } else { std::merge(range_begin, range_middle, range_middle, range_end, temp_indices, - [&](uint64_t left, uint64_t right) { - const auto chunk_left = left_resolver.Resolve(left); - const auto chunk_right = right_resolver.Resolve(right); + [&](CompressedChunkLocation left, CompressedChunkLocation right) { // We don't use 'left > right' here to reduce required // operator. If we use 'right < left' here, '<' is only // required. - return chunk_right.Value() < chunk_left.Value(); + return ChunkValue(arrays, right) < + ChunkValue(arrays, left); }); } // Copy back temp area into main buffer std::copy(temp_indices, temp_indices + (range_end - range_begin), range_begin); } + template + auto ChunkValue(span arrays, CompressedChunkLocation loc) const { + return ResolvedChunk(arrays[loc.chunk_index()], + static_cast(loc.index_in_chunk())) + .template Value(); + } + uint64_t* indices_begin_; uint64_t* indices_end_; const std::shared_ptr& physical_type_; @@ -610,8 +635,6 @@ class TableSorter { batches_(MakeBatches(table, &status_)), options_(options), null_placement_(options.null_placement), - left_resolver_(batches_), - right_resolver_(batches_), sort_keys_(ResolveSortKeys(table, batches_, options.sort_keys, &status_)), indices_begin_(indices_begin), indices_end_(indices_end), @@ -674,14 +697,24 @@ class TableSorter { // Then merge them by pairs, recursively if (sorted.size() > 1) { + ChunkedIndexMapper chunked_mapper(batches_, indices_begin_, indices_end_); + ARROW_ASSIGN_OR_RAISE(auto chunked_indices_pair, + chunked_mapper.LogicalToPhysical()); + auto [chunked_indices_begin, chunked_indices_end] = chunked_indices_pair; + + std::vector chunk_sorted(num_batches); + for (int64_t i = 0; i < num_batches; ++i) { + chunk_sorted[i] = sorted[i].TranslateTo(indices_begin_, chunked_indices_begin); + } + struct Visitor { TableSorter* sorter; - std::vector* sorted; + std::vector* chunk_sorted; int64_t null_count; -#define VISIT(TYPE) \ - Status Visit(const TYPE& type) { \ - return sorter->MergeInternal(std::move(*sorted), null_count); \ +#define VISIT(TYPE) \ + Status Visit(const TYPE& type) { \ + return sorter->MergeInternal(chunk_sorted, null_count); \ } VISIT_SORTABLE_PHYSICAL_TYPES(VISIT) @@ -693,104 +726,101 @@ class TableSorter { type.ToString()); } }; - Visitor visitor{this, &sorted, null_count}; + Visitor visitor{this, &chunk_sorted, null_count}; RETURN_NOT_OK(VisitTypeInline(*sort_keys_[0].type, &visitor)); + + DCHECK_EQ(chunk_sorted.size(), 1); + DCHECK_EQ(chunk_sorted[0].overall_begin(), chunked_indices_begin); + DCHECK_EQ(chunk_sorted[0].overall_end(), chunked_indices_end); + + RETURN_NOT_OK(chunked_mapper.PhysicalToLogical()); } return Status::OK(); } // Recursive merge routine, typed on the first sort key - template - Status MergeInternal(std::vector sorted, int64_t null_count) { - auto merge_nulls = [&](uint64_t* nulls_begin, uint64_t* nulls_middle, - uint64_t* nulls_end, uint64_t* temp_indices, - int64_t null_count) { - MergeNulls(nulls_begin, nulls_middle, nulls_end, temp_indices, null_count); + template + Status MergeInternal(std::vector* sorted, + int64_t null_count) { + auto merge_nulls = [&](CompressedChunkLocation* nulls_begin, + CompressedChunkLocation* nulls_middle, + CompressedChunkLocation* nulls_end, + CompressedChunkLocation* temp_indices, int64_t null_count) { + MergeNulls(nulls_begin, nulls_middle, nulls_end, temp_indices, + null_count); }; - auto merge_non_nulls = [&](uint64_t* range_begin, uint64_t* range_middle, - uint64_t* range_end, uint64_t* temp_indices) { - MergeNonNulls(range_begin, range_middle, range_end, temp_indices); - }; - - MergeImpl merge_impl(options_.null_placement, std::move(merge_nulls), - std::move(merge_non_nulls)); + auto merge_non_nulls = + [&](CompressedChunkLocation* range_begin, CompressedChunkLocation* range_middle, + CompressedChunkLocation* range_end, CompressedChunkLocation* temp_indices) { + MergeNonNulls(range_begin, range_middle, range_end, temp_indices); + }; + + ChunkedMergeImpl merge_impl(options_.null_placement, std::move(merge_nulls), + std::move(merge_non_nulls)); RETURN_NOT_OK(merge_impl.Init(ctx_, table_.num_rows())); - while (sorted.size() > 1) { - auto out_it = sorted.begin(); - auto it = sorted.begin(); - while (it < sorted.end() - 1) { + while (sorted->size() > 1) { + auto out_it = sorted->begin(); + auto it = sorted->begin(); + while (it < sorted->end() - 1) { const auto& left = *it++; const auto& right = *it++; DCHECK_EQ(left.overall_end(), right.overall_begin()); *out_it++ = merge_impl.Merge(left, right, null_count); } - if (it < sorted.end()) { + if (it < sorted->end()) { *out_it++ = *it++; } - sorted.erase(out_it, sorted.end()); + sorted->erase(out_it, sorted->end()); } - DCHECK_EQ(sorted.size(), 1); - DCHECK_EQ(sorted[0].overall_begin(), indices_begin_); - DCHECK_EQ(sorted[0].overall_end(), indices_end_); return comparator_.status(); } - // Merge rows with a null or a null-like in the first sort key - template - enable_if_t::value> MergeNulls(uint64_t* nulls_begin, - uint64_t* nulls_middle, - uint64_t* nulls_end, - uint64_t* temp_indices, - int64_t null_count) { - auto& comparator = comparator_; - const auto& first_sort_key = sort_keys_[0]; - - ChunkLocation left_loc; - ChunkLocation right_loc; - std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end, temp_indices, - [&](uint64_t left, uint64_t right) { - // First column is either null or nan - left_loc = left_resolver_.ResolveWithHint(left, /*hint=*/left_loc); - right_loc = right_resolver_.ResolveWithHint(right, /*hint=*/right_loc); - auto chunk_left = first_sort_key.GetChunk(left_loc); - auto chunk_right = first_sort_key.GetChunk(right_loc); - const auto left_is_null = chunk_left.IsNull(); - const auto right_is_null = chunk_right.IsNull(); - if (left_is_null == right_is_null) { - return comparator.Compare(left_loc, right_loc, 1); - } else if (options_.null_placement == NullPlacement::AtEnd) { - return right_is_null; - } else { - return left_is_null; - } - }); - // Copy back temp area into main buffer - std::copy(temp_indices, temp_indices + (nulls_end - nulls_begin), nulls_begin); - } - - template - enable_if_t::value> MergeNulls(uint64_t* nulls_begin, - uint64_t* nulls_middle, - uint64_t* nulls_end, - uint64_t* temp_indices, - int64_t null_count) { - MergeNullsOnly(nulls_begin, nulls_middle, nulls_end, temp_indices, null_count); + template + void MergeNulls(CompressedChunkLocation* nulls_begin, + CompressedChunkLocation* nulls_middle, + CompressedChunkLocation* nulls_end, + CompressedChunkLocation* temp_indices, int64_t null_count) { + if constexpr (has_null_like_values::value) { + // Merge rows with a null or a null-like in the first sort key + auto& comparator = comparator_; + const auto& first_sort_key = sort_keys_[0]; + + std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end, temp_indices, + [&](CompressedChunkLocation left, CompressedChunkLocation right) { + // First column is either null or nan + const auto left_loc = ChunkLocation{left}; + const auto right_loc = ChunkLocation{right}; + const auto chunk_left = first_sort_key.GetChunk(left_loc); + const auto chunk_right = first_sort_key.GetChunk(right_loc); + const auto left_is_null = chunk_left.IsNull(); + const auto right_is_null = chunk_right.IsNull(); + if (left_is_null == right_is_null) { + return comparator.Compare(left_loc, right_loc, 1); + } else if (options_.null_placement == NullPlacement::AtEnd) { + return right_is_null; + } else { + return left_is_null; + } + }); + // Copy back temp area into main buffer + std::copy(temp_indices, temp_indices + (nulls_end - nulls_begin), nulls_begin); + } else { + MergeNullsOnly(nulls_begin, nulls_middle, nulls_end, temp_indices, null_count); + } } - void MergeNullsOnly(uint64_t* nulls_begin, uint64_t* nulls_middle, uint64_t* nulls_end, - uint64_t* temp_indices, int64_t null_count) { + void MergeNullsOnly(CompressedChunkLocation* nulls_begin, + CompressedChunkLocation* nulls_middle, + CompressedChunkLocation* nulls_end, + CompressedChunkLocation* temp_indices, int64_t null_count) { // Untyped implementation auto& comparator = comparator_; - ChunkLocation left_loc; - ChunkLocation right_loc; std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end, temp_indices, - [&](uint64_t left, uint64_t right) { + [&](CompressedChunkLocation left, CompressedChunkLocation right) { // First column is always null - left_loc = left_resolver_.ResolveWithHint(left, /*hint=*/left_loc); - right_loc = right_resolver_.ResolveWithHint(right, /*hint=*/right_loc); - return comparator.Compare(left_loc, right_loc, 1); + return comparator.Compare(ChunkLocation{left}, ChunkLocation{right}, 1); }); // Copy back temp area into main buffer std::copy(temp_indices, temp_indices + (nulls_end - nulls_begin), nulls_begin); @@ -799,27 +829,24 @@ class TableSorter { // // Merge rows with a non-null in the first sort key // - template - enable_if_t::value> MergeNonNulls(uint64_t* range_begin, - uint64_t* range_middle, - uint64_t* range_end, - uint64_t* temp_indices) { + template + enable_if_t::value> MergeNonNulls( + CompressedChunkLocation* range_begin, CompressedChunkLocation* range_middle, + CompressedChunkLocation* range_end, CompressedChunkLocation* temp_indices) { auto& comparator = comparator_; const auto& first_sort_key = sort_keys_[0]; - ChunkLocation left_loc; - ChunkLocation right_loc; std::merge(range_begin, range_middle, range_middle, range_end, temp_indices, - [&](uint64_t left, uint64_t right) { + [&](CompressedChunkLocation left, CompressedChunkLocation right) { // Both values are never null nor NaN. - left_loc = left_resolver_.ResolveWithHint(left, /*hint=*/left_loc); - right_loc = right_resolver_.ResolveWithHint(right, /*hint=*/right_loc); + const auto left_loc = ChunkLocation{left}; + const auto right_loc = ChunkLocation{right}; auto chunk_left = first_sort_key.GetChunk(left_loc); auto chunk_right = first_sort_key.GetChunk(right_loc); DCHECK(!chunk_left.IsNull()); DCHECK(!chunk_right.IsNull()); - auto value_left = chunk_left.Value(); - auto value_right = chunk_right.Value(); + const auto value_left = chunk_left.Value(); + const auto value_right = chunk_right.Value(); if (value_left == value_right) { // If the left value equals to the right value, // we need to compare the second and following @@ -834,13 +861,16 @@ class TableSorter { } } }); + // Copy back temp area into main buffer std::copy(temp_indices, temp_indices + (range_end - range_begin), range_begin); } - template - enable_if_null MergeNonNulls(uint64_t* range_begin, uint64_t* range_middle, - uint64_t* range_end, uint64_t* temp_indices) { + template + enable_if_null MergeNonNulls(CompressedChunkLocation* range_begin, + CompressedChunkLocation* range_middle, + CompressedChunkLocation* range_end, + CompressedChunkLocation* temp_indices) { const int64_t null_count = range_end - range_begin; MergeNullsOnly(range_begin, range_middle, range_end, temp_indices, null_count); } @@ -851,7 +881,6 @@ class TableSorter { const RecordBatchVector batches_; const SortOptions& options_; const NullPlacement null_placement_; - const ::arrow::ChunkResolver left_resolver_, right_resolver_; const std::vector sort_keys_; uint64_t* indices_begin_; uint64_t* indices_end_; diff --git a/cpp/src/arrow/compute/kernels/vector_sort_internal.h b/cpp/src/arrow/compute/kernels/vector_sort_internal.h index bee7f838a05da..97a2db1d11a8f 100644 --- a/cpp/src/arrow/compute/kernels/vector_sort_internal.h +++ b/cpp/src/arrow/compute/kernels/vector_sort_internal.h @@ -55,15 +55,17 @@ namespace internal { // NOTE: std::partition is usually faster than std::stable_partition. struct NonStablePartitioner { - template - uint64_t* operator()(uint64_t* indices_begin, uint64_t* indices_end, Predicate&& pred) { + template + IndexType* operator()(IndexType* indices_begin, IndexType* indices_end, + Predicate&& pred) { return std::partition(indices_begin, indices_end, std::forward(pred)); } }; struct StablePartitioner { - template - uint64_t* operator()(uint64_t* indices_begin, uint64_t* indices_end, Predicate&& pred) { + template + IndexType* operator()(IndexType* indices_begin, IndexType* indices_end, + Predicate&& pred) { return std::stable_partition(indices_begin, indices_end, std::forward(pred)); } @@ -142,22 +144,24 @@ int CompareTypeValues(const Value& left, const Value& right, SortOrder order, return ValueComparator::Compare(left, right, order, null_placement); } -struct NullPartitionResult { - uint64_t* non_nulls_begin; - uint64_t* non_nulls_end; - uint64_t* nulls_begin; - uint64_t* nulls_end; +template +struct GenericNullPartitionResult { + IndexType* non_nulls_begin; + IndexType* non_nulls_end; + IndexType* nulls_begin; + IndexType* nulls_end; - uint64_t* overall_begin() const { return std::min(nulls_begin, non_nulls_begin); } + IndexType* overall_begin() const { return std::min(nulls_begin, non_nulls_begin); } - uint64_t* overall_end() const { return std::max(nulls_end, non_nulls_end); } + IndexType* overall_end() const { return std::max(nulls_end, non_nulls_end); } int64_t non_null_count() const { return non_nulls_end - non_nulls_begin; } int64_t null_count() const { return nulls_end - nulls_begin; } - static NullPartitionResult NoNulls(uint64_t* indices_begin, uint64_t* indices_end, - NullPlacement null_placement) { + static GenericNullPartitionResult NoNulls(IndexType* indices_begin, + IndexType* indices_end, + NullPlacement null_placement) { if (null_placement == NullPlacement::AtStart) { return {indices_begin, indices_end, indices_begin, indices_begin}; } else { @@ -165,8 +169,9 @@ struct NullPartitionResult { } } - static NullPartitionResult NullsOnly(uint64_t* indices_begin, uint64_t* indices_end, - NullPlacement null_placement) { + static GenericNullPartitionResult NullsOnly(IndexType* indices_begin, + IndexType* indices_end, + NullPlacement null_placement) { if (null_placement == NullPlacement::AtStart) { return {indices_end, indices_end, indices_begin, indices_end}; } else { @@ -174,21 +179,37 @@ struct NullPartitionResult { } } - static NullPartitionResult NullsAtEnd(uint64_t* indices_begin, uint64_t* indices_end, - uint64_t* midpoint) { + static GenericNullPartitionResult NullsAtEnd(IndexType* indices_begin, + IndexType* indices_end, + IndexType* midpoint) { DCHECK_GE(midpoint, indices_begin); DCHECK_LE(midpoint, indices_end); return {indices_begin, midpoint, midpoint, indices_end}; } - static NullPartitionResult NullsAtStart(uint64_t* indices_begin, uint64_t* indices_end, - uint64_t* midpoint) { + static GenericNullPartitionResult NullsAtStart(IndexType* indices_begin, + IndexType* indices_end, + IndexType* midpoint) { DCHECK_GE(midpoint, indices_begin); DCHECK_LE(midpoint, indices_end); return {midpoint, indices_end, indices_begin, midpoint}; } + + template + GenericNullPartitionResult TranslateTo( + IndexType* indices_begin, TargetIndexType* target_indices_begin) const { + return { + (non_nulls_begin - indices_begin) + target_indices_begin, + (non_nulls_end - indices_begin) + target_indices_begin, + (nulls_begin - indices_begin) + target_indices_begin, + (nulls_end - indices_begin) + target_indices_begin, + }; + } }; +using NullPartitionResult = GenericNullPartitionResult; +using ChunkedNullPartitionResult = GenericNullPartitionResult; + // Move nulls (not null-like values) to end of array. // // `offset` is used when this is called on a chunk of a chunked array @@ -265,7 +286,9 @@ NullPartitionResult PartitionNulls(uint64_t* indices_begin, uint64_t* indices_en } // -// Null partitioning on chunked arrays +// Null partitioning on chunked arrays, in two flavors: +// 1) with uint64_t indices and ChunkedArrayResolver +// 2) with CompressedChunkLocation and span of chunks // template @@ -291,6 +314,36 @@ NullPartitionResult PartitionNullsOnly(uint64_t* indices_begin, uint64_t* indice } } +template +ChunkedNullPartitionResult PartitionNullsOnly(CompressedChunkLocation* locations_begin, + CompressedChunkLocation* locations_end, + util::span chunks, + int64_t null_count, + NullPlacement null_placement) { + if (null_count == 0) { + return ChunkedNullPartitionResult::NoNulls(locations_begin, locations_end, + null_placement); + } + Partitioner partitioner; + if (null_placement == NullPlacement::AtStart) { + auto nulls_end = + partitioner(locations_begin, locations_end, [&](CompressedChunkLocation loc) { + return chunks[loc.chunk_index()]->IsNull( + static_cast(loc.index_in_chunk())); + }); + return ChunkedNullPartitionResult::NullsAtStart(locations_begin, locations_end, + nulls_end); + } else { + auto nulls_begin = + partitioner(locations_begin, locations_end, [&](CompressedChunkLocation loc) { + return !chunks[loc.chunk_index()]->IsNull( + static_cast(loc.index_in_chunk())); + }); + return ChunkedNullPartitionResult::NullsAtEnd(locations_begin, locations_end, + nulls_begin); + } +} + template enable_if_t::value, NullPartitionResult> @@ -334,17 +387,18 @@ NullPartitionResult PartitionNulls(uint64_t* indices_begin, uint64_t* indices_en std::max(q.nulls_end, p.nulls_end)}; } -struct MergeImpl { - using MergeNullsFunc = std::function; +template +struct GenericMergeImpl { + using MergeNullsFunc = std::function; using MergeNonNullsFunc = - std::function; + std::function; - MergeImpl(NullPlacement null_placement, MergeNullsFunc&& merge_nulls, - MergeNonNullsFunc&& merge_non_nulls) + GenericMergeImpl(NullPlacement null_placement, MergeNullsFunc&& merge_nulls, + MergeNonNullsFunc&& merge_non_nulls) : null_placement_(null_placement), merge_nulls_(std::move(merge_nulls)), merge_non_nulls_(std::move(merge_non_nulls)) {} @@ -352,13 +406,14 @@ struct MergeImpl { Status Init(ExecContext* ctx, int64_t temp_indices_length) { ARROW_ASSIGN_OR_RAISE( temp_buffer_, - AllocateBuffer(sizeof(int64_t) * temp_indices_length, ctx->memory_pool())); - temp_indices_ = reinterpret_cast(temp_buffer_->mutable_data()); + AllocateBuffer(sizeof(IndexType) * temp_indices_length, ctx->memory_pool())); + temp_indices_ = reinterpret_cast(temp_buffer_->mutable_data()); return Status::OK(); } - NullPartitionResult Merge(const NullPartitionResult& left, - const NullPartitionResult& right, int64_t null_count) const { + NullPartitionResultType Merge(const NullPartitionResultType& left, + const NullPartitionResultType& right, + int64_t null_count) const { if (null_placement_ == NullPlacement::AtStart) { return MergeNullsAtStart(left, right, null_count); } else { @@ -366,9 +421,9 @@ struct MergeImpl { } } - NullPartitionResult MergeNullsAtStart(const NullPartitionResult& left, - const NullPartitionResult& right, - int64_t null_count) const { + NullPartitionResultType MergeNullsAtStart(const NullPartitionResultType& left, + const NullPartitionResultType& right, + int64_t null_count) const { // Input layout: // [left nulls .... left non-nulls .... right nulls .... right non-nulls] DCHECK_EQ(left.nulls_end, left.non_nulls_begin); @@ -379,7 +434,7 @@ struct MergeImpl { // [left nulls .... right nulls .... left non-nulls .... right non-nulls] std::rotate(left.non_nulls_begin, right.nulls_begin, right.nulls_end); - const auto p = NullPartitionResult::NullsAtStart( + const auto p = NullPartitionResultType::NullsAtStart( left.nulls_begin, right.non_nulls_end, left.nulls_begin + left.null_count() + right.null_count()); @@ -401,9 +456,9 @@ struct MergeImpl { return p; } - NullPartitionResult MergeNullsAtEnd(const NullPartitionResult& left, - const NullPartitionResult& right, - int64_t null_count) const { + NullPartitionResultType MergeNullsAtEnd(const NullPartitionResultType& left, + const NullPartitionResultType& right, + int64_t null_count) const { // Input layout: // [left non-nulls .... left nulls .... right non-nulls .... right nulls] DCHECK_EQ(left.non_nulls_end, left.nulls_begin); @@ -414,7 +469,7 @@ struct MergeImpl { // [left non-nulls .... right non-nulls .... left nulls .... right nulls] std::rotate(left.nulls_begin, right.non_nulls_begin, right.non_nulls_end); - const auto p = NullPartitionResult::NullsAtEnd( + const auto p = NullPartitionResultType::NullsAtEnd( left.non_nulls_begin, right.nulls_end, left.non_nulls_begin + left.non_null_count() + right.non_null_count()); @@ -441,9 +496,13 @@ struct MergeImpl { MergeNullsFunc merge_nulls_; MergeNonNullsFunc merge_non_nulls_; std::unique_ptr temp_buffer_; - uint64_t* temp_indices_ = nullptr; + IndexType* temp_indices_ = nullptr; }; +using MergeImpl = GenericMergeImpl; +using ChunkedMergeImpl = + GenericMergeImpl; + // TODO make this usable if indices are non trivial on input // (see ConcreteRecordBatchColumnSorter) // `offset` is used when this is called on a chunk of a chunked array