Skip to content

Commit

Permalink
apacheGH-41560: [C++] ChunkResolver: Implement ResolveMany and add un…
Browse files Browse the repository at this point in the history
…it tests (apache#41561)

### Rationale for this change

I want `ResolveMany` to support me in the implementation of `Take` that doesn't `Concatenate` all the chunks from a `ChunkedArray` `values` parameter.

### What changes are included in this PR?

 - Implementation of `ChunkResolver::ResolveMany()`
 - Addition of missing unit tests for `ChunkResolver`

### Are these changes tested?

Yes. By new unit tests.

### Are there any user-facing changes?

No. `ChunkResolver` is an internal API at the moment (see apache#34535 for future plans).
* GitHub Issue: apache#41560

Authored-by: Felipe Oliveira Carvalho <[email protected]>
Signed-off-by: Felipe Oliveira Carvalho <[email protected]>
  • Loading branch information
felipecrv authored and vibhatha committed May 25, 2024
1 parent 77b91ad commit 3107f2b
Show file tree
Hide file tree
Showing 4 changed files with 407 additions and 31 deletions.
80 changes: 76 additions & 4 deletions cpp/src/arrow/chunk_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

#include <algorithm>
#include <cstdint>
#include <limits>
#include <memory>
#include <vector>

#include "arrow/array.h"
#include "arrow/record_batch.h"

namespace arrow {
namespace internal {
namespace arrow::internal {

namespace {
template <typename T>
Expand Down Expand Up @@ -54,6 +54,51 @@ inline std::vector<int64_t> MakeChunksOffsets(const std::vector<T>& chunks) {
offsets[chunks.size()] = offset;
return offsets;
}

/// \pre all the pre-conditions of ChunkResolver::ResolveMany()
/// \pre num_offsets - 1 <= std::numeric_limits<IndexType>::max()
template <typename IndexType>
void ResolveManyInline(size_t num_offsets, const int64_t* signed_offsets,
int64_t n_indices, const IndexType* logical_index_vec,
IndexType* out_chunk_index_vec, IndexType chunk_hint,
IndexType* out_index_in_chunk_vec) {
auto* offsets = reinterpret_cast<const uint64_t*>(signed_offsets);
const auto num_chunks = static_cast<IndexType>(num_offsets - 1);
// chunk_hint in [0, num_offsets) per the precondition.
for (int64_t i = 0; i < n_indices; i++) {
const auto index = static_cast<uint64_t>(logical_index_vec[i]);
if (index >= offsets[chunk_hint] &&
(chunk_hint == num_chunks || index < offsets[chunk_hint + 1])) {
out_chunk_index_vec[i] = chunk_hint; // hint is correct!
continue;
}
// lo < hi is guaranteed by `num_offsets = chunks.size() + 1`
auto chunk_index =
ChunkResolver::Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets);
chunk_hint = static_cast<IndexType>(chunk_index);
out_chunk_index_vec[i] = chunk_hint;
}
if (out_index_in_chunk_vec != NULLPTR) {
for (int64_t i = 0; i < n_indices; i++) {
auto logical_index = logical_index_vec[i];
auto chunk_index = out_chunk_index_vec[i];
// chunk_index is in [0, chunks.size()] no matter what the
// value of logical_index is, so it's always safe to dereference
// offset_ as it contains chunks.size()+1 values.
out_index_in_chunk_vec[i] =
logical_index - static_cast<IndexType>(offsets[chunk_index]);
#if defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER)
// Make it more likely that Valgrind/ASAN can catch an invalid memory
// access by poisoning out_index_in_chunk_vec[i] when the logical
// index is out-of-bounds.
if (chunk_index == num_chunks) {
out_index_in_chunk_vec[i] = std::numeric_limits<IndexType>::max();
}
#endif
}
}
}

} // namespace

ChunkResolver::ChunkResolver(const ArrayVector& chunks) noexcept
Expand Down Expand Up @@ -84,5 +129,32 @@ ChunkResolver& ChunkResolver::operator=(const ChunkResolver& other) noexcept {
return *this;
}

} // namespace internal
} // namespace arrow
void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint8_t* logical_index_vec,
uint8_t* out_chunk_index_vec, uint8_t chunk_hint,
uint8_t* out_index_in_chunk_vec) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_index_vec, chunk_hint, out_index_in_chunk_vec);
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint32_t* logical_index_vec,
uint32_t* out_chunk_index_vec, uint32_t chunk_hint,
uint32_t* out_index_in_chunk_vec) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_index_vec, chunk_hint, out_index_in_chunk_vec);
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint16_t* logical_index_vec,
uint16_t* out_chunk_index_vec, uint16_t chunk_hint,
uint16_t* out_index_in_chunk_vec) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_index_vec, chunk_hint, out_index_in_chunk_vec);
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint64_t* logical_index_vec,
uint64_t* out_chunk_index_vec, uint64_t chunk_hint,
uint64_t* out_index_in_chunk_vec) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_index_vec, chunk_hint, out_index_in_chunk_vec);
}

} // namespace arrow::internal
128 changes: 119 additions & 9 deletions cpp/src/arrow/chunk_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
#include <atomic>
#include <cassert>
#include <cstdint>
#include <limits>
#include <type_traits>
#include <vector>

#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"

namespace arrow::internal {

struct ChunkResolver;

struct ChunkLocation {
/// \brief Index of the chunk in the array of chunks
///
Expand All @@ -36,8 +40,17 @@ struct ChunkLocation {

/// \brief Index of the value in the chunk
///
/// The value is undefined if chunk_index >= chunks.size()
/// The value is UNDEFINED if chunk_index >= chunks.size()
int64_t index_in_chunk = 0;

ChunkLocation() = default;

ChunkLocation(int64_t chunk_index, int64_t index_in_chunk)
: chunk_index(chunk_index), index_in_chunk(index_in_chunk) {}

bool operator==(ChunkLocation other) const {
return chunk_index == other.chunk_index && index_in_chunk == other.index_in_chunk;
}
};

/// \brief An utility that incrementally resolves logical indices into
Expand All @@ -60,12 +73,35 @@ struct ARROW_EXPORT ChunkResolver {
explicit ChunkResolver(const std::vector<const Array*>& chunks) noexcept;
explicit ChunkResolver(const RecordBatchVector& batches) noexcept;

/// \brief Construct a ChunkResolver from a vector of chunks.size() + 1 offsets.
///
/// The first offset must be 0 and the last offset must be the logical length of the
/// chunked array. Each offset before the last represents the starting logical index of
/// the corresponding chunk.
explicit ChunkResolver(std::vector<int64_t> offsets) noexcept
: offsets_(std::move(offsets)), cached_chunk_(0) {
#ifndef NDEBUG
assert(offsets_.size() >= 1);
assert(offsets_[0] == 0);
for (size_t i = 1; i < offsets_.size(); i++) {
assert(offsets_[i] >= offsets_[i - 1]);
}
#endif
}

ChunkResolver(ChunkResolver&& other) noexcept;
ChunkResolver& operator=(ChunkResolver&& other) noexcept;

ChunkResolver(const ChunkResolver& other) noexcept;
ChunkResolver& operator=(const ChunkResolver& other) noexcept;

int64_t logical_array_length() const { return offsets_.back(); }
int64_t num_chunks() const { return static_cast<int64_t>(offsets_.size()) - 1; }

int64_t chunk_length(int64_t chunk_index) const {
return offsets_[chunk_index + 1] - offsets_[chunk_index];
}

/// \brief Resolve a logical index to a ChunkLocation.
///
/// The returned ChunkLocation contains the chunk index and the within-chunk index
Expand All @@ -81,7 +117,7 @@ struct ARROW_EXPORT ChunkResolver {
const auto cached_chunk = cached_chunk_.load(std::memory_order_relaxed);
const auto chunk_index =
ResolveChunkIndex</*StoreCachedChunk=*/true>(index, cached_chunk);
return {chunk_index, index - offsets_[chunk_index]};
return ChunkLocation{chunk_index, index - offsets_[chunk_index]};
}

/// \brief Resolve a logical index to a ChunkLocation.
Expand All @@ -97,12 +133,70 @@ struct ARROW_EXPORT ChunkResolver {
/// \return ChunkLocation with a valid chunk_index if index is within
/// bounds, or with chunk_index == chunks.size() if logical index is
/// `>= chunked_array.length()`.
inline ChunkLocation ResolveWithChunkIndexHint(int64_t index,
ChunkLocation hint) const {
inline ChunkLocation ResolveWithHint(int64_t index, ChunkLocation hint) const {
assert(hint.chunk_index < static_cast<int64_t>(offsets_.size()));
const auto chunk_index =
ResolveChunkIndex</*StoreCachedChunk=*/false>(index, hint.chunk_index);
return {chunk_index, index - offsets_[chunk_index]};
return ChunkLocation{chunk_index, index - offsets_[chunk_index]};
}

/// \brief Resolve `n_indices` logical indices to chunk indices.
///
/// \pre 0 <= logical_index_vec[i] < logical_array_length()
/// (for well-defined and valid chunk index results)
/// \pre out_chunk_index_vec has space for `n_indices`
/// \pre chunk_hint in [0, chunks.size()]
/// \post out_chunk_index_vec[i] in [0, chunks.size()] for i in [0, n)
/// \post if logical_index_vec[i] >= chunked_array.length(), then
/// out_chunk_index_vec[i] == chunks.size()
/// and out_index_in_chunk_vec[i] is UNDEFINED (can be out-of-bounds)
/// \post if logical_index_vec[i] < 0, then both out_chunk_index_vec[i] and
/// out_index_in_chunk_vec[i] are UNDEFINED
///
/// \param n_indices The number of logical indices to resolve
/// \param logical_index_vec The logical indices to resolve
/// \param out_chunk_index_vec The output array where the chunk indices will be written
/// \param chunk_hint 0 or the last chunk_index produced by ResolveMany
/// \param out_index_in_chunk_vec If not NULLPTR, the output array where the
/// within-chunk indices will be written
/// \return false iff chunks.size() > std::numeric_limits<IndexType>::max()
template <typename IndexType>
[[nodiscard]] bool ResolveMany(int64_t n_indices, const IndexType* logical_index_vec,
IndexType* out_chunk_index_vec, IndexType chunk_hint = 0,
IndexType* out_index_in_chunk_vec = NULLPTR) const {
if constexpr (sizeof(IndexType) < sizeof(uint64_t)) {
// The max value returned by Bisect is `offsets.size() - 1` (= chunks.size()).
constexpr uint64_t kMaxIndexTypeValue = std::numeric_limits<IndexType>::max();
// A ChunkedArray with enough empty chunks can make the index of a chunk
// exceed the logical index and thus the maximum value of IndexType.
const bool chunk_index_fits_on_type =
static_cast<uint64_t>(offsets_.size() - 1) <= kMaxIndexTypeValue;
if (ARROW_PREDICT_FALSE(!chunk_index_fits_on_type)) {
return false;
}
// Since an index-in-chunk cannot possibly exceed the logical index being
// queried, we don't have to worry about these values not fitting on IndexType.
}
if constexpr (std::is_signed_v<IndexType>) {
// We interpret signed integers as unsigned and avoid having to generate double
// the amount of binary code to handle each integer width.
//
// Negative logical indices can become large values when cast to unsigned, and
// they are gracefully handled by ResolveManyImpl, but both the chunk index
// and the index in chunk values will be undefined in these cases. This
// happend because int8_t(-1) == uint8_t(255) and 255 could be a valid
// logical index in the chunked array.
using U = std::make_unsigned_t<IndexType>;
ResolveManyImpl(n_indices, reinterpret_cast<const U*>(logical_index_vec),
reinterpret_cast<U*>(out_chunk_index_vec),
static_cast<U>(chunk_hint),
reinterpret_cast<U*>(out_index_in_chunk_vec));
} else {
static_assert(std::is_unsigned_v<IndexType>);
ResolveManyImpl(n_indices, logical_index_vec, out_chunk_index_vec, chunk_hint,
out_index_in_chunk_vec);
}
return true;
}

private:
Expand Down Expand Up @@ -130,26 +224,42 @@ struct ARROW_EXPORT ChunkResolver {
return chunk_index;
}

/// \pre all the pre-conditions of ChunkResolver::ResolveMany()
/// \pre num_offsets - 1 <= std::numeric_limits<IndexType>::max()
void ResolveManyImpl(int64_t, const uint8_t*, uint8_t*, uint8_t, uint8_t*) const;
void ResolveManyImpl(int64_t, const uint16_t*, uint16_t*, uint16_t, uint16_t*) const;
void ResolveManyImpl(int64_t, const uint32_t*, uint32_t*, uint32_t, uint32_t*) const;
void ResolveManyImpl(int64_t, const uint64_t*, uint64_t*, uint64_t, uint64_t*) const;

public:
/// \brief Find the index of the chunk that contains the logical index.
///
/// Any non-negative index is accepted. When `hi=num_offsets`, the largest
/// possible return value is `num_offsets-1` which is equal to
/// `chunks.size()`. The is returned when the logical index is out-of-bounds.
/// `chunks.size()`. Which is returned when the logical index is greater or
/// equal the logical length of the chunked array.
///
/// \pre index >= 0
/// \pre index >= 0 (otherwise, when index is negative, hi-1 is returned)
/// \pre lo < hi
/// \pre lo >= 0 && hi <= offsets_.size()
static inline int64_t Bisect(int64_t index, const int64_t* offsets, int64_t lo,
int64_t hi) {
return Bisect(static_cast<uint64_t>(index),
reinterpret_cast<const uint64_t*>(offsets), static_cast<uint64_t>(lo),
static_cast<uint64_t>(hi));
}

static inline int64_t Bisect(uint64_t index, const uint64_t* offsets, uint64_t lo,
uint64_t hi) {
// Similar to std::upper_bound(), but slightly different as our offsets
// array always starts with 0.
auto n = hi - lo;
// First iteration does not need to check for n > 1
// (lo < hi is guaranteed by the precondition).
assert(n > 1 && "lo < hi is a precondition of Bisect");
do {
const int64_t m = n >> 1;
const int64_t mid = lo + m;
const uint64_t m = n >> 1;
const uint64_t mid = lo + m;
if (index >= offsets[mid]) {
lo = mid;
n -= m;
Expand Down
Loading

0 comments on commit 3107f2b

Please sign in to comment.