Skip to content

Commit

Permalink
Also improve Table sorting
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Sep 25, 2024
1 parent 562d1f9 commit df0f691
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 154 deletions.
62 changes: 37 additions & 25 deletions cpp/src/arrow/compute/kernels/chunked_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <algorithm>

#include "arrow/record_batch.h"
#include "arrow/util/logging.h"

namespace arrow::compute::internal {
Expand All @@ -32,25 +33,37 @@ std::vector<const Array*> GetArrayPointers(const ArrayVector& arrays) {
return pointers;
}

ChunkedIndexMapper::ChunkedIndexMapper(util::span<const Array* const> chunks,
uint64_t* indices_begin, uint64_t* indices_end)
: resolver_(chunks),
chunks_(chunks),
indices_begin_(indices_begin),
indices_end_(indices_end) {}
std::vector<int64_t> ChunkedIndexMapper::GetChunkLengths(
util::span<const Array* const> chunks) {
std::vector<int64_t> chunk_lengths(chunks.size());
for (int64_t i = 0; i < static_cast<int64_t>(chunks.size()); ++i) {
chunk_lengths[i] = chunks[i]->length();
}
return chunk_lengths;
}

std::vector<int64_t> ChunkedIndexMapper::GetChunkLengths(
const RecordBatchVector& chunks) {
std::vector<int64_t> chunk_lengths(chunks.size());
for (int64_t i = 0; i < static_cast<int64_t>(chunks.size()); ++i) {
chunk_lengths[i] = chunks[i]->num_rows();
}
return chunk_lengths;
}

Result<std::pair<CompressedChunkLocation*, CompressedChunkLocation*>>
Result<std::pair<ResolvedChunkIndex*, ResolvedChunkIndex*>>
ChunkedIndexMapper::LogicalToPhysical() {
// Check that indices would fall in bounds for CompressedChunkLocation
if (ARROW_PREDICT_FALSE(chunks_.size() > CompressedChunkLocation::kMaxChunkIndex + 1)) {
// Check that indices would fall in bounds for ResolvedChunkIndex
if (ARROW_PREDICT_FALSE(chunk_lengths_.size() >
ResolvedChunkIndex::kMaxChunkIndex + 1)) {
return Status::NotImplemented("Chunked array has more than ",
CompressedChunkLocation::kMaxChunkIndex + 1, " chunks");
ResolvedChunkIndex::kMaxChunkIndex + 1, " chunks");
}
for (const Array* chunk : chunks_) {
if (ARROW_PREDICT_FALSE(static_cast<uint64_t>(chunk->length()) >
CompressedChunkLocation::kMaxIndexInChunk + 1)) {
for (int64_t chunk_length : chunk_lengths_) {
if (ARROW_PREDICT_FALSE(static_cast<uint64_t>(chunk_length) >
ResolvedChunkIndex::kMaxIndexInChunk + 1)) {
return Status::NotImplemented("Individual chunk in chunked array has more than ",
CompressedChunkLocation::kMaxIndexInChunk + 1,
ResolvedChunkIndex::kMaxIndexInChunk + 1,
" elements");
}
}
Expand All @@ -59,10 +72,10 @@ ChunkedIndexMapper::LogicalToPhysical() {
std::array<TypedChunkLocation<uint64_t>, kMaxBatchSize> batch;

const int64_t num_indices = static_cast<int64_t>(indices_end_ - indices_begin_);
CompressedChunkLocation* physical_begin =
reinterpret_cast<CompressedChunkLocation*>(indices_begin_);
ResolvedChunkIndex* physical_begin =
reinterpret_cast<ResolvedChunkIndex*>(indices_begin_);
DCHECK_EQ(physical_begin + num_indices,
reinterpret_cast<CompressedChunkLocation*>(indices_end_));
reinterpret_cast<ResolvedChunkIndex*>(indices_end_));

for (int64_t i = 0; i < num_indices; i += kMaxBatchSize) {
const int64_t batch_size = std::min(kMaxBatchSize, num_indices - i);
Expand All @@ -71,32 +84,31 @@ ChunkedIndexMapper::LogicalToPhysical() {
DCHECK(ok) << "ResolveMany unexpectedly failed (invalid logical index?)";
for (int64_t j = 0; j < batch_size; ++j) {
const auto loc = batch[j];
physical_begin[i + j] =
CompressedChunkLocation{loc.chunk_index, loc.index_in_chunk};
physical_begin[i + j] = ResolvedChunkIndex{loc.chunk_index, loc.index_in_chunk};
}
}

return std::pair{physical_begin, physical_begin + num_indices};
}

Status ChunkedIndexMapper::PhysicalToLogical() {
std::vector<int64_t> chunk_offsets(chunks_.size());
std::vector<int64_t> chunk_offsets(chunk_lengths_.size());
{
int64_t offset = 0;
for (int64_t i = 0; i < static_cast<int64_t>(chunks_.size()); ++i) {
for (int64_t i = 0; i < static_cast<int64_t>(chunk_lengths_.size()); ++i) {
chunk_offsets[i] = offset;
offset += chunks_[i]->length();
offset += chunk_lengths_[i];
}
}

const int64_t num_indices = static_cast<int64_t>(indices_end_ - indices_begin_);
CompressedChunkLocation* physical_begin =
reinterpret_cast<CompressedChunkLocation*>(indices_begin_);
ResolvedChunkIndex* physical_begin =
reinterpret_cast<ResolvedChunkIndex*>(indices_begin_);
for (int64_t i = 0; i < num_indices; ++i) {
const auto loc = physical_begin[i];
DCHECK_LT(loc.chunk_index(), chunk_offsets.size());
DCHECK_LT(loc.index_in_chunk(),
static_cast<uint64_t>(chunks_[loc.chunk_index()]->length()));
static_cast<uint64_t>(chunk_lengths_[loc.chunk_index()]));
indices_begin_[i] =
chunk_offsets[loc.chunk_index()] + static_cast<int64_t>(loc.index_in_chunk());
}
Expand Down
50 changes: 35 additions & 15 deletions cpp/src/arrow/compute/kernels/chunked_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

namespace arrow::compute::internal {

using ::arrow::internal::ChunkResolver;
using ::arrow::internal::TypedChunkLocation;

// The target chunk in a chunked array.
struct ResolvedChunk {
// The target array in chunked array.
Expand All @@ -56,33 +59,38 @@ struct ResolvedChunk {
}
};

// TODO rename to something shorter? (e.g. ResolvedChunkIndex, CompressedChunkLoc...)
struct CompressedChunkLocation {
struct ResolvedChunkIndex {
static constexpr int kChunkIndexBits = 24;
static constexpr int KIndexInChunkBits = 64 - kChunkIndexBits;

static constexpr uint64_t kMaxChunkIndex = (1ULL << kChunkIndexBits) - 1;
static constexpr uint64_t kMaxIndexInChunk = (1ULL << KIndexInChunkBits) - 1;

CompressedChunkLocation() = default;
ResolvedChunkIndex() = default;

constexpr uint64_t chunk_index() const { return data_ & kMaxChunkIndex; }
constexpr uint64_t index_in_chunk() const { return data_ >> kChunkIndexBits; }
explicit constexpr CompressedChunkLocation(uint64_t chunk_index,
uint64_t index_in_chunk)

explicit constexpr ResolvedChunkIndex(uint64_t chunk_index, uint64_t index_in_chunk)
: data_((index_in_chunk << kChunkIndexBits) | chunk_index) {}

template <typename IndexType>
explicit operator TypedChunkLocation<IndexType>() {
return {static_cast<IndexType>(chunk_index()),
static_cast<IndexType>(index_in_chunk())};
}

private:
uint64_t data_;
};

// CompressedChunkLocation must be the same size of a logical index, to
// enable in-place resolution.
static_assert(sizeof(uint64_t) == sizeof(CompressedChunkLocation));
// ResolvedChunkIndex must be the same size of a logical index, to
// enable in-place resolution in ChunkedIndexMapper.s
static_assert(sizeof(uint64_t) == sizeof(ResolvedChunkIndex));

class ChunkedArrayResolver {
private:
::arrow::internal::ChunkResolver resolver_;
ChunkResolver resolver_;
util::span<const Array* const> chunks_;
std::vector<const Array*> owned_chunks_;

Expand Down Expand Up @@ -116,20 +124,32 @@ std::vector<const Array*> GetArrayPointers(const ArrayVector& arrays);

class ChunkedIndexMapper {
public:
ChunkedIndexMapper(util::span<const Array* const> chunks, uint64_t* indices_begin,
uint64_t* indices_end);
ChunkedIndexMapper(const std::vector<const Array*>& chunks, uint64_t* indices_begin,
uint64_t* indices_end)
: ChunkedIndexMapper(util::span(chunks), indices_begin, indices_end) {}
ChunkedIndexMapper(util::span<const Array* const> chunks, uint64_t* indices_begin,
uint64_t* indices_end)
: resolver_(chunks),
chunk_lengths_(GetChunkLengths(chunks)),
indices_begin_(indices_begin),
indices_end_(indices_end) {}
ChunkedIndexMapper(const RecordBatchVector& chunks, uint64_t* indices_begin,
uint64_t* indices_end)
: resolver_(chunks),
chunk_lengths_(GetChunkLengths(chunks)),
indices_begin_(indices_begin),
indices_end_(indices_end) {}

Result<std::pair<CompressedChunkLocation*, CompressedChunkLocation*>>
LogicalToPhysical();
Result<std::pair<ResolvedChunkIndex*, ResolvedChunkIndex*>> LogicalToPhysical();

Status PhysicalToLogical();

private:
::arrow::internal::ChunkResolver resolver_;
util::span<const Array* const> chunks_;
static std::vector<int64_t> GetChunkLengths(util::span<const Array* const> chunks);
static std::vector<int64_t> GetChunkLengths(const RecordBatchVector& chunks);

ChunkResolver resolver_;
std::vector<int64_t> chunk_lengths_;
uint64_t* indices_begin_;
uint64_t* indices_end_;
};
Expand Down
Loading

0 comments on commit df0f691

Please sign in to comment.