Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-31769: [C++][Acero] Add spilling for hash join #13669

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ if(ARROW_COMPUTE)
compute/exec/query_context.cc
compute/exec/sink_node.cc
compute/exec/source_node.cc
compute/exec/spilling_util.cc
compute/exec/spilling_join.cc
compute/exec/swiss_join.cc
compute/exec/task_util.cc
compute/exec/tpch_node.cc
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/compute/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ add_arrow_compute_test(asof_join_node_test
"arrow-compute"
SOURCES
asof_join_node_test.cc)
add_arrow_compute_test(spilling_test PREFIX "arrow-compute")
add_arrow_compute_test(tpch_node_test PREFIX "arrow-compute")
add_arrow_compute_test(union_node_test PREFIX "arrow-compute")
add_arrow_compute_test(util_test
Expand All @@ -47,6 +48,7 @@ add_arrow_compute_test(util_test
task_util_test.cc)

add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute")
add_arrow_benchmark(spilling_benchmark PREFIX "arrow-compute")

add_arrow_benchmark(filter_benchmark
PREFIX
Expand Down
185 changes: 172 additions & 13 deletions cpp/src/arrow/compute/exec/accumulation_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@
// under the License.

#include "arrow/compute/exec/accumulation_queue.h"

#include <iterator>
#include "arrow/compute/exec/key_hash.h"
#include "arrow/util/atomic_util.h"

namespace arrow {
namespace util {
using arrow::compute::ExecBatch;
namespace compute {

AccumulationQueue::AccumulationQueue(AccumulationQueue&& that) {
this->batches_ = std::move(that.batches_);
this->row_count_ = that.row_count_;
that.Clear();
}

AccumulationQueue& AccumulationQueue::operator=(AccumulationQueue&& that) {
this->batches_ = std::move(that.batches_);
this->row_count_ = that.row_count_;
that.Clear();
return *this;
}
Expand All @@ -39,20 +37,181 @@ void AccumulationQueue::Concatenate(AccumulationQueue&& that) {
this->batches_.reserve(this->batches_.size() + that.batches_.size());
std::move(that.batches_.begin(), that.batches_.end(),
std::back_inserter(this->batches_));
this->row_count_ += that.row_count_;
that.Clear();
}

void AccumulationQueue::InsertBatch(ExecBatch batch) {
row_count_ += batch.length;
batches_.emplace_back(std::move(batch));
}

void AccumulationQueue::Clear() {
row_count_ = 0;
batches_.clear();
void AccumulationQueue::SetBatch(size_t idx, ExecBatch batch) {
ARROW_DCHECK(idx < batches_.size());
batches_[idx] = std::move(batch);
}

size_t AccumulationQueue::CalculateRowCount() const {
size_t count = 0;
for (const ExecBatch& b : batches_) count += static_cast<size_t>(b.length);
return count;
}

void AccumulationQueue::Clear() { batches_.clear(); }

Status SpillingAccumulationQueue::Init(QueryContext* ctx) {
ctx_ = ctx;
partition_locks_.Init(ctx_->max_concurrency(), kNumPartitions);
for (size_t ipart = 0; ipart < kNumPartitions; ipart++) {
Partition& part = partitions_[ipart];
part.task_group_read = ctx_->RegisterTaskGroup(
[this, ipart](size_t thread_index, int64_t batch_index) {
return partitions_[ipart].read_back_fn(
thread_index, static_cast<size_t>(batch_index),
std::move(partitions_[ipart].queue[batch_index]));
},
[this, ipart](size_t thread_index) {
return partitions_[ipart].on_finished(thread_index);
});
}
return Status::OK();
}

Status SpillingAccumulationQueue::InsertBatch(size_t thread_index, ExecBatch batch) {
Datum& hash_datum = batch.values.back();
const uint64_t* hashes =
reinterpret_cast<const uint64_t*>(hash_datum.array()->buffers[1]->data());
// `permutation` stores the indices of rows in the input batch sorted by partition.
std::vector<uint16_t> permutation(batch.length);
uint16_t part_starts[kNumPartitions + 1];
PartitionSort::Eval(
batch.length, kNumPartitions, part_starts,
/*partition_id=*/[&](int64_t i) { return partition_id(hashes[i]); },
/*output_fn=*/
[&permutation](int64_t input_pos, int64_t output_pos) {
permutation[output_pos] = static_cast<uint16_t>(input_pos);
});

int unprocessed_partition_ids[kNumPartitions];
RETURN_NOT_OK(partition_locks_.ForEachPartition(
thread_index, unprocessed_partition_ids,
/*is_prtn_empty_fn=*/
[&](int part_id) { return part_starts[part_id + 1] == part_starts[part_id]; },
/*process_prtn_fn=*/
[&](int locked_part_id_int) {
size_t locked_part_id = static_cast<size_t>(locked_part_id_int);
uint64_t num_total_rows_to_append =
part_starts[locked_part_id + 1] - part_starts[locked_part_id];

Partition& locked_part = partitions_[locked_part_id];

size_t offset = static_cast<size_t>(part_starts[locked_part_id]);
while (num_total_rows_to_append > 0) {
int num_rows_to_append =
std::min(static_cast<int>(num_total_rows_to_append),
static_cast<int>(ExecBatchBuilder::num_rows_max() -
locked_part.builder.num_rows()));

RETURN_NOT_OK(locked_part.builder.AppendSelected(
ctx_->memory_pool(), batch, num_rows_to_append, permutation.data() + offset,
batch.num_values()));

if (locked_part.builder.is_full()) {
ExecBatch batch = locked_part.builder.Flush();
Datum hash = std::move(batch.values.back());
batch.values.pop_back();
ExecBatch hash_batch({std::move(hash)}, batch.length);
if (locked_part_id < spilling_cursor_)
RETURN_NOT_OK(locked_part.file.SpillBatch(ctx_, std::move(batch)));
else
locked_part.queue.InsertBatch(std::move(batch));

if (locked_part_id >= hash_cursor_)
locked_part.hash_queue.InsertBatch(std::move(hash_batch));
}
offset += num_rows_to_append;
num_total_rows_to_append -= num_rows_to_append;
}
return Status::OK();
}));
return Status::OK();
}

const uint64_t* SpillingAccumulationQueue::GetHashes(size_t partition_idx,
size_t batch_idx) {
ARROW_DCHECK(partition_idx >= hash_cursor_.load());
Partition& partition = partitions_[partition_idx];
if (batch_idx > partition.hash_queue.batch_count()) {
const Datum& datum = partition.hash_queue[batch_idx].values[0];
return reinterpret_cast<const uint64_t*>(datum.array()->buffers[1]->data());
} else {
size_t hash_idx = partition.builder.num_cols();
KeyColumnArray kca = partition.builder.column(hash_idx - 1);
return reinterpret_cast<const uint64_t*>(kca.data(1));
}
}

Status SpillingAccumulationQueue::GetPartition(
size_t thread_index, size_t partition_idx,
std::function<Status(size_t, size_t, ExecBatch)> on_batch,
std::function<Status(size_t)> on_finished) {
bool is_in_memory = partition_idx >= spilling_cursor_.load();
Partition& partition = partitions_[partition_idx];
if (partition.builder.num_rows() > 0) {
ExecBatch batch = partition.builder.Flush();
batch.values.pop_back();
RETURN_NOT_OK(on_batch(thread_index,
/*batch_index=*/partition.queue.batch_count(),
std::move(batch)));
}

if (is_in_memory) {
ARROW_DCHECK(partition_idx >= hash_cursor_.load());
partition.read_back_fn = std::move(on_batch);
partition.on_finished = std::move(on_finished);
return ctx_->StartTaskGroup(partition.task_group_read, partition.queue.batch_count());
}

return partition.file.ReadBackBatches(
ctx_, on_batch,
[this, partition_idx, finished = std::move(on_finished)](size_t thread_index) {
RETURN_NOT_OK(partitions_[partition_idx].file.Cleanup());
return finished(thread_index);
});
}

size_t SpillingAccumulationQueue::CalculatePartitionRowCount(size_t partition) const {
return partitions_[partition].builder.num_rows() +
partitions_[partition].queue.CalculateRowCount();
}

ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; }
} // namespace util
Result<bool> SpillingAccumulationQueue::AdvanceSpillCursor() {
size_t to_spill = spilling_cursor_.fetch_add(1);
if (to_spill >= kNumPartitions) {
ARROW_DCHECK(to_spill < 1000 * 1000 * 1000)
<< "You've tried to advance the spill cursor over a billion times, you might "
"have a problem";
return false;
}

auto lock = partition_locks_.AcquirePartitionLock(static_cast<int>(to_spill));
Partition& partition = partitions_[to_spill];
size_t num_batches = partition.queue.batch_count();
for (size_t i = 0; i < num_batches; i++)
RETURN_NOT_OK(partition.file.SpillBatch(ctx_, std::move(partition.queue[i])));
return true;
}

Result<bool> SpillingAccumulationQueue::AdvanceHashCursor() {
size_t to_spill = hash_cursor_.fetch_add(1);
if (to_spill >= kNumPartitions) {
ARROW_DCHECK(to_spill < 1000 * 1000 * 1000)
<< "You've tried to advance the spill cursor over a billion times, you might "
"have a problem";
return false;
}

auto lock = partition_locks_.AcquirePartitionLock(static_cast<int>(to_spill));
partitions_[to_spill].hash_queue.Clear();
return true;
}
} // namespace compute
} // namespace arrow
133 changes: 124 additions & 9 deletions cpp/src/arrow/compute/exec/accumulation_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@
#include <vector>

#include "arrow/compute/exec.h"
#include "arrow/compute/exec/partition_util.h"
#include "arrow/compute/exec/spilling_util.h"
#include "arrow/compute/exec/task_util.h"
#include "arrow/compute/light_array.h"

namespace arrow {
namespace util {
using arrow::compute::ExecBatch;
namespace compute {

/// \brief A container that accumulates batches until they are ready to
/// be processed.
class AccumulationQueue {
class ARROW_EXPORT AccumulationQueue {
public:
AccumulationQueue() : row_count_(0) {}
AccumulationQueue() = default;
~AccumulationQueue() = default;

// We should never be copying ExecBatch around
Expand All @@ -42,16 +45,128 @@ class AccumulationQueue {

void Concatenate(AccumulationQueue&& that);
void InsertBatch(ExecBatch batch);
int64_t row_count() { return row_count_; }
size_t batch_count() { return batches_.size(); }
void SetBatch(size_t idx, ExecBatch batch);
size_t batch_count() const { return batches_.size(); }
bool empty() const { return batches_.empty(); }
size_t CalculateRowCount() const;

// Resizes the accumulation queue to contain size batches. The
// new batches will be empty and have length 0, but they will be
// usable (useful for concurrent modification of the AccumulationQueue
// of separate elements).
void Resize(size_t size) { batches_.resize(size); }
save-buffer marked this conversation as resolved.
Show resolved Hide resolved
void Clear();
ExecBatch& operator[](size_t i);
ExecBatch& operator[](size_t i) { return batches_[i]; }
const ExecBatch& operator[](size_t i) const { return batches_[i]; }

private:
int64_t row_count_;
std::vector<ExecBatch> batches_;
};

} // namespace util
/// Accumulates batches in a queue that can be spilled to disk if needed
///
/// Each batch is partitioned by the lower bits of the hash column (which must be present)
/// and rows are initially accumulated in batch builders (one per partition). As a batch
/// builder fills up the completed batch is put into an in-memory accumulation queue (per
/// partition).
///
/// When memory pressure is encountered the spilling queue's "spill cursor" can be
/// advanced. This will cause a partition to be spilled to disk. Any future data
/// arriving for that partition will go immediately to disk (after accumulating a full
/// batch in the batch builder). Note that hashes are spilled separately from batches and
/// have their own cursor. We assume that the Batch cursor is advanced faster than the
/// spill cursor. Hashes are spilled separately to enable building a Bloom filter for
/// spilled partitions.
///
/// Later, data is retrieved one partition at a time. Partitions that are in-memory will
/// be delivered immediately in new thread tasks. Partitions that are on disk will be
/// read from disk and delivered as they arrive.
///
/// This class assumes that data is fully accumulated before it is read-back. As such, do
/// not call InsertBatch after calling GetPartition.
class ARROW_EXPORT SpillingAccumulationQueue {
public:
// Number of partitions must be a power of two, since we assign partitions by
// looking at bottom few bits.
static constexpr int kLogNumPartitions = 6;
static constexpr int kNumPartitions = 1 << kLogNumPartitions;
Status Init(QueryContext* ctx);
// Assumes that the final column in batch contains 64-bit hashes of the columns.
Status InsertBatch(size_t thread_index, ExecBatch batch);
// Runs `on_batch` on each batch in the SpillingAccumulationQueue for the given
// partition. Each batch will have its own task. Once all batches have had their
// on_batch function run, `on_finished` will be called.
Status GetPartition(size_t thread_index, size_t partition_idx,
std::function<Status(size_t, size_t, ExecBatch)>
on_batch, // thread_index, batch_index, batch
std::function<Status(size_t)> on_finished);

// Returns hashes of the given partition and batch index.
// partition MUST be at least hash_cursor, as if partition < hash_cursor,
// these hashes will have been deleted.
const uint64_t* GetHashes(size_t partition_idx, size_t batch_idx);
inline size_t batch_count(size_t partition_idx) const {
const Partition& partition = partitions_[partition_idx];
size_t num_full_batches = partition_idx >= spilling_cursor_
? partition.queue.batch_count()
: partition.file.num_batches();

return num_full_batches + (partition.builder.num_rows() > 0);
}

inline size_t row_count(size_t partition_idx, size_t batch_idx) const {
const Partition& partition = partitions_[partition_idx];
if (batch_idx < partition.hash_queue.batch_count())
return partition.hash_queue[batch_idx].length;
else
return partition.builder.num_rows();
}

static inline constexpr size_t partition_id(uint64_t hash) {
// Hash Table uses the top bits of the hash, so it is important
// to use the bottom bits of the hash for spilling to avoid
// a huge number of hash collisions per partition.
return static_cast<size_t>(hash & (kNumPartitions - 1));
}

// Returns the row count for the partition if it is still in-memory.
// Returns 0 if the partition has already been spilled.
size_t CalculatePartitionRowCount(size_t partition) const;
save-buffer marked this conversation as resolved.
Show resolved Hide resolved

// Spills the next partition of batches to disk and returns true,
// or returns false if too many partitions have been spilled.
// The QueryContext's bytes_in_flight will be increased by the
// number of bytes spilled (unless the disk IO was very fast and
// the bytes_in_flight got reduced again).
//
// We expect that we always advance the SpillCursor faster than the
// HashCursor, and only advance the HashCursor when we've exhausted
// partitions for the SpillCursor.
Result<bool> AdvanceSpillCursor();
// Same as AdvanceSpillCursor but spills the hashes for the partition.
Result<bool> AdvanceHashCursor();
inline size_t spill_cursor() const { return spilling_cursor_.load(); }
inline size_t hash_cursor() const { return hash_cursor_.load(); }

private:
std::atomic<size_t> spilling_cursor_{0}; // denotes the first in-memory partition
std::atomic<size_t> hash_cursor_{0};

QueryContext* ctx_;
PartitionLocks partition_locks_;

struct Partition {
AccumulationQueue queue;
AccumulationQueue hash_queue;
ExecBatchBuilder builder;
SpillFile file;
int task_group_read;
std::function<Status(size_t, size_t, ExecBatch)> read_back_fn;
std::function<Status(size_t)> on_finished;
};

Partition partitions_[kNumPartitions];
};

} // namespace compute
} // namespace arrow
Loading