Skip to content

Commit

Permalink
Rename ResolvedChunkIndex to CompressedChunkLocation
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Nov 26, 2024
1 parent 88a92b3 commit c85bb4c
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 57 deletions.
24 changes: 12 additions & 12 deletions cpp/src/arrow/compute/kernels/chunked_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,28 @@ std::vector<int64_t> ChunkedIndexMapper::GetChunkLengths(
return chunk_lengths;
}

Result<std::pair<ResolvedChunkIndex*, ResolvedChunkIndex*>>
Result<std::pair<CompressedChunkLocation*, CompressedChunkLocation*>>
ChunkedIndexMapper::LogicalToPhysical() {
// Check that indices would fall in bounds for ResolvedChunkIndex
// Check that indices would fall in bounds for CompressedChunkLocation
if (ARROW_PREDICT_FALSE(chunk_lengths_.size() >
ResolvedChunkIndex::kMaxChunkIndex + 1)) {
CompressedChunkLocation::kMaxChunkIndex + 1)) {
return Status::NotImplemented("Chunked array has more than ",
ResolvedChunkIndex::kMaxChunkIndex + 1, " chunks");
CompressedChunkLocation::kMaxChunkIndex + 1, " chunks");
}
for (int64_t chunk_length : chunk_lengths_) {
if (ARROW_PREDICT_FALSE(static_cast<uint64_t>(chunk_length) >
ResolvedChunkIndex::kMaxIndexInChunk + 1)) {
CompressedChunkLocation::kMaxIndexInChunk + 1)) {
return Status::NotImplemented("Individual chunk in chunked array has more than ",
ResolvedChunkIndex::kMaxIndexInChunk + 1,
CompressedChunkLocation::kMaxIndexInChunk + 1,
" elements");
}
}

const int64_t num_indices = static_cast<int64_t>(indices_end_ - indices_begin_);
ResolvedChunkIndex* physical_begin =
reinterpret_cast<ResolvedChunkIndex*>(indices_begin_);
CompressedChunkLocation* physical_begin =
reinterpret_cast<CompressedChunkLocation*>(indices_begin_);
DCHECK_EQ(physical_begin + num_indices,
reinterpret_cast<ResolvedChunkIndex*>(indices_end_));
reinterpret_cast<CompressedChunkLocation*>(indices_end_));

int64_t chunk_offset = 0;
for (int64_t chunk_index = 0; chunk_index < static_cast<int64_t>(chunk_lengths_.size());
Expand All @@ -80,7 +80,7 @@ ChunkedIndexMapper::LogicalToPhysical() {
DCHECK_GE(indices_begin_[chunk_offset + i], static_cast<uint64_t>(chunk_offset));
DCHECK_LT(indices_begin_[chunk_offset + i],
static_cast<uint64_t>(chunk_offset + chunk_length));
physical_begin[chunk_offset + i] = ResolvedChunkIndex{
physical_begin[chunk_offset + i] = CompressedChunkLocation{
static_cast<uint64_t>(chunk_index),
indices_begin_[chunk_offset + i] - static_cast<uint64_t>(chunk_offset)};
}
Expand All @@ -101,8 +101,8 @@ Status ChunkedIndexMapper::PhysicalToLogical() {
}

const int64_t num_indices = static_cast<int64_t>(indices_end_ - indices_begin_);
ResolvedChunkIndex* physical_begin =
reinterpret_cast<ResolvedChunkIndex*>(indices_begin_);
CompressedChunkLocation* physical_begin =
reinterpret_cast<CompressedChunkLocation*>(indices_begin_);
for (int64_t i = 0; i < num_indices; ++i) {
const auto loc = physical_begin[i];
DCHECK_LT(loc.chunk_index(), chunk_offsets.size());
Expand Down
12 changes: 7 additions & 5 deletions cpp/src/arrow/compute/kernels/chunked_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,20 @@ struct ResolvedChunk {
// The goal of compression is to make it fit in 64 bits, allowing in place
// replacement of logical uint64_t indices with physical indices.
// (see ChunkedIndexMapper)
struct ResolvedChunkIndex {
struct CompressedChunkLocation {
static constexpr int kChunkIndexBits = 24;
static constexpr int KIndexInChunkBits = 64 - kChunkIndexBits;

static constexpr uint64_t kMaxChunkIndex = (1ULL << kChunkIndexBits) - 1;
static constexpr uint64_t kMaxIndexInChunk = (1ULL << KIndexInChunkBits) - 1;

ResolvedChunkIndex() = default;
CompressedChunkLocation() = default;

constexpr uint64_t chunk_index() const { return data_ & kMaxChunkIndex; }
constexpr uint64_t index_in_chunk() const { return data_ >> kChunkIndexBits; }

explicit constexpr ResolvedChunkIndex(uint64_t chunk_index, uint64_t index_in_chunk)
explicit constexpr CompressedChunkLocation(uint64_t chunk_index,
uint64_t index_in_chunk)
: data_((index_in_chunk << kChunkIndexBits) | chunk_index) {}

template <typename IndexType>
Expand All @@ -85,7 +86,7 @@ struct ResolvedChunkIndex {
uint64_t data_;
};

static_assert(sizeof(uint64_t) == sizeof(ResolvedChunkIndex));
static_assert(sizeof(uint64_t) == sizeof(CompressedChunkLocation));

class ChunkedArrayResolver {
private:
Expand Down Expand Up @@ -144,7 +145,8 @@ class ChunkedIndexMapper {
// PhysicalToLogical() is called.
//
// This assumes that the logical indices are originally chunk-partitioned.
Result<std::pair<ResolvedChunkIndex*, ResolvedChunkIndex*>> LogicalToPhysical();
Result<std::pair<CompressedChunkLocation*, CompressedChunkLocation*>>
LogicalToPhysical();

// Turn the physical indices back into logical, making the uint64_t indices
// usable again.
Expand Down
69 changes: 36 additions & 33 deletions cpp/src/arrow/compute/kernels/vector_sort.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,18 @@ class ChunkedArraySorter : public TypeVisitor {
sorted[i], indices_begin_, chunked_indices_begin);
}

auto merge_nulls = [&](ResolvedChunkIndex* nulls_begin,
ResolvedChunkIndex* nulls_middle,
ResolvedChunkIndex* nulls_end,
ResolvedChunkIndex* temp_indices, int64_t null_count) {
auto merge_nulls = [&](CompressedChunkLocation* nulls_begin,
CompressedChunkLocation* nulls_middle,
CompressedChunkLocation* nulls_end,
CompressedChunkLocation* temp_indices, int64_t null_count) {
if (has_null_like_values<typename ArrayType::TypeClass>::value) {
PartitionNullsOnly<StablePartitioner>(nulls_begin, nulls_end, arrays,
null_count, null_placement_);
}
};
auto merge_non_nulls =
[&](ResolvedChunkIndex* range_begin, ResolvedChunkIndex* range_middle,
ResolvedChunkIndex* range_end, ResolvedChunkIndex* temp_indices) {
[&](CompressedChunkLocation* range_begin, CompressedChunkLocation* range_middle,
CompressedChunkLocation* range_end, CompressedChunkLocation* temp_indices) {
MergeNonNulls<ArrayType>(range_begin, range_middle, range_end, arrays,
temp_indices);
};
Expand Down Expand Up @@ -175,20 +175,21 @@ class ChunkedArraySorter : public TypeVisitor {
}

template <typename ArrayType>
void MergeNonNulls(ResolvedChunkIndex* range_begin, ResolvedChunkIndex* range_middle,
ResolvedChunkIndex* range_end, span<const Array* const> arrays,
ResolvedChunkIndex* temp_indices) {
void MergeNonNulls(CompressedChunkLocation* range_begin,
CompressedChunkLocation* range_middle,
CompressedChunkLocation* range_end, span<const Array* const> arrays,
CompressedChunkLocation* temp_indices) {
using ArrowType = typename ArrayType::TypeClass;

if (order_ == SortOrder::Ascending) {
std::merge(range_begin, range_middle, range_middle, range_end, temp_indices,
[&](ResolvedChunkIndex left, ResolvedChunkIndex right) {
[&](CompressedChunkLocation left, CompressedChunkLocation right) {
return ChunkValue<ArrowType>(arrays, left) <
ChunkValue<ArrowType>(arrays, right);
});
} else {
std::merge(range_begin, range_middle, range_middle, range_end, temp_indices,
[&](ResolvedChunkIndex left, ResolvedChunkIndex right) {
[&](CompressedChunkLocation left, CompressedChunkLocation right) {
// We don't use 'left > right' here to reduce required
// operator. If we use 'right < left' here, '<' is only
// required.
Expand All @@ -201,7 +202,7 @@ class ChunkedArraySorter : public TypeVisitor {
}

template <typename ArrowType>
auto ChunkValue(span<const Array* const> arrays, ResolvedChunkIndex loc) const {
auto ChunkValue(span<const Array* const> arrays, CompressedChunkLocation loc) const {
return ResolvedChunk(arrays[loc.chunk_index()],
static_cast<int64_t>(loc.index_in_chunk()))
.template Value<ArrowType>();
Expand Down Expand Up @@ -744,16 +745,16 @@ class TableSorter {
template <typename ArrowType>
Status MergeInternal(std::vector<ChunkedNullPartitionResult>* sorted,
int64_t null_count) {
auto merge_nulls = [&](ResolvedChunkIndex* nulls_begin,
ResolvedChunkIndex* nulls_middle,
ResolvedChunkIndex* nulls_end,
ResolvedChunkIndex* temp_indices, int64_t null_count) {
auto merge_nulls = [&](CompressedChunkLocation* nulls_begin,
CompressedChunkLocation* nulls_middle,
CompressedChunkLocation* nulls_end,
CompressedChunkLocation* temp_indices, int64_t null_count) {
MergeNulls<ArrowType>(nulls_begin, nulls_middle, nulls_end, temp_indices,
null_count);
};
auto merge_non_nulls =
[&](ResolvedChunkIndex* range_begin, ResolvedChunkIndex* range_middle,
ResolvedChunkIndex* range_end, ResolvedChunkIndex* temp_indices) {
[&](CompressedChunkLocation* range_begin, CompressedChunkLocation* range_middle,
CompressedChunkLocation* range_end, CompressedChunkLocation* temp_indices) {
MergeNonNulls<ArrowType>(range_begin, range_middle, range_end, temp_indices);
};

Expand All @@ -779,16 +780,17 @@ class TableSorter {
}

template <typename ArrowType>
void MergeNulls(ResolvedChunkIndex* nulls_begin, ResolvedChunkIndex* nulls_middle,
ResolvedChunkIndex* nulls_end, ResolvedChunkIndex* temp_indices,
int64_t null_count) {
void MergeNulls(CompressedChunkLocation* nulls_begin,
CompressedChunkLocation* nulls_middle,
CompressedChunkLocation* nulls_end,
CompressedChunkLocation* temp_indices, int64_t null_count) {
if constexpr (has_null_like_values<ArrowType>::value) {
// Merge rows with a null or a null-like in the first sort key
auto& comparator = comparator_;
const auto& first_sort_key = sort_keys_[0];

std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end, temp_indices,
[&](ResolvedChunkIndex left, ResolvedChunkIndex right) {
[&](CompressedChunkLocation left, CompressedChunkLocation right) {
// First column is either null or nan
const auto left_loc = ChunkLocation{left};
const auto right_loc = ChunkLocation{right};
Expand All @@ -811,14 +813,15 @@ class TableSorter {
}
}

void MergeNullsOnly(ResolvedChunkIndex* nulls_begin, ResolvedChunkIndex* nulls_middle,
ResolvedChunkIndex* nulls_end, ResolvedChunkIndex* temp_indices,
int64_t null_count) {
void MergeNullsOnly(CompressedChunkLocation* nulls_begin,
CompressedChunkLocation* nulls_middle,
CompressedChunkLocation* nulls_end,
CompressedChunkLocation* temp_indices, int64_t null_count) {
// Untyped implementation
auto& comparator = comparator_;

std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end, temp_indices,
[&](ResolvedChunkIndex left, ResolvedChunkIndex right) {
[&](CompressedChunkLocation left, CompressedChunkLocation right) {
// First column is always null
return comparator.Compare(ChunkLocation{left}, ChunkLocation{right}, 1);
});
Expand All @@ -831,13 +834,13 @@ class TableSorter {
//
template <typename ArrowType>
enable_if_t<!is_null_type<ArrowType>::value> MergeNonNulls(
ResolvedChunkIndex* range_begin, ResolvedChunkIndex* range_middle,
ResolvedChunkIndex* range_end, ResolvedChunkIndex* temp_indices) {
CompressedChunkLocation* range_begin, CompressedChunkLocation* range_middle,
CompressedChunkLocation* range_end, CompressedChunkLocation* temp_indices) {
auto& comparator = comparator_;
const auto& first_sort_key = sort_keys_[0];

std::merge(range_begin, range_middle, range_middle, range_end, temp_indices,
[&](ResolvedChunkIndex left, ResolvedChunkIndex right) {
[&](CompressedChunkLocation left, CompressedChunkLocation right) {
// Both values are never null nor NaN.
const auto left_loc = ChunkLocation{left};
const auto right_loc = ChunkLocation{right};
Expand Down Expand Up @@ -867,10 +870,10 @@ class TableSorter {
}

template <typename ArrowType>
enable_if_null<ArrowType> MergeNonNulls(ResolvedChunkIndex* range_begin,
ResolvedChunkIndex* range_middle,
ResolvedChunkIndex* range_end,
ResolvedChunkIndex* temp_indices) {
enable_if_null<ArrowType> MergeNonNulls(CompressedChunkLocation* range_begin,
CompressedChunkLocation* range_middle,
CompressedChunkLocation* range_end,
CompressedChunkLocation* temp_indices) {
const int64_t null_count = range_end - range_begin;
MergeNullsOnly(range_begin, range_middle, range_end, temp_indices, null_count);
}
Expand Down
15 changes: 8 additions & 7 deletions cpp/src/arrow/compute/kernels/vector_sort_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ struct GenericNullPartitionResult {
};

using NullPartitionResult = GenericNullPartitionResult<uint64_t>;
using ChunkedNullPartitionResult = GenericNullPartitionResult<ResolvedChunkIndex>;
using ChunkedNullPartitionResult = GenericNullPartitionResult<CompressedChunkLocation>;

// Move nulls (not null-like values) to end of array.
//
Expand Down Expand Up @@ -289,7 +289,7 @@ NullPartitionResult PartitionNulls(uint64_t* indices_begin, uint64_t* indices_en
//
// Null partitioning on chunked arrays, in two flavors:
// 1) with uint64_t indices and ChunkedArrayResolver
// 2) with ResolvedChunkIndex and span of chunks
// 2) with CompressedChunkLocation and span of chunks
//

template <typename Partitioner>
Expand All @@ -316,8 +316,8 @@ NullPartitionResult PartitionNullsOnly(uint64_t* indices_begin, uint64_t* indice
}

template <typename Partitioner>
ChunkedNullPartitionResult PartitionNullsOnly(ResolvedChunkIndex* locations_begin,
ResolvedChunkIndex* locations_end,
ChunkedNullPartitionResult PartitionNullsOnly(CompressedChunkLocation* locations_begin,
CompressedChunkLocation* locations_end,
util::span<const Array* const> chunks,
int64_t null_count,
NullPlacement null_placement) {
Expand All @@ -328,15 +328,15 @@ ChunkedNullPartitionResult PartitionNullsOnly(ResolvedChunkIndex* locations_begi
Partitioner partitioner;
if (null_placement == NullPlacement::AtStart) {
auto nulls_end =
partitioner(locations_begin, locations_end, [&](ResolvedChunkIndex loc) {
partitioner(locations_begin, locations_end, [&](CompressedChunkLocation loc) {
return chunks[loc.chunk_index()]->IsNull(
static_cast<int64_t>(loc.index_in_chunk()));
});
return ChunkedNullPartitionResult::NullsAtStart(locations_begin, locations_end,
nulls_end);
} else {
auto nulls_begin =
partitioner(locations_begin, locations_end, [&](ResolvedChunkIndex loc) {
partitioner(locations_begin, locations_end, [&](CompressedChunkLocation loc) {
return !chunks[loc.chunk_index()]->IsNull(
static_cast<int64_t>(loc.index_in_chunk()));
});
Expand Down Expand Up @@ -501,7 +501,8 @@ struct GenericMergeImpl {
};

using MergeImpl = GenericMergeImpl<uint64_t, NullPartitionResult>;
using ChunkedMergeImpl = GenericMergeImpl<ResolvedChunkIndex, ChunkedNullPartitionResult>;
using ChunkedMergeImpl =
GenericMergeImpl<CompressedChunkLocation, ChunkedNullPartitionResult>;

// TODO make this usable if indices are non trivial on input
// (see ConcreteRecordBatchColumnSorter)
Expand Down

0 comments on commit c85bb4c

Please sign in to comment.