Skip to content

Commit

Permalink
GH-44084: [C++] Improve merge step in chunked sorting
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Nov 26, 2024
1 parent c4d17fd commit 88a92b3
Show file tree
Hide file tree
Showing 8 changed files with 490 additions and 196 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ set(ARROW_COMPUTE_SRCS
compute/light_array_internal.cc
compute/ordering.cc
compute/registry.cc
compute/kernels/chunked_internal.cc
compute/kernels/codegen_internal.cc
compute/kernels/ree_util_internal.cc
compute/kernels/scalar_cast_boolean.cc
Expand Down
10 changes: 6 additions & 4 deletions cpp/src/arrow/chunk_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

namespace arrow {

using util::span;

namespace {
template <typename T>
int64_t GetLength(const T& array) {
Expand All @@ -42,7 +44,7 @@ int64_t GetLength<std::shared_ptr<RecordBatch>>(
}

template <typename T>
inline std::vector<int64_t> MakeChunksOffsets(const std::vector<T>& chunks) {
inline std::vector<int64_t> MakeChunksOffsets(span<T> chunks) {
std::vector<int64_t> offsets(chunks.size() + 1);
int64_t offset = 0;
std::transform(chunks.begin(), chunks.end(), offsets.begin(),
Expand Down Expand Up @@ -112,13 +114,13 @@ void ResolveManyInline(uint32_t num_offsets, const int64_t* signed_offsets,
} // namespace

ChunkResolver::ChunkResolver(const ArrayVector& chunks) noexcept
: offsets_(MakeChunksOffsets(chunks)), cached_chunk_(0) {}
: offsets_(MakeChunksOffsets(span(chunks))), cached_chunk_(0) {}

ChunkResolver::ChunkResolver(const std::vector<const Array*>& chunks) noexcept
ChunkResolver::ChunkResolver(span<const Array* const> chunks) noexcept
: offsets_(MakeChunksOffsets(chunks)), cached_chunk_(0) {}

ChunkResolver::ChunkResolver(const RecordBatchVector& batches) noexcept
: offsets_(MakeChunksOffsets(batches)), cached_chunk_(0) {}
: offsets_(MakeChunksOffsets(span(batches))), cached_chunk_(0) {}

ChunkResolver::ChunkResolver(ChunkResolver&& other) noexcept
: offsets_(std::move(other.offsets_)),
Expand Down
10 changes: 7 additions & 3 deletions cpp/src/arrow/chunk_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"
#include "arrow/util/span.h"

namespace arrow {

Expand Down Expand Up @@ -76,11 +77,14 @@ class ARROW_EXPORT ChunkResolver {

public:
explicit ChunkResolver(const ArrayVector& chunks) noexcept;

explicit ChunkResolver(const std::vector<const Array*>& chunks) noexcept;

explicit ChunkResolver(util::span<const Array* const> chunks) noexcept;
explicit ChunkResolver(const RecordBatchVector& batches) noexcept;

/// \brief Construct a ChunkResolver from a vector of chunks.size() + 1 offsets.
///
/// The first offset must be 0 and the last offset must be the logical length of the
/// chunked array. Each offset before the last represents the starting logical index of
/// the corresponding chunk.
explicit ChunkResolver(std::vector<int64_t> offsets) noexcept
: offsets_(std::move(offsets)), cached_chunk_(0) {
#ifndef NDEBUG
Expand Down
118 changes: 118 additions & 0 deletions cpp/src/arrow/compute/kernels/chunked_internal.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/compute/kernels/chunked_internal.h"

#include <algorithm>

#include "arrow/record_batch.h"
#include "arrow/util/logging.h"

namespace arrow::compute::internal {

std::vector<const Array*> GetArrayPointers(const ArrayVector& arrays) {
std::vector<const Array*> pointers(arrays.size());
std::transform(arrays.begin(), arrays.end(), pointers.begin(),
[&](const std::shared_ptr<Array>& array) { return array.get(); });
return pointers;
}

std::vector<int64_t> ChunkedIndexMapper::GetChunkLengths(
util::span<const Array* const> chunks) {
std::vector<int64_t> chunk_lengths(chunks.size());
for (int64_t i = 0; i < static_cast<int64_t>(chunks.size()); ++i) {
chunk_lengths[i] = chunks[i]->length();
}
return chunk_lengths;
}

std::vector<int64_t> ChunkedIndexMapper::GetChunkLengths(
const RecordBatchVector& chunks) {
std::vector<int64_t> chunk_lengths(chunks.size());
for (int64_t i = 0; i < static_cast<int64_t>(chunks.size()); ++i) {
chunk_lengths[i] = chunks[i]->num_rows();
}
return chunk_lengths;
}

Result<std::pair<ResolvedChunkIndex*, ResolvedChunkIndex*>>
ChunkedIndexMapper::LogicalToPhysical() {
// Check that indices would fall in bounds for ResolvedChunkIndex
if (ARROW_PREDICT_FALSE(chunk_lengths_.size() >
ResolvedChunkIndex::kMaxChunkIndex + 1)) {
return Status::NotImplemented("Chunked array has more than ",
ResolvedChunkIndex::kMaxChunkIndex + 1, " chunks");
}
for (int64_t chunk_length : chunk_lengths_) {
if (ARROW_PREDICT_FALSE(static_cast<uint64_t>(chunk_length) >
ResolvedChunkIndex::kMaxIndexInChunk + 1)) {
return Status::NotImplemented("Individual chunk in chunked array has more than ",
ResolvedChunkIndex::kMaxIndexInChunk + 1,
" elements");
}
}

const int64_t num_indices = static_cast<int64_t>(indices_end_ - indices_begin_);
ResolvedChunkIndex* physical_begin =
reinterpret_cast<ResolvedChunkIndex*>(indices_begin_);
DCHECK_EQ(physical_begin + num_indices,
reinterpret_cast<ResolvedChunkIndex*>(indices_end_));

int64_t chunk_offset = 0;
for (int64_t chunk_index = 0; chunk_index < static_cast<int64_t>(chunk_lengths_.size());
++chunk_index) {
const int64_t chunk_length = chunk_lengths_[chunk_index];
for (int64_t i = 0; i < chunk_length; ++i) {
DCHECK_GE(indices_begin_[chunk_offset + i], static_cast<uint64_t>(chunk_offset));
DCHECK_LT(indices_begin_[chunk_offset + i],
static_cast<uint64_t>(chunk_offset + chunk_length));
physical_begin[chunk_offset + i] = ResolvedChunkIndex{
static_cast<uint64_t>(chunk_index),
indices_begin_[chunk_offset + i] - static_cast<uint64_t>(chunk_offset)};
}
chunk_offset += chunk_length;
}

return std::pair{physical_begin, physical_begin + num_indices};
}

Status ChunkedIndexMapper::PhysicalToLogical() {
std::vector<int64_t> chunk_offsets(chunk_lengths_.size());
{
int64_t offset = 0;
for (int64_t i = 0; i < static_cast<int64_t>(chunk_lengths_.size()); ++i) {
chunk_offsets[i] = offset;
offset += chunk_lengths_[i];
}
}

const int64_t num_indices = static_cast<int64_t>(indices_end_ - indices_begin_);
ResolvedChunkIndex* physical_begin =
reinterpret_cast<ResolvedChunkIndex*>(indices_begin_);
for (int64_t i = 0; i < num_indices; ++i) {
const auto loc = physical_begin[i];
DCHECK_LT(loc.chunk_index(), chunk_offsets.size());
DCHECK_LT(loc.index_in_chunk(),
static_cast<uint64_t>(chunk_lengths_[loc.chunk_index()]));
indices_begin_[i] =
chunk_offsets[loc.chunk_index()] + static_cast<int64_t>(loc.index_in_chunk());
}

return Status::OK();
}

} // namespace arrow::compute::internal
119 changes: 99 additions & 20 deletions cpp/src/arrow/compute/kernels/chunked_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,32 @@
#include <algorithm>
#include <cstdint>
#include <memory>
#include <utility>
#include <vector>

#include "arrow/array.h"
#include "arrow/chunk_resolver.h"
#include "arrow/compute/kernels/codegen_internal.h"
#include "arrow/util/span.h"

namespace arrow {
namespace compute {
namespace internal {
namespace arrow::compute::internal {

// The target chunk in a chunked array.
struct ResolvedChunk {
// The target array in chunked array.
const Array* array;
// The index in the target array.
const int64_t index;
int64_t index;

ResolvedChunk(const Array* array, int64_t index) : array(array), index(index) {}

public:
friend bool operator==(const ResolvedChunk& left, const ResolvedChunk& right) {
return left.array == right.array && left.index == right.index;
}
friend bool operator!=(const ResolvedChunk& left, const ResolvedChunk& right) {
return left.array != right.array || left.index != right.index;
}

bool IsNull() const { return array->IsNull(index); }

template <typename ArrowType, typename ViewType = GetViewType<ArrowType>>
Expand All @@ -50,34 +56,107 @@ struct ResolvedChunk {
}
};

// A compressed (chunk_index, index_in_chunk) pair.
// The goal of compression is to make it fit in 64 bits, allowing in place
// replacement of logical uint64_t indices with physical indices.
// (see ChunkedIndexMapper)
struct ResolvedChunkIndex {
static constexpr int kChunkIndexBits = 24;
static constexpr int KIndexInChunkBits = 64 - kChunkIndexBits;

static constexpr uint64_t kMaxChunkIndex = (1ULL << kChunkIndexBits) - 1;
static constexpr uint64_t kMaxIndexInChunk = (1ULL << KIndexInChunkBits) - 1;

ResolvedChunkIndex() = default;

constexpr uint64_t chunk_index() const { return data_ & kMaxChunkIndex; }
constexpr uint64_t index_in_chunk() const { return data_ >> kChunkIndexBits; }

explicit constexpr ResolvedChunkIndex(uint64_t chunk_index, uint64_t index_in_chunk)
: data_((index_in_chunk << kChunkIndexBits) | chunk_index) {}

template <typename IndexType>
explicit operator TypedChunkLocation<IndexType>() {
return {static_cast<IndexType>(chunk_index()),
static_cast<IndexType>(index_in_chunk())};
}

private:
uint64_t data_;
};

static_assert(sizeof(uint64_t) == sizeof(ResolvedChunkIndex));

class ChunkedArrayResolver {
private:
ChunkResolver resolver_;
std::vector<const Array*> chunks_;
util::span<const Array* const> chunks_;
std::vector<const Array*> owned_chunks_;

public:
explicit ChunkedArrayResolver(const std::vector<const Array*>& chunks)
explicit ChunkedArrayResolver(std::vector<const Array*>&& chunks)
: resolver_(chunks), chunks_(chunks), owned_chunks_(std::move(chunks)) {}
explicit ChunkedArrayResolver(util::span<const Array* const> chunks)
: resolver_(chunks), chunks_(chunks) {}

ChunkedArrayResolver(ChunkedArrayResolver&& other) = default;
ChunkedArrayResolver& operator=(ChunkedArrayResolver&& other) = default;
ARROW_DEFAULT_MOVE_AND_ASSIGN(ChunkedArrayResolver);

ChunkedArrayResolver(const ChunkedArrayResolver& other) = default;
ChunkedArrayResolver& operator=(const ChunkedArrayResolver& other) = default;
ChunkedArrayResolver(const ChunkedArrayResolver& other)
: resolver_(other.resolver_), owned_chunks_(other.owned_chunks_) {
// Rebind span to owned_chunks_ if necessary
chunks_ = owned_chunks_.empty() ? other.chunks_ : owned_chunks_;
}
ChunkedArrayResolver& operator=(const ChunkedArrayResolver& other) {
resolver_ = other.resolver_;
owned_chunks_ = other.owned_chunks_;
chunks_ = owned_chunks_.empty() ? other.chunks_ : owned_chunks_;
return *this;
}

ResolvedChunk Resolve(int64_t index) const {
const auto loc = resolver_.Resolve(index);
return {chunks_[loc.chunk_index], loc.index_in_chunk};
}
};

inline std::vector<const Array*> GetArrayPointers(const ArrayVector& arrays) {
std::vector<const Array*> pointers(arrays.size());
std::transform(arrays.begin(), arrays.end(), pointers.begin(),
[&](const std::shared_ptr<Array>& array) { return array.get(); });
return pointers;
}
std::vector<const Array*> GetArrayPointers(const ArrayVector& arrays);

// A class that turns logical (linear) indices into physical (chunked) indices,
// and vice-versa.
class ChunkedIndexMapper {
public:
ChunkedIndexMapper(const std::vector<const Array*>& chunks, uint64_t* indices_begin,
uint64_t* indices_end)
: ChunkedIndexMapper(util::span(chunks), indices_begin, indices_end) {}
ChunkedIndexMapper(util::span<const Array* const> chunks, uint64_t* indices_begin,
uint64_t* indices_end)
: chunk_lengths_(GetChunkLengths(chunks)),
indices_begin_(indices_begin),
indices_end_(indices_end) {}
ChunkedIndexMapper(const RecordBatchVector& chunks, uint64_t* indices_begin,
uint64_t* indices_end)
: chunk_lengths_(GetChunkLengths(chunks)),
indices_begin_(indices_begin),
indices_end_(indices_end) {}

// Turn the original uint64_t logical indices into physical. This reuses the
// same memory area, so the logical indices cannot be used anymore until
// PhysicalToLogical() is called.
//
// This assumes that the logical indices are originally chunk-partitioned.
Result<std::pair<ResolvedChunkIndex*, ResolvedChunkIndex*>> LogicalToPhysical();

// Turn the physical indices back into logical, making the uint64_t indices
// usable again.
Status PhysicalToLogical();

private:
static std::vector<int64_t> GetChunkLengths(util::span<const Array* const> chunks);
static std::vector<int64_t> GetChunkLengths(const RecordBatchVector& chunks);

std::vector<int64_t> chunk_lengths_;
uint64_t* indices_begin_;
uint64_t* indices_end_;
};

} // namespace internal
} // namespace compute
} // namespace arrow
} // namespace arrow::compute::internal
4 changes: 3 additions & 1 deletion cpp/src/arrow/compute/kernels/vector_rank.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

namespace arrow::compute::internal {

using ::arrow::util::span;

namespace {

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -237,7 +239,7 @@ class Ranker<ChunkedArray> : public RankerMixin<ChunkedArray, Ranker<ChunkedArra
physical_chunks_, order_, null_placement_));

const auto arrays = GetArrayPointers(physical_chunks_);
auto value_selector = [resolver = ChunkedArrayResolver(arrays)](int64_t index) {
auto value_selector = [resolver = ChunkedArrayResolver(span(arrays))](int64_t index) {
return resolver.Resolve(index).Value<InType>();
};
ARROW_ASSIGN_OR_RAISE(*output_, CreateRankings(ctx_, sorted, null_placement_,
Expand Down
Loading

0 comments on commit 88a92b3

Please sign in to comment.