diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 90ab1e6ac27f3..f7767548698d3 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -406,6 +406,8 @@ if(ARROW_COMPUTE) compute/exec/query_context.cc compute/exec/sink_node.cc compute/exec/source_node.cc + compute/exec/spilling_util.cc + compute/exec/spilling_join.cc compute/exec/swiss_join.cc compute/exec/task_util.cc compute/exec/tpch_node.cc diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt index 4ce73359d0f1f..4322782f47055 100644 --- a/cpp/src/arrow/compute/exec/CMakeLists.txt +++ b/cpp/src/arrow/compute/exec/CMakeLists.txt @@ -37,6 +37,7 @@ add_arrow_compute_test(asof_join_node_test "arrow-compute" SOURCES asof_join_node_test.cc) +add_arrow_compute_test(spilling_test PREFIX "arrow-compute") add_arrow_compute_test(tpch_node_test PREFIX "arrow-compute") add_arrow_compute_test(union_node_test PREFIX "arrow-compute") add_arrow_compute_test(util_test @@ -47,6 +48,7 @@ add_arrow_compute_test(util_test task_util_test.cc) add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute") +add_arrow_benchmark(spilling_benchmark PREFIX "arrow-compute") add_arrow_benchmark(filter_benchmark PREFIX diff --git a/cpp/src/arrow/compute/exec/accumulation_queue.cc b/cpp/src/arrow/compute/exec/accumulation_queue.cc index 192db52942820..250770e4f49d9 100644 --- a/cpp/src/arrow/compute/exec/accumulation_queue.cc +++ b/cpp/src/arrow/compute/exec/accumulation_queue.cc @@ -16,21 +16,19 @@ // under the License. #include "arrow/compute/exec/accumulation_queue.h" - -#include +#include "arrow/compute/exec/key_hash.h" +#include "arrow/util/atomic_util.h" namespace arrow { -namespace util { -using arrow::compute::ExecBatch; +namespace compute { + AccumulationQueue::AccumulationQueue(AccumulationQueue&& that) { this->batches_ = std::move(that.batches_); - this->row_count_ = that.row_count_; that.Clear(); } AccumulationQueue& AccumulationQueue::operator=(AccumulationQueue&& that) { this->batches_ = std::move(that.batches_); - this->row_count_ = that.row_count_; that.Clear(); return *this; } @@ -39,20 +37,181 @@ void AccumulationQueue::Concatenate(AccumulationQueue&& that) { this->batches_.reserve(this->batches_.size() + that.batches_.size()); std::move(that.batches_.begin(), that.batches_.end(), std::back_inserter(this->batches_)); - this->row_count_ += that.row_count_; that.Clear(); } void AccumulationQueue::InsertBatch(ExecBatch batch) { - row_count_ += batch.length; batches_.emplace_back(std::move(batch)); } -void AccumulationQueue::Clear() { - row_count_ = 0; - batches_.clear(); +void AccumulationQueue::SetBatch(size_t idx, ExecBatch batch) { + ARROW_DCHECK(idx < batches_.size()); + batches_[idx] = std::move(batch); +} + +size_t AccumulationQueue::CalculateRowCount() const { + size_t count = 0; + for (const ExecBatch& b : batches_) count += static_cast(b.length); + return count; +} + +void AccumulationQueue::Clear() { batches_.clear(); } + +Status SpillingAccumulationQueue::Init(QueryContext* ctx) { + ctx_ = ctx; + partition_locks_.Init(ctx_->max_concurrency(), kNumPartitions); + for (size_t ipart = 0; ipart < kNumPartitions; ipart++) { + Partition& part = partitions_[ipart]; + part.task_group_read = ctx_->RegisterTaskGroup( + [this, ipart](size_t thread_index, int64_t batch_index) { + return partitions_[ipart].read_back_fn( + thread_index, static_cast(batch_index), + std::move(partitions_[ipart].queue[batch_index])); + }, + [this, ipart](size_t thread_index) { + return partitions_[ipart].on_finished(thread_index); + }); + } + return Status::OK(); +} + +Status SpillingAccumulationQueue::InsertBatch(size_t thread_index, ExecBatch batch) { + Datum& hash_datum = batch.values.back(); + const uint64_t* hashes = + reinterpret_cast(hash_datum.array()->buffers[1]->data()); + // `permutation` stores the indices of rows in the input batch sorted by partition. + std::vector permutation(batch.length); + uint16_t part_starts[kNumPartitions + 1]; + PartitionSort::Eval( + batch.length, kNumPartitions, part_starts, + /*partition_id=*/[&](int64_t i) { return partition_id(hashes[i]); }, + /*output_fn=*/ + [&permutation](int64_t input_pos, int64_t output_pos) { + permutation[output_pos] = static_cast(input_pos); + }); + + int unprocessed_partition_ids[kNumPartitions]; + RETURN_NOT_OK(partition_locks_.ForEachPartition( + thread_index, unprocessed_partition_ids, + /*is_prtn_empty_fn=*/ + [&](int part_id) { return part_starts[part_id + 1] == part_starts[part_id]; }, + /*process_prtn_fn=*/ + [&](int locked_part_id_int) { + size_t locked_part_id = static_cast(locked_part_id_int); + uint64_t num_total_rows_to_append = + part_starts[locked_part_id + 1] - part_starts[locked_part_id]; + + Partition& locked_part = partitions_[locked_part_id]; + + size_t offset = static_cast(part_starts[locked_part_id]); + while (num_total_rows_to_append > 0) { + int num_rows_to_append = + std::min(static_cast(num_total_rows_to_append), + static_cast(ExecBatchBuilder::num_rows_max() - + locked_part.builder.num_rows())); + + RETURN_NOT_OK(locked_part.builder.AppendSelected( + ctx_->memory_pool(), batch, num_rows_to_append, permutation.data() + offset, + batch.num_values())); + + if (locked_part.builder.is_full()) { + ExecBatch batch = locked_part.builder.Flush(); + Datum hash = std::move(batch.values.back()); + batch.values.pop_back(); + ExecBatch hash_batch({std::move(hash)}, batch.length); + if (locked_part_id < spilling_cursor_) + RETURN_NOT_OK(locked_part.file.SpillBatch(ctx_, std::move(batch))); + else + locked_part.queue.InsertBatch(std::move(batch)); + + if (locked_part_id >= hash_cursor_) + locked_part.hash_queue.InsertBatch(std::move(hash_batch)); + } + offset += num_rows_to_append; + num_total_rows_to_append -= num_rows_to_append; + } + return Status::OK(); + })); + return Status::OK(); +} + +const uint64_t* SpillingAccumulationQueue::GetHashes(size_t partition_idx, + size_t batch_idx) { + ARROW_DCHECK(partition_idx >= hash_cursor_.load()); + Partition& partition = partitions_[partition_idx]; + if (batch_idx > partition.hash_queue.batch_count()) { + const Datum& datum = partition.hash_queue[batch_idx].values[0]; + return reinterpret_cast(datum.array()->buffers[1]->data()); + } else { + size_t hash_idx = partition.builder.num_cols(); + KeyColumnArray kca = partition.builder.column(hash_idx - 1); + return reinterpret_cast(kca.data(1)); + } +} + +Status SpillingAccumulationQueue::GetPartition( + size_t thread_index, size_t partition_idx, + std::function on_batch, + std::function on_finished) { + bool is_in_memory = partition_idx >= spilling_cursor_.load(); + Partition& partition = partitions_[partition_idx]; + if (partition.builder.num_rows() > 0) { + ExecBatch batch = partition.builder.Flush(); + batch.values.pop_back(); + RETURN_NOT_OK(on_batch(thread_index, + /*batch_index=*/partition.queue.batch_count(), + std::move(batch))); + } + + if (is_in_memory) { + ARROW_DCHECK(partition_idx >= hash_cursor_.load()); + partition.read_back_fn = std::move(on_batch); + partition.on_finished = std::move(on_finished); + return ctx_->StartTaskGroup(partition.task_group_read, partition.queue.batch_count()); + } + + return partition.file.ReadBackBatches( + ctx_, on_batch, + [this, partition_idx, finished = std::move(on_finished)](size_t thread_index) { + RETURN_NOT_OK(partitions_[partition_idx].file.Cleanup()); + return finished(thread_index); + }); +} + +size_t SpillingAccumulationQueue::CalculatePartitionRowCount(size_t partition) const { + return partitions_[partition].builder.num_rows() + + partitions_[partition].queue.CalculateRowCount(); } -ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; } -} // namespace util +Result SpillingAccumulationQueue::AdvanceSpillCursor() { + size_t to_spill = spilling_cursor_.fetch_add(1); + if (to_spill >= kNumPartitions) { + ARROW_DCHECK(to_spill < 1000 * 1000 * 1000) + << "You've tried to advance the spill cursor over a billion times, you might " + "have a problem"; + return false; + } + + auto lock = partition_locks_.AcquirePartitionLock(static_cast(to_spill)); + Partition& partition = partitions_[to_spill]; + size_t num_batches = partition.queue.batch_count(); + for (size_t i = 0; i < num_batches; i++) + RETURN_NOT_OK(partition.file.SpillBatch(ctx_, std::move(partition.queue[i]))); + return true; +} + +Result SpillingAccumulationQueue::AdvanceHashCursor() { + size_t to_spill = hash_cursor_.fetch_add(1); + if (to_spill >= kNumPartitions) { + ARROW_DCHECK(to_spill < 1000 * 1000 * 1000) + << "You've tried to advance the spill cursor over a billion times, you might " + "have a problem"; + return false; + } + + auto lock = partition_locks_.AcquirePartitionLock(static_cast(to_spill)); + partitions_[to_spill].hash_queue.Clear(); + return true; +} +} // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/accumulation_queue.h b/cpp/src/arrow/compute/exec/accumulation_queue.h index 4b23e5ffcac54..584333b4300d2 100644 --- a/cpp/src/arrow/compute/exec/accumulation_queue.h +++ b/cpp/src/arrow/compute/exec/accumulation_queue.h @@ -21,16 +21,19 @@ #include #include "arrow/compute/exec.h" +#include "arrow/compute/exec/partition_util.h" +#include "arrow/compute/exec/spilling_util.h" +#include "arrow/compute/exec/task_util.h" +#include "arrow/compute/light_array.h" namespace arrow { -namespace util { -using arrow::compute::ExecBatch; +namespace compute { /// \brief A container that accumulates batches until they are ready to /// be processed. -class AccumulationQueue { +class ARROW_EXPORT AccumulationQueue { public: - AccumulationQueue() : row_count_(0) {} + AccumulationQueue() = default; ~AccumulationQueue() = default; // We should never be copying ExecBatch around @@ -42,16 +45,128 @@ class AccumulationQueue { void Concatenate(AccumulationQueue&& that); void InsertBatch(ExecBatch batch); - int64_t row_count() { return row_count_; } - size_t batch_count() { return batches_.size(); } + void SetBatch(size_t idx, ExecBatch batch); + size_t batch_count() const { return batches_.size(); } bool empty() const { return batches_.empty(); } + size_t CalculateRowCount() const; + + // Resizes the accumulation queue to contain size batches. The + // new batches will be empty and have length 0, but they will be + // usable (useful for concurrent modification of the AccumulationQueue + // of separate elements). + void Resize(size_t size) { batches_.resize(size); } void Clear(); - ExecBatch& operator[](size_t i); + ExecBatch& operator[](size_t i) { return batches_[i]; } + const ExecBatch& operator[](size_t i) const { return batches_[i]; } private: - int64_t row_count_; std::vector batches_; }; -} // namespace util +/// Accumulates batches in a queue that can be spilled to disk if needed +/// +/// Each batch is partitioned by the lower bits of the hash column (which must be present) +/// and rows are initially accumulated in batch builders (one per partition). As a batch +/// builder fills up the completed batch is put into an in-memory accumulation queue (per +/// partition). +/// +/// When memory pressure is encountered the spilling queue's "spill cursor" can be +/// advanced. This will cause a partition to be spilled to disk. Any future data +/// arriving for that partition will go immediately to disk (after accumulating a full +/// batch in the batch builder). Note that hashes are spilled separately from batches and +/// have their own cursor. We assume that the Batch cursor is advanced faster than the +/// spill cursor. Hashes are spilled separately to enable building a Bloom filter for +/// spilled partitions. +/// +/// Later, data is retrieved one partition at a time. Partitions that are in-memory will +/// be delivered immediately in new thread tasks. Partitions that are on disk will be +/// read from disk and delivered as they arrive. +/// +/// This class assumes that data is fully accumulated before it is read-back. As such, do +/// not call InsertBatch after calling GetPartition. +class ARROW_EXPORT SpillingAccumulationQueue { + public: + // Number of partitions must be a power of two, since we assign partitions by + // looking at bottom few bits. + static constexpr int kLogNumPartitions = 6; + static constexpr int kNumPartitions = 1 << kLogNumPartitions; + Status Init(QueryContext* ctx); + // Assumes that the final column in batch contains 64-bit hashes of the columns. + Status InsertBatch(size_t thread_index, ExecBatch batch); + // Runs `on_batch` on each batch in the SpillingAccumulationQueue for the given + // partition. Each batch will have its own task. Once all batches have had their + // on_batch function run, `on_finished` will be called. + Status GetPartition(size_t thread_index, size_t partition_idx, + std::function + on_batch, // thread_index, batch_index, batch + std::function on_finished); + + // Returns hashes of the given partition and batch index. + // partition MUST be at least hash_cursor, as if partition < hash_cursor, + // these hashes will have been deleted. + const uint64_t* GetHashes(size_t partition_idx, size_t batch_idx); + inline size_t batch_count(size_t partition_idx) const { + const Partition& partition = partitions_[partition_idx]; + size_t num_full_batches = partition_idx >= spilling_cursor_ + ? partition.queue.batch_count() + : partition.file.num_batches(); + + return num_full_batches + (partition.builder.num_rows() > 0); + } + + inline size_t row_count(size_t partition_idx, size_t batch_idx) const { + const Partition& partition = partitions_[partition_idx]; + if (batch_idx < partition.hash_queue.batch_count()) + return partition.hash_queue[batch_idx].length; + else + return partition.builder.num_rows(); + } + + static inline constexpr size_t partition_id(uint64_t hash) { + // Hash Table uses the top bits of the hash, so it is important + // to use the bottom bits of the hash for spilling to avoid + // a huge number of hash collisions per partition. + return static_cast(hash & (kNumPartitions - 1)); + } + + // Returns the row count for the partition if it is still in-memory. + // Returns 0 if the partition has already been spilled. + size_t CalculatePartitionRowCount(size_t partition) const; + + // Spills the next partition of batches to disk and returns true, + // or returns false if too many partitions have been spilled. + // The QueryContext's bytes_in_flight will be increased by the + // number of bytes spilled (unless the disk IO was very fast and + // the bytes_in_flight got reduced again). + // + // We expect that we always advance the SpillCursor faster than the + // HashCursor, and only advance the HashCursor when we've exhausted + // partitions for the SpillCursor. + Result AdvanceSpillCursor(); + // Same as AdvanceSpillCursor but spills the hashes for the partition. + Result AdvanceHashCursor(); + inline size_t spill_cursor() const { return spilling_cursor_.load(); } + inline size_t hash_cursor() const { return hash_cursor_.load(); } + + private: + std::atomic spilling_cursor_{0}; // denotes the first in-memory partition + std::atomic hash_cursor_{0}; + + QueryContext* ctx_; + PartitionLocks partition_locks_; + + struct Partition { + AccumulationQueue queue; + AccumulationQueue hash_queue; + ExecBatchBuilder builder; + SpillFile file; + int task_group_read; + std::function read_back_fn; + std::function on_finished; + }; + + Partition partitions_[kNumPartitions]; +}; + +} // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index b8886619d7dae..1561f18d4e051 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -620,10 +620,16 @@ Result>> DeclarationToBatches( Future DeclarationToExecBatchesAsync(Declaration declaration, ExecContext exec_context) { + return DeclarationToExecBatchesAsync(std::move(declaration), exec_context, + QueryOptions{}); +} + +Future DeclarationToExecBatchesAsync( + Declaration declaration, ExecContext exec_context, QueryOptions query_options) { std::shared_ptr out_schema; AsyncGenerator> sink_gen; ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, - ExecPlan::Make(exec_context)); + ExecPlan::Make(query_options, exec_context)); Declaration with_sink = Declaration::Sequence( {declaration, {"sink", SinkNodeOptions(&sink_gen, &out_schema)}}); ARROW_RETURN_NOT_OK(with_sink.AddToPlan(exec_plan.get())); diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index 09fab00727824..721ba1d147e7f 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -477,6 +477,13 @@ ARROW_EXPORT Future DeclarationToExecBatchesAsync( ARROW_EXPORT Future DeclarationToExecBatchesAsync( Declaration declaration, ExecContext custom_exec_context); +/// \brief Overload of \see DeclarationToExecBatchesAsync accepting a custom exec context +/// and QueryOptions +/// +/// \see DeclarationToTableAsync for details on threading & execution +ARROW_EXPORT Future DeclarationToExecBatchesAsync( + Declaration declaration, ExecContext custom_exec_context, QueryOptions query_options); + /// \brief Utility method to run a declaration and collect the results into a vector /// /// \see DeclarationToTable for details on threading & execution diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index ffd93591e653d..fb6f415cd579f 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -42,13 +42,10 @@ class HashJoinBasicImpl : public HashJoinImpl { Status Init(QueryContext* ctx, JoinType join_type, size_t num_threads, const HashJoinProjectionMaps* proj_map_left, const HashJoinProjectionMaps* proj_map_right, - std::vector key_cmp, Expression filter, - RegisterTaskGroupCallback register_task_group_callback, - StartTaskGroupCallback start_task_group_callback, - OutputBatchCallback output_batch_callback, - FinishedCallback finished_callback) override { + std::vector* key_cmp, Expression* filter, + CallbackRecord callback_record) override { START_COMPUTE_SPAN(span_, "HashJoinBasicImpl", - {{"detail", filter.ToString()}, + {{"detail", filter->ToString()}, {"join.kind", arrow::compute::ToString(join_type)}, {"join.threads", static_cast(num_threads)}}); @@ -57,12 +54,9 @@ class HashJoinBasicImpl : public HashJoinImpl { join_type_ = join_type; schema_[0] = proj_map_left; schema_[1] = proj_map_right; - key_cmp_ = std::move(key_cmp); - filter_ = std::move(filter); - register_task_group_callback_ = std::move(register_task_group_callback); - start_task_group_callback_ = std::move(start_task_group_callback); - output_batch_callback_ = std::move(output_batch_callback); - finished_callback_ = std::move(finished_callback); + key_cmp_ = key_cmp; + filter_ = filter; + callback_record_ = std::move(callback_record); local_states_.resize(num_threads_); for (size_t i = 0; i < local_states_.size(); ++i) { @@ -155,7 +149,7 @@ class HashJoinBasicImpl : public HashJoinImpl { bool is_null = non_null_bit_vectors[icol] && !bit_util::GetBit(non_null_bit_vectors[icol], non_null_bit_vector_offsets[icol] + irow); - if (key_cmp_[icol] == JoinKeyCmp::EQ && is_null) { + if ((*key_cmp_)[icol] == JoinKeyCmp::EQ && is_null) { no_match = true; break; } @@ -232,7 +226,7 @@ class HashJoinBasicImpl : public HashJoinImpl { : opt_right_payload->values[from_payload.get(icol)]; } - output_batch_callback_(0, std::move(result)); + callback_record_.output_batch(0, std::move(result)); // Update the counter of produced batches // @@ -244,7 +238,7 @@ class HashJoinBasicImpl : public HashJoinImpl { std::vector& no_match, std::vector& match_left, std::vector& match_right) { - if (filter_ == literal(true)) { + if (*filter_ == literal(true)) { return Status::OK(); } ARROW_DCHECK_EQ(match_left.size(), match_right.size()); @@ -296,8 +290,8 @@ class HashJoinBasicImpl : public HashJoinImpl { AppendFields(left_to_key, left_to_pay, left_key, left_payload); AppendFields(right_to_key, right_to_pay, right_key, right_payload); - ARROW_ASSIGN_OR_RAISE( - Datum mask, ExecuteScalarExpression(filter_, concatenated, ctx_->exec_context())); + ARROW_ASSIGN_OR_RAISE(Datum mask, ExecuteScalarExpression(*filter_, concatenated, + ctx_->exec_context())); size_t num_probed_rows = match.size() + no_match.size(); if (mask.is_scalar()) { @@ -550,7 +544,7 @@ class HashJoinBasicImpl : public HashJoinImpl { } void RegisterBuildHashTable() { - task_group_build_ = register_task_group_callback_( + task_group_build_ = callback_record_.register_task_group( [this](size_t thread_index, int64_t task_id) -> Status { return BuildHashTable_exec_task(thread_index, task_id); }, @@ -610,12 +604,12 @@ class HashJoinBasicImpl : public HashJoinImpl { BuildFinishedCallback on_finished) override { build_finished_callback_ = std::move(on_finished); build_batches_ = std::move(batches); - return start_task_group_callback_(task_group_build_, - /*num_tasks=*/1); + return callback_record_.start_task_group(task_group_build_, + /*num_tasks=*/1); } void RegisterScanHashTable() { - task_group_scan_ = register_task_group_callback_( + task_group_scan_ = callback_record_.register_task_group( [this](size_t thread_index, int64_t task_id) -> Status { return ScanHashTable_exec_task(thread_index, task_id); }, @@ -684,13 +678,12 @@ class HashJoinBasicImpl : public HashJoinImpl { return Status::Cancelled("Hash join cancelled"); } END_SPAN(span_); - finished_callback_(num_batches_produced_.load()); - return Status::OK(); + return callback_record_.finished(num_batches_produced_.load()); } Status ScanHashTable(size_t thread_index) { MergeHasMatch(); - return start_task_group_callback_(task_group_scan_, ScanHashTable_num_tasks()); + return callback_record_.start_task_group(task_group_scan_, ScanHashTable_num_tasks()); } Status ProbingFinished(size_t thread_index) override { @@ -739,18 +732,15 @@ class HashJoinBasicImpl : public HashJoinImpl { JoinType join_type_; size_t num_threads_; const HashJoinProjectionMaps* schema_[2]; - std::vector key_cmp_; - Expression filter_; + std::vector* key_cmp_; + Expression* filter_; int task_group_build_; int task_group_scan_; // Callbacks // - RegisterTaskGroupCallback register_task_group_callback_; - StartTaskGroupCallback start_task_group_callback_; - OutputBatchCallback output_batch_callback_; + CallbackRecord callback_record_; BuildFinishedCallback build_finished_callback_; - FinishedCallback finished_callback_; // Thread local runtime state // diff --git a/cpp/src/arrow/compute/exec/hash_join.h b/cpp/src/arrow/compute/exec/hash_join.h index bc053b2f1b631..a07a153a7c893 100644 --- a/cpp/src/arrow/compute/exec/hash_join.h +++ b/cpp/src/arrow/compute/exec/hash_join.h @@ -35,33 +35,35 @@ namespace arrow { namespace compute { -using arrow::util::AccumulationQueue; - class HashJoinImpl { public: using OutputBatchCallback = std::function; using BuildFinishedCallback = std::function; - using FinishedCallback = std::function; + using FinishedCallback = std::function; using RegisterTaskGroupCallback = std::function, std::function)>; using StartTaskGroupCallback = std::function; using AbortContinuationImpl = std::function; + struct CallbackRecord { + RegisterTaskGroupCallback register_task_group; + StartTaskGroupCallback start_task_group; + OutputBatchCallback output_batch; + FinishedCallback finished; + }; + virtual ~HashJoinImpl() = default; virtual Status Init(QueryContext* ctx, JoinType join_type, size_t num_threads, const HashJoinProjectionMaps* proj_map_left, const HashJoinProjectionMaps* proj_map_right, - std::vector key_cmp, Expression filter, - RegisterTaskGroupCallback register_task_group_callback, - StartTaskGroupCallback start_task_group_callback, - OutputBatchCallback output_batch_callback, - FinishedCallback finished_callback) = 0; + std::vector* key_cmp, Expression* filter, + CallbackRecord callback_record) = 0; virtual Status BuildHashTable(size_t thread_index, AccumulationQueue batches, BuildFinishedCallback on_finished) = 0; virtual Status ProbeSingleBatch(size_t thread_index, ExecBatch batch) = 0; virtual Status ProbingFinished(size_t thread_index) = 0; - virtual void Abort(TaskScheduler::AbortContinuationImpl pos_abort_callback) = 0; + virtual void Abort(AbortContinuationImpl pos_abort_callback) = 0; virtual std::string ToString() const = 0; static Result> MakeBasic(); diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc index cc85251f8c1ac..88d228e66e311 100644 --- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc +++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc @@ -126,10 +126,10 @@ class JoinBenchmark { stats_.num_probe_rows = settings.num_probe_batches * settings.batch_size; schema_mgr_ = std::make_unique(); - Expression filter = literal(true); + filter_ = literal(true); DCHECK_OK(schema_mgr_->Init(settings.join_type, *l_batches_with_schema.schema, left_keys, *r_batches_with_schema.schema, right_keys, - filter, "l_", "r_")); + filter_, "l_", "r_")); if (settings.use_basic_implementation) { join_ = *HashJoinImpl::MakeBasic(); @@ -147,20 +147,21 @@ class JoinBenchmark { scheduler_ = TaskScheduler::Make(); DCHECK_OK(ctx_.Init(settings.num_threads, nullptr)); - auto register_task_group_callback = [&](std::function task, - std::function cont) { + HashJoinImpl::CallbackRecord callbacks; + callbacks.register_task_group = [&](std::function task, + std::function cont) { return scheduler_->RegisterTaskGroup(std::move(task), std::move(cont)); }; - auto start_task_group_callback = [&](int task_group_id, int64_t num_tasks) { + callbacks.start_task_group = [&](int task_group_id, int64_t num_tasks) { return scheduler_->StartTaskGroup(omp_get_thread_num(), task_group_id, num_tasks); }; + callbacks.output_batch = [](int64_t, ExecBatch) {}; + callbacks.finished = [](int64_t) { return Status::OK(); }; - DCHECK_OK(join_->Init( - &ctx_, settings.join_type, settings.num_threads, &(schema_mgr_->proj_maps[0]), - &(schema_mgr_->proj_maps[1]), std::move(key_cmp), std::move(filter), - std::move(register_task_group_callback), std::move(start_task_group_callback), - [](int64_t, ExecBatch) {}, [](int64_t x) {})); + DCHECK_OK(join_->Init(&ctx_, settings.join_type, settings.num_threads, + &(schema_mgr_->proj_maps[0]), &(schema_mgr_->proj_maps[1]), + &key_cmp_, &filter_, std::move(callbacks))); task_group_probe_ = scheduler_->RegisterTaskGroup( [this](size_t thread_index, int64_t task_id) -> Status { @@ -197,6 +198,8 @@ class JoinBenchmark { std::unique_ptr schema_mgr_; std::unique_ptr join_; QueryContext ctx_; + std::vector key_cmp_; + Expression filter_; int task_group_probe_; struct { diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index 37bdb82517a04..c65df920e7f6d 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -27,6 +27,7 @@ #include "arrow/compute/exec/key_hash.h" #include "arrow/compute/exec/options.h" #include "arrow/compute/exec/schema_util.h" +#include "arrow/compute/exec/spilling_join.h" #include "arrow/compute/exec/util.h" #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" @@ -127,49 +128,33 @@ Status HashJoinSchema::Init( right_schema, right_keys, right_output, left_field_name_suffix, right_field_name_suffix)); - std::vector handles; - std::vector*> field_refs; - std::vector left_filter, right_filter; RETURN_NOT_OK( CollectFilterColumns(left_filter, right_filter, filter, left_schema, right_schema)); - - handles.push_back(HashJoinProjection::KEY); - field_refs.push_back(&left_keys); - ARROW_ASSIGN_OR_RAISE(auto left_payload, ComputePayload(left_schema, left_output, left_filter, left_keys)); - handles.push_back(HashJoinProjection::PAYLOAD); - field_refs.push_back(&left_payload); - - handles.push_back(HashJoinProjection::FILTER); - field_refs.push_back(&left_filter); - - handles.push_back(HashJoinProjection::OUTPUT); - field_refs.push_back(&left_output); - - RETURN_NOT_OK( - proj_maps[0].Init(HashJoinProjection::INPUT, left_schema, handles, field_refs)); - handles.clear(); - field_refs.clear(); - - handles.push_back(HashJoinProjection::KEY); - field_refs.push_back(&right_keys); + RETURN_NOT_OK(proj_maps[0].Init(HashJoinProjection::INPUT, left_schema)); + RETURN_NOT_OK(proj_maps[0].RegisterProjectedSchema(HashJoinProjection::KEY, left_keys, + left_schema)); + RETURN_NOT_OK(proj_maps[0].RegisterProjectedSchema(HashJoinProjection::PAYLOAD, + left_payload, left_schema)); + RETURN_NOT_OK(proj_maps[0].RegisterProjectedSchema(HashJoinProjection::FILTER, + left_filter, left_schema)); + RETURN_NOT_OK(proj_maps[0].RegisterProjectedSchema(HashJoinProjection::OUTPUT, + left_output, left_schema)); ARROW_ASSIGN_OR_RAISE(auto right_payload, ComputePayload(right_schema, right_output, right_filter, right_keys)); - handles.push_back(HashJoinProjection::PAYLOAD); - field_refs.push_back(&right_payload); - - handles.push_back(HashJoinProjection::FILTER); - field_refs.push_back(&right_filter); - - handles.push_back(HashJoinProjection::OUTPUT); - field_refs.push_back(&right_output); - - RETURN_NOT_OK( - proj_maps[1].Init(HashJoinProjection::INPUT, right_schema, handles, field_refs)); + RETURN_NOT_OK(proj_maps[1].Init(HashJoinProjection::INPUT, right_schema)); + RETURN_NOT_OK(proj_maps[1].RegisterProjectedSchema(HashJoinProjection::KEY, right_keys, + right_schema)); + RETURN_NOT_OK(proj_maps[1].RegisterProjectedSchema(HashJoinProjection::PAYLOAD, + right_payload, right_schema)); + RETURN_NOT_OK(proj_maps[1].RegisterProjectedSchema(HashJoinProjection::FILTER, + right_filter, right_schema)); + RETURN_NOT_OK(proj_maps[1].RegisterProjectedSchema(HashJoinProjection::OUTPUT, + right_output, right_schema)); return Status::OK(); } @@ -492,12 +477,17 @@ struct BloomFilterPushdownContext { using BuildFinishedCallback = std::function; using FiltersReceivedCallback = std::function; using FilterFinishedCallback = std::function; + void Init(HashJoinNode* owner, size_t num_threads, RegisterTaskGroupCallback register_task_group_callback, StartTaskGroupCallback start_task_group_callback, FiltersReceivedCallback on_bloom_filters_received, bool disable_bloom_filter, bool use_sync_execution); + PartitionedBloomFilter* bloom_filter() { + return disable_bloom_filter_ ? nullptr : &push_.bloom_filter_; + } + Status StartProducing(size_t thread_index); void ExpectBloomFilter() { eval_.num_expected_bloom_filters_ += 1; } @@ -511,8 +501,7 @@ struct BloomFilterPushdownContext { Status PushBloomFilter(size_t thread_index); // Receives a Bloom filter and its associated column map. - Status ReceiveBloomFilter(size_t thread_index, - std::unique_ptr filter, + Status ReceiveBloomFilter(size_t thread_index, PartitionedBloomFilter filter, std::vector column_map) { bool proceed; { @@ -544,21 +533,17 @@ struct BloomFilterPushdownContext { /*num_tasks=*/eval_.batches_.batch_count()); } - // Applies all Bloom filters on the input batch. - Status FilterSingleBatch(size_t thread_index, ExecBatch* batch_ptr) { - ExecBatch& batch = *batch_ptr; - if (eval_.num_expected_bloom_filters_ == 0 || batch.length == 0) return Status::OK(); - - int64_t bit_vector_bytes = bit_util::BytesForBits(batch.length); - std::vector selected(bit_vector_bytes); - std::vector hashes(batch.length); - std::vector bv(bit_vector_bytes); + Status HashAndLookupInFilter(size_t thread_index, ExecBatch& batch, + std::vector& selected) { + std::vector bv(selected.size()); + std::vector hashes(batch.length); ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack, ctx_->GetTempStack(thread_index)); // Start with full selection for the current batch - memset(selected.data(), 0xff, bit_vector_bytes); + memset(selected.data(), 0xff, bv.size()); + std::vector temp_column_arrays; for (size_t ifilter = 0; ifilter < eval_.num_expected_bloom_filters_; ifilter++) { std::vector keys(eval_.received_maps_[ifilter].size()); for (size_t i = 0; i < keys.size(); i++) { @@ -571,38 +556,21 @@ struct BloomFilterPushdownContext { } } ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(keys))); - std::vector temp_column_arrays; - RETURN_NOT_OK(Hashing32::HashBatch(key_batch, hashes.data(), temp_column_arrays, + RETURN_NOT_OK(Hashing64::HashBatch(key_batch, hashes.data(), temp_column_arrays, ctx_->cpu_info()->hardware_flags(), stack, 0, key_batch.length)); - eval_.received_filters_[ifilter]->Find(ctx_->cpu_info()->hardware_flags(), - key_batch.length, hashes.data(), bv.data()); + eval_.received_filters_[ifilter].Find(ctx_->cpu_info()->hardware_flags(), + key_batch.length, hashes.data(), bv.data()); arrow::internal::BitmapAnd(bv.data(), 0, selected.data(), 0, key_batch.length, 0, selected.data()); } - auto selected_buffer = std::make_unique(selected.data(), bit_vector_bytes); - ArrayData selected_arraydata(boolean(), batch.length, - {nullptr, std::move(selected_buffer)}); - Datum selected_datum(selected_arraydata); - FilterOptions options; - size_t first_nonscalar = batch.values.size(); - for (size_t i = 0; i < batch.values.size(); i++) { - if (!batch.values[i].is_scalar()) { - ARROW_ASSIGN_OR_RAISE(batch.values[i], Filter(batch.values[i], selected_datum, - options, ctx_->exec_context())); - first_nonscalar = std::min(first_nonscalar, i); - ARROW_DCHECK_EQ(batch.values[i].length(), batch.values[first_nonscalar].length()); - } - } - // If they're all Scalar, then the length of the batch is the number of set bits - if (first_nonscalar == batch.values.size()) - batch.length = arrow::internal::CountSetBits(selected.data(), 0, batch.length); - else - batch.length = batch.values[first_nonscalar].length(); return Status::OK(); } + // Applies all Bloom filters on the input batch. + Status FilterSingleBatch(size_t thread_index, ExecBatch* batch_ptr); + private: Status BuildBloomFilter_exec_task(size_t thread_index, int64_t task_id); @@ -634,7 +602,7 @@ struct BloomFilterPushdownContext { } build_; struct { - std::unique_ptr bloom_filter_; + PartitionedBloomFilter bloom_filter_; HashJoinNode* pushdown_target_; std::vector column_map_; } push_; @@ -643,13 +611,14 @@ struct BloomFilterPushdownContext { int task_id_; size_t num_expected_bloom_filters_ = 0; std::mutex receive_mutex_; - std::vector> received_filters_; + std::vector received_filters_; std::vector> received_maps_; AccumulationQueue batches_; FiltersReceivedCallback all_received_callback_; FilterFinishedCallback on_finished_; } eval_; }; + bool HashJoinSchema::HasDictionaries() const { for (int side = 0; side <= 1; ++side) { for (int icol = 0; icol < proj_maps[side].num_cols(HashJoinProjection::INPUT); @@ -683,7 +652,7 @@ class HashJoinNode : public ExecNode { HashJoinNode(ExecPlan* plan, NodeVector inputs, const HashJoinNodeOptions& join_options, std::shared_ptr output_schema, std::unique_ptr schema_mgr, Expression filter, - std::unique_ptr impl) + std::unique_ptr impl, bool is_swiss) : ExecNode(plan, inputs, {"left", "right"}, /*output_schema=*/std::move(output_schema), /*num_outputs=*/1), @@ -692,6 +661,7 @@ class HashJoinNode : public ExecNode { filter_(std::move(filter)), schema_mgr_(std::move(schema_mgr)), impl_(std::move(impl)), + is_swiss_(is_swiss), disable_bloom_filter_(join_options.disable_bloom_filter) { complete_.store(false); } @@ -732,21 +702,9 @@ class HashJoinNode : public ExecNode { std::shared_ptr output_schema = schema_mgr->MakeOutputSchema( join_options.output_suffix_for_left, join_options.output_suffix_for_right); - // Create hash join implementation object - // SwissJoin does not support: - // a) 64-bit string offsets - // b) residual predicates - // c) dictionaries - // - bool use_swiss_join; -#if ARROW_LITTLE_ENDIAN - use_swiss_join = (filter == literal(true)) && !schema_mgr->HasDictionaries() && - !schema_mgr->HasLargeBinary(); -#else - use_swiss_join = false; -#endif + bool use_swiss = use_swiss_join(filter, schema_mgr); std::unique_ptr impl; - if (use_swiss_join) { + if (use_swiss) { ARROW_ASSIGN_OR_RAISE(impl, HashJoinImpl::MakeSwiss()); } else { ARROW_ASSIGN_OR_RAISE(impl, HashJoinImpl::MakeBasic()); @@ -754,23 +712,121 @@ class HashJoinNode : public ExecNode { return plan->EmplaceNode( plan, inputs, join_options, std::move(output_schema), std::move(schema_mgr), - std::move(filter), std::move(impl)); + std::move(filter), std::move(impl), use_swiss); } const char* kind_name() const override { return "HashJoinNode"; } + // Create hash join implementation object + // SwissJoin does not support: + // a) 64-bit string offsets + // b) residual predicates + // c) dictionaries + // + static bool use_swiss_join(const Expression& filter, + const std::unique_ptr& schema) { +#if ARROW_LITTLE_ENDIAN + return (filter == literal(true) && !schema->HasDictionaries() && + !schema->HasLargeBinary()); +#else + return false; +#endif + } + + Status AddHashColumn(size_t thread_index, ExecBatch* batch, + const SchemaProjectionMaps& map) { + for (int i = 0; i < batch->num_values(); i++) { + if (batch->values[i].is_scalar()) { + ARROW_ASSIGN_OR_RAISE( + batch->values[i], + MakeArrayFromScalar(*batch->values[i].scalar(), batch->length, + plan_->query_context()->memory_pool())); + } + } + + ARROW_ASSIGN_OR_RAISE(std::unique_ptr hash_buf, + AllocateBuffer(sizeof(uint64_t) * batch->length, + plan_->query_context()->memory_pool())); + uint64_t* hashes = reinterpret_cast(hash_buf->mutable_data()); + std::vector temp_column_arrays; + auto key_to_in = map.map(HashJoinProjection::KEY, HashJoinProjection::INPUT); + int num_keys = key_to_in.num_cols; + std::vector key_cols(num_keys); + for (int i = 0; i < num_keys; i++) key_cols[i] = (*batch).values[key_to_in.get(i)]; + + ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack, + plan_->query_context()->GetTempStack(thread_index)); + + ExecBatch key_batch(std::move(key_cols), batch->length); + RETURN_NOT_OK(Hashing64::HashBatch( + std::move(key_batch), hashes, temp_column_arrays, + plan_->query_context()->cpu_info()->hardware_flags(), stack, 0, batch->length)); + + ArrayData hash_data(uint64(), batch->length, {nullptr, std::move(hash_buf)}); + batch->values.emplace_back(std::move(hash_data)); + return Status::OK(); + } + + Status OnSpillingStarted(size_t) { + { + std::lock_guard build_guard(build_side_mutex_); + spilling_build_ = true; + } + RETURN_NOT_OK(plan_->query_context()->StartTaskGroup( + task_group_spill_build_, build_accumulator_.batch_count())); + + { + std::lock_guard probe_guard(probe_side_mutex_); + spilling_probe_ = true; + } + RETURN_NOT_OK(plan_->query_context()->StartTaskGroup( + task_group_spill_probe_, probe_accumulator_.batch_count())); + + return Status::OK(); + } + + Status OnBuildSideAccumSpilled(size_t thread_index) { + // If the exchange returned true, it means that it was already + // true before us, so the other event that we are synchronizing + // with already happened. + if (build_accum_spilled_.exchange(true)) + return spilling_join_.OnBuildSideFinished(thread_index); + return Status::OK(); + } + + Status OnProbeSideAccumSpilled(size_t thread_index) { + // If the exchange returned true, it means that it was already + // true before us, so the other event that we are synchronizing + // with already happened. + if (probe_accum_spilled_.exchange(true)) + return spilling_join_.OnProbeSideFinished(thread_index); + return Status::OK(); + } + Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) { - std::lock_guard guard(build_side_mutex_); - build_accumulator_.InsertBatch(std::move(batch)); + { + std::lock_guard guard(build_side_mutex_); + if (!spilling_build_) { + build_accumulator_.InsertBatch(std::move(batch)); + return Status::OK(); + } + } + RETURN_NOT_OK(spilling_join_.OnBuildSideBatch(thread_index, std::move(batch))); return Status::OK(); } Status OnBuildSideFinished(size_t thread_index) { - return pushdown_context_.BuildBloomFilter( - thread_index, std::move(build_accumulator_), - [this](size_t thread_index, AccumulationQueue batches) { - return OnBloomFilterFinished(thread_index, std::move(batches)); - }); + if (!spilling_build_) { + return pushdown_context_.BuildBloomFilter( + thread_index, std::move(build_accumulator_), + [this](size_t thread_index, AccumulationQueue batches) { + return OnBloomFilterFinished(thread_index, std::move(batches)); + }); + } + + if (build_accum_spilled_.exchange(true)) + return spilling_join_.OnBuildSideFinished(thread_index); + return Status::OK(); } Status OnBloomFilterFinished(size_t thread_index, AccumulationQueue batches) { @@ -795,7 +851,12 @@ class HashJoinNode : public ExecNode { Status OnProbeSideBatch(size_t thread_index, ExecBatch batch) { { - std::lock_guard guard(probe_side_mutex_); + std::unique_lock guard(probe_side_mutex_); + if (spilling_probe_) { + guard.unlock(); + return spilling_join_.OnProbeSideBatch(thread_index, std::move(batch)); + } + if (!bloom_filters_ready_) { probe_accumulator_.InsertBatch(std::move(batch)); return Status::OK(); @@ -817,7 +878,14 @@ class HashJoinNode : public ExecNode { Status OnProbeSideFinished(size_t thread_index) { bool probing_finished; { - std::lock_guard guard(probe_side_mutex_); + std::unique_lock guard(probe_side_mutex_); + if (spilling_probe_) { + guard.unlock(); + if (probe_accum_spilled_.exchange(true)) + return spilling_join_.OnProbeSideFinished(thread_index); + return Status::OK(); + } + probing_finished = queued_batches_probed_ && !probe_side_finished_; probe_side_finished_ = true; } @@ -826,7 +894,11 @@ class HashJoinNode : public ExecNode { } Status OnFiltersReceived(size_t thread_index) { + RETURN_NOT_OK(spilling_join_.OnBloomFiltersReceived(thread_index)); + std::unique_lock guard(probe_side_mutex_); + if (spilling_probe_) return Status::OK(); + bloom_filters_ready_ = true; AccumulationQueue batches = std::move(probe_accumulator_); guard.unlock(); @@ -886,6 +958,13 @@ class HashJoinNode : public ExecNode { START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", {{"batch.length", batch.length}}); + if (ErrorIfNotOk(AddHashColumn(thread_index, &batch, schema_mgr_->proj_maps[side]))) { + StopProducing(); + return; + } + + if (ErrorIfNotOk(spilling_join_.CheckSpilling(thread_index, batch))) return; + Status status = side == 0 ? OnProbeSideBatch(thread_index, std::move(batch)) : OnBuildSideBatch(thread_index, std::move(batch)); @@ -947,32 +1026,85 @@ class HashJoinNode : public ExecNode { // we will change it back to just the CPU's thread pool capacity. size_t num_threads = (GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1); + auto register_task_group = [ctx](std::function fn, + std::function on_finished) { + return ctx->RegisterTaskGroup(std::move(fn), std::move(on_finished)); + }; + + auto start_task_group = [ctx](int task_group_id, int64_t num_tasks) { + return ctx->StartTaskGroup(task_group_id, num_tasks); + }; + + auto output_batch = [this](int64_t, ExecBatch batch) { + this->OutputBatchCallback(batch); + }; + auto finished = [this](int64_t total_num_batches) { + return this->FinishedCallback(total_num_batches); + }; + pushdown_context_.Init( - this, num_threads, - [ctx](std::function fn, - std::function on_finished) { - return ctx->RegisterTaskGroup(std::move(fn), std::move(on_finished)); - }, - [ctx](int task_group_id, int64_t num_tasks) { - return ctx->StartTaskGroup(task_group_id, num_tasks); - }, + this, num_threads, register_task_group, start_task_group, [this](size_t thread_index) { return OnFiltersReceived(thread_index); }, disable_bloom_filter_, use_sync_execution); - RETURN_NOT_OK(impl_->Init( + HashJoinImpl::CallbackRecord join_callbacks; + join_callbacks.register_task_group = register_task_group; + join_callbacks.start_task_group = start_task_group; + join_callbacks.output_batch = output_batch; + join_callbacks.finished = finished; + + RETURN_NOT_OK(impl_->Init(ctx, join_type_, num_threads, &(schema_mgr_->proj_maps[0]), + &(schema_mgr_->proj_maps[1]), &key_cmp_, &filter_, + std::move(join_callbacks))); + + SpillingHashJoin::CallbackRecord spilling_callbacks; + spilling_callbacks.register_task_group = register_task_group; + spilling_callbacks.start_task_group = start_task_group; + spilling_callbacks.add_probe_side_hashes = [this](size_t thread_index, + ExecBatch* batch) { + return AddHashColumn(thread_index, batch, schema_mgr_->proj_maps[0]); + }; + spilling_callbacks.bloom_filter_finished = [this](size_t thread_index) { + return pushdown_context_.PushBloomFilter(thread_index); + }; + spilling_callbacks.apply_bloom_filter = [this](size_t thread_index, + ExecBatch* batch) { + return pushdown_context_.FilterSingleBatch(thread_index, batch); + }; + spilling_callbacks.output_batch = output_batch; + spilling_callbacks.finished = finished; + spilling_callbacks.start_spilling = [this](size_t thread_index) { + return OnSpillingStarted(thread_index); + }; + spilling_callbacks.pause_probe_side = [this](int counter) { + inputs_[0]->PauseProducing(this, counter); + }; + spilling_callbacks.resume_probe_side = [this](int counter) { + inputs_[0]->ResumeProducing(this, counter); + }; + + RETURN_NOT_OK(spilling_join_.Init( ctx, join_type_, num_threads, &(schema_mgr_->proj_maps[0]), - &(schema_mgr_->proj_maps[1]), key_cmp_, filter_, - [ctx](std::function fn, - std::function on_finished) { - return ctx->RegisterTaskGroup(std::move(fn), std::move(on_finished)); + &(schema_mgr_->proj_maps[1]), &key_cmp_, &filter_, + pushdown_context_.bloom_filter(), std::move(spilling_callbacks), is_swiss_)); + + task_group_spill_build_ = ctx->RegisterTaskGroup( + [this](size_t thread_index, int64_t task_id) -> Status { + return spilling_join_.OnBuildSideBatch(thread_index, + std::move(build_accumulator_[task_id])); }, - [ctx](int task_group_id, int64_t num_tasks) { - return ctx->StartTaskGroup(task_group_id, num_tasks); + [this](size_t thread_index) -> Status { + return OnBuildSideAccumSpilled(thread_index); + }); + + task_group_spill_probe_ = ctx->RegisterTaskGroup( + [this](size_t thread_index, int64_t task_id) -> Status { + return spilling_join_.OnProbeSideBatch(thread_index, + std::move(probe_accumulator_[task_id])); }, - [this](int64_t, ExecBatch batch) { this->OutputBatchCallback(batch); }, - [this](int64_t total_num_batches) { - this->FinishedCallback(total_num_batches); - })); + [this](size_t thread_index) -> Status { + return OnProbeSideAccumSpilled(thread_index); + }); task_group_probe_ = ctx->RegisterTaskGroup( [this](size_t thread_index, int64_t task_id) -> Status { @@ -1030,12 +1162,13 @@ class HashJoinNode : public ExecNode { outputs_[0]->InputReceived(this, std::move(batch)); } - void FinishedCallback(int64_t total_num_batches) { + Status FinishedCallback(int64_t total_num_batches) { bool expected = false; if (complete_.compare_exchange_strong(expected, true)) { outputs_[0]->InputFinished(this, static_cast(total_num_batches)); finished_.MarkFinished(); } + return Status::OK(); } private: @@ -1046,14 +1179,19 @@ class HashJoinNode : public ExecNode { Expression filter_; std::unique_ptr schema_mgr_; std::unique_ptr impl_; - util::AccumulationQueue build_accumulator_; - util::AccumulationQueue probe_accumulator_; - util::AccumulationQueue queued_batches_to_probe_; + bool is_swiss_; + + AccumulationQueue build_accumulator_; + AccumulationQueue probe_accumulator_; + AccumulationQueue queued_batches_to_probe_; std::mutex build_side_mutex_; std::mutex probe_side_mutex_; + int task_group_spill_build_; + int task_group_spill_probe_; int task_group_probe_; + bool bloom_filters_ready_ = false; bool hash_table_ready_ = false; bool queued_batches_filtered_ = false; @@ -1061,8 +1199,15 @@ class HashJoinNode : public ExecNode { bool probe_side_finished_ = false; friend struct BloomFilterPushdownContext; + bool disable_bloom_filter_; BloomFilterPushdownContext pushdown_context_; + SpillingHashJoin spilling_join_; + bool spilling_build_ = false; + bool spilling_probe_ = false; + + std::atomic build_accum_spilled_{false}; + std::atomic probe_accum_spilled_{false}; }; void BloomFilterPushdownContext::Init( @@ -1078,7 +1223,6 @@ void BloomFilterPushdownContext::Init( eval_.all_received_callback_ = std::move(on_bloom_filters_received); if (!disable_bloom_filter_) { ARROW_CHECK(push_.pushdown_target_); - push_.bloom_filter_ = std::make_unique(); push_.pushdown_target_->pushdown_context_.ExpectBloomFilter(); build_.builder_ = BloomFilterBuilder::Make( @@ -1119,10 +1263,11 @@ Status BloomFilterPushdownContext::BuildBloomFilter(size_t thread_index, if (disable_bloom_filter_) return build_.on_finished_(thread_index, std::move(build_.batches_)); + push_.bloom_filter_.in_memory = std::make_unique(); RETURN_NOT_OK(build_.builder_->Begin( /*num_threads=*/ctx_->max_concurrency(), ctx_->cpu_info()->hardware_flags(), - ctx_->memory_pool(), build_.batches_.row_count(), build_.batches_.batch_count(), - push_.bloom_filter_.get())); + ctx_->memory_pool(), static_cast(build_.batches_.CalculateRowCount()), + build_.batches_.batch_count(), push_.bloom_filter_.in_memory.get())); return start_task_group_callback_(build_.task_id_, /*num_tasks=*/build_.batches_.batch_count()); @@ -1135,39 +1280,59 @@ Status BloomFilterPushdownContext::PushBloomFilter(size_t thread_index) { return Status::OK(); } -Status BloomFilterPushdownContext::BuildBloomFilter_exec_task(size_t thread_index, - int64_t task_id) { - const ExecBatch& input_batch = build_.batches_[task_id]; - SchemaProjectionMap key_to_in = - schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY, HashJoinProjection::INPUT); - std::vector key_columns(key_to_in.num_cols); - for (size_t i = 0; i < key_columns.size(); i++) { - int input_idx = key_to_in.get(static_cast(i)); - key_columns[i] = input_batch[input_idx]; - if (key_columns[i].is_scalar()) { - ARROW_ASSIGN_OR_RAISE(key_columns[i], - MakeArrayFromScalar(*key_columns[i].scalar(), - input_batch.length, ctx_->memory_pool())); - } +// Applies all Bloom filters on the input batch. +Status BloomFilterPushdownContext::FilterSingleBatch(size_t thread_index, + ExecBatch* batch_ptr) { + ExecBatch& batch = *batch_ptr; + if (eval_.num_expected_bloom_filters_ == 0 || batch.length == 0) return Status::OK(); + + int64_t bit_vector_bytes = bit_util::BytesForBits(batch.length); + std::vector selected(bit_vector_bytes); + + // In the common case of a join pushing a Bloom filter to itself, and that + // being the only Bloom filter, we can skip computing the hashes + if (push_.pushdown_target_ && this == &push_.pushdown_target_->pushdown_context_ && + eval_.num_expected_bloom_filters_ == 1) { + const uint64_t* hashes = reinterpret_cast( + batch.values.back().array()->buffers[1]->data()); + eval_.received_filters_[0].Find(ctx_->cpu_info()->hardware_flags(), batch.length, + hashes, selected.data()); + } else { + RETURN_NOT_OK(HashAndLookupInFilter(thread_index, batch, selected)); } - ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(key_columns))); - - ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack, ctx_->GetTempStack(thread_index)); - util::TempVectorHolder hash_holder(stack, util::MiniBatch::kMiniBatchLength); - uint32_t* hashes = hash_holder.mutable_data(); - for (int64_t i = 0; i < key_batch.length; i += util::MiniBatch::kMiniBatchLength) { - int64_t length = std::min(static_cast(key_batch.length - i), - static_cast(util::MiniBatch::kMiniBatchLength)); - std::vector temp_column_arrays; - RETURN_NOT_OK(Hashing32::HashBatch(key_batch, hashes, temp_column_arrays, - ctx_->cpu_info()->hardware_flags(), stack, i, - length)); - RETURN_NOT_OK(build_.builder_->PushNextBatch(thread_index, length, hashes)); + auto selected_buffer = std::make_unique(selected.data(), bit_vector_bytes); + ArrayData selected_arraydata(boolean(), batch.length, + {nullptr, std::move(selected_buffer)}); + Datum selected_datum(selected_arraydata); + FilterOptions options; + size_t first_nonscalar = batch.values.size(); + for (size_t i = 0; i < batch.values.size(); i++) { + if (!batch.values[i].is_scalar()) { + ARROW_ASSIGN_OR_RAISE(batch.values[i], Filter(batch.values[i], selected_datum, + options, ctx_->exec_context())); + first_nonscalar = std::min(first_nonscalar, i); + ARROW_DCHECK_EQ(batch.values[i].length(), batch.values[first_nonscalar].length()); + } } + // If they're all Scalar, then the length of the batch is the number of set bits + if (first_nonscalar == batch.values.size()) + batch.length = arrow::internal::CountSetBits(selected.data(), 0, batch.length); + else + batch.length = batch.values[first_nonscalar].length(); return Status::OK(); } +Status BloomFilterPushdownContext::BuildBloomFilter_exec_task(size_t thread_index, + int64_t task_id) { + const ExecBatch& input_batch = build_.batches_[task_id]; + if (input_batch.length == 0) return Status::OK(); + + const uint64_t* hashes = reinterpret_cast( + input_batch.values.back().array()->buffers[1]->data()); + return build_.builder_->PushNextBatch(thread_index, input_batch.length, hashes); +} + std::pair> BloomFilterPushdownContext::GetPushdownTarget( HashJoinNode* start) { #if !ARROW_LITTLE_ENDIAN diff --git a/cpp/src/arrow/compute/exec/partition_util.cc b/cpp/src/arrow/compute/exec/partition_util.cc index e99007c45a335..4e9d7ee93832d 100644 --- a/cpp/src/arrow/compute/exec/partition_util.cc +++ b/cpp/src/arrow/compute/exec/partition_util.cc @@ -17,6 +17,7 @@ #include "arrow/compute/exec/partition_util.h" #include +#include namespace arrow { namespace compute { @@ -79,6 +80,16 @@ bool PartitionLocks::AcquirePartitionLock(size_t thread_id, int num_prtns_to_try return false; } +PartitionLocks::AutoReleaseLock PartitionLocks::AcquirePartitionLock(int prtn_id) { + std::atomic* lock = lock_ptr(prtn_id); + bool expected = false; + for (;;) { + if (lock->compare_exchange_strong(expected, true, std::memory_order_acquire)) + return {this, prtn_id}; + while (lock->load()) std::this_thread::yield(); + } +} + void PartitionLocks::ReleasePartitionLock(int prtn_id) { ARROW_DCHECK(prtn_id >= 0 && prtn_id < num_prtns_); std::atomic* lock = lock_ptr(prtn_id); diff --git a/cpp/src/arrow/compute/exec/partition_util.h b/cpp/src/arrow/compute/exec/partition_util.h index b3f302511a75d..362d3e0ee97d7 100644 --- a/cpp/src/arrow/compute/exec/partition_util.h +++ b/cpp/src/arrow/compute/exec/partition_util.h @@ -37,7 +37,7 @@ class PartitionSort { /// This corresponds to ranges in the sorted array containing all row ids for /// each of the partitions. /// - /// prtn_ranges must be initailized and have at least prtn_ranges + 1 elements + /// prtn_ranges must be initailized and have at least num_prtns + 1 elements /// when this method returns prtn_ranges[i] will contains the total number of /// elements in partitions 0 through i. prtn_ranges[0] will be 0. /// @@ -88,7 +88,7 @@ class PartitionSort { }; /// \brief A control for synchronizing threads on a partitionable workload -class PartitionLocks { +class ARROW_EXPORT PartitionLocks { public: PartitionLocks(); ~PartitionLocks(); @@ -115,6 +115,17 @@ class PartitionLocks { bool AcquirePartitionLock(size_t thread_id, int num_prtns, const int* prtns_to_try, bool limit_retries, int max_retries, int* locked_prtn_id, int* locked_prtn_id_pos); + + class [[nodiscard]] AutoReleaseLock { + public: + AutoReleaseLock(PartitionLocks* locks, int prtn_id) + : locks(locks), prtn_id(prtn_id) {} + ~AutoReleaseLock() { locks->ReleasePartitionLock(prtn_id); } + PartitionLocks* locks; + int prtn_id; + }; + + AutoReleaseLock AcquirePartitionLock(int prtn_id); /// \brief Release a partition so that other threads can work on it void ReleasePartitionLock(int prtn_id); @@ -147,14 +158,7 @@ class PartitionLocks { /*limit_retries=*/false, /*max_retries=*/-1, &locked_prtn_id, &locked_prtn_id_pos); { - class AutoReleaseLock { - public: - AutoReleaseLock(PartitionLocks* locks, int prtn_id) - : locks(locks), prtn_id(prtn_id) {} - ~AutoReleaseLock() { locks->ReleasePartitionLock(prtn_id); } - PartitionLocks* locks; - int prtn_id; - } auto_release_lock(this, locked_prtn_id); + AutoReleaseLock auto_release_lock(this, locked_prtn_id); ARROW_RETURN_NOT_OK(process_prtn_fn(locked_prtn_id)); } if (locked_prtn_id_pos < num_unprocessed_partitions - 1) { diff --git a/cpp/src/arrow/compute/exec/query_context.cc b/cpp/src/arrow/compute/exec/query_context.cc index a155c750a2a9e..926f61ef215ae 100644 --- a/cpp/src/arrow/compute/exec/query_context.cc +++ b/cpp/src/arrow/compute/exec/query_context.cc @@ -22,7 +22,9 @@ namespace arrow { using internal::CpuInfo; namespace compute { -QueryOptions::QueryOptions() : use_legacy_batching(false) {} +QueryOptions::QueryOptions() + : max_memory_bytes(::arrow::internal::GetTotalMemoryBytes() / 2), + use_legacy_batching(false) {} QueryContext::QueryContext(QueryOptions opts, ExecContext exec_context) : options_(opts), diff --git a/cpp/src/arrow/compute/exec/query_context.h b/cpp/src/arrow/compute/exec/query_context.h index 12ddbc56fad16..fdf059b103a41 100644 --- a/cpp/src/arrow/compute/exec/query_context.h +++ b/cpp/src/arrow/compute/exec/query_context.h @@ -30,6 +30,8 @@ namespace compute { struct ARROW_EXPORT QueryOptions { QueryOptions(); + int64_t max_memory_bytes; + /// \brief Should the plan use a legacy batching strategy /// /// This is currently in place only to support the Scanner::ToTable @@ -45,7 +47,7 @@ struct ARROW_EXPORT QueryOptions { class ARROW_EXPORT QueryContext { public: QueryContext(QueryOptions opts = {}, - ExecContext exec_context = *default_exec_context()); + ExecContext exec_context = *threaded_exec_context()); Status Init(size_t max_num_threads, util::AsyncTaskScheduler* scheduler); diff --git a/cpp/src/arrow/compute/exec/schema_util.h b/cpp/src/arrow/compute/exec/schema_util.h index f2b14aa545060..bfb224c9e5dad 100644 --- a/cpp/src/arrow/compute/exec/schema_util.h +++ b/cpp/src/arrow/compute/exec/schema_util.h @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -38,7 +39,8 @@ enum class HashJoinProjection : int { KEY = 1, PAYLOAD = 2, FILTER = 3, - OUTPUT = 4 + OUTPUT = 4, + NUM_VALUES = 5, }; struct SchemaProjectionMap { @@ -63,22 +65,44 @@ class SchemaProjectionMaps { public: static constexpr int kMissingField = -1; - Status Init(ProjectionIdEnum full_schema_handle, const Schema& schema, - const std::vector& projection_handles, - const std::vector*>& projections) { - ARROW_DCHECK(projection_handles.size() == projections.size()); + Status Init(ProjectionIdEnum full_schema_handle, const Schema& schema) { ARROW_RETURN_NOT_OK(RegisterSchema(full_schema_handle, schema)); - for (size_t i = 0; i < projections.size(); ++i) { - ARROW_RETURN_NOT_OK( - RegisterProjectedSchema(projection_handles[i], *(projections[i]), schema)); + const int id_base = 0; + std::vector& mapping = mappings_[id_base]; + std::vector& inverse = inverse_mappings_[id_base]; + mapping.resize(schema.num_fields()); + inverse.resize(schema.num_fields()); + std::iota(mapping.begin(), mapping.end(), 0); + std::iota(inverse.begin(), inverse.end(), 0); + return Status::OK(); + } + + Status RegisterProjectedSchema(ProjectionIdEnum handle, + const std::vector& selected_fields, + const Schema& full_schema) { + FieldInfos out_fields; + const FieldVector& in_fields = full_schema.fields(); + out_fields.field_paths.resize(selected_fields.size()); + out_fields.field_names.resize(selected_fields.size()); + out_fields.data_types.resize(selected_fields.size()); + for (size_t i = 0; i < selected_fields.size(); ++i) { + // All fields must be found in schema without ambiguity + ARROW_ASSIGN_OR_RAISE(auto match, selected_fields[i].FindOne(full_schema)); + const std::string& name = in_fields[match[0]]->name(); + const std::shared_ptr& type = in_fields[match[0]]->type(); + out_fields.field_paths[i] = match[0]; + out_fields.field_names[i] = name; + out_fields.data_types[i] = type; } - RegisterEnd(); + int id = schema_id(handle); + schemas_[id] = std::move(out_fields); + GenerateMapForProjection(id); return Status::OK(); } int num_cols(ProjectionIdEnum schema_handle) const { int id = schema_id(schema_handle); - return static_cast(schemas_[id].second.data_types.size()); + return static_cast(schemas_[id].data_types.size()); } bool is_empty(ProjectionIdEnum schema_handle) const { @@ -87,19 +111,19 @@ class SchemaProjectionMaps { const std::string& field_name(ProjectionIdEnum schema_handle, int field_id) const { int id = schema_id(schema_handle); - return schemas_[id].second.field_names[field_id]; + return schemas_[id].field_names[field_id]; } const std::shared_ptr& data_type(ProjectionIdEnum schema_handle, int field_id) const { int id = schema_id(schema_handle); - return schemas_[id].second.data_types[field_id]; + return schemas_[id].data_types[field_id]; } const std::vector>& data_types( ProjectionIdEnum schema_handle) const { int id = schema_id(schema_handle); - return schemas_[id].second.data_types; + return schemas_[id].data_types; } SchemaProjectionMap map(ProjectionIdEnum from, ProjectionIdEnum to) const { @@ -132,55 +156,21 @@ class SchemaProjectionMaps { out_fields.field_names[i] = name; out_fields.data_types[i] = type; } - schemas_.push_back(std::make_pair(handle, out_fields)); + schemas_[schema_id(handle)] = std::move(out_fields); return Status::OK(); } - Status RegisterProjectedSchema(ProjectionIdEnum handle, - const std::vector& selected_fields, - const Schema& full_schema) { - FieldInfos out_fields; - const FieldVector& in_fields = full_schema.fields(); - out_fields.field_paths.resize(selected_fields.size()); - out_fields.field_names.resize(selected_fields.size()); - out_fields.data_types.resize(selected_fields.size()); - for (size_t i = 0; i < selected_fields.size(); ++i) { - // All fields must be found in schema without ambiguity - ARROW_ASSIGN_OR_RAISE(auto match, selected_fields[i].FindOne(full_schema)); - const std::string& name = in_fields[match[0]]->name(); - const std::shared_ptr& type = in_fields[match[0]]->type(); - out_fields.field_paths[i] = match[0]; - out_fields.field_names[i] = name; - out_fields.data_types[i] = type; - } - schemas_.push_back(std::make_pair(handle, out_fields)); - return Status::OK(); - } - - void RegisterEnd() { - size_t size = schemas_.size(); - mappings_.resize(size); - inverse_mappings_.resize(size); - int id_base = 0; - for (size_t i = 0; i < size; ++i) { - GenerateMapForProjection(static_cast(i), id_base); - } - } - int schema_id(ProjectionIdEnum schema_handle) const { - for (size_t i = 0; i < schemas_.size(); ++i) { - if (schemas_[i].first == schema_handle) { - return static_cast(i); - } - } - // We should never get here - ARROW_DCHECK(false); - return -1; + int id = static_cast(schema_handle); + ARROW_DCHECK(id < static_cast(ProjectionIdEnum::NUM_VALUES)); + return id; } - void GenerateMapForProjection(int id_proj, int id_base) { - int num_cols_proj = static_cast(schemas_[id_proj].second.data_types.size()); - int num_cols_base = static_cast(schemas_[id_base].second.data_types.size()); + void GenerateMapForProjection(int id_proj) { + const int id_base = 0; + + int num_cols_proj = static_cast(schemas_[id_proj].data_types.size()); + int num_cols_base = static_cast(schemas_[id_base].data_types.size()); std::vector& mapping = mappings_[id_proj]; std::vector& inverse_mapping = inverse_mappings_[id_proj]; @@ -192,8 +182,8 @@ class SchemaProjectionMaps { mapping[i] = inverse_mapping[i] = i; } } else { - const FieldInfos& fields_proj = schemas_[id_proj].second; - const FieldInfos& fields_base = schemas_[id_base].second; + const FieldInfos& fields_proj = schemas_[id_proj]; + const FieldInfos& fields_base = schemas_[id_base]; for (int i = 0; i < num_cols_base; ++i) { inverse_mapping[i] = SchemaProjectionMap::kMissingField; } @@ -215,9 +205,11 @@ class SchemaProjectionMaps { } // vector used as a mapping from ProjectionIdEnum to fields - std::vector> schemas_; - std::vector> mappings_; - std::vector> inverse_mappings_; + std::array(ProjectionIdEnum::NUM_VALUES)> schemas_; + std::array, static_cast(ProjectionIdEnum::NUM_VALUES)> + mappings_; + std::array, static_cast(ProjectionIdEnum::NUM_VALUES)> + inverse_mappings_; }; using HashJoinProjectionMaps = SchemaProjectionMaps; diff --git a/cpp/src/arrow/compute/exec/spilling_benchmark.cc b/cpp/src/arrow/compute/exec/spilling_benchmark.cc new file mode 100644 index 0000000000000..b8fb5780c1968 --- /dev/null +++ b/cpp/src/arrow/compute/exec/spilling_benchmark.cc @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include "arrow/compute/exec/accumulation_queue.h" +#include "arrow/compute/exec/spilling_util.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/util/checked_cast.h" +#include "benchmark/benchmark.h" + +namespace arrow { +namespace compute { +struct SpillingBenchmarkSettings { + int64_t num_files = 4; + // number of I/O threads. If -1 then the default I/O capacity will be used. + int64_t num_threads = -1; +}; + +static void SpillingWrite_Impl(benchmark::State& st, + SpillingBenchmarkSettings& settings) { + constexpr int num_batches = 1024; + constexpr int rows_per_batch = 32000; + int64_t num_files = settings.num_files; + std::shared_ptr bm_schema = + schema({field("f1", int32()), field("f2", int32())}); + Random64Bit rng(42); + for (auto _ : st) { + st.PauseTiming(); + { + QueryContext ctx; + std::vector file(num_files); + Future<> fut = util::AsyncTaskScheduler::Make([&](util::AsyncTaskScheduler* sched) { + RETURN_NOT_OK(ctx.Init(ctx.max_concurrency(), sched)); + if (settings.num_threads != -1) { + RETURN_NOT_OK(arrow::internal::checked_cast( + ctx.io_context()->executor()) + ->SetCapacity(static_cast(settings.num_threads))); + } + BatchesWithSchema batches = + MakeRandomBatches(bm_schema, num_batches, rows_per_batch, + SpillFile::kAlignment, ctx.memory_pool()); + st.ResumeTiming(); + + for (ExecBatch& b : batches.batches) { + int64_t idx = rng.from_range(static_cast(0), num_files - 1); + RETURN_NOT_OK(file[idx].SpillBatch(&ctx, std::move(b))); + } + return Status::OK(); + }); + fut.Wait(); + st.PauseTiming(); + for (SpillFile& f : file) ASSERT_OK(f.Cleanup()); + } + st.ResumeTiming(); + } + st.counters["BytesProcessed"] = benchmark::Counter( + num_batches * rows_per_batch * sizeof(int32_t) * 2, + benchmark::Counter::kIsIterationInvariantRate, benchmark::Counter::OneK::kIs1024); +} + +static void BM_SpillingWrite(benchmark::State& st) { + SpillingBenchmarkSettings settings; + settings.num_files = st.range(0); + SpillingWrite_Impl(st, settings); +} + +static void BM_SpillingRead(benchmark::State& st) { + constexpr int num_batches = 1024; + constexpr int rows_per_batch = 32000; + std::shared_ptr bm_schema = + schema({field("f1", int32()), field("f2", int32())}); + for (auto _ : st) { + st.PauseTiming(); + { + SpillFile file; + QueryContext ctx; + Future<> fut = util::AsyncTaskScheduler::Make([&](util::AsyncTaskScheduler* sched) { + RETURN_NOT_OK(ctx.Init(std::thread::hardware_concurrency(), sched)); + BatchesWithSchema batches = + MakeRandomBatches(bm_schema, num_batches, rows_per_batch, + SpillFile::kAlignment, ctx.memory_pool()); + + std::vector accum(num_batches); + for (ExecBatch& b : batches.batches) + RETURN_NOT_OK(file.SpillBatch(&ctx, std::move(b))); + + while (file.batches_written() < num_batches) std::this_thread::yield(); + + RETURN_NOT_OK(file.PreallocateBatches(ctx.memory_pool())); + st.ResumeTiming(); + + RETURN_NOT_OK(file.ReadBackBatches( + &ctx, + [&](size_t, size_t idx, ExecBatch batch) { + accum[idx] = std::move(batch); + return Status::OK(); + }, + [&](size_t) { return Status::OK(); })); + return Status::OK(); + }); + fut.Wait(); + st.PauseTiming(); + ASSERT_OK(file.Cleanup()); + } + st.ResumeTiming(); + } + st.counters["BytesProcessed"] = benchmark::Counter( + num_batches * rows_per_batch * sizeof(int32_t) * 2, + benchmark::Counter::kIsIterationInvariantRate, benchmark::Counter::OneK::kIs1024); +} + +static void BM_SpillingNumThreads(benchmark::State& st) { + SpillingBenchmarkSettings settings; + settings.num_threads = st.range(0); + SpillingWrite_Impl(st, settings); +} + +BENCHMARK(BM_SpillingWrite) + ->UseRealTime() + ->ArgNames({"NumFiles"}) + ->RangeMultiplier(4) + ->Range(1, SpillingAccumulationQueue::kNumPartitions); +BENCHMARK(BM_SpillingRead)->UseRealTime(); +BENCHMARK(BM_SpillingNumThreads) + ->UseRealTime() + ->ArgNames({"NumThreads"}) + ->RangeMultiplier(2) + ->Range(1, 2 * std::thread::hardware_concurrency()); +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/spilling_join.cc b/cpp/src/arrow/compute/exec/spilling_join.cc new file mode 100644 index 0000000000000..1bb0a04365c12 --- /dev/null +++ b/cpp/src/arrow/compute/exec/spilling_join.cc @@ -0,0 +1,262 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "arrow/compute/exec/spilling_join.h" +#include "arrow/util/atomic_util.h" + +namespace arrow { +namespace compute { +void PartitionedBloomFilter::Find(int64_t hardware_flags, int64_t num_rows, + const uint64_t* hashes, uint8_t* bv) { + if (in_memory) return in_memory->Find(hardware_flags, num_rows, hashes, bv); + + for (int64_t i = 0; i < num_rows; i++) { + uint64_t hash = hashes[i]; + size_t partition = SpillingAccumulationQueue::partition_id(hashes[i]); + bool found = partitions[partition] ? partitions[partition]->Find(hash) : true; + bit_util::SetBitTo(bv, i, found); + } +} + +Status SpillingHashJoin::Init(QueryContext* ctx, JoinType join_type, size_t num_threads, + SchemaProjectionMaps* proj_map_left, + SchemaProjectionMaps* proj_map_right, + std::vector* key_cmp, Expression* filter, + PartitionedBloomFilter* bloom_filter, + CallbackRecord callback_record, bool is_swiss) { + ctx_ = ctx; + num_threads_ = num_threads; + callbacks_ = std::move(callback_record); + bloom_filter_ = bloom_filter; + is_swiss_ = is_swiss; + + HashJoinImpl::CallbackRecord join_callbacks; + join_callbacks.register_task_group = callbacks_.register_task_group; + join_callbacks.start_task_group = callbacks_.start_task_group; + join_callbacks.output_batch = callbacks_.output_batch; + join_callbacks.finished = [this](int64_t num_total_batches) { + return this->OnCollocatedJoinFinished(num_total_batches); + }; + + builder_ = BloomFilterBuilder::Make(num_threads_ == 1 + ? BloomFilterBuildStrategy::SINGLE_THREADED + : BloomFilterBuildStrategy::PARALLEL); + RETURN_NOT_OK(build_accumulator_.Init(ctx)); + RETURN_NOT_OK(probe_accumulator_.Init(ctx)); + + for (size_t i = 0; i < SpillingAccumulationQueue::kNumPartitions; i++) { + ARROW_ASSIGN_OR_RAISE( + impls_[i], is_swiss_ ? HashJoinImpl::MakeSwiss() : HashJoinImpl::MakeBasic()); + RETURN_NOT_OK(impls_[i]->Init(ctx_, join_type, num_threads, proj_map_left, + proj_map_right, key_cmp, filter, join_callbacks)); + + task_group_bloom_[i] = callbacks_.register_task_group( + [this](size_t thread_index, int64_t task_id) { + return PushBloomFilterBatch(thread_index, task_id); + }, + [this](size_t thread_index) { return OnBloomFilterFinished(thread_index); }); + } + return Status::OK(); +} + +Status SpillingHashJoin::CheckSpilling(size_t thread_index, ExecBatch& batch) { + size_t size_of_batch = static_cast(batch.TotalBufferSize()); + size_t max_batch_size = arrow::util::AtomicMax(max_batch_size_, size_of_batch); + + // Spilling algorithm proven to not use more than + // (SpillThreshold + NumThreads * BatchSize) memory. + // Thus we want to spill when (SpillThreshold + NumThreads * BatchSize) = k * MaxMemory + // with some fuzz factor k (which is 0.8 here because that's what I decided). + // Thus SpillThreshold = k * MaxMemory - NumThreads * BatchSize. + constexpr float kFuzzFactor = 0.8f; + size_t max_memory = static_cast(kFuzzFactor * ctx_->options().max_memory_bytes); + size_t spill_threshold = static_cast(std::max( + static_cast(kFuzzFactor * max_memory - num_threads_ * max_batch_size), + static_cast(0))); + size_t bytes_allocated = static_cast(ctx_->memory_pool()->bytes_allocated()); + size_t bytes_inflight = ctx_->GetCurrentTempFileIO(); + + size_t backpressure_threshold = spill_threshold / 2; + if (bytes_allocated > backpressure_threshold) { + if (int32_t expected = 0; backpressure_counter_.compare_exchange_strong(expected, 1)) + callbacks_.pause_probe_side(1); + } + if ((bytes_allocated - bytes_inflight) > spill_threshold) { + RETURN_NOT_OK(AdvanceSpillCursor(thread_index)); + } + return Status::OK(); +} + +Status SpillingHashJoin::AdvanceSpillCursor(size_t thread_index) { + if (bool expected = false; + !spilling_.load() && spilling_.compare_exchange_strong(expected, true)) + return callbacks_.start_spilling(thread_index); + + ARROW_ASSIGN_OR_RAISE(bool probe_advanced, probe_accumulator_.AdvanceSpillCursor()); + if (probe_advanced) return Status::OK(); + + ARROW_ASSIGN_OR_RAISE(bool build_advanced, build_accumulator_.AdvanceSpillCursor()); + if (build_advanced) return Status::OK(); + + ARROW_ASSIGN_OR_RAISE(bool probe_hash_advanced, probe_accumulator_.AdvanceHashCursor()); + if (probe_hash_advanced) return Status::OK(); + + ARROW_ASSIGN_OR_RAISE(bool build_hash_advanced, build_accumulator_.AdvanceHashCursor()); + if (build_hash_advanced) return Status::OK(); + + // Pray we don't run out of memory + ARROW_LOG(WARNING) + << "Memory limits for query exceeded but all data from hash join spilled to disk"; + return Status::OK(); +} + +Status SpillingHashJoin::OnBuildSideBatch(size_t thread_index, ExecBatch batch) { + return build_accumulator_.InsertBatch(thread_index, std::move(batch)); +} + +Status SpillingHashJoin::OnBuildSideFinished(size_t thread_index) { + return BuildPartitionedBloomFilter(thread_index); +} + +// Note about Bloom filter implementation: +// Currently, we disable a partition for a Bloom filter based on the size of +// the hashes for that partition. Instead, we should be disabling based on +// the size of the bloom filter itself, since a Bloom filter would use about +// 8-16 bits per value instead of 64 bits per value. +Status SpillingHashJoin::BuildPartitionedBloomFilter(size_t thread_index) { + // Disable Bloom filter if bloom_filter_ = nullptr by advancing to past + // the final Bloom filter + partition_idx_ = (bloom_filter_ == nullptr) ? SpillingAccumulationQueue::kNumPartitions + : build_accumulator_.hash_cursor(); + return BuildNextBloomFilter(thread_index); +} + +Status SpillingHashJoin::PushBloomFilterBatch(size_t thread_index, int64_t batch_id) { + const uint64_t* hashes = + build_accumulator_.GetHashes(partition_idx_, static_cast(batch_id)); + size_t num_rows = + build_accumulator_.row_count(partition_idx_, static_cast(batch_id)); + return builder_->PushNextBatch(thread_index, static_cast(num_rows), hashes); +} + +Status SpillingHashJoin::BuildNextBloomFilter(size_t thread_index) { + size_t num_rows = build_accumulator_.CalculatePartitionRowCount(partition_idx_); + size_t num_batches = build_accumulator_.batch_count(partition_idx_); + + // partition_idx_ is incremented in the callback for the taskgroup + bloom_filter_->partitions[partition_idx_] = std::make_unique(); + + RETURN_NOT_OK(builder_->Begin(num_threads_, ctx_->cpu_info()->hardware_flags(), + ctx_->memory_pool(), num_rows, num_batches, + bloom_filter_->partitions[partition_idx_].get())); + + return callbacks_.start_task_group(task_group_bloom_[partition_idx_], + build_accumulator_.batch_count(partition_idx_)); +} + +Status SpillingHashJoin::OnBloomFilterFinished(size_t thread_index) { + if (++partition_idx_ >= SpillingAccumulationQueue::kNumPartitions) + return OnPartitionedBloomFilterFinished(thread_index); + return BuildNextBloomFilter(thread_index); +} + +Status SpillingHashJoin::OnPartitionedBloomFilterFinished(size_t thread_index) { + RETURN_NOT_OK(callbacks_.bloom_filter_finished(thread_index)); + backpressure_counter_.store(2); + callbacks_.resume_probe_side(/*backpressure_counter=*/2); + if (bloom_or_probe_finished_.exchange(true)) return StartCollocatedJoins(thread_index); + return Status::OK(); +} + +Status SpillingHashJoin::OnBloomFiltersReceived(size_t thread_index) { + bloom_ready_.store(true, std::memory_order_release); + return Status::OK(); +} + +Status SpillingHashJoin::OnProbeSideBatch(size_t thread_index, ExecBatch batch) { + if (bloom_ready_.load()) { + RETURN_NOT_OK(callbacks_.apply_bloom_filter(thread_index, &batch)); + } + return probe_accumulator_.InsertBatch(thread_index, std::move(batch)); +} + +Status SpillingHashJoin::OnProbeSideFinished(size_t thread_index) { + if (bloom_or_probe_finished_.exchange(true)) return StartCollocatedJoins(thread_index); + return Status::OK(); +} + +Status SpillingHashJoin::StartCollocatedJoins(size_t thread_index) { + // We start reading from the back to take advantage of any caches with the SSD + // that may be in place (i.e. read back the most-recently-written stuff). + partition_idx_ = SpillingAccumulationQueue::kNumPartitions; + return BeginNextCollocatedJoin(thread_index); +} + +Status SpillingHashJoin::BeginNextCollocatedJoin(size_t thread_index) { + partition_idx_ -= 1; + build_queue_.Resize(build_accumulator_.batch_count(partition_idx_)); + return build_accumulator_.GetPartition( + thread_index, partition_idx_, + /*on_batch*/ + [this](size_t thread_index, size_t batch_idx, ExecBatch batch) { + build_queue_.SetBatch(batch_idx, std::move(batch)); + return Status::OK(); + }, + /*on_finished=*/ + [this](size_t thread_index) { return BuildHashTable(thread_index); }); +} + +// A possible optimization here is to swap the build and probe side if the probe side is +// smaller (we want the smaller side to be the hash table side). We know how much we wrote +// to disk for each side, so it could be a big win. +Status SpillingHashJoin::BuildHashTable(size_t thread_index) { + RETURN_NOT_OK(impls_[partition_idx_]->BuildHashTable( + thread_index, std::move(build_queue_), + [this](size_t thread_index) { return OnHashTableFinished(thread_index); })); + return Status::OK(); +} + +Status SpillingHashJoin::OnHashTableFinished(size_t thread_index) { + return probe_accumulator_.GetPartition( + thread_index, partition_idx_, + [this](size_t thread_index, size_t batch_idx, ExecBatch batch) { + return OnProbeSideBatchReadBack(thread_index, batch_idx, std::move(batch)); + }, + [this](size_t thread_index) { return OnProbingFinished(thread_index); }); +} + +Status SpillingHashJoin::OnProbeSideBatchReadBack(size_t thread_index, size_t batch_idx, + ExecBatch batch) { + ARROW_DCHECK(bloom_ready_.load()); + RETURN_NOT_OK(callbacks_.add_probe_side_hashes(thread_index, &batch)); + RETURN_NOT_OK(callbacks_.apply_bloom_filter(thread_index, &batch)); + return impls_[partition_idx_]->ProbeSingleBatch(thread_index, std::move(batch)); +} + +Status SpillingHashJoin::OnProbingFinished(size_t thread_index) { + return impls_[partition_idx_]->ProbingFinished(thread_index); +} + +Status SpillingHashJoin::OnCollocatedJoinFinished(int64_t num_batches) { + total_batches_outputted_ += num_batches; + if (partition_idx_ > 0) return BeginNextCollocatedJoin(ctx_->GetThreadIndex()); + return callbacks_.finished(total_batches_outputted_); +} +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/spilling_join.h b/cpp/src/arrow/compute/exec/spilling_join.h new file mode 100644 index 0000000000000..3542368e8b63d --- /dev/null +++ b/cpp/src/arrow/compute/exec/spilling_join.h @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/compute/exec/accumulation_queue.h" +#include "arrow/compute/exec/hash_join.h" +#include "arrow/compute/exec/query_context.h" + +namespace arrow { +namespace compute { + +// Holds Bloom filters used by the join. In the case of spilling, +// Bloom filters are rebuilt on partitions that still have their hashes +// in memory (since hashes get spilled later). +struct PartitionedBloomFilter { + std::unique_ptr in_memory; + std::unique_ptr + partitions[SpillingAccumulationQueue::kNumPartitions]; + + void Find(int64_t hardware_flags, int64_t num_rows, const uint64_t* hashes, + uint8_t* bv); +}; + +// A separate implementation of the Hash Join that partitions the input data into 64 +// partitions and writes the partitions to disk. Once the partitions have been written to +// disk, joins are performed per-partition and results are outputted. +// +// Probe-side batches are spilled first, then build-side batches, then probe-side hashes, +// then build-side hashes. +// +// As soon as spilling starts, the probe-side is paused to enable full accumulation of the +// build side first, to minimize the number of batches buffered by the probe side. It is +// resumed once the build side is finished. +class SpillingHashJoin { + public: + using RegisterTaskGroupCallback = std::function, + std::function)>; // Register a TaskGroup in the ExecPlan + using StartTaskGroupCallback = + std::function; // Start the TaskGroup with the givne ID + using AddProbeSideHashColumnCallback = + std::function; // Hashes the key columns of the batch + // and appends the hashes as a column + using BloomFilterFinishedCallback = + std::function; // Called when the Bloom filter is built + using ApplyBloomFilterCallback = + std::function; // Applies the Bloom filter to the batch + using OutputBatchCallback = std::function; // Output a batch + using FinishedCallback = + std::function; // The Join has finished outputting + using StartSpillingCallback = std::function; // Called when we've run out of memory and spilling is starting + using PauseProbeSideCallback = std::function; // Pauses probe side + using ResumeProbeSideCallback = std::function; // Resumes probe side + + struct CallbackRecord { + RegisterTaskGroupCallback register_task_group; + StartTaskGroupCallback start_task_group; + AddProbeSideHashColumnCallback add_probe_side_hashes; + BloomFilterFinishedCallback bloom_filter_finished; + ApplyBloomFilterCallback apply_bloom_filter; + OutputBatchCallback output_batch; + FinishedCallback finished; + StartSpillingCallback start_spilling; + PauseProbeSideCallback pause_probe_side; + ResumeProbeSideCallback resume_probe_side; + }; + + Status Init(QueryContext* ctx, JoinType join_type, size_t num_threads, + SchemaProjectionMaps* proj_map_left, + SchemaProjectionMaps* proj_map_right, + std::vector* key_cmp, Expression* filter, + PartitionedBloomFilter* bloom_filter, CallbackRecord callback_record, + bool is_swiss); + + // Checks available memory and initiates spilling if there is not enough. + Status CheckSpilling(size_t thread_index, ExecBatch& batch); + + Status OnBuildSideBatch(size_t thread_index, ExecBatch batch); + Status OnBuildSideFinished(size_t thread_index); + + Status OnProbeSideBatch(size_t thread_index, ExecBatch batch); + Status OnProbeSideFinished(size_t thread_index); + + Status OnBloomFiltersReceived(size_t thread_index); + + private: + Status AdvanceSpillCursor(size_t thread_index); + + // Builds the entire bloom filter for all 64 partitions. + Status BuildPartitionedBloomFilter(size_t thread_index); + Status PushBloomFilterBatch(size_t thread_index, int64_t batch_id); + // Builds a bloom filter for a single partition. + Status BuildNextBloomFilter(size_t thread_index); + Status OnBloomFilterFinished(size_t thread_index); + Status OnPartitionedBloomFilterFinished(size_t thread_index); + + Status StartCollocatedJoins(size_t thread_index); + Status BeginNextCollocatedJoin(size_t thread_index); + Status BuildHashTable(size_t thread_index); + Status OnHashTableFinished(size_t thread_index); + Status OnProbeSideBatchReadBack(size_t thread_index, size_t batch_idx, ExecBatch batch); + Status OnProbingFinished(size_t thread_index); + Status OnCollocatedJoinFinished(int64_t num_batches); + + QueryContext* ctx_; + size_t num_threads_; + CallbackRecord callbacks_; + bool is_swiss_; + PartitionedBloomFilter* bloom_filter_; + std::unique_ptr builder_; + + // Backpressure toggling happens at most twice during execution. A value of 0 means + // we haven't toggled backpressure at all, value of 1 means we've paused, and value + // 2 means we've resumed. + std::atomic backpressure_counter_{0}; + + SpillingAccumulationQueue build_accumulator_; + SpillingAccumulationQueue probe_accumulator_; + + AccumulationQueue build_queue_; + + std::unique_ptr impls_[SpillingAccumulationQueue::kNumPartitions]; + int task_group_bloom_[SpillingAccumulationQueue::kNumPartitions]; + + std::atomic max_batch_size_{0}; + + int64_t total_batches_outputted_ = 0; + size_t partition_idx_ = SpillingAccumulationQueue::kNumPartitions; + std::atomic spilling_{false}; + std::atomic bloom_or_probe_finished_{false}; + std::atomic bloom_ready_{false}; +}; +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/spilling_test.cc b/cpp/src/arrow/compute/exec/spilling_test.cc new file mode 100644 index 0000000000000..667e1e6fcb2e3 --- /dev/null +++ b/cpp/src/arrow/compute/exec/spilling_test.cc @@ -0,0 +1,265 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include + +#include "arrow/api.h" +#include "arrow/compute/exec/accumulation_queue.h" +#include "arrow/compute/exec/exec_plan.h" +#include "arrow/compute/exec/spilling_util.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/compute/light_array.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/random.h" + +namespace arrow { +namespace compute { +namespace internal { + +enum class SpillingTestParam { + None, + Values, + ValuesAndHashes, +}; + +void TestSpillingAccumulationQueue(SpillingTestParam param) { + QueryContext ctx; + SpillingAccumulationQueue queue; + + Future<> fut = util::AsyncTaskScheduler::Make([&](util::AsyncTaskScheduler* sched) { + RETURN_NOT_OK(ctx.Init(ctx.max_concurrency(), sched)); + RETURN_NOT_OK(queue.Init(&ctx)); + ctx.scheduler()->RegisterEnd(); + RETURN_NOT_OK(ctx.scheduler()->StartScheduling( + /*thread_index=*/0, + [&ctx](std::function fn) { + return ctx.ScheduleTask(std::move(fn)); + }, + /*concurrent_tasks=*/static_cast(ctx.max_concurrency()), false)); + + size_t num_batches = 4 * SpillingAccumulationQueue::kNumPartitions; + size_t rows_per_batch = ExecBatchBuilder::num_rows_max(); + std::vector batches; + + size_t spill_every_n_batches = 0; + switch (param) { + case SpillingTestParam::None: + spill_every_n_batches = num_batches; + break; + case SpillingTestParam::Values: + spill_every_n_batches = 32; + break; + case SpillingTestParam::ValuesAndHashes: + spill_every_n_batches = 3; + break; + default: + DCHECK(false); + } + + int num_vals_spilled = 0; + int num_hashes_spilled = 0; + for (size_t i = 0; i < num_batches; i++) { + if (i % spill_every_n_batches == 0) { + ARROW_ASSIGN_OR_RAISE(bool advanced, queue.AdvanceSpillCursor()); + if (num_vals_spilled < SpillingAccumulationQueue::kNumPartitions) { + ARROW_CHECK(advanced); + } + num_vals_spilled++; + + if (!advanced) { + ARROW_ASSIGN_OR_RAISE(bool advanced_hash, queue.AdvanceHashCursor()); + if (num_hashes_spilled < SpillingAccumulationQueue::kNumPartitions) { + ARROW_CHECK(advanced_hash); + } + num_hashes_spilled++; + } + } + + ARROW_ASSIGN_OR_RAISE(std::unique_ptr vals_buf, + AllocateBuffer(sizeof(uint64_t) * rows_per_batch)); + ARROW_ASSIGN_OR_RAISE(std::unique_ptr hashes_buf, + AllocateBuffer(sizeof(uint64_t) * rows_per_batch)); + + uint64_t* vals = reinterpret_cast(vals_buf->mutable_data()); + uint64_t* hashes = reinterpret_cast(hashes_buf->mutable_data()); + for (size_t j = 0; j < rows_per_batch; j++) { + vals[j] = j; + hashes[j] = (j % SpillingAccumulationQueue::kNumPartitions); + } + + ArrayData vals_data(uint64(), rows_per_batch, {nullptr, std::move(vals_buf)}); + ArrayData hashes_data(uint64(), rows_per_batch, {nullptr, std::move(hashes_buf)}); + ExecBatch batch({std::move(vals_data), std::move(hashes_data)}, rows_per_batch); + ARROW_CHECK_OK(queue.InsertBatch(/*thread_index=*/0, std::move(batch))); + } + + for (size_t ipart = 0; ipart < SpillingAccumulationQueue::kNumPartitions; ipart++) { + Future<> fut = Future<>::Make(); + AccumulationQueue ac; + ac.Resize(queue.batch_count(ipart)); + ARROW_CHECK_OK(queue.GetPartition( + /*thread_index=*/0, + /*partition=*/ipart, + [&](size_t, size_t batch_idx, ExecBatch batch) { + ac[batch_idx] = std::move(batch); + return Status::OK(); + }, + [&](size_t) { + fut.MarkFinished(); + return Status::OK(); + })); + ARROW_CHECK_OK(fut.status()); + ARROW_CHECK_EQ(ac.batch_count(), + num_batches / SpillingAccumulationQueue::kNumPartitions); + for (size_t ibatch = 0; ibatch < ac.batch_count(); ibatch++) { + ARROW_CHECK_EQ(ac[ibatch].num_values(), 1); + ARROW_CHECK_EQ(ac[ibatch].length, ExecBatchBuilder::num_rows_max()); + const uint64_t* vals = + reinterpret_cast(ac[ibatch][0].array()->buffers[1]->data()); + for (int64_t irow = 0; irow < ac[ibatch].length; irow++) + ARROW_CHECK_EQ(vals[irow] % SpillingAccumulationQueue::kNumPartitions, ipart); + } + } + return Status::OK(); + }); + ASSERT_FINISHES_OK(fut); +} + +TEST(Spilling, SpillingAccumulationQueue_NoSpill) { + TestSpillingAccumulationQueue(SpillingTestParam::None); +} + +TEST(Spilling, SpillingAccumulationQueue_SpillValues) { + TestSpillingAccumulationQueue(SpillingTestParam::Values); +} + +TEST(Spilling, SpillingAccumulationQueue_SpillValuesAndHashes) { + TestSpillingAccumulationQueue(SpillingTestParam::ValuesAndHashes); +} + +TEST(Spilling, ReadWriteBasicBatches) { + QueryContext ctx; + SpillFile file; + BatchesWithSchema batches = MakeBasicBatches(); + std::vector read_batches(batches.batches.size()); + + Future<> fut = util::AsyncTaskScheduler::Make([&](util::AsyncTaskScheduler* sched) { + ARROW_CHECK_OK(ctx.Init(ctx.max_concurrency(), sched)); + for (ExecBatch& b : batches.batches) { + ExecBatchBuilder builder; + std::vector row_ids(b.length); + std::iota(row_ids.begin(), row_ids.end(), 0); + ARROW_CHECK_OK(builder.AppendSelected(ctx.memory_pool(), b, + static_cast(b.length), row_ids.data(), + b.num_values())); + ARROW_CHECK_OK(file.SpillBatch(&ctx, builder.Flush())); + } + + ARROW_CHECK_OK(file.ReadBackBatches( + &ctx, + [&read_batches](size_t, size_t batch_idx, ExecBatch batch) { + read_batches[batch_idx] = std::move(batch); + return Status::OK(); + }, + [&](size_t) { + AssertExecBatchesEqualIgnoringOrder(batches.schema, batches.batches, + read_batches); + return Status::OK(); + })); + return Status::OK(); + }); + ASSERT_FINISHES_OK(fut); +} + +TEST(Spilling, HashJoin) { + constexpr int kNumTests = 1; + Random64Bit rng(42); + + // 50% chance to get a string column, 50% chance to get an integer + std::vector> possible_types = { + int8(), int16(), int32(), int64(), utf8(), utf8(), utf8(), utf8(), + }; + + std::unordered_map key_metadata; + key_metadata["min"] = "0"; + key_metadata["max"] = "1000"; + + for (int itest = 0; itest < kNumTests; itest++) { + int left_cols = rng.from_range(1, 4); + std::vector> left_fields = { + field("l0", int32(), key_value_metadata(key_metadata))}; + for (int i = 1; i < left_cols; i++) { + std::string name = std::string("l") + std::to_string(i); + size_t type = rng.from_range(static_cast(0), possible_types.size() - 1); + left_fields.push_back(field(std::move(name), possible_types[type])); + } + + int right_cols = rng.from_range(1, 4); + std::vector> right_fields = { + field("r0", int32(), key_value_metadata(key_metadata))}; + for (int i = 1; i < right_cols; i++) { + std::string name = std::string("r") + std::to_string(i); + size_t type = rng.from_range(static_cast(0), possible_types.size() - 1); + right_fields.push_back(field(std::move(name), possible_types[type])); + } + + std::vector key_cmp = {JoinKeyCmp::EQ}; + std::vector left_keys = {FieldRef{0}}; + std::vector right_keys = {FieldRef{0}}; + + std::shared_ptr l_schema = schema(std::move(left_fields)); + std::shared_ptr r_schema = schema(std::move(right_fields)); + + BatchesWithSchema l_batches = MakeRandomBatches( + l_schema, 10, 1024, kDefaultBufferAlignment, default_memory_pool()); + BatchesWithSchema r_batches = MakeRandomBatches( + r_schema, 10, 1024, kDefaultBufferAlignment, default_memory_pool()); + + std::vector reference; + for (bool spilling : {false, true}) { + QueryOptions options; + if (spilling) options.max_memory_bytes = 1024; + ExecContext ctx(default_memory_pool(), ::arrow::internal::GetCpuThreadPool()); + Declaration l_source{ + "source", SourceNodeOptions{l_batches.schema, + l_batches.gen(/*parallel=*/true, /*slow=*/false)}}; + Declaration r_source{ + "source", SourceNodeOptions{r_batches.schema, + r_batches.gen(/*parallel=*/true, /*slow=*/false)}}; + + HashJoinNodeOptions join_options; + join_options.left_keys = left_keys; + join_options.right_keys = right_keys; + join_options.output_all = true; + join_options.key_cmp = key_cmp; + Declaration join{"hashjoin", {l_source, r_source}, join_options}; + + ASSERT_FINISHES_OK_AND_ASSIGN(auto result, + DeclarationToExecBatchesAsync(join, ctx, options)); + if (!spilling) + reference = std::move(result.batches); + else + AssertExecBatchesEqualIgnoringOrder(result.schema, reference, result.batches); + } + } +} +} // namespace internal +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/spilling_util.cc b/cpp/src/arrow/compute/exec/spilling_util.cc new file mode 100644 index 0000000000000..1d539ad8fb04d --- /dev/null +++ b/cpp/src/arrow/compute/exec/spilling_util.cc @@ -0,0 +1,438 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/spilling_util.h" +#include + +#ifdef _WIN32 +// "windows_compatibility.h" includes , which must go BEFORE +// because defines some architecture stuff that +// needs. +// clang-format off +#include "arrow/util/windows_compatibility.h" +#include "arrow/util/io_util.h" +#include +// clang-format on +#endif + +namespace arrow { +namespace compute { +struct ArrayInfo { + int64_t num_children; + int64_t length; + int64_t null_count; + std::shared_ptr type; + std::array, 3> bufs; + std::array sizes; + std::shared_ptr dictionary; +}; + +#ifdef _WIN32 + +struct SpillFile::BatchInfo { + int64_t start; // Offset of batch in file + std::vector arrays; +}; + +ARROW_EXPORT const FileHandle kInvalidHandle = INVALID_HANDLE_VALUE; + +static Result OpenTemporaryFile() { + constexpr DWORD kTempFileNameSize = MAX_PATH + 1; + wchar_t tmp_name_buf[kTempFileNameSize]; + wchar_t tmp_path_buf[kTempFileNameSize]; + + DWORD ret; + ret = GetTempPathW(kTempFileNameSize, tmp_path_buf); + if (ret > kTempFileNameSize || ret == 0) + return arrow::internal::IOErrorFromWinError(GetLastError(), + "Failed to get temporary file path"); + if (GetTempFileNameW(tmp_path_buf, L"ARROW_TMP", 0, tmp_name_buf) == 0) + return arrow::internal::IOErrorFromWinError(GetLastError(), + "Failed to get temporary file name"); + + HANDLE file_handle = CreateFileW( + tmp_name_buf, GENERIC_READ | GENERIC_WRITE | FILE_APPEND_DATA, 0, NULL, + CREATE_ALWAYS, + FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED | FILE_FLAG_DELETE_ON_CLOSE, NULL); + if (file_handle == INVALID_HANDLE_VALUE) + return arrow::internal::IOErrorFromWinError(GetLastError(), + "Failed to create temp file"); + return file_handle; +} + +static Status CloseTemporaryFile(FileHandle* handle) { + if (!CloseHandle(*handle)) return Status::IOError("Failed to close temp file"); + *handle = kInvalidHandle; + return Status::OK(); +} + +static Status WriteBatch_PlatformSpecific(FileHandle handle, SpillFile::BatchInfo& info) { + OVERLAPPED overlapped; + int64_t offset = info.start; + for (ArrayInfo& arr : info.arrays) { + for (size_t i = 0; i < arr.bufs.size(); i++) { + if (arr.bufs[i] != 0) { + overlapped.Offset = static_cast(offset & ~static_cast(0)); + overlapped.OffsetHigh = + static_cast((offset >> 32) & ~static_cast(0)); + if (!WriteFile(handle, arr.bufs[i]->data(), static_cast(arr.sizes[i]), + NULL, &overlapped)) + return arrow::internal::IOErrorFromWinError( + GetLastError(), "Failed to write to temporary file"); + + offset += arr.sizes[i]; + arr.bufs[i].reset(); + } + } + } + return Status::OK(); +} + +static Result> ReconstructArray(const FileHandle handle, + size_t& idx, + std::vector& arrs, + size_t& current_offset) { + ArrayInfo& arr = arrs[idx++]; + std::shared_ptr data = std::make_shared(); + data->type = std::move(arr.type); + data->length = arr.length; + data->null_count = arr.null_count; + data->dictionary = std::move(arr.dictionary); + + data->buffers.resize(3); + for (int i = 0; i < 3; i++) { + if (arr.sizes[i]) { + data->buffers[i] = std::move(arr.bufs[i]); + + OVERLAPPED overlapped; + overlapped.Offset = static_cast(current_offset & static_cast(~0)); +#ifdef _WIN64 + overlapped.OffsetHigh = + static_cast((current_offset >> 32) & static_cast(~0)); +#else + overlapped.OffsetHigh = static_cast(0); +#endif + if (!ReadFile(handle, static_cast(data->buffers[i]->mutable_data()), + static_cast(arr.sizes[i]), NULL, &overlapped)) + return Status::IOError("Failed to read back spilled data!"); + current_offset += arr.sizes[i]; + } + } + data->child_data.resize(arr.num_children); + for (int i = 0; i < arr.num_children; i++) { + ARROW_ASSIGN_OR_RAISE(data->child_data[i], + ReconstructArray(handle, idx, arrs, current_offset)); + } + + return data; +} + +static Result ReadBatch_PlatformSpecific(FileHandle handle, + SpillFile::BatchInfo& info) { + std::vector batch; + size_t offset = info.start; + // ReconstructArray increments i + for (size_t i = 0; i < info.arrays.size();) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr ad, + ReconstructArray(handle, i, info.arrays, offset)); + batch.emplace_back(std::move(ad)); + } + return ExecBatch::Make(std::move(batch)); +} + +#else +#include +#include +#include +#include +#include + +struct SpillFile::BatchInfo { + int64_t start; + std::vector arrays; + std::vector ios; +}; + +Result OpenTemporaryFile() { + static std::once_flag generate_tmp_file_name_flag; + + constexpr int kFileNameSize = 1024; + static char name_template[kFileNameSize]; + char name[kFileNameSize]; + + char* name_template_ptr = name_template; + std::call_once(generate_tmp_file_name_flag, [name_template_ptr]() noexcept { + const char* selectors[] = {"TMPDIR", "TMP", "TEMP", "TEMPDIR"}; + constexpr size_t kNumSelectors = sizeof(selectors) / sizeof(selectors[0]); +#ifdef __ANDROID__ + const char* backup = "/data/local/tmp/"; +#else + const char *backup = "/var/tmp/"; +#endif + const char* tmp_dir = backup; + for (size_t i = 0; i < kNumSelectors; i++) { + const char* env = getenv(selectors[i]); + if (env) { + tmp_dir = env; + break; + } + } + size_t tmp_dir_length = std::strlen(tmp_dir); + + const char* tmp_name_template = "/ARROW_TMP_XXXXXX"; + size_t tmp_name_length = std::strlen(tmp_name_template); + + if ((tmp_dir_length + tmp_name_length) >= kFileNameSize) { + tmp_dir = backup; + tmp_dir_length = std::strlen(backup); + } + + std::strncpy(name_template_ptr, tmp_dir, kFileNameSize); + std::strncpy(name_template_ptr + tmp_dir_length, tmp_name_template, + kFileNameSize - tmp_dir_length); + }); + + std::strncpy(name, name_template, kFileNameSize); + +#ifdef __APPLE__ + int fd = mkstemp(name); + if (fd == kInvalidHandle) return Status::IOError(strerror(errno)); + if (fcntl(fd, F_NOCACHE, 1) == -1) return Status::IOError(strerror(errno)); +#else + // If we failed, it's possible the temp directory didn't like O_DIRECT, + // so we try again without O_DIRECT, and if it still doesn't work then + // give up. + int fd = mkostemp(name, O_DIRECT); + if (fd == kInvalidHandle) { + std::strncpy(name, name_template, kFileNameSize); + fd = mkstemp(name); + if (fd == kInvalidHandle) return Status::IOError(strerror(errno)); + } +#endif + + if (unlink(name) != 0) return Status::IOError(strerror(errno)); + return fd; +} + +static Status CloseTemporaryFile(FileHandle* handle) { + if (close(*handle) == -1) return Status::IOError(strerror(errno)); + *handle = kInvalidHandle; + return Status::OK(); +} + +static Status WriteBatch_PlatformSpecific(FileHandle handle, SpillFile::BatchInfo& info) { + if (pwritev(handle, info.ios.data(), static_cast(info.ios.size()), info.start) == + -1) + return Status::IOError("Failed to spill!"); + + // Release all references to the buffers, freeing them. + for (ArrayInfo& arr : info.arrays) + for (int i = 0; i < 3; i++) + if (arr.bufs[i]) arr.bufs[i].reset(); + return Status::OK(); +} + +static Result> ReconstructArray(size_t& idx, + SpillFile::BatchInfo& info) { + ArrayInfo& arr = info.arrays[idx++]; + std::shared_ptr data = std::make_shared(); + data->type = std::move(arr.type); + data->length = arr.length; + data->null_count = arr.null_count; + data->dictionary = std::move(arr.dictionary); + data->buffers.resize(3); + for (int i = 0; i < 3; i++) + if (arr.sizes[i]) data->buffers[i] = std::move(arr.bufs[i]); + + data->child_data.resize(arr.num_children); + for (int i = 0; i < arr.num_children; i++) { + ARROW_ASSIGN_OR_RAISE(data->child_data[i], ReconstructArray(idx, info)); + } + return data; +} + +static Result ReadBatch_PlatformSpecific(FileHandle handle, + SpillFile::BatchInfo& info) { + std::vector batch; + // ReconstructArray increments i + for (size_t i = 0; i < info.arrays.size();) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr ad, ReconstructArray(i, info)); + batch.emplace_back(std::move(ad)); + } + + if (preadv(handle, info.ios.data(), static_cast(info.ios.size()), info.start) == + -1) + return Status::IOError(std::string("Failed to read back spilled data: ") + + std::strerror(errno)); + + return ExecBatch::Make(std::move(batch)); +} +#endif + +SpillFile::~SpillFile() { + Status st = Cleanup(); + if (!st.ok()) st.Warn(); +} + +static Status CollectArrayInfo(SpillFile::BatchInfo& batch_info, int64_t& total_size, + ArrayData* array) { + if (array->offset != 0) + return Status::Invalid("We don't support spilling arrays with offsets"); + + batch_info.arrays.push_back({}); + ArrayInfo& array_info = batch_info.arrays.back(); + array_info.type = std::move(array->type); + array_info.length = array->length; + array_info.null_count = array->null_count.load(std::memory_order_relaxed); + + ARROW_DCHECK(array->buffers.size() <= array_info.bufs.size()); + array_info.num_children = array->child_data.size(); + for (size_t i = 0; i < array->buffers.size(); i++) { + if (array->buffers[i]) { + array_info.sizes[i] = array->buffers[i]->size(); + total_size += array_info.sizes[i]; + uintptr_t addr = array->buffers[i]->address(); + if ((addr % SpillFile::kAlignment) != 0) + return Status::Invalid("Buffer not aligned to 512 bytes!"); + array_info.bufs[i] = std::move(array->buffers[i]); +#ifndef _WIN32 + struct iovec io; + io.iov_base = static_cast(array_info.bufs[i]->mutable_data()); + io.iov_len = static_cast(array_info.sizes[i]); + batch_info.ios.push_back(io); +#endif + } else { + array_info.sizes[i] = 0; + } + } + + array_info.dictionary = std::move(array->dictionary); + for (std::shared_ptr& child : array->child_data) + RETURN_NOT_OK(CollectArrayInfo(batch_info, total_size, child.get())); + + // Cleanup the ArrayData + array->type.reset(); + array->length = 0; + return Status::OK(); +} + +static Status AllocateBuffersForBatch(SpillFile::BatchInfo& batch_info, + MemoryPool* pool) { +#ifndef _WIN32 + size_t iiovec = 0; +#endif + for (ArrayInfo& arr : batch_info.arrays) { + for (size_t ibuf = 0; ibuf < 3; ibuf++) { + if (arr.sizes[ibuf]) { + ARROW_ASSIGN_OR_RAISE( + arr.bufs[ibuf], AllocateBuffer(arr.sizes[ibuf], SpillFile::kAlignment, pool)); +#ifndef _WIN32 + batch_info.ios[iiovec].iov_base = + static_cast(arr.bufs[ibuf]->mutable_data()); + batch_info.ios[iiovec].iov_len = static_cast(arr.sizes[ibuf]); + iiovec++; +#endif + } + } + } + return Status::OK(); +} + +Status SpillFile::SpillBatch(QueryContext* ctx, ExecBatch batch) { + if (handle_ == kInvalidHandle) { + ARROW_ASSIGN_OR_RAISE(handle_, OpenTemporaryFile()); + } + int64_t total_size = 0; + batches_.emplace_back(new BatchInfo); + BatchInfo* info = batches_.back(); + for (int i = 0; i < batch.num_values(); i++) { + if (batch[i].is_scalar()) return Status::Invalid("Cannot spill a Scalar"); + RETURN_NOT_OK(CollectArrayInfo(*info, total_size, batch[i].mutable_array())); + } + info->start = size_; + size_ += total_size; + + FileHandle handle = handle_; + RETURN_NOT_OK(ctx->ScheduleIOTask([this, handle, info, ctx, total_size]() { + auto mark = ctx->ReportTempFileIO(total_size); + RETURN_NOT_OK(WriteBatch_PlatformSpecific(handle, *info)); + if (++batches_written_ == batches_.size() && read_requested_.load()) { + bool expected = false; + if (read_started_.compare_exchange_strong(expected, true)) + return ctx->ScheduleTask([this, ctx]() { return ScheduleReadbackTasks(ctx); }); + } + return Status::OK(); + })); + return Status::OK(); +} + +Status SpillFile::ReadBackBatches(QueryContext* ctx, + std::function fn, + std::function on_finished) { + readback_fn_ = std::move(fn); + on_readback_finished_ = std::move(on_finished); + + read_requested_.store(true); + if (batches_written_ == batches_.size()) { + bool expected = false; + if (read_started_.compare_exchange_strong(expected, true)) + return ScheduleReadbackTasks(ctx); + } + return Status::OK(); +} + +Status SpillFile::Cleanup() { + if (handle_ != kInvalidHandle) RETURN_NOT_OK(CloseTemporaryFile(&handle_)); + for (BatchInfo* b : batches_) delete b; + + batches_.clear(); + return Status::OK(); +} + +Status SpillFile::PreallocateBatches(MemoryPool* memory_pool) { + preallocated_ = true; + for (size_t i = 0; i < batches_.size(); i++) { + RETURN_NOT_OK(AllocateBuffersForBatch(*batches_[i], memory_pool)); + } + return Status::OK(); +} + +Status SpillFile::OnBatchRead(size_t thread_index, size_t batch_index, ExecBatch batch) { + RETURN_NOT_OK(readback_fn_(thread_index, batch_index, std::move(batch))); + if (++batches_read_ == batches_.size()) return on_readback_finished_(thread_index); + return Status::OK(); +} + +Status SpillFile::ScheduleReadbackTasks(QueryContext* ctx) { + if (batches_.empty()) return on_readback_finished_(ctx->GetThreadIndex()); + + for (size_t i = 0; i < batches_.size(); i++) { + BatchInfo* info = batches_[i]; + if (!preallocated_) RETURN_NOT_OK(AllocateBuffersForBatch(*info, ctx->memory_pool())); + RETURN_NOT_OK(ctx->ScheduleIOTask([this, i, info, ctx]() { + ARROW_ASSIGN_OR_RAISE(ExecBatch batch, ReadBatch_PlatformSpecific(handle_, *info)); + RETURN_NOT_OK(ctx->ScheduleTask( + [this, i, batch = std::move(batch)](size_t thread_index) mutable { + return OnBatchRead(thread_index, i, std::move(batch)); + })); + return Status::OK(); + })); + } + return Status::OK(); +} +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/spilling_util.h b/cpp/src/arrow/compute/exec/spilling_util.h new file mode 100644 index 0000000000000..6ba8f93d8d302 --- /dev/null +++ b/cpp/src/arrow/compute/exec/spilling_util.h @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include "arrow/compute/exec/query_context.h" + +namespace arrow { +namespace compute { +#ifdef _WIN32 +using FileHandle = void*; +ARROW_EXPORT extern const FileHandle kInvalidHandle; +#else +using FileHandle = int; +constexpr FileHandle kInvalidHandle = -1; +#endif + +// A temporary file meant for spilling data to disk. It can spill a batch to +// disk and read it back into memory. This class is designed to fully utilize +// disk bandwidth and for removing batches from memory as quickly as possible. +// Note that dictionaries are not spilled! They are expected to be very small, +// and so retaining them in memory is considered to be fine. +// One other note: Access to this class is expected to be exclusive from the +// perspective of the CPU thread pool. There may be concurrent accesses from +// the IO thread pool by tasks scheduled by this class itself (in other words, +// this class is not thread-safe from the user's point of view). +class ARROW_EXPORT SpillFile { + public: + static constexpr size_t kAlignment = 512; + + ~SpillFile(); + // To spill a batch the following must be true: + // - Row offset for each column must be 0. + // - Column buffers must be aligned to 512 bytes + // - No column can be a scalar + // These assumptions aren't as inconvenient as it seems because + // typically batches will be partitioned before being spilled, + // meaning the batches will come from ExecBatchBuilder, which + // ensures these assumptions hold. + // It is a bug to spill a batch after ReadBackBatches. + Status SpillBatch(QueryContext* ctx, ExecBatch batch); + + // Reads back all of the batches from the disk, invoking `fn` + // on each batch, and invoking `on_finished` when `fn` has finished + // on all batches. Both will be run on the CPU thread pool. + // Do NOT insert any batches after invoking this function. + Status ReadBackBatches(QueryContext* ctx, + std::function fn, + std::function on_finished); + Status Cleanup(); + size_t num_batches() const { return batches_.size(); } + size_t batches_written() const { return batches_written_.load(); } + + // Used for benchmarking only! + Status PreallocateBatches(MemoryPool* memory_pool); + + struct BatchInfo; + + private: + Status ScheduleReadbackTasks(QueryContext* ctx); + Status OnBatchRead(size_t thread_index, size_t batch_index, ExecBatch batch); + + bool preallocated_ = false; + + FileHandle handle_ = kInvalidHandle; + size_t size_ = 0; + + std::vector batches_; + + std::atomic batches_written_{0}; + std::atomic read_requested_{false}; + std::atomic read_started_{false}; + std::atomic batches_read_{0}; + + std::function + readback_fn_; // thread_index, batch_index, batch + std::function on_readback_finished_; +}; +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/swiss_join.cc b/cpp/src/arrow/compute/exec/swiss_join.cc index fee3c5f79db5b..260c074964eaa 100644 --- a/cpp/src/arrow/compute/exec/swiss_join.cc +++ b/cpp/src/arrow/compute/exec/swiss_join.cc @@ -2025,13 +2025,10 @@ class SwissJoin : public HashJoinImpl { Status Init(QueryContext* ctx, JoinType join_type, size_t num_threads, const HashJoinProjectionMaps* proj_map_left, const HashJoinProjectionMaps* proj_map_right, - std::vector key_cmp, Expression filter, - RegisterTaskGroupCallback register_task_group_callback, - StartTaskGroupCallback start_task_group_callback, - OutputBatchCallback output_batch_callback, - FinishedCallback finished_callback) override { + std::vector* key_cmp, Expression* filter, + CallbackRecord callback_record) override { START_COMPUTE_SPAN(span_, "SwissJoinImpl", - {{"detail", filter.ToString()}, + {{"detail", filter->ToString()}, {"join.kind", arrow::compute::ToString(join_type)}, {"join.threads", static_cast(num_threads)}}); @@ -2041,18 +2038,12 @@ class SwissJoin : public HashJoinImpl { pool_ = ctx->memory_pool(); join_type_ = join_type; - key_cmp_.resize(key_cmp.size()); - for (size_t i = 0; i < key_cmp.size(); ++i) { - key_cmp_[i] = key_cmp[i]; - } + key_cmp_ = key_cmp; schema_[0] = proj_map_left; schema_[1] = proj_map_right; - register_task_group_callback_ = std::move(register_task_group_callback); - start_task_group_callback_ = std::move(start_task_group_callback); - output_batch_callback_ = std::move(output_batch_callback); - finished_callback_ = std::move(finished_callback); + callback_record_ = std::move(callback_record); hash_table_ready_.store(false); cancelled_.store(false); @@ -2077,7 +2068,8 @@ class SwissJoin : public HashJoinImpl { } probe_processor_.Init(proj_map_left->num_cols(HashJoinProjection::KEY), join_type_, - &hash_table_, materialize, &key_cmp_, output_batch_callback_); + &hash_table_, materialize, key_cmp_, + callback_record_.output_batch); InitTaskGroups(); @@ -2085,17 +2077,17 @@ class SwissJoin : public HashJoinImpl { } void InitTaskGroups() { - task_group_build_ = register_task_group_callback_( + task_group_build_ = callback_record_.register_task_group( [this](size_t thread_index, int64_t task_id) -> Status { return BuildTask(thread_index, task_id); }, [this](size_t thread_index) -> Status { return BuildFinished(thread_index); }); - task_group_merge_ = register_task_group_callback_( + task_group_merge_ = callback_record_.register_task_group( [this](size_t thread_index, int64_t task_id) -> Status { return MergeTask(thread_index, task_id); }, [this](size_t thread_index) -> Status { return MergeFinished(thread_index); }); - task_group_scan_ = register_task_group_callback_( + task_group_scan_ = callback_record_.register_task_group( [this](size_t thread_index, int64_t task_id) -> Status { return ScanTask(thread_index, task_id); }, @@ -2176,14 +2168,15 @@ class SwissJoin : public HashJoinImpl { payload_types.push_back(metadata); } RETURN_NOT_OK(CancelIfNotOK(hash_table_build_.Init( - &hash_table_, num_threads_, build_side_batches_.row_count(), + &hash_table_, num_threads_, + static_cast(build_side_batches_.CalculateRowCount()), reject_duplicate_keys, no_payload, key_types, payload_types, pool_, hardware_flags_))); // Process all input batches // - return CancelIfNotOK( - start_task_group_callback_(task_group_build_, build_side_batches_.batch_count())); + return CancelIfNotOK(callback_record_.start_task_group( + task_group_build_, build_side_batches_.batch_count())); } Status BuildTask(size_t thread_id, int64_t batch_id) { @@ -2247,8 +2240,8 @@ class SwissJoin : public HashJoinImpl { // table. // RETURN_NOT_OK(CancelIfNotOK(hash_table_build_.PreparePrtnMerge())); - return CancelIfNotOK( - start_task_group_callback_(task_group_merge_, hash_table_build_.num_prtns())); + return CancelIfNotOK(callback_record_.start_task_group( + task_group_merge_, hash_table_build_.num_prtns())); } Status MergeTask(size_t /*thread_id*/, int64_t prtn_id) { @@ -2295,7 +2288,8 @@ class SwissJoin : public HashJoinImpl { hash_table_.MergeHasMatch(); int64_t num_tasks = bit_util::CeilDiv(hash_table_.num_rows(), kNumRowsPerScanTask); - return CancelIfNotOK(start_task_group_callback_(task_group_scan_, num_tasks)); + return CancelIfNotOK( + callback_record_.start_task_group(task_group_scan_, num_tasks)); } else { return CancelIfNotOK(OnScanHashTableFinished()); } @@ -2368,7 +2362,8 @@ class SwissJoin : public HashJoinImpl { Status status = local_states_[thread_id].materialize.AppendBuildOnly( num_output_rows, key_ids_buf.mutable_data(), payload_ids_buf.mutable_data(), [&](ExecBatch batch) { - output_batch_callback_(static_cast(thread_id), std::move(batch)); + callback_record_.output_batch(static_cast(thread_id), + std::move(batch)); }); RETURN_NOT_OK(CancelIfNotOK(status)); if (!status.ok()) { @@ -2406,9 +2401,7 @@ class SwissJoin : public HashJoinImpl { num_produced_batches += materialize.num_produced_batches(); } - finished_callback_(num_produced_batches); - - return Status::OK(); + return callback_record_.finished(num_produced_batches); } Result KeyPayloadFromInput(int side, ExecBatch* input) { @@ -2477,7 +2470,7 @@ class SwissJoin : public HashJoinImpl { MemoryPool* pool_; int num_threads_; JoinType join_type_; - std::vector key_cmp_; + std::vector* key_cmp_; const HashJoinProjectionMaps* schema_[2]; // Task scheduling @@ -2486,11 +2479,8 @@ class SwissJoin : public HashJoinImpl { int task_group_scan_; // Callbacks - RegisterTaskGroupCallback register_task_group_callback_; - StartTaskGroupCallback start_task_group_callback_; - OutputBatchCallback output_batch_callback_; + CallbackRecord callback_record_; BuildFinishedCallback build_finished_callback_; - FinishedCallback finished_callback_; struct ThreadLocalState { JoinResultMaterialize materialize; diff --git a/cpp/src/arrow/compute/light_array.cc b/cpp/src/arrow/compute/light_array.cc index 37d4421fd79f7..e75204e32a162 100644 --- a/cpp/src/arrow/compute/light_array.cc +++ b/cpp/src/arrow/compute/light_array.cc @@ -200,7 +200,7 @@ Status ColumnArraysFromExecBatch(const ExecBatch& batch, } void ResizableArrayData::Init(const std::shared_ptr& data_type, - MemoryPool* pool, int log_num_rows_min) { + MemoryPool* pool, int log_num_rows_min, int64_t alignment) { #ifndef NDEBUG if (num_rows_allocated_ > 0) { ARROW_DCHECK(data_type_ != NULLPTR); @@ -213,6 +213,7 @@ void ResizableArrayData::Init(const std::shared_ptr& data_type, #endif Clear(/*release_buffers=*/false); log_num_rows_min_ = log_num_rows_min; + alignment_ = alignment; data_type_ = data_type; pool_ = pool; } @@ -249,7 +250,8 @@ Status ResizableArrayData::ResizeFixedLengthBuffers(int num_rows_new) { ARROW_ASSIGN_OR_RAISE( buffers_[kValidityBuffer], AllocateResizableBuffer( - bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, pool_)); + bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, alignment_, + pool_)); memset(mutable_data(kValidityBuffer), 0, bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes); if (column_metadata.is_fixed_length) { @@ -258,7 +260,7 @@ Status ResizableArrayData::ResizeFixedLengthBuffers(int num_rows_new) { buffers_[kFixedLengthBuffer], AllocateResizableBuffer( bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, - pool_)); + alignment_, pool_)); memset(mutable_data(kFixedLengthBuffer), 0, bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes); } else { @@ -266,18 +268,19 @@ Status ResizableArrayData::ResizeFixedLengthBuffers(int num_rows_new) { buffers_[kFixedLengthBuffer], AllocateResizableBuffer( num_rows_allocated_new * column_metadata.fixed_length + kNumPaddingBytes, - pool_)); + alignment_, pool_)); } } else { ARROW_ASSIGN_OR_RAISE( buffers_[kFixedLengthBuffer], AllocateResizableBuffer( - (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes, pool_)); + (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes, + alignment_, pool_)); } ARROW_ASSIGN_OR_RAISE( buffers_[kVariableLengthBuffer], - AllocateResizableBuffer(sizeof(uint64_t) + kNumPaddingBytes, pool_)); + AllocateResizableBuffer(sizeof(uint64_t) + kNumPaddingBytes, alignment_, pool_)); var_len_buf_size_ = sizeof(uint64_t); } else { @@ -491,7 +494,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source ARROW_DCHECK(num_rows_before >= 0); int num_rows_after = num_rows_before + num_rows_to_append; if (target->num_rows() == 0) { - target->Init(source->type, pool, kLogNumRows); + target->Init(source->type, pool, kLogNumRows, kAlignment); } RETURN_NOT_OK(target->ResizeFixedLengthBuffers(num_rows_after)); @@ -638,7 +641,7 @@ Status ExecBatchBuilder::AppendNulls(const std::shared_ptr& type, int num_rows_before = target.num_rows(); int num_rows_after = num_rows_before + num_rows_to_append; if (target.num_rows() == 0) { - target.Init(type, pool, kLogNumRows); + target.Init(type, pool, kLogNumRows, kAlignment); } RETURN_NOT_OK(target.ResizeFixedLengthBuffers(num_rows_after)); @@ -699,7 +702,7 @@ Status ExecBatchBuilder::AppendSelected(MemoryPool* pool, const ExecBatch& batch const Datum& data = batch.values[col_ids ? col_ids[i] : i]; ARROW_DCHECK(data.is_array()); const std::shared_ptr& array_data = data.array(); - values_[i].Init(array_data->type, pool, kLogNumRows); + values_[i].Init(array_data->type, pool, kLogNumRows, kAlignment); } } @@ -730,7 +733,7 @@ Status ExecBatchBuilder::AppendNulls(MemoryPool* pool, if (values_.empty()) { values_.resize(types.size()); for (size_t i = 0; i < types.size(); ++i) { - values_[i].Init(types[i], pool, kLogNumRows); + values_[i].Init(types[i], pool, kLogNumRows, kAlignment); } } diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h index 389b63cca4143..cfd5182b15f0c 100644 --- a/cpp/src/arrow/compute/light_array.h +++ b/cpp/src/arrow/compute/light_array.h @@ -270,7 +270,7 @@ class ARROW_EXPORT ResizableArrayData { /// \param log_num_rows_min All resize operations will allocate at least enough /// space for (1 << log_num_rows_min) rows void Init(const std::shared_ptr& data_type, MemoryPool* pool, - int log_num_rows_min); + int log_num_rows_min, int64_t alignment = kDefaultBufferAlignment); /// \brief Resets the array back to an empty state /// \param release_buffers If true then allocated memory is released and the @@ -325,6 +325,7 @@ class ARROW_EXPORT ResizableArrayData { private: static constexpr int64_t kNumPaddingBytes = 64; int log_num_rows_min_; + int64_t alignment_; std::shared_ptr data_type_; MemoryPool* pool_; int num_rows_; @@ -337,6 +338,7 @@ class ARROW_EXPORT ResizableArrayData { /// \brief A builder to concatenate batches of data into a larger batch /// /// Will only store num_rows_max() rows +/// All buffers allocated by ExecBatchBuilder will have 512-byte alignment. class ARROW_EXPORT ExecBatchBuilder { public: /// \brief Add rows from `source` into `target` column @@ -355,6 +357,11 @@ class ARROW_EXPORT ExecBatchBuilder { ResizableArrayData& target, int num_rows_to_append, MemoryPool* pool); + /// \brief Returns a non-owning view into the `col`th column. + KeyColumnArray column(size_t col) const { return values_[col].column_array(); } + + size_t num_cols() const { return values_.size(); } + /// \brief Add selected rows from `batch` /// /// If `col_ids` is null then `num_cols` should less than batch.num_values() and @@ -377,12 +384,17 @@ class ARROW_EXPORT ExecBatchBuilder { ExecBatch Flush(); int num_rows() const { return values_.empty() ? 0 : values_[0].num_rows(); } + bool is_full() const { return num_rows() == num_rows_max(); } static int num_rows_max() { return 1 << kLogNumRows; } private: static constexpr int kLogNumRows = 15; + // Align all buffers to 512 bytes so that we can spill them with + // DirectIO. + static constexpr int64_t kAlignment = 512; + // Calculate how many rows to skip from the tail of the // sequence of selected rows, such that the total size of skipped rows is at // least equal to the size specified by the caller. diff --git a/cpp/src/arrow/datum.cc b/cpp/src/arrow/datum.cc index d0b5cf62c61be..6d06facfa1e22 100644 --- a/cpp/src/arrow/datum.cc +++ b/cpp/src/arrow/datum.cc @@ -125,6 +125,7 @@ int64_t Datum::TotalBufferSize() const { case Datum::TABLE: return util::TotalBufferSize(*std::get>(this->value)); case Datum::SCALAR: + case Datum::NONE: return 0; default: DCHECK(false); diff --git a/cpp/src/arrow/util/atomic_util.h b/cpp/src/arrow/util/atomic_util.h new file mode 100644 index 0000000000000..0aa7efb35a47e --- /dev/null +++ b/cpp/src/arrow/util/atomic_util.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +namespace arrow { +namespace util { + +// Updates `to_max` to contain the maximum of `to_max` and `val` +// and returns the new maximum. It is expected that `to_max` be treated +// as a shared maximum. +template +inline T AtomicMax(std::atomic& to_max, T val) { + static_assert(std::is_arithmetic::value, + "Maximum only makes sense on numeric types!"); + T local_to_max = to_max.load(std::memory_order_relaxed); + while (val > local_to_max && + !to_max.compare_exchange_weak(local_to_max, val, std::memory_order_release, + std::memory_order_relaxed)) { + } + return to_max.load(std::memory_order_relaxed); +} + +} // namespace util +} // namespace arrow