Skip to content

Commit

Permalink
apacheGH-44084: [C++] Improve merge step in chunked sorting (apache#4…
Browse files Browse the repository at this point in the history
…4217)

### Rationale for this change

When merge-sorting the chunks of a chunked array or table, we would currently repeatedly resolve the chunk indices for each individual value lookup. This requires `O(n*log k)` chunk resolutions with `n` being the chunked array or table length, and `k` the number of chunks.

Instead, this PR translates the logical indices to physical all at once, without even requiring expensive chunk resolution as the logical indices are initially chunk-partitioned.

This change yields significant speedups on chunked array and table sorting:
```
                                           benchmark          baseline         contender  change %                                                                                                                                                                                                                                       counters
      ChunkedArraySortIndicesInt64Narrow/1048576/100   345.419 MiB/sec   628.334 MiB/sec    81.905                               {'family_index': 0, 'per_family_instance_index': 6, 'run_name': 'ChunkedArraySortIndicesInt64Narrow/1048576/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 242, 'null_percent': 1.0}
          TableSortIndicesInt64Narrow/1048576/0/1/32 25.997M items/sec 44.550M items/sec    71.366   {'family_index': 3, 'per_family_instance_index': 11, 'run_name': 'TableSortIndicesInt64Narrow/1048576/0/1/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 17, 'chunks': 32.0, 'columns': 1.0, 'null_percent': 0.0}
        ChunkedArraySortIndicesInt64Wide/32768/10000    91.182 MiB/sec   153.756 MiB/sec    68.625                               {'family_index': 1, 'per_family_instance_index': 0, 'run_name': 'ChunkedArraySortIndicesInt64Wide/32768/10000', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2067, 'null_percent': 0.01}
           ChunkedArraySortIndicesInt64Wide/32768/10    96.536 MiB/sec   161.648 MiB/sec    67.449                                  {'family_index': 1, 'per_family_instance_index': 2, 'run_name': 'ChunkedArraySortIndicesInt64Wide/32768/10', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2238, 'null_percent': 10.0}
        TableSortIndicesInt64Narrow/1048576/100/1/32 24.290M items/sec 40.513M items/sec    66.791  {'family_index': 3, 'per_family_instance_index': 9, 'run_name': 'TableSortIndicesInt64Narrow/1048576/100/1/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 16, 'chunks': 32.0, 'columns': 1.0, 'null_percent': 1.0}
          ChunkedArraySortIndicesInt64Wide/32768/100    90.030 MiB/sec   149.633 MiB/sec    66.203                                  {'family_index': 1, 'per_family_instance_index': 1, 'run_name': 'ChunkedArraySortIndicesInt64Wide/32768/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2017, 'null_percent': 1.0}
            ChunkedArraySortIndicesInt64Wide/32768/0    91.982 MiB/sec   152.840 MiB/sec    66.163                                    {'family_index': 1, 'per_family_instance_index': 5, 'run_name': 'ChunkedArraySortIndicesInt64Wide/32768/0', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2115, 'null_percent': 0.0}
      ChunkedArraySortIndicesInt64Narrow/8388608/100   240.335 MiB/sec   387.423 MiB/sec    61.201                                {'family_index': 0, 'per_family_instance_index': 7, 'run_name': 'ChunkedArraySortIndicesInt64Narrow/8388608/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 21, 'null_percent': 1.0}
            ChunkedArraySortIndicesInt64Wide/32768/2   172.376 MiB/sec   274.133 MiB/sec    59.032                                   {'family_index': 1, 'per_family_instance_index': 3, 'run_name': 'ChunkedArraySortIndicesInt64Wide/32768/2', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3770, 'null_percent': 50.0}
            TableSortIndicesInt64Wide/1048576/4/1/32  7.407M items/sec 11.621M items/sec    56.904     {'family_index': 4, 'per_family_instance_index': 10, 'run_name': 'TableSortIndicesInt64Wide/1048576/4/1/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 5, 'chunks': 32.0, 'columns': 1.0, 'null_percent': 25.0}
          TableSortIndicesInt64Wide/1048576/100/1/32  5.788M items/sec  9.062M items/sec    56.565     {'family_index': 4, 'per_family_instance_index': 9, 'run_name': 'TableSortIndicesInt64Wide/1048576/100/1/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0, 'columns': 1.0, 'null_percent': 1.0}
            TableSortIndicesInt64Wide/1048576/0/1/32  5.785M items/sec  9.049M items/sec    56.409      {'family_index': 4, 'per_family_instance_index': 11, 'run_name': 'TableSortIndicesInt64Wide/1048576/0/1/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0, 'columns': 1.0, 'null_percent': 0.0}
          ChunkedArraySortIndicesInt64Narrow/32768/2   194.743 MiB/sec   291.432 MiB/sec    49.649                                 {'family_index': 0, 'per_family_instance_index': 3, 'run_name': 'ChunkedArraySortIndicesInt64Narrow/32768/2', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4340, 'null_percent': 50.0}
          TableSortIndicesInt64Narrow/1048576/4/1/32 25.686M items/sec 38.087M items/sec    48.279  {'family_index': 3, 'per_family_instance_index': 10, 'run_name': 'TableSortIndicesInt64Narrow/1048576/4/1/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 17, 'chunks': 32.0, 'columns': 1.0, 'null_percent': 25.0}
            TableSortIndicesInt64Wide/1048576/0/8/32  5.766M items/sec  8.374M items/sec    45.240       {'family_index': 4, 'per_family_instance_index': 5, 'run_name': 'TableSortIndicesInt64Wide/1048576/0/8/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0, 'columns': 8.0, 'null_percent': 0.0}
           TableSortIndicesInt64Wide/1048576/0/16/32  5.752M items/sec  8.352M items/sec    45.202     {'family_index': 4, 'per_family_instance_index': 2, 'run_name': 'TableSortIndicesInt64Wide/1048576/0/16/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0, 'columns': 16.0, 'null_percent': 0.0}
      ChunkedArraySortIndicesInt64Narrow/32768/10000   121.253 MiB/sec   175.286 MiB/sec    44.562                             {'family_index': 0, 'per_family_instance_index': 0, 'run_name': 'ChunkedArraySortIndicesInt64Narrow/32768/10000', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2673, 'null_percent': 0.01}
          TableSortIndicesInt64Wide/1048576/100/2/32  5.549M items/sec  7.984M items/sec    43.876     {'family_index': 4, 'per_family_instance_index': 6, 'run_name': 'TableSortIndicesInt64Wide/1048576/100/2/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0, 'columns': 2.0, 'null_percent': 1.0}
        ChunkedArraySortIndicesInt64Wide/1048576/100    69.599 MiB/sec    99.666 MiB/sec    43.200                                  {'family_index': 1, 'per_family_instance_index': 6, 'run_name': 'ChunkedArraySortIndicesInt64Wide/1048576/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 49, 'null_percent': 1.0}
           TableSortIndicesInt64Narrow/1048576/0/1/4 55.940M items/sec 79.984M items/sec    42.982     {'family_index': 3, 'per_family_instance_index': 23, 'run_name': 'TableSortIndicesInt64Narrow/1048576/0/1/4', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 37, 'chunks': 4.0, 'columns': 1.0, 'null_percent': 0.0}
         TableSortIndicesInt64Wide/1048576/100/16/32  5.554M items/sec  7.909M items/sec    42.417   {'family_index': 4, 'per_family_instance_index': 0, 'run_name': 'TableSortIndicesInt64Wide/1048576/100/16/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0, 'columns': 16.0, 'null_percent': 1.0}
         ChunkedArraySortIndicesInt64Narrow/32768/10   127.758 MiB/sec   181.407 MiB/sec    41.992                                {'family_index': 0, 'per_family_instance_index': 2, 'run_name': 'ChunkedArraySortIndicesInt64Narrow/32768/10', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2856, 'null_percent': 10.0}
          TableSortIndicesInt64Wide/1048576/100/8/32  5.572M items/sec  7.775M items/sec    39.548     {'family_index': 4, 'per_family_instance_index': 3, 'run_name': 'TableSortIndicesInt64Wide/1048576/100/8/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0, 'columns': 8.0, 'null_percent': 1.0}
        ChunkedArraySortIndicesInt64Narrow/32768/100   119.600 MiB/sec   166.454 MiB/sec    39.176                                {'family_index': 0, 'per_family_instance_index': 1, 'run_name': 'ChunkedArraySortIndicesInt64Narrow/32768/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2667, 'null_percent': 1.0}
            TableSortIndicesInt64Wide/1048576/0/2/32  5.781M items/sec  8.016M items/sec    38.669       {'family_index': 4, 'per_family_instance_index': 8, 'run_name': 'TableSortIndicesInt64Wide/1048576/0/2/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0, 'columns': 2.0, 'null_percent': 0.0}
         TableSortIndicesInt64Narrow/1048576/100/1/4 52.252M items/sec 72.193M items/sec    38.162   {'family_index': 3, 'per_family_instance_index': 21, 'run_name': 'TableSortIndicesInt64Narrow/1048576/100/1/4', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 35, 'chunks': 4.0, 'columns': 1.0, 'null_percent': 1.0}
          ChunkedArraySortIndicesInt64Narrow/32768/0   121.868 MiB/sec   168.364 MiB/sec    38.152                                  {'family_index': 0, 'per_family_instance_index': 5, 'run_name': 'ChunkedArraySortIndicesInt64Narrow/32768/0', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2691, 'null_percent': 0.0}
            TableSortIndicesInt64Wide/1048576/4/2/32  5.017M items/sec  6.720M items/sec    33.934      {'family_index': 4, 'per_family_instance_index': 7, 'run_name': 'TableSortIndicesInt64Wide/1048576/4/2/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3, 'chunks': 32.0, 'columns': 2.0, 'null_percent': 25.0}
        ChunkedArraySortIndicesInt64Wide/8388608/100    54.785 MiB/sec    72.642 MiB/sec    32.593                                   {'family_index': 1, 'per_family_instance_index': 7, 'run_name': 'ChunkedArraySortIndicesInt64Wide/8388608/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 5, 'null_percent': 1.0}
            TableSortIndicesInt64Wide/1048576/4/8/32  4.222M items/sec  5.483M items/sec    29.861      {'family_index': 4, 'per_family_instance_index': 4, 'run_name': 'TableSortIndicesInt64Wide/1048576/4/8/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3, 'chunks': 32.0, 'columns': 8.0, 'null_percent': 25.0}
              ChunkedArraySortIndicesString/32768/10   146.866 MiB/sec   190.314 MiB/sec    29.583                                     {'family_index': 2, 'per_family_instance_index': 2, 'run_name': 'ChunkedArraySortIndicesString/32768/10', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3494, 'null_percent': 10.0}
           TableSortIndicesInt64Wide/1048576/4/16/32  4.225M items/sec  5.433M items/sec    28.599    {'family_index': 4, 'per_family_instance_index': 1, 'run_name': 'TableSortIndicesInt64Wide/1048576/4/16/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3, 'chunks': 32.0, 'columns': 16.0, 'null_percent': 25.0}
       TableSortIndicesInt64Narrow/1048576/100/16/32  2.193M items/sec  2.711M items/sec    23.652 {'family_index': 3, 'per_family_instance_index': 0, 'run_name': 'TableSortIndicesInt64Narrow/1048576/100/16/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2, 'chunks': 32.0, 'columns': 16.0, 'null_percent': 1.0}
             ChunkedArraySortIndicesString/32768/100   156.401 MiB/sec   191.910 MiB/sec    22.704                                     {'family_index': 2, 'per_family_instance_index': 1, 'run_name': 'ChunkedArraySortIndicesString/32768/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3488, 'null_percent': 1.0}
           TableSortIndicesInt64Narrow/1048576/4/1/4 47.342M items/sec 58.062M items/sec    22.644    {'family_index': 3, 'per_family_instance_index': 22, 'run_name': 'TableSortIndicesInt64Narrow/1048576/4/1/4', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 32, 'chunks': 4.0, 'columns': 1.0, 'null_percent': 25.0}
               ChunkedArraySortIndicesString/32768/0   161.457 MiB/sec   195.782 MiB/sec    21.259                                       {'family_index': 2, 'per_family_instance_index': 5, 'run_name': 'ChunkedArraySortIndicesString/32768/0', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3644, 'null_percent': 0.0}
         TableSortIndicesInt64Narrow/1048576/4/16/32  1.915M items/sec  2.309M items/sec    20.561  {'family_index': 3, 'per_family_instance_index': 1, 'run_name': 'TableSortIndicesInt64Narrow/1048576/4/16/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 1, 'chunks': 32.0, 'columns': 16.0, 'null_percent': 25.0}
         TableSortIndicesInt64Narrow/1048576/0/16/32  2.561M items/sec  3.079M items/sec    20.208   {'family_index': 3, 'per_family_instance_index': 2, 'run_name': 'TableSortIndicesInt64Narrow/1048576/0/16/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2, 'chunks': 32.0, 'columns': 16.0, 'null_percent': 0.0}
           ChunkedArraySortIndicesString/32768/10000   157.786 MiB/sec   189.412 MiB/sec    20.043                                  {'family_index': 2, 'per_family_instance_index': 0, 'run_name': 'ChunkedArraySortIndicesString/32768/10000', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3539, 'null_percent': 0.01}
               ChunkedArraySortIndicesString/32768/2   139.241 MiB/sec   164.172 MiB/sec    17.904                                      {'family_index': 2, 'per_family_instance_index': 3, 'run_name': 'ChunkedArraySortIndicesString/32768/2', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3155, 'null_percent': 50.0}
          TableSortIndicesInt64Narrow/1048576/0/8/32  2.595M items/sec  3.038M items/sec    17.081     {'family_index': 3, 'per_family_instance_index': 5, 'run_name': 'TableSortIndicesInt64Narrow/1048576/0/8/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2, 'chunks': 32.0, 'columns': 8.0, 'null_percent': 0.0}
          TableSortIndicesInt64Narrow/1048576/4/8/32  1.999M items/sec  2.298M items/sec    14.936    {'family_index': 3, 'per_family_instance_index': 4, 'run_name': 'TableSortIndicesInt64Narrow/1048576/4/8/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 1, 'chunks': 32.0, 'columns': 8.0, 'null_percent': 25.0}
           ChunkedArraySortIndicesString/8388608/100    81.026 MiB/sec    93.120 MiB/sec    14.926                                      {'family_index': 2, 'per_family_instance_index': 7, 'run_name': 'ChunkedArraySortIndicesString/8388608/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 7, 'null_percent': 1.0}
        TableSortIndicesInt64Narrow/1048576/100/8/32  2.382M items/sec  2.719M items/sec    14.168   {'family_index': 3, 'per_family_instance_index': 3, 'run_name': 'TableSortIndicesInt64Narrow/1048576/100/8/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 2, 'chunks': 32.0, 'columns': 8.0, 'null_percent': 1.0}
           ChunkedArraySortIndicesString/1048576/100   107.722 MiB/sec   122.229 MiB/sec    13.467                                     {'family_index': 2, 'per_family_instance_index': 6, 'run_name': 'ChunkedArraySortIndicesString/1048576/100', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 77, 'null_percent': 1.0}
        TableSortIndicesInt64Narrow/1048576/100/2/32  4.019M items/sec  4.477M items/sec    11.383   {'family_index': 3, 'per_family_instance_index': 6, 'run_name': 'TableSortIndicesInt64Narrow/1048576/100/2/32', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3, 'chunks': 32.0, 'columns': 2.0, 'null_percent': 1.0}
             TableSortIndicesInt64Wide/1048576/4/1/4 11.595M items/sec 12.791M items/sec    10.314       {'family_index': 4, 'per_family_instance_index': 22, 'run_name': 'TableSortIndicesInt64Wide/1048576/4/1/4', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 8, 'chunks': 4.0, 'columns': 1.0, 'null_percent': 25.0}
             TableSortIndicesInt64Wide/1048576/0/1/4  9.231M items/sec 10.181M items/sec    10.294        {'family_index': 4, 'per_family_instance_index': 23, 'run_name': 'TableSortIndicesInt64Wide/1048576/0/1/4', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 6, 'chunks': 4.0, 'columns': 1.0, 'null_percent': 0.0}
```

However, performance also regresses when the input is all-nulls (which is probably rare):
```
                                       benchmark           baseline          contender  change %                                                                                                                                                                                                                                      counters
           ChunkedArraySortIndicesString/32768/1      5.636 GiB/sec      4.336 GiB/sec   -23.068                                  {'family_index': 2, 'per_family_instance_index': 4, 'run_name': 'ChunkedArraySortIndicesString/32768/1', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 127778, 'null_percent': 100.0}
      ChunkedArraySortIndicesInt64Narrow/32768/1      3.963 GiB/sec      2.852 GiB/sec   -28.025                              {'family_index': 0, 'per_family_instance_index': 4, 'run_name': 'ChunkedArraySortIndicesInt64Narrow/32768/1', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 91209, 'null_percent': 100.0}
        ChunkedArraySortIndicesInt64Wide/32768/1      4.038 GiB/sec      2.869 GiB/sec   -28.954                                {'family_index': 1, 'per_family_instance_index': 4, 'run_name': 'ChunkedArraySortIndicesInt64Wide/32768/1', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 94090, 'null_percent': 100.0}
```

### Are these changes tested?

Yes, by existing tests.

### Are there any user-facing changes?

No.
* GitHub Issue: apache#44084

Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
pitrou authored Nov 26, 2024
1 parent c4d17fd commit d5cda4a
Show file tree
Hide file tree
Showing 8 changed files with 496 additions and 196 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ set(ARROW_COMPUTE_SRCS
compute/light_array_internal.cc
compute/ordering.cc
compute/registry.cc
compute/kernels/chunked_internal.cc
compute/kernels/codegen_internal.cc
compute/kernels/ree_util_internal.cc
compute/kernels/scalar_cast_boolean.cc
Expand Down
10 changes: 6 additions & 4 deletions cpp/src/arrow/chunk_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

namespace arrow {

using util::span;

namespace {
template <typename T>
int64_t GetLength(const T& array) {
Expand All @@ -42,7 +44,7 @@ int64_t GetLength<std::shared_ptr<RecordBatch>>(
}

template <typename T>
inline std::vector<int64_t> MakeChunksOffsets(const std::vector<T>& chunks) {
inline std::vector<int64_t> MakeChunksOffsets(span<T> chunks) {
std::vector<int64_t> offsets(chunks.size() + 1);
int64_t offset = 0;
std::transform(chunks.begin(), chunks.end(), offsets.begin(),
Expand Down Expand Up @@ -112,13 +114,13 @@ void ResolveManyInline(uint32_t num_offsets, const int64_t* signed_offsets,
} // namespace

ChunkResolver::ChunkResolver(const ArrayVector& chunks) noexcept
: offsets_(MakeChunksOffsets(chunks)), cached_chunk_(0) {}
: offsets_(MakeChunksOffsets(span(chunks))), cached_chunk_(0) {}

ChunkResolver::ChunkResolver(const std::vector<const Array*>& chunks) noexcept
ChunkResolver::ChunkResolver(span<const Array* const> chunks) noexcept
: offsets_(MakeChunksOffsets(chunks)), cached_chunk_(0) {}

ChunkResolver::ChunkResolver(const RecordBatchVector& batches) noexcept
: offsets_(MakeChunksOffsets(batches)), cached_chunk_(0) {}
: offsets_(MakeChunksOffsets(span(batches))), cached_chunk_(0) {}

ChunkResolver::ChunkResolver(ChunkResolver&& other) noexcept
: offsets_(std::move(other.offsets_)),
Expand Down
10 changes: 7 additions & 3 deletions cpp/src/arrow/chunk_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"
#include "arrow/util/span.h"

namespace arrow {

Expand Down Expand Up @@ -76,11 +77,14 @@ class ARROW_EXPORT ChunkResolver {

public:
explicit ChunkResolver(const ArrayVector& chunks) noexcept;

explicit ChunkResolver(const std::vector<const Array*>& chunks) noexcept;

explicit ChunkResolver(util::span<const Array* const> chunks) noexcept;
explicit ChunkResolver(const RecordBatchVector& batches) noexcept;

/// \brief Construct a ChunkResolver from a vector of chunks.size() + 1 offsets.
///
/// The first offset must be 0 and the last offset must be the logical length of the
/// chunked array. Each offset before the last represents the starting logical index of
/// the corresponding chunk.
explicit ChunkResolver(std::vector<int64_t> offsets) noexcept
: offsets_(std::move(offsets)), cached_chunk_(0) {
#ifndef NDEBUG
Expand Down
122 changes: 122 additions & 0 deletions cpp/src/arrow/compute/kernels/chunked_internal.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/compute/kernels/chunked_internal.h"

#include <algorithm>

#include "arrow/record_batch.h"
#include "arrow/util/logging.h"

namespace arrow::compute::internal {

std::vector<const Array*> GetArrayPointers(const ArrayVector& arrays) {
std::vector<const Array*> pointers(arrays.size());
std::transform(arrays.begin(), arrays.end(), pointers.begin(),
[&](const std::shared_ptr<Array>& array) { return array.get(); });
return pointers;
}

std::vector<int64_t> ChunkedIndexMapper::GetChunkLengths(
util::span<const Array* const> chunks) {
std::vector<int64_t> chunk_lengths(chunks.size());
for (int64_t i = 0; i < static_cast<int64_t>(chunks.size()); ++i) {
chunk_lengths[i] = chunks[i]->length();
}
return chunk_lengths;
}

std::vector<int64_t> ChunkedIndexMapper::GetChunkLengths(
const RecordBatchVector& chunks) {
std::vector<int64_t> chunk_lengths(chunks.size());
for (int64_t i = 0; i < static_cast<int64_t>(chunks.size()); ++i) {
chunk_lengths[i] = chunks[i]->num_rows();
}
return chunk_lengths;
}

Result<std::pair<CompressedChunkLocation*, CompressedChunkLocation*>>
ChunkedIndexMapper::LogicalToPhysical() {
// Check that indices would fall in bounds for CompressedChunkLocation
if (ARROW_PREDICT_FALSE(chunk_lengths_.size() >
CompressedChunkLocation::kMaxChunkIndex + 1)) {
return Status::NotImplemented("Chunked array has more than ",
CompressedChunkLocation::kMaxChunkIndex + 1, " chunks");
}
for (int64_t chunk_length : chunk_lengths_) {
if (ARROW_PREDICT_FALSE(static_cast<uint64_t>(chunk_length) >
CompressedChunkLocation::kMaxIndexInChunk + 1)) {
return Status::NotImplemented("Individual chunk in chunked array has more than ",
CompressedChunkLocation::kMaxIndexInChunk + 1,
" elements");
}
}

const int64_t num_indices = static_cast<int64_t>(indices_end_ - indices_begin_);
DCHECK_EQ(num_indices, std::accumulate(chunk_lengths_.begin(), chunk_lengths_.end(),
static_cast<int64_t>(0)));
CompressedChunkLocation* physical_begin =
reinterpret_cast<CompressedChunkLocation*>(indices_begin_);
DCHECK_EQ(physical_begin + num_indices,
reinterpret_cast<CompressedChunkLocation*>(indices_end_));

int64_t chunk_offset = 0;
for (int64_t chunk_index = 0; chunk_index < static_cast<int64_t>(chunk_lengths_.size());
++chunk_index) {
const int64_t chunk_length = chunk_lengths_[chunk_index];
for (int64_t i = 0; i < chunk_length; ++i) {
// Logical indices are expected to be chunk-partitioned, which avoids costly
// chunked index resolution.
DCHECK_GE(indices_begin_[chunk_offset + i], static_cast<uint64_t>(chunk_offset));
DCHECK_LT(indices_begin_[chunk_offset + i],
static_cast<uint64_t>(chunk_offset + chunk_length));
physical_begin[chunk_offset + i] = CompressedChunkLocation{
static_cast<uint64_t>(chunk_index),
indices_begin_[chunk_offset + i] - static_cast<uint64_t>(chunk_offset)};
}
chunk_offset += chunk_length;
}

return std::pair{physical_begin, physical_begin + num_indices};
}

Status ChunkedIndexMapper::PhysicalToLogical() {
std::vector<int64_t> chunk_offsets(chunk_lengths_.size());
{
int64_t offset = 0;
for (int64_t i = 0; i < static_cast<int64_t>(chunk_lengths_.size()); ++i) {
chunk_offsets[i] = offset;
offset += chunk_lengths_[i];
}
}

const int64_t num_indices = static_cast<int64_t>(indices_end_ - indices_begin_);
CompressedChunkLocation* physical_begin =
reinterpret_cast<CompressedChunkLocation*>(indices_begin_);
for (int64_t i = 0; i < num_indices; ++i) {
const auto loc = physical_begin[i];
DCHECK_LT(loc.chunk_index(), chunk_offsets.size());
DCHECK_LT(loc.index_in_chunk(),
static_cast<uint64_t>(chunk_lengths_[loc.chunk_index()]));
indices_begin_[i] =
chunk_offsets[loc.chunk_index()] + static_cast<int64_t>(loc.index_in_chunk());
}

return Status::OK();
}

} // namespace arrow::compute::internal
121 changes: 101 additions & 20 deletions cpp/src/arrow/compute/kernels/chunked_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,32 @@
#include <algorithm>
#include <cstdint>
#include <memory>
#include <utility>
#include <vector>

#include "arrow/array.h"
#include "arrow/chunk_resolver.h"
#include "arrow/compute/kernels/codegen_internal.h"
#include "arrow/util/span.h"

namespace arrow {
namespace compute {
namespace internal {
namespace arrow::compute::internal {

// The target chunk in a chunked array.
struct ResolvedChunk {
// The target array in chunked array.
const Array* array;
// The index in the target array.
const int64_t index;
int64_t index;

ResolvedChunk(const Array* array, int64_t index) : array(array), index(index) {}

public:
friend bool operator==(const ResolvedChunk& left, const ResolvedChunk& right) {
return left.array == right.array && left.index == right.index;
}
friend bool operator!=(const ResolvedChunk& left, const ResolvedChunk& right) {
return left.array != right.array || left.index != right.index;
}

bool IsNull() const { return array->IsNull(index); }

template <typename ArrowType, typename ViewType = GetViewType<ArrowType>>
Expand All @@ -50,34 +56,109 @@ struct ResolvedChunk {
}
};

// A compressed (chunk_index, index_in_chunk) pair.
// The goal of compression is to make it fit in 64 bits, allowing in place
// replacement of logical uint64_t indices with physical indices.
// (see ChunkedIndexMapper)
struct CompressedChunkLocation {
static constexpr int kChunkIndexBits = 24;
static constexpr int KIndexInChunkBits = 64 - kChunkIndexBits;

static constexpr uint64_t kMaxChunkIndex = (1ULL << kChunkIndexBits) - 1;
static constexpr uint64_t kMaxIndexInChunk = (1ULL << KIndexInChunkBits) - 1;

CompressedChunkLocation() = default;

constexpr uint64_t chunk_index() const { return data_ & kMaxChunkIndex; }
constexpr uint64_t index_in_chunk() const { return data_ >> kChunkIndexBits; }

explicit constexpr CompressedChunkLocation(uint64_t chunk_index,
uint64_t index_in_chunk)
: data_((index_in_chunk << kChunkIndexBits) | chunk_index) {}

template <typename IndexType>
explicit operator TypedChunkLocation<IndexType>() {
return {static_cast<IndexType>(chunk_index()),
static_cast<IndexType>(index_in_chunk())};
}

private:
uint64_t data_;
};

static_assert(sizeof(uint64_t) == sizeof(CompressedChunkLocation));

class ChunkedArrayResolver {
private:
ChunkResolver resolver_;
std::vector<const Array*> chunks_;
util::span<const Array* const> chunks_;
std::vector<const Array*> owned_chunks_;

public:
explicit ChunkedArrayResolver(const std::vector<const Array*>& chunks)
explicit ChunkedArrayResolver(std::vector<const Array*>&& chunks)
: resolver_(chunks), chunks_(chunks), owned_chunks_(std::move(chunks)) {}
explicit ChunkedArrayResolver(util::span<const Array* const> chunks)
: resolver_(chunks), chunks_(chunks) {}

ChunkedArrayResolver(ChunkedArrayResolver&& other) = default;
ChunkedArrayResolver& operator=(ChunkedArrayResolver&& other) = default;
ARROW_DEFAULT_MOVE_AND_ASSIGN(ChunkedArrayResolver);

ChunkedArrayResolver(const ChunkedArrayResolver& other) = default;
ChunkedArrayResolver& operator=(const ChunkedArrayResolver& other) = default;
ChunkedArrayResolver(const ChunkedArrayResolver& other)
: resolver_(other.resolver_), owned_chunks_(other.owned_chunks_) {
// Rebind span to owned_chunks_ if necessary
chunks_ = owned_chunks_.empty() ? other.chunks_ : owned_chunks_;
}
ChunkedArrayResolver& operator=(const ChunkedArrayResolver& other) {
resolver_ = other.resolver_;
owned_chunks_ = other.owned_chunks_;
chunks_ = owned_chunks_.empty() ? other.chunks_ : owned_chunks_;
return *this;
}

ResolvedChunk Resolve(int64_t index) const {
const auto loc = resolver_.Resolve(index);
return {chunks_[loc.chunk_index], loc.index_in_chunk};
}
};

inline std::vector<const Array*> GetArrayPointers(const ArrayVector& arrays) {
std::vector<const Array*> pointers(arrays.size());
std::transform(arrays.begin(), arrays.end(), pointers.begin(),
[&](const std::shared_ptr<Array>& array) { return array.get(); });
return pointers;
}
std::vector<const Array*> GetArrayPointers(const ArrayVector& arrays);

// A class that turns logical (linear) indices into physical (chunked) indices,
// and vice-versa.
class ChunkedIndexMapper {
public:
ChunkedIndexMapper(const std::vector<const Array*>& chunks, uint64_t* indices_begin,
uint64_t* indices_end)
: ChunkedIndexMapper(util::span(chunks), indices_begin, indices_end) {}
ChunkedIndexMapper(util::span<const Array* const> chunks, uint64_t* indices_begin,
uint64_t* indices_end)
: chunk_lengths_(GetChunkLengths(chunks)),
indices_begin_(indices_begin),
indices_end_(indices_end) {}
ChunkedIndexMapper(const RecordBatchVector& chunks, uint64_t* indices_begin,
uint64_t* indices_end)
: chunk_lengths_(GetChunkLengths(chunks)),
indices_begin_(indices_begin),
indices_end_(indices_end) {}

// Turn the original uint64_t logical indices into physical. This reuses the
// same memory area, so the logical indices cannot be used anymore until
// PhysicalToLogical() is called.
//
// This assumes that the logical indices are originally chunk-partitioned.
Result<std::pair<CompressedChunkLocation*, CompressedChunkLocation*>>
LogicalToPhysical();

// Turn the physical indices back into logical, making the uint64_t indices
// usable again.
Status PhysicalToLogical();

private:
static std::vector<int64_t> GetChunkLengths(util::span<const Array* const> chunks);
static std::vector<int64_t> GetChunkLengths(const RecordBatchVector& chunks);

std::vector<int64_t> chunk_lengths_;
uint64_t* indices_begin_;
uint64_t* indices_end_;
};

} // namespace internal
} // namespace compute
} // namespace arrow
} // namespace arrow::compute::internal
4 changes: 3 additions & 1 deletion cpp/src/arrow/compute/kernels/vector_rank.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

namespace arrow::compute::internal {

using ::arrow::util::span;

namespace {

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -237,7 +239,7 @@ class Ranker<ChunkedArray> : public RankerMixin<ChunkedArray, Ranker<ChunkedArra
physical_chunks_, order_, null_placement_));

const auto arrays = GetArrayPointers(physical_chunks_);
auto value_selector = [resolver = ChunkedArrayResolver(arrays)](int64_t index) {
auto value_selector = [resolver = ChunkedArrayResolver(span(arrays))](int64_t index) {
return resolver.Resolve(index).Value<InType>();
};
ARROW_ASSIGN_OR_RAISE(*output_, CreateRankings(ctx_, sorted, null_placement_,
Expand Down
Loading

0 comments on commit d5cda4a

Please sign in to comment.