From fd7f00e1ed43d821dc672c4d91a7e2531620fe03 Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Thu, 22 Sep 2022 18:54:55 -0700 Subject: [PATCH] Implement spilling for Hash Join --- cpp/src/arrow/CMakeLists.txt | 2 + cpp/src/arrow/compute/exec/CMakeLists.txt | 2 + .../arrow/compute/exec/accumulation_queue.cc | 223 +++++- .../arrow/compute/exec/accumulation_queue.h | 98 ++- cpp/src/arrow/compute/exec/hash_join.cc | 47 +- cpp/src/arrow/compute/exec/hash_join.h | 34 +- .../arrow/compute/exec/hash_join_benchmark.cc | 23 +- cpp/src/arrow/compute/exec/hash_join_node.cc | 634 ++++++++++++------ cpp/src/arrow/compute/exec/partition_util.cc | 14 + cpp/src/arrow/compute/exec/partition_util.h | 23 +- cpp/src/arrow/compute/exec/schema_util.h | 116 ++-- cpp/src/arrow/compute/exec/swiss_join.cc | 48 +- cpp/src/arrow/compute/light_array.cc | 20 +- cpp/src/arrow/compute/light_array.h | 13 +- cpp/src/arrow/datum.cc | 1 + cpp/src/arrow/util/io_util.cc | 34 + cpp/src/arrow/util/io_util.h | 6 + cpp/src/arrow/util/io_util_test.cc | 13 + 18 files changed, 968 insertions(+), 383 deletions(-) diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index eb64ecd0bb5f4..e10fc675c98af 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -405,6 +405,8 @@ if(ARROW_COMPUTE) compute/exec/query_context.cc compute/exec/sink_node.cc compute/exec/source_node.cc + compute/exec/spilling_util.cc + compute/exec/spilling_join.cc compute/exec/swiss_join.cc compute/exec/task_util.cc compute/exec/tpch_node.cc diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt index 4ce73359d0f1f..4322782f47055 100644 --- a/cpp/src/arrow/compute/exec/CMakeLists.txt +++ b/cpp/src/arrow/compute/exec/CMakeLists.txt @@ -37,6 +37,7 @@ add_arrow_compute_test(asof_join_node_test "arrow-compute" SOURCES asof_join_node_test.cc) +add_arrow_compute_test(spilling_test PREFIX "arrow-compute") add_arrow_compute_test(tpch_node_test PREFIX "arrow-compute") add_arrow_compute_test(union_node_test PREFIX "arrow-compute") add_arrow_compute_test(util_test @@ -47,6 +48,7 @@ add_arrow_compute_test(util_test task_util_test.cc) add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute") +add_arrow_benchmark(spilling_benchmark PREFIX "arrow-compute") add_arrow_benchmark(filter_benchmark PREFIX diff --git a/cpp/src/arrow/compute/exec/accumulation_queue.cc b/cpp/src/arrow/compute/exec/accumulation_queue.cc index 192db52942820..a7a65ab5ad707 100644 --- a/cpp/src/arrow/compute/exec/accumulation_queue.cc +++ b/cpp/src/arrow/compute/exec/accumulation_queue.cc @@ -15,22 +15,20 @@ // specific language governing permissions and limitations // under the License. +#include "arrow/util/atomic_util.h" #include "arrow/compute/exec/accumulation_queue.h" - -#include +#include "arrow/compute/exec/key_hash.h" namespace arrow { -namespace util { -using arrow::compute::ExecBatch; +namespace compute { + AccumulationQueue::AccumulationQueue(AccumulationQueue&& that) { this->batches_ = std::move(that.batches_); - this->row_count_ = that.row_count_; that.Clear(); } AccumulationQueue& AccumulationQueue::operator=(AccumulationQueue&& that) { this->batches_ = std::move(that.batches_); - this->row_count_ = that.row_count_; that.Clear(); return *this; } @@ -39,20 +37,225 @@ void AccumulationQueue::Concatenate(AccumulationQueue&& that) { this->batches_.reserve(this->batches_.size() + that.batches_.size()); std::move(that.batches_.begin(), that.batches_.end(), std::back_inserter(this->batches_)); - this->row_count_ += that.row_count_; that.Clear(); } void AccumulationQueue::InsertBatch(ExecBatch batch) { - row_count_ += batch.length; batches_.emplace_back(std::move(batch)); } +void AccumulationQueue::SetBatch(size_t idx, ExecBatch batch) +{ + ARROW_DCHECK(idx < batches_.size()); + batches_[idx] = std::move(batch); +} + +size_t AccumulationQueue::CalculateRowCount() const +{ + size_t count = 0; + for(const ExecBatch &b : batches_) + count += static_cast(b.length); + return count; +} + void AccumulationQueue::Clear() { - row_count_ = 0; batches_.clear(); } -ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; } + Status SpillingAccumulationQueue::Init(QueryContext *ctx) + { + ctx_ = ctx; + partition_locks_.Init(ctx_->max_concurrency(), kNumPartitions); + for(size_t ipart = 0; ipart < kNumPartitions; ipart++) + { + task_group_read_[ipart] = ctx_->RegisterTaskGroup( + [this, ipart](size_t thread_index, int64_t batch_index) + { + return read_back_fn_[ipart]( + thread_index, + static_cast(batch_index), + std::move(queues_[ipart][batch_index])); + }, + [this, ipart](size_t thread_index) + { + return on_finished_[ipart](thread_index); + }); + } + return Status::OK(); + } + + Status SpillingAccumulationQueue::InsertBatch( + size_t thread_index, + ExecBatch batch) + { + Datum &hash_datum = batch.values.back(); + const uint64_t *hashes = reinterpret_cast(hash_datum.array()->buffers[1]->data()); + // `permutation` stores the indices of rows in the input batch sorted by partition. + std::vector permutation(batch.length); + uint16_t part_starts[kNumPartitions + 1]; + PartitionSort::Eval( + batch.length, + kNumPartitions, + part_starts, + /*partition_id=*/[&](int64_t i) + { + return partition_id(hashes[i]); + }, + /*output_fn=*/[&permutation](int64_t input_pos, int64_t output_pos) + { + permutation[output_pos] = static_cast(input_pos); + }); + + int unprocessed_partition_ids[kNumPartitions]; + RETURN_NOT_OK(partition_locks_.ForEachPartition( + thread_index, + unprocessed_partition_ids, + /*is_prtn_empty=*/[&](int part_id) + { + return part_starts[part_id + 1] == part_starts[part_id]; + }, + /*partition=*/[&](int locked_part_id_int) + { + size_t locked_part_id = static_cast(locked_part_id_int); + uint64_t num_total_rows_to_append = + part_starts[locked_part_id + 1] - part_starts[locked_part_id]; + + size_t offset = static_cast(part_starts[locked_part_id]); + while(num_total_rows_to_append > 0) + { + int num_rows_to_append = std::min( + static_cast(num_total_rows_to_append), + static_cast(ExecBatchBuilder::num_rows_max() - builders_[locked_part_id].num_rows())); + + RETURN_NOT_OK(builders_[locked_part_id].AppendSelected( + ctx_->memory_pool(), + batch, + num_rows_to_append, + permutation.data() + offset, + batch.num_values())); + + if(builders_[locked_part_id].is_full()) + { + ExecBatch batch = builders_[locked_part_id].Flush(); + Datum hash = std::move(batch.values.back()); + batch.values.pop_back(); + ExecBatch hash_batch({ std::move(hash) }, batch.length); + if(locked_part_id < spilling_cursor_) + RETURN_NOT_OK(files_[locked_part_id].SpillBatch( + ctx_, + std::move(batch))); + else + queues_[locked_part_id].InsertBatch(std::move(batch)); + + if(locked_part_id >= hash_cursor_) + hash_queues_[locked_part_id].InsertBatch(std::move(hash_batch)); + + } + offset += num_rows_to_append; + num_total_rows_to_append -= num_rows_to_append; + } + return Status::OK(); + })); + return Status::OK(); + } + + const uint64_t *SpillingAccumulationQueue::GetHashes(size_t partition, size_t batch_idx) + { + ARROW_DCHECK(partition >= hash_cursor_.load()); + if(batch_idx > hash_queues_[partition].batch_count()) + { + const Datum &datum = hash_queues_[partition][batch_idx].values[0]; + return reinterpret_cast( + datum.array()->buffers[1]->data()); + } + else + { + size_t hash_idx = builders_[partition].num_cols(); + KeyColumnArray kca = builders_[partition].column(hash_idx - 1); + return reinterpret_cast(kca.data(1)); + } + } + + Status SpillingAccumulationQueue::GetPartition( + size_t thread_index, + size_t partition, + std::function on_batch, + std::function on_finished) + { + bool is_in_memory = partition >= spilling_cursor_.load(); + if(builders_[partition].num_rows() > 0) + { + ExecBatch batch = builders_[partition].Flush(); + Datum hash = std::move(batch.values.back()); + batch.values.pop_back(); + if(is_in_memory) + { + ExecBatch hash_batch({ std::move(hash) }, batch.length); + hash_queues_[partition].InsertBatch(std::move(hash_batch)); + queues_[partition].InsertBatch(std::move(batch)); + } + else + { + RETURN_NOT_OK(on_batch( + thread_index, + /*batch_index=*/queues_[partition].batch_count(), + std::move(batch))); + } + } + + if(is_in_memory) + { + ARROW_DCHECK(partition >= hash_cursor_.load()); + read_back_fn_[partition] = std::move(on_batch); + on_finished_[partition] = std::move(on_finished); + return ctx_->StartTaskGroup(task_group_read_[partition], queues_[partition].batch_count()); + } + + return files_[partition].ReadBackBatches( + ctx_, + on_batch, + [this, partition, finished = std::move(on_finished)](size_t thread_index) + { + RETURN_NOT_OK(files_[partition].Cleanup()); + return finished(thread_index); + }); + } + + size_t SpillingAccumulationQueue::CalculatePartitionRowCount(size_t partition) const + { + return builders_[partition].num_rows() + queues_[partition].CalculateRowCount(); + } + + Result SpillingAccumulationQueue::AdvanceSpillCursor() + { + size_t to_spill = spilling_cursor_.fetch_add(1); + if(to_spill >= kNumPartitions) + { + ARROW_DCHECK(to_spill < 1000 * 1000 * 1000) << + "You've tried to advance the spill cursor over a billion times, you might have a problem"; + return false; + } + + auto lock = partition_locks_.AcquirePartitionLock(static_cast(to_spill)); + size_t num_batches = queues_[to_spill].batch_count(); + for(size_t i = 0; i < num_batches; i++) + RETURN_NOT_OK(files_[to_spill].SpillBatch(ctx_, std::move(queues_[to_spill][i]))); + return true; + } + + Result SpillingAccumulationQueue::AdvanceHashCursor() + { + size_t to_spill = hash_cursor_.fetch_add(1); + if(to_spill >= kNumPartitions) + { + ARROW_DCHECK(to_spill < 1000 * 1000 * 1000) << + "You've tried to advance the spill cursor over a billion times, you might have a problem"; + return false; + } + + auto lock = partition_locks_.AcquirePartitionLock(static_cast(to_spill)); + hash_queues_[to_spill].Clear(); + return true; + } } // namespace util } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/accumulation_queue.h b/cpp/src/arrow/compute/exec/accumulation_queue.h index 4b23e5ffcac54..678dfb62af792 100644 --- a/cpp/src/arrow/compute/exec/accumulation_queue.h +++ b/cpp/src/arrow/compute/exec/accumulation_queue.h @@ -21,16 +21,19 @@ #include #include "arrow/compute/exec.h" +#include "arrow/compute/light_array.h" +#include "arrow/compute/exec/partition_util.h" +#include "arrow/compute/exec/task_util.h" +#include "arrow/compute/exec/spilling_util.h" namespace arrow { -namespace util { -using arrow::compute::ExecBatch; +namespace compute { /// \brief A container that accumulates batches until they are ready to /// be processed. class AccumulationQueue { public: - AccumulationQueue() : row_count_(0) {} + AccumulationQueue() = default; ~AccumulationQueue() = default; // We should never be copying ExecBatch around @@ -42,16 +45,95 @@ class AccumulationQueue { void Concatenate(AccumulationQueue&& that); void InsertBatch(ExecBatch batch); - int64_t row_count() { return row_count_; } - size_t batch_count() { return batches_.size(); } + void SetBatch(size_t idx, ExecBatch batch); + size_t batch_count() const { return batches_.size(); } bool empty() const { return batches_.empty(); } + size_t CalculateRowCount() const; + + // Resizes the accumulation queue to contain size batches. The + // new batches will be empty and have length 0, but they will be + // usable (useful for concurrent modification of the AccumulationQueue + // of separate elements). + void Resize(size_t size) { batches_.resize(size); } void Clear(); - ExecBatch& operator[](size_t i); + ExecBatch& operator[](size_t i) { return batches_[i]; }; + const ExecBatch &operator[] (size_t i) const { return batches_[i]; }; private: - int64_t row_count_; std::vector batches_; }; -} // namespace util +class SpillingAccumulationQueue +{ +public: + // Number of partitions must be a power of two, since we assign partitions by + // looking at bottom few bits. + static constexpr int kLogNumPartitions = 6; + static constexpr int kNumPartitions = 1 << kLogNumPartitions; + Status Init(QueryContext *ctx); + // Assumes that the final column in batch contains 64-bit hashes of the columns. + Status InsertBatch( + size_t thread_index, + ExecBatch batch); + Status GetPartition( + size_t thread_index, + size_t partition, + std::function on_batch, // thread_index, batch_index, batch + std::function on_finished); + + // Returns hashes of the given partition and batch index. + // partition MUST be at least hash_cursor, as if partition < hash_cursor, + // these hashes will have been deleted. + const uint64_t *GetHashes(size_t partition, size_t batch_idx); + inline size_t batch_count(size_t partition) const + { + size_t num_full_batches = partition >= spilling_cursor_ + ? queues_[partition].batch_count() + : files_[partition].num_batches(); + + return num_full_batches + (builders_[partition].num_rows() > 0); + } + inline size_t row_count(size_t partition, size_t batch_idx) const + { + if(batch_idx < hash_queues_[partition].batch_count()) + return hash_queues_[partition][batch_idx].length; + else + return builders_[partition].num_rows(); + } + + static inline constexpr size_t partition_id(uint64_t hash) + { + // Hash Table uses the top bits of the hash, so we really really + // need to use the bottom bits of the hash for spilling to avoid + // a huge number of hash collisions per partition. + return static_cast(hash & (kNumPartitions - 1)); + } + + size_t CalculatePartitionRowCount(size_t partition) const; + + Result AdvanceSpillCursor(); + Result AdvanceHashCursor(); + inline size_t spill_cursor() const { return spilling_cursor_.load(); }; + inline size_t hash_cursor() const { return hash_cursor_.load(); }; + +private: + std::atomic spilling_cursor_{0}; // denotes the first in-memory partition + std::atomic hash_cursor_{0}; + + QueryContext* ctx_; + PartitionLocks partition_locks_; + + AccumulationQueue queues_[kNumPartitions]; + AccumulationQueue hash_queues_[kNumPartitions]; + + ExecBatchBuilder builders_[kNumPartitions]; + + SpillFile files_[kNumPartitions]; + + int task_group_read_[kNumPartitions]; + std::function read_back_fn_[kNumPartitions]; + std::function on_finished_[kNumPartitions]; +}; + +} // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index 7eba27c89e936..fd481952e1d2a 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -42,13 +42,11 @@ class HashJoinBasicImpl : public HashJoinImpl { Status Init(QueryContext* ctx, JoinType join_type, size_t num_threads, const HashJoinProjectionMaps* proj_map_left, const HashJoinProjectionMaps* proj_map_right, - std::vector key_cmp, Expression filter, - RegisterTaskGroupCallback register_task_group_callback, - StartTaskGroupCallback start_task_group_callback, - OutputBatchCallback output_batch_callback, - FinishedCallback finished_callback) override { + std::vector *key_cmp, + Expression *filter, + CallbackRecord callback_record) override { START_COMPUTE_SPAN(span_, "HashJoinBasicImpl", - {{"detail", filter.ToString()}, + {{"detail", filter->ToString()}, {"join.kind", arrow::compute::ToString(join_type)}, {"join.threads", static_cast(num_threads)}}); @@ -57,12 +55,9 @@ class HashJoinBasicImpl : public HashJoinImpl { join_type_ = join_type; schema_[0] = proj_map_left; schema_[1] = proj_map_right; - key_cmp_ = std::move(key_cmp); - filter_ = std::move(filter); - register_task_group_callback_ = std::move(register_task_group_callback); - start_task_group_callback_ = std::move(start_task_group_callback); - output_batch_callback_ = std::move(output_batch_callback); - finished_callback_ = std::move(finished_callback); + key_cmp_ = key_cmp; + filter_ = filter; + callback_record_ = std::move(callback_record); local_states_.resize(num_threads_); for (size_t i = 0; i < local_states_.size(); ++i) { @@ -155,7 +150,7 @@ class HashJoinBasicImpl : public HashJoinImpl { bool is_null = non_null_bit_vectors[icol] && !bit_util::GetBit(non_null_bit_vectors[icol], non_null_bit_vector_offsets[icol] + irow); - if (key_cmp_[icol] == JoinKeyCmp::EQ && is_null) { + if ((*key_cmp_)[icol] == JoinKeyCmp::EQ && is_null) { no_match = true; break; } @@ -232,7 +227,7 @@ class HashJoinBasicImpl : public HashJoinImpl { : opt_right_payload->values[from_payload.get(icol)]; } - output_batch_callback_(0, std::move(result)); + callback_record_.output_batch(0, std::move(result)); // Update the counter of produced batches // @@ -244,7 +239,7 @@ class HashJoinBasicImpl : public HashJoinImpl { std::vector& no_match, std::vector& match_left, std::vector& match_right) { - if (filter_ == literal(true)) { + if (*filter_ == literal(true)) { return Status::OK(); } ARROW_DCHECK_EQ(match_left.size(), match_right.size()); @@ -549,7 +544,7 @@ class HashJoinBasicImpl : public HashJoinImpl { } void RegisterBuildHashTable() { - task_group_build_ = register_task_group_callback_( + task_group_build_ = callback_record_.register_task_group( [this](size_t thread_index, int64_t task_id) -> Status { return BuildHashTable_exec_task(thread_index, task_id); }, @@ -609,12 +604,12 @@ class HashJoinBasicImpl : public HashJoinImpl { BuildFinishedCallback on_finished) override { build_finished_callback_ = std::move(on_finished); build_batches_ = std::move(batches); - return start_task_group_callback_(task_group_build_, - /*num_tasks=*/1); + return callback_record_.start_task_group(task_group_build_, + /*num_tasks=*/1); } void RegisterScanHashTable() { - task_group_scan_ = register_task_group_callback_( + task_group_scan_ = callback_record_.register_task_group( [this](size_t thread_index, int64_t task_id) -> Status { return ScanHashTable_exec_task(thread_index, task_id); }, @@ -683,13 +678,12 @@ class HashJoinBasicImpl : public HashJoinImpl { return Status::Cancelled("Hash join cancelled"); } END_SPAN(span_); - finished_callback_(num_batches_produced_.load()); - return Status::OK(); + return callback_record_.finished(num_batches_produced_.load()); } Status ScanHashTable(size_t thread_index) { MergeHasMatch(); - return start_task_group_callback_(task_group_scan_, ScanHashTable_num_tasks()); + return callback_record_.start_task_group(task_group_scan_, ScanHashTable_num_tasks()); } Status ProbingFinished(size_t thread_index) override { @@ -738,18 +732,15 @@ class HashJoinBasicImpl : public HashJoinImpl { JoinType join_type_; size_t num_threads_; const HashJoinProjectionMaps* schema_[2]; - std::vector key_cmp_; - Expression filter_; + std::vector *key_cmp_; + Expression *filter_; int task_group_build_; int task_group_scan_; // Callbacks // - RegisterTaskGroupCallback register_task_group_callback_; - StartTaskGroupCallback start_task_group_callback_; - OutputBatchCallback output_batch_callback_; + CallbackRecord callback_record_; BuildFinishedCallback build_finished_callback_; - FinishedCallback finished_callback_; // Thread local runtime state // diff --git a/cpp/src/arrow/compute/exec/hash_join.h b/cpp/src/arrow/compute/exec/hash_join.h index afe04fa998578..9217d48143479 100644 --- a/cpp/src/arrow/compute/exec/hash_join.h +++ b/cpp/src/arrow/compute/exec/hash_join.h @@ -34,33 +34,37 @@ namespace arrow { namespace compute { -using arrow::util::AccumulationQueue; - class HashJoinImpl { public: - using OutputBatchCallback = std::function; - using BuildFinishedCallback = std::function; - using FinishedCallback = std::function; - using RegisterTaskGroupCallback = std::function, std::function)>; - using StartTaskGroupCallback = std::function; - using AbortContinuationImpl = std::function; + using OutputBatchCallback = std::function; + using BuildFinishedCallback = std::function; + using FinishedCallback = std::function; + using RegisterTaskGroupCallback = std::function, std::function)>; + using StartTaskGroupCallback = std::function; + using AbortContinuationImpl = std::function; + + struct CallbackRecord + { + RegisterTaskGroupCallback register_task_group; + StartTaskGroupCallback start_task_group; + OutputBatchCallback output_batch; + FinishedCallback finished; + }; virtual ~HashJoinImpl() = default; virtual Status Init(QueryContext* ctx, JoinType join_type, size_t num_threads, const HashJoinProjectionMaps* proj_map_left, const HashJoinProjectionMaps* proj_map_right, - std::vector key_cmp, Expression filter, - RegisterTaskGroupCallback register_task_group_callback, - StartTaskGroupCallback start_task_group_callback, - OutputBatchCallback output_batch_callback, - FinishedCallback finished_callback) = 0; + std::vector *key_cmp, + Expression *filter, + CallbackRecord callback_record) = 0; virtual Status BuildHashTable(size_t thread_index, AccumulationQueue batches, BuildFinishedCallback on_finished) = 0; virtual Status ProbeSingleBatch(size_t thread_index, ExecBatch batch) = 0; virtual Status ProbingFinished(size_t thread_index) = 0; - virtual void Abort(TaskScheduler::AbortContinuationImpl pos_abort_callback) = 0; + virtual void Abort(AbortContinuationImpl pos_abort_callback) = 0; virtual std::string ToString() const = 0; static Result> MakeBasic(); diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc index 948245675a8d9..28576c6510bc5 100644 --- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc +++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc @@ -126,10 +126,10 @@ class JoinBenchmark { stats_.num_probe_rows = settings.num_probe_batches * settings.batch_size; schema_mgr_ = std::make_unique(); - Expression filter = literal(true); + filter_ = literal(true); DCHECK_OK(schema_mgr_->Init(settings.join_type, *l_batches_with_schema.schema, left_keys, *r_batches_with_schema.schema, right_keys, - filter, "l_", "r_")); + filter_, "l_", "r_")); if (settings.use_basic_implementation) { join_ = *HashJoinImpl::MakeBasic(); @@ -147,21 +147,24 @@ class JoinBenchmark { scheduler_ = TaskScheduler::Make(); DCHECK_OK(ctx_.Init(settings.num_threads)); - auto register_task_group_callback = [&](std::function task, + HashJoinImpl::CallbackRecord callbacks; + callbacks.register_task_group = [&](std::function task, std::function cont) { return scheduler_->RegisterTaskGroup(std::move(task), std::move(cont)); }; - auto start_task_group_callback = [&](int task_group_id, int64_t num_tasks) { + callbacks.start_task_group = [&](int task_group_id, int64_t num_tasks) { return scheduler_->StartTaskGroup(omp_get_thread_num(), task_group_id, num_tasks); }; + callbacks.output_batch = [](int64_t, ExecBatch) {}; + callbacks.finished = [](int64_t){ return Status::OK(); }; DCHECK_OK(join_->Init( - &(schema_mgr_->proj_maps[0]), &(schema_mgr_->proj_maps[1]), std::move(key_cmp), - std::move(filter), std::move(register_task_group_callback), - std::move(start_task_group_callback), [](int64_t, ExecBatch) {}, - [](int64_t x) {})); &ctx_, settings.join_type, settings.num_threads, + &(schema_mgr_->proj_maps[0]), &(schema_mgr_->proj_maps[1]), + &key_cmp_, + &filter_, + std::move(callbacks))); task_group_probe_ = scheduler_->RegisterTaskGroup( [this](size_t thread_index, int64_t task_id) -> Status { @@ -198,6 +201,8 @@ class JoinBenchmark { std::unique_ptr schema_mgr_; std::unique_ptr join_; QueryContext ctx_; + std::vector key_cmp_; + Expression filter_; int task_group_probe_; struct { @@ -209,13 +214,13 @@ static void HashJoinBasicBenchmarkImpl(benchmark::State& st, BenchmarkSettings& settings) { uint64_t total_rows = 0; for (auto _ : st) { - st.PauseTiming(); { JoinBenchmark bm(settings); st.ResumeTiming(); bm.RunJoin(); st.PauseTiming(); total_rows += bm.stats_.num_probe_rows; + st.PauseTiming(); } st.ResumeTiming(); } diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index 4e15470902871..6d0acb87c2826 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -24,6 +24,7 @@ #include "arrow/compute/exec/hash_join.h" #include "arrow/compute/exec/hash_join_dict.h" #include "arrow/compute/exec/hash_join_node.h" +#include "arrow/compute/exec/spilling_join.h" #include "arrow/compute/exec/key_hash.h" #include "arrow/compute/exec/options.h" #include "arrow/compute/exec/schema_util.h" @@ -127,49 +128,25 @@ Status HashJoinSchema::Init( right_schema, right_keys, right_output, left_field_name_suffix, right_field_name_suffix)); - std::vector handles; - std::vector*> field_refs; - std::vector left_filter, right_filter; RETURN_NOT_OK( CollectFilterColumns(left_filter, right_filter, filter, left_schema, right_schema)); - - handles.push_back(HashJoinProjection::KEY); - field_refs.push_back(&left_keys); - ARROW_ASSIGN_OR_RAISE(auto left_payload, ComputePayload(left_schema, left_output, left_filter, left_keys)); - handles.push_back(HashJoinProjection::PAYLOAD); - field_refs.push_back(&left_payload); - handles.push_back(HashJoinProjection::FILTER); - field_refs.push_back(&left_filter); - - handles.push_back(HashJoinProjection::OUTPUT); - field_refs.push_back(&left_output); - - RETURN_NOT_OK( - proj_maps[0].Init(HashJoinProjection::INPUT, left_schema, handles, field_refs)); - - handles.clear(); - field_refs.clear(); - - handles.push_back(HashJoinProjection::KEY); - field_refs.push_back(&right_keys); + RETURN_NOT_OK(proj_maps[0].Init(HashJoinProjection::INPUT, left_schema)); + RETURN_NOT_OK(proj_maps[0].RegisterProjectedSchema(HashJoinProjection::KEY, left_keys, left_schema)); + RETURN_NOT_OK(proj_maps[0].RegisterProjectedSchema(HashJoinProjection::PAYLOAD, left_payload, left_schema)); + RETURN_NOT_OK(proj_maps[0].RegisterProjectedSchema(HashJoinProjection::FILTER, left_filter, left_schema)); + RETURN_NOT_OK(proj_maps[0].RegisterProjectedSchema(HashJoinProjection::OUTPUT, left_output, left_schema)); ARROW_ASSIGN_OR_RAISE(auto right_payload, ComputePayload(right_schema, right_output, right_filter, right_keys)); - handles.push_back(HashJoinProjection::PAYLOAD); - field_refs.push_back(&right_payload); - - handles.push_back(HashJoinProjection::FILTER); - field_refs.push_back(&right_filter); - - handles.push_back(HashJoinProjection::OUTPUT); - field_refs.push_back(&right_output); - - RETURN_NOT_OK( - proj_maps[1].Init(HashJoinProjection::INPUT, right_schema, handles, field_refs)); + RETURN_NOT_OK(proj_maps[1].Init(HashJoinProjection::INPUT, right_schema)); + RETURN_NOT_OK(proj_maps[1].RegisterProjectedSchema(HashJoinProjection::KEY, right_keys, right_schema)); + RETURN_NOT_OK(proj_maps[1].RegisterProjectedSchema(HashJoinProjection::PAYLOAD, right_payload, right_schema)); + RETURN_NOT_OK(proj_maps[1].RegisterProjectedSchema(HashJoinProjection::FILTER, right_filter, right_schema)); + RETURN_NOT_OK(proj_maps[1].RegisterProjectedSchema(HashJoinProjection::OUTPUT, right_output, right_schema)); return Status::OK(); } @@ -478,6 +455,7 @@ Status ValidateHashJoinNodeOptions(const HashJoinNodeOptions& join_options) { class HashJoinNode; + // This is a struct encapsulating things related to Bloom filters and pushing them around // between HashJoinNodes. The general strategy is to notify other joins at plan-creation // time for that join to expect a Bloom filter. Once the full build side has been @@ -490,15 +468,18 @@ struct BloomFilterPushdownContext { std::function, std::function)>; using StartTaskGroupCallback = std::function; using BuildFinishedCallback = std::function; - using FiltersReceivedCallback = std::function; + using FiltersReceivedCallback = std::function; using FilterFinishedCallback = std::function; + void Init(HashJoinNode* owner, size_t num_threads, RegisterTaskGroupCallback register_task_group_callback, StartTaskGroupCallback start_task_group_callback, FiltersReceivedCallback on_bloom_filters_received, bool disable_bloom_filter, bool use_sync_execution); - Status StartProducing(); + PartitionedBloomFilter *bloom_filter() { return disable_bloom_filter_ ? nullptr : &push_.bloom_filter_; } + + Status StartProducing(size_t thread_index); void ExpectBloomFilter() { eval_.num_expected_bloom_filters_ += 1; } @@ -508,11 +489,13 @@ struct BloomFilterPushdownContext { BuildFinishedCallback on_finished); // Sends the Bloom filter to the pushdown target. - Status PushBloomFilter(); + Status PushBloomFilter(size_t thread_index); // Receives a Bloom filter and its associated column map. - Status ReceiveBloomFilter(std::unique_ptr filter, - std::vector column_map) { + Status ReceiveBloomFilter( + size_t thread_index, + PartitionedBloomFilter filter, + std::vector column_map) { bool proceed; { std::lock_guard guard(eval_.receive_mutex_); @@ -524,7 +507,7 @@ struct BloomFilterPushdownContext { ARROW_DCHECK_LE(eval_.received_filters_.size(), eval_.num_expected_bloom_filters_); } if (proceed) { - return eval_.all_received_callback_(); + return eval_.all_received_callback_(thread_index); } return Status::OK(); } @@ -543,71 +526,57 @@ struct BloomFilterPushdownContext { /*num_tasks=*/eval_.batches_.batch_count()); } - // Applies all Bloom filters on the input batch. - Status FilterSingleBatch(size_t thread_index, ExecBatch* batch_ptr) { - ExecBatch& batch = *batch_ptr; - if (eval_.num_expected_bloom_filters_ == 0 || batch.length == 0) return Status::OK(); - - int64_t bit_vector_bytes = bit_util::BytesForBits(batch.length); - std::vector selected(bit_vector_bytes); - std::vector hashes(batch.length); - std::vector bv(bit_vector_bytes); - - ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack, GetStack(thread_index)); - - // Start with full selection for the current batch - memset(selected.data(), 0xff, bit_vector_bytes); - for (size_t ifilter = 0; ifilter < eval_.num_expected_bloom_filters_; ifilter++) { - std::vector keys(eval_.received_maps_[ifilter].size()); - for (size_t i = 0; i < keys.size(); i++) { - int input_idx = eval_.received_maps_[ifilter][i]; - keys[i] = batch[input_idx]; - if (keys[i].is_scalar()) { - ARROW_ASSIGN_OR_RAISE( - keys[i], - MakeArrayFromScalar(*keys[i].scalar(), batch.length, ctx_->memory_pool())); + Status HashAndLookupInFilter( + size_t thread_index, + ExecBatch &batch, + std::vector &selected) + { + std::vector bv(selected.size()); + std::vector hashes(batch.length); + + ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack, ctx_->GetTempStack(thread_index)); + + // Start with full selection for the current batch + memset(selected.data(), 0xff, bv.size()); + std::vector temp_column_arrays; + for (size_t ifilter = 0; ifilter < eval_.num_expected_bloom_filters_; ifilter++) + { + std::vector keys(eval_.received_maps_[ifilter].size()); + for (size_t i = 0; i < keys.size(); i++) { + int input_idx = eval_.received_maps_[ifilter][i]; + keys[i] = batch[input_idx]; + if (keys[i].is_scalar()) { + ARROW_ASSIGN_OR_RAISE( + keys[i], + MakeArrayFromScalar(*keys[i].scalar(), batch.length, ctx_->memory_pool())); + } + } + ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(keys))); + RETURN_NOT_OK(Hashing64::HashBatch(key_batch, hashes.data(), temp_column_arrays, + ctx_->cpu_info()->hardware_flags(), stack, 0, + key_batch.length)); + + eval_.received_filters_[ifilter].Find( + ctx_->cpu_info()->hardware_flags(), + key_batch.length, + hashes.data(), + bv.data()); + arrow::internal::BitmapAnd(bv.data(), 0, selected.data(), 0, key_batch.length, 0, + selected.data()); } - } - ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(keys))); - std::vector temp_column_arrays; - RETURN_NOT_OK(Hashing32::HashBatch(key_batch, hashes.data(), temp_column_arrays, - ctx_->cpu_info()->hardware_flags(), stack, 0, - key_batch.length)); - - eval_.received_filters_[ifilter]->Find(ctx_->cpu_info()->hardware_flags(), - key_batch.length, hashes.data(), bv.data()); - arrow::internal::BitmapAnd(bv.data(), 0, selected.data(), 0, key_batch.length, 0, - selected.data()); - } - auto selected_buffer = - std::make_unique(selected.data(), bit_vector_bytes); - ArrayData selected_arraydata(boolean(), batch.length, - {nullptr, std::move(selected_buffer)}); - Datum selected_datum(selected_arraydata); - FilterOptions options; - size_t first_nonscalar = batch.values.size(); - for (size_t i = 0; i < batch.values.size(); i++) { - if (!batch.values[i].is_scalar()) { - ARROW_ASSIGN_OR_RAISE(batch.values[i], - Filter(batch.values[i], selected_datum, options, ctx_)); - first_nonscalar = std::min(first_nonscalar, i); - ARROW_DCHECK_EQ(batch.values[i].length(), batch.values[first_nonscalar].length()); - } + return Status::OK(); } - // If they're all Scalar, then the length of the batch is the number of set bits - if (first_nonscalar == batch.values.size()) - batch.length = arrow::internal::CountSetBits(selected.data(), 0, batch.length); - else - batch.length = batch.values[first_nonscalar].length(); - return Status::OK(); - } + + // Applies all Bloom filters on the input batch. + Status FilterSingleBatch(size_t thread_index, ExecBatch* batch_ptr); private: Status BuildBloomFilter_exec_task(size_t thread_index, int64_t task_id); - Status BuildBloomFilter_on_finished(size_t thread_index) { - return build_.on_finished_(thread_index, std::move(build_.batches_)); - } + Status BuildBloomFilter_on_finished(size_t thread_index) + { + return build_.on_finished_(thread_index, std::move(build_.batches_)); + } // The Bloom filter is built on the build side of some upstream join. For a join to // evaluate the Bloom filter on its input columns, it has to rearrange its input columns @@ -623,38 +592,36 @@ struct BloomFilterPushdownContext { StartTaskGroupCallback start_task_group_callback_; bool disable_bloom_filter_; HashJoinSchema* schema_mgr_; - ExecContext* ctx_; + QueryContext* ctx_; - struct ThreadLocalData { - bool is_init = false; - util::TempVectorStack stack; - }; - std::vector tld_; - - struct { + struct + { int task_id_; std::unique_ptr builder_; AccumulationQueue batches_; BuildFinishedCallback on_finished_; } build_; - struct { - std::unique_ptr bloom_filter_; + struct + { + PartitionedBloomFilter bloom_filter_; HashJoinNode* pushdown_target_; std::vector column_map_; } push_; - struct { + struct + { int task_id_; size_t num_expected_bloom_filters_ = 0; std::mutex receive_mutex_; - std::vector> received_filters_; + std::vector received_filters_; std::vector> received_maps_; AccumulationQueue batches_; FiltersReceivedCallback all_received_callback_; FilterFinishedCallback on_finished_; } eval_; }; + bool HashJoinSchema::HasDictionaries() const { for (int side = 0; side <= 1; ++side) { for (int icol = 0; icol < proj_maps[side].num_cols(HashJoinProjection::INPUT); @@ -688,7 +655,8 @@ class HashJoinNode : public ExecNode { HashJoinNode(ExecPlan* plan, NodeVector inputs, const HashJoinNodeOptions& join_options, std::shared_ptr output_schema, std::unique_ptr schema_mgr, Expression filter, - std::unique_ptr impl) + std::unique_ptr impl, + bool is_swiss) : ExecNode(plan, inputs, {"left", "right"}, /*output_schema=*/std::move(output_schema), /*num_outputs=*/1), @@ -697,6 +665,7 @@ class HashJoinNode : public ExecNode { filter_(std::move(filter)), schema_mgr_(std::move(schema_mgr)), impl_(std::move(impl)), + is_swiss_(is_swiss), disable_bloom_filter_(join_options.disable_bloom_filter) { complete_.store(false); } @@ -737,49 +706,162 @@ class HashJoinNode : public ExecNode { std::shared_ptr output_schema = schema_mgr->MakeOutputSchema( join_options.output_suffix_for_left, join_options.output_suffix_for_right); + bool use_swiss = use_swiss_join(filter, schema_mgr); + std::unique_ptr impl; + if (use_swiss) + { + ARROW_ASSIGN_OR_RAISE(impl, HashJoinImpl::MakeSwiss()); + } + else + { + ARROW_ASSIGN_OR_RAISE(impl, HashJoinImpl::MakeBasic()); + } + + return plan->EmplaceNode( + plan, inputs, join_options, std::move(output_schema), std::move(schema_mgr), + std::move(filter), std::move(impl), use_swiss); + } + + const char* kind_name() const override { return "HashJoinNode"; } + // Create hash join implementation object // SwissJoin does not support: // a) 64-bit string offsets // b) residual predicates // c) dictionaries // - bool use_swiss_join; + static bool use_swiss_join( + const Expression &filter, + const std::unique_ptr &schema) + { #if ARROW_LITTLE_ENDIAN - use_swiss_join = (filter == literal(true)) && !schema_mgr->HasDictionaries() && - !schema_mgr->HasLargeBinary(); + return (filter == literal(true) + && !schema->HasDictionaries() + && !schema->HasLargeBinary()); #else - use_swiss_join = false; + return false; #endif - std::unique_ptr impl; - if (use_swiss_join) { - ARROW_ASSIGN_OR_RAISE(impl, HashJoinImpl::MakeSwiss()); - } else { - ARROW_ASSIGN_OR_RAISE(impl, HashJoinImpl::MakeBasic()); } - return plan->EmplaceNode( - plan, inputs, join_options, std::move(output_schema), std::move(schema_mgr), - std::move(filter), std::move(impl)); - } + Status AddHashColumn( + size_t thread_index, + ExecBatch *batch, + const SchemaProjectionMaps &map) + { + for(int i = 0; i < batch->num_values(); i++) + { + if(batch->values[i].is_scalar()) + { + ARROW_ASSIGN_OR_RAISE( + batch->values[i], + MakeArrayFromScalar( + *batch->values[i].scalar(), + batch->length, + plan_->query_context()->memory_pool())); + } + } - const char* kind_name() const override { return "HashJoinNode"; } + ARROW_ASSIGN_OR_RAISE(std::unique_ptr hash_buf, + AllocateBuffer(sizeof(uint64_t) * batch->length, + plan_->query_context()->memory_pool())); + uint64_t *hashes = reinterpret_cast(hash_buf->mutable_data()); + std::vector temp_column_arrays; + auto key_to_in = map.map(HashJoinProjection::KEY, HashJoinProjection::INPUT); + int num_keys = key_to_in.num_cols; + std::vector key_cols(num_keys); + for(int i = 0; i < num_keys; i++) + key_cols[i] = (*batch).values[key_to_in.get(i)]; + + ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack, + plan_->query_context()->GetTempStack(thread_index)); + + ExecBatch key_batch(std::move(key_cols), batch->length); + RETURN_NOT_OK(Hashing64::HashBatch(std::move(key_batch), + hashes, + temp_column_arrays, + plan_->query_context()->cpu_info()->hardware_flags(), + stack, + 0, + batch->length)); + + ArrayData hash_data(uint64(), batch->length, { nullptr, std::move(hash_buf)}); + batch->values.emplace_back(std::move(hash_data)); + return Status::OK(); + } - Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) { - std::lock_guard guard(build_side_mutex_); - build_accumulator_.InsertBatch(std::move(batch)); - return Status::OK(); - } + Status OnSpillingStarted(size_t) + { + { + std::lock_guard build_guard(build_side_mutex_); + spilling_build_ = true; + } + RETURN_NOT_OK(plan_->query_context()->StartTaskGroup( + task_group_spill_build_, + build_accumulator_.batch_count())); + + { + std::lock_guard probe_guard(probe_side_mutex_); + spilling_probe_ = true; + } + RETURN_NOT_OK(plan_->query_context()->StartTaskGroup( + task_group_spill_probe_, + probe_accumulator_.batch_count())); + + return Status::OK(); + } + + Status OnBuildSideAccumSpilled(size_t thread_index) + { + // If the exchange returned true, it means that it was already + // true before us, so the other event that we are synchronizing + // with already happened. + if(build_accum_spilled_.exchange(true)) + return spilling_join_.OnBuildSideFinished(thread_index); + return Status::OK(); + } + + Status OnProbeSideAccumSpilled(size_t thread_index) + { + // If the exchange returned true, it means that it was already + // true before us, so the other event that we are synchronizing + // with already happened. + if(probe_accum_spilled_.exchange(true)) + return spilling_join_.OnProbeSideFinished(thread_index); + return Status::OK(); + } + + Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) + { + { + std::lock_guard guard(build_side_mutex_); + if(!spilling_build_) + { + build_accumulator_.InsertBatch(std::move(batch)); + return Status::OK(); + } + } + RETURN_NOT_OK(spilling_join_.OnBuildSideBatch(thread_index, std::move(batch))); + return Status::OK(); + } Status OnBuildSideFinished(size_t thread_index) { - return pushdown_context_.BuildBloomFilter( - thread_index, std::move(build_accumulator_), - [this](size_t thread_index, AccumulationQueue batches) { - return OnBloomFilterFinished(thread_index, std::move(batches)); - }); + + if(!spilling_build_) + { + return pushdown_context_.BuildBloomFilter( + thread_index, std::move(build_accumulator_), + [this](size_t thread_index, AccumulationQueue batches) { + return OnBloomFilterFinished(thread_index, std::move(batches)); + }); + } + + if(build_accum_spilled_.exchange(true)) + return spilling_join_.OnBuildSideFinished(thread_index); + return Status::OK(); } Status OnBloomFilterFinished(size_t thread_index, AccumulationQueue batches) { - RETURN_NOT_OK(pushdown_context_.PushBloomFilter()); + RETURN_NOT_OK(pushdown_context_.PushBloomFilter(thread_index)); return impl_->BuildHashTable( thread_index, std::move(batches), [this](size_t thread_index) { return OnHashTableFinished(thread_index); }); @@ -800,7 +882,13 @@ class HashJoinNode : public ExecNode { Status OnProbeSideBatch(size_t thread_index, ExecBatch batch) { { - std::lock_guard guard(probe_side_mutex_); + std::unique_lock guard(probe_side_mutex_); + if(spilling_probe_) + { + guard.unlock(); + return spilling_join_.OnProbeSideBatch(thread_index, std::move(batch)); + } + if (!bloom_filters_ready_) { probe_accumulator_.InsertBatch(std::move(batch)); return Status::OK(); @@ -822,7 +910,15 @@ class HashJoinNode : public ExecNode { Status OnProbeSideFinished(size_t thread_index) { bool probing_finished; { - std::lock_guard guard(probe_side_mutex_); + std::unique_lock guard(probe_side_mutex_); + if(spilling_probe_) + { + guard.unlock(); + if(probe_accum_spilled_.exchange(true)) + return spilling_join_.OnProbeSideFinished(thread_index); + return Status::OK(); + } + probing_finished = queued_batches_probed_ && !probe_side_finished_; probe_side_finished_ = true; } @@ -830,10 +926,14 @@ class HashJoinNode : public ExecNode { return Status::OK(); } - Status OnFiltersReceived() { + Status OnFiltersReceived(size_t thread_index) { + RETURN_NOT_OK(spilling_join_.OnBloomFiltersReceived(thread_index)); + std::unique_lock guard(probe_side_mutex_); + if(spilling_probe_) + return Status::OK(); + bloom_filters_ready_ = true; - size_t thread_index = plan_->GetThreadIndex(); AccumulationQueue batches = std::move(probe_accumulator_); guard.unlock(); return pushdown_context_.FilterBatches( @@ -884,7 +984,7 @@ class HashJoinNode : public ExecNode { return; } - size_t thread_index = plan_->GetThreadIndex(); + size_t thread_index = plan_->query_context()->GetThreadIndex(); int side = (input == inputs_[0]) ? 0 : 1; EVENT(span_, "InputReceived", {{"batch.length", batch.length}, {"side", side}}); @@ -892,6 +992,15 @@ class HashJoinNode : public ExecNode { START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", {{"batch.length", batch.length}}); + if(ErrorIfNotOk(AddHashColumn(thread_index, &batch, schema_mgr_->proj_maps[side]))) + { + StopProducing(); + return; + } + + if(ErrorIfNotOk(spilling_join_.CheckSpilling(thread_index, batch))) + return; + Status status = side == 0 ? OnProbeSideBatch(thread_index, std::move(batch)) : OnBuildSideBatch(thread_index, std::move(batch)); @@ -922,7 +1031,7 @@ class HashJoinNode : public ExecNode { void InputFinished(ExecNode* input, int total_batches) override { ARROW_DCHECK(std::find(inputs_.begin(), inputs_.end(), input) != inputs_.end()); - size_t thread_index = plan_->GetThreadIndex(); + size_t thread_index = plan_->query_context()->GetThreadIndex(); int side = (input == inputs_[0]) ? 0 : 1; EVENT(span_, "InputFinished", {{"side", side}, {"batches.length", total_batches}}); @@ -953,32 +1062,102 @@ class HashJoinNode : public ExecNode { // we will change it back to just the CPU's thread pool capacity. size_t num_threads = (GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1); + + auto register_task_group = [ctx](std::function fn, + std::function on_finished) + { + return ctx->RegisterTaskGroup(std::move(fn), std::move(on_finished)); + }; + + auto start_task_group = [ctx](int task_group_id, int64_t num_tasks) + { + return ctx->StartTaskGroup(task_group_id, num_tasks); + }; + + auto output_batch = [this](int64_t, ExecBatch batch) { this->OutputBatchCallback(batch); }; + auto finished = [this](int64_t total_num_batches) { return this->FinishedCallback(total_num_batches); }; + pushdown_context_.Init( - this, num_threads, - [this](std::function fn, - std::function on_finished) { - return plan_->RegisterTaskGroup(std::move(fn), std::move(on_finished)); - }, - [this](int task_group_id, int64_t num_tasks) { - return plan_->StartTaskGroup(task_group_id, num_tasks); - }, - [this]() { return OnFiltersReceived(); }, disable_bloom_filter_, + this, + num_threads, + register_task_group, + start_task_group, + [this](size_t thread_index) { return OnFiltersReceived(thread_index); }, + disable_bloom_filter_, use_sync_execution); + HashJoinImpl::CallbackRecord join_callbacks; + join_callbacks.register_task_group = register_task_group; + join_callbacks.start_task_group = start_task_group; + join_callbacks.output_batch = output_batch; + join_callbacks.finished = finished; + RETURN_NOT_OK(impl_->Init( - plan_->exec_context(), join_type_, num_threads, &(schema_mgr_->proj_maps[0]), - &(schema_mgr_->proj_maps[1]), key_cmp_, filter_, - [this](std::function fn, - std::function on_finished) { - return plan_->RegisterTaskGroup(std::move(fn), std::move(on_finished)); + ctx, join_type_, num_threads, &(schema_mgr_->proj_maps[0]), + &(schema_mgr_->proj_maps[1]), &key_cmp_, &filter_, + std::move(join_callbacks))); + + SpillingHashJoin::CallbackRecord spilling_callbacks; + spilling_callbacks.register_task_group = register_task_group; + spilling_callbacks.start_task_group = start_task_group; + spilling_callbacks.add_probe_side_hashes = [this](size_t thread_index, ExecBatch *batch) + { + return AddHashColumn(thread_index, batch, schema_mgr_->proj_maps[0]); + }; + spilling_callbacks.bloom_filter_finished = [this](size_t thread_index) + { + return pushdown_context_.PushBloomFilter(thread_index); + }; + spilling_callbacks.apply_bloom_filter = [this](size_t thread_index, ExecBatch *batch) + { + return pushdown_context_.FilterSingleBatch(thread_index, batch); + }; + spilling_callbacks.output_batch = output_batch; + spilling_callbacks.finished = finished; + spilling_callbacks.start_spilling = [this](size_t thread_index) + { + return OnSpillingStarted(thread_index); + }; + spilling_callbacks.pause_probe_side = [this](int counter) + { + inputs_[0]->PauseProducing(this, counter); + }; + spilling_callbacks.resume_probe_side = [this](int counter) + { + inputs_[0]->ResumeProducing(this, counter); + }; + + RETURN_NOT_OK(spilling_join_.Init( + ctx, + join_type_, + num_threads, + &(schema_mgr_->proj_maps[0]), + &(schema_mgr_->proj_maps[1]), + &key_cmp_, + &filter_, + pushdown_context_.bloom_filter(), + std::move(spilling_callbacks), + is_swiss_)); + + task_group_spill_build_ = ctx->RegisterTaskGroup( + [this](size_t thread_index, int64_t task_id) -> Status + { + return spilling_join_.OnBuildSideBatch(thread_index, std::move(build_accumulator_[task_id])); }, - [this](int task_group_id, int64_t num_tasks) { - return plan_->StartTaskGroup(task_group_id, num_tasks); + [this](size_t thread_index) -> Status + { + return OnBuildSideAccumSpilled(thread_index); + }); + + task_group_spill_probe_ = ctx->RegisterTaskGroup( + [this](size_t thread_index, int64_t task_id) -> Status + { + return spilling_join_.OnProbeSideBatch(thread_index, std::move(probe_accumulator_[task_id])); }, - [this](int64_t, ExecBatch batch) { this->OutputBatchCallback(batch); }, - [this](int64_t total_num_batches) { - this->FinishedCallback(total_num_batches); - })); + [this](size_t thread_index) -> Status + { + return OnProbeSideAccumSpilled(thread_index); + }); task_group_probe_ = ctx->RegisterTaskGroup( [this](size_t thread_index, int64_t task_id) -> Status { @@ -1036,12 +1215,13 @@ class HashJoinNode : public ExecNode { outputs_[0]->InputReceived(this, std::move(batch)); } - void FinishedCallback(int64_t total_num_batches) { + Status FinishedCallback(int64_t total_num_batches) { bool expected = false; if (complete_.compare_exchange_strong(expected, true)) { outputs_[0]->InputFinished(this, static_cast(total_num_batches)); finished_.MarkFinished(); } + return Status::OK(); } private: @@ -1052,14 +1232,19 @@ class HashJoinNode : public ExecNode { Expression filter_; std::unique_ptr schema_mgr_; std::unique_ptr impl_; - util::AccumulationQueue build_accumulator_; - util::AccumulationQueue probe_accumulator_; - util::AccumulationQueue queued_batches_to_probe_; + bool is_swiss_; + + AccumulationQueue build_accumulator_; + AccumulationQueue probe_accumulator_; + AccumulationQueue queued_batches_to_probe_; std::mutex build_side_mutex_; std::mutex probe_side_mutex_; + int task_group_spill_build_; + int task_group_spill_probe_; int task_group_probe_; + bool bloom_filters_ready_ = false; bool hash_table_ready_ = false; bool queued_batches_filtered_ = false; @@ -1067,8 +1252,15 @@ class HashJoinNode : public ExecNode { bool probe_side_finished_ = false; friend struct BloomFilterPushdownContext; + bool disable_bloom_filter_; BloomFilterPushdownContext pushdown_context_; + SpillingHashJoin spilling_join_; + bool spilling_build_ = false; + bool spilling_probe_ = false; + + std::atomic build_accum_spilled_{false}; + std::atomic probe_accum_spilled_{false}; }; void BloomFilterPushdownContext::Init( @@ -1084,7 +1276,6 @@ void BloomFilterPushdownContext::Init( eval_.all_received_callback_ = std::move(on_bloom_filters_received); if (!disable_bloom_filter_) { ARROW_CHECK(push_.pushdown_target_); - push_.bloom_filter_ = std::make_unique(); push_.pushdown_target_->pushdown_context_.ExpectBloomFilter(); build_.builder_ = BloomFilterBuilder::Make( @@ -1110,8 +1301,8 @@ void BloomFilterPushdownContext::Init( start_task_group_callback_ = std::move(start_task_group_callback); } -Status BloomFilterPushdownContext::StartProducing() { - if (eval_.num_expected_bloom_filters_ == 0) return eval_.all_received_callback_(); +Status BloomFilterPushdownContext::StartProducing(size_t thread_index) { + if (eval_.num_expected_bloom_filters_ == 0) return eval_.all_received_callback_(thread_index); return Status::OK(); } @@ -1124,52 +1315,90 @@ Status BloomFilterPushdownContext::BuildBloomFilter(size_t thread_index, if (disable_bloom_filter_) return build_.on_finished_(thread_index, std::move(build_.batches_)); + push_.bloom_filter_.in_memory = std::make_unique(); RETURN_NOT_OK(build_.builder_->Begin( - /*num_threads=*/tld_.size(), ctx_->cpu_info()->hardware_flags(), - ctx_->memory_pool(), build_.batches_.row_count(), build_.batches_.batch_count(), - push_.bloom_filter_.get())); + /*num_threads=*/ctx_->max_concurrency(), ctx_->cpu_info()->hardware_flags(), + ctx_->memory_pool(), static_cast(build_.batches_.CalculateRowCount()), build_.batches_.batch_count(), + push_.bloom_filter_.in_memory.get())); return start_task_group_callback_(build_.task_id_, /*num_tasks=*/build_.batches_.batch_count()); } -Status BloomFilterPushdownContext::PushBloomFilter() { +Status BloomFilterPushdownContext::PushBloomFilter(size_t thread_index) { if (!disable_bloom_filter_) return push_.pushdown_target_->pushdown_context_.ReceiveBloomFilter( + thread_index, std::move(push_.bloom_filter_), std::move(push_.column_map_)); return Status::OK(); } + // Applies all Bloom filters on the input batch. + Status BloomFilterPushdownContext::FilterSingleBatch(size_t thread_index, ExecBatch* batch_ptr) { + ExecBatch& batch = *batch_ptr; + if (eval_.num_expected_bloom_filters_ == 0 || batch.length == 0) return Status::OK(); + + int64_t bit_vector_bytes = bit_util::BytesForBits(batch.length); + std::vector selected(bit_vector_bytes); + + // In the common case of a join pushing a Bloom filter to itself, and that + // being the only Bloom filter, we can skip computing the hashes + if(push_.pushdown_target_ + && this == &push_.pushdown_target_->pushdown_context_ + && eval_.num_expected_bloom_filters_ == 1) + { + const uint64_t *hashes = + reinterpret_cast( + batch.values.back().array()->buffers[1]->data()); + eval_.received_filters_[0].Find( + ctx_->cpu_info()->hardware_flags(), + batch.length, + hashes, + selected.data()); + } + else + { + RETURN_NOT_OK(HashAndLookupInFilter( + thread_index, + batch, + selected)); + } + + auto selected_buffer = + std::make_unique(selected.data(), bit_vector_bytes); + ArrayData selected_arraydata(boolean(), batch.length, + {nullptr, std::move(selected_buffer)}); + Datum selected_datum(selected_arraydata); + FilterOptions options; + size_t first_nonscalar = batch.values.size(); + for (size_t i = 0; i < batch.values.size(); i++) + { + if (!batch.values[i].is_scalar()) + { + ARROW_ASSIGN_OR_RAISE(batch.values[i], + Filter(batch.values[i], selected_datum, options, ctx_->exec_context())); + first_nonscalar = std::min(first_nonscalar, i); + ARROW_DCHECK_EQ(batch.values[i].length(), batch.values[first_nonscalar].length()); + } + } + // If they're all Scalar, then the length of the batch is the number of set bits + if (first_nonscalar == batch.values.size()) + batch.length = arrow::internal::CountSetBits(selected.data(), 0, batch.length); + else + batch.length = batch.values[first_nonscalar].length(); + return Status::OK(); + } + Status BloomFilterPushdownContext::BuildBloomFilter_exec_task(size_t thread_index, - int64_t task_id) { + int64_t task_id) +{ const ExecBatch& input_batch = build_.batches_[task_id]; - SchemaProjectionMap key_to_in = - schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY, HashJoinProjection::INPUT); - std::vector key_columns(key_to_in.num_cols); - for (size_t i = 0; i < key_columns.size(); i++) { - int input_idx = key_to_in.get(static_cast(i)); - key_columns[i] = input_batch[input_idx]; - if (key_columns[i].is_scalar()) { - ARROW_ASSIGN_OR_RAISE(key_columns[i], - MakeArrayFromScalar(*key_columns[i].scalar(), - input_batch.length, ctx_->memory_pool())); - } - } - ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(key_columns))); - - ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack, GetStack(thread_index)); - util::TempVectorHolder hash_holder(stack, util::MiniBatch::kMiniBatchLength); - uint32_t* hashes = hash_holder.mutable_data(); - for (int64_t i = 0; i < key_batch.length; i += util::MiniBatch::kMiniBatchLength) { - int64_t length = std::min(static_cast(key_batch.length - i), - static_cast(util::MiniBatch::kMiniBatchLength)); - - std::vector temp_column_arrays; - RETURN_NOT_OK(Hashing32::HashBatch(key_batch, hashes, temp_column_arrays, - ctx_->cpu_info()->hardware_flags(), stack, i, - length)); - RETURN_NOT_OK(build_.builder_->PushNextBatch(thread_index, length, hashes)); - } + if(input_batch.length == 0) + return Status::OK(); + + const uint64_t *hashes = + reinterpret_cast(input_batch.values.back().array()->buffers[1]->data()); + RETURN_NOT_OK(build_.builder_->PushNextBatch(thread_index, input_batch.length, hashes)); return Status::OK(); } @@ -1277,6 +1506,7 @@ std::pair> BloomFilterPushdownContext::GetPushdo #endif // ARROW_LITTLE_ENDIAN } + namespace internal { void RegisterHashJoinNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("hashjoin", HashJoinNode::Make)); diff --git a/cpp/src/arrow/compute/exec/partition_util.cc b/cpp/src/arrow/compute/exec/partition_util.cc index e99007c45a335..90ff48ffa5b84 100644 --- a/cpp/src/arrow/compute/exec/partition_util.cc +++ b/cpp/src/arrow/compute/exec/partition_util.cc @@ -17,6 +17,7 @@ #include "arrow/compute/exec/partition_util.h" #include +#include namespace arrow { namespace compute { @@ -79,6 +80,19 @@ bool PartitionLocks::AcquirePartitionLock(size_t thread_id, int num_prtns_to_try return false; } +PartitionLocks::AutoReleaseLock PartitionLocks::AcquirePartitionLock(int prtn_id) +{ + std::atomic *lock = lock_ptr(prtn_id); + bool expected = false; + for(;;) + { + if(lock->compare_exchange_strong(expected, true, std::memory_order_acquire)) + return { this, prtn_id }; + while(lock->load()) + std::this_thread::yield(); + } +} + void PartitionLocks::ReleasePartitionLock(int prtn_id) { ARROW_DCHECK(prtn_id >= 0 && prtn_id < num_prtns_); std::atomic* lock = lock_ptr(prtn_id); diff --git a/cpp/src/arrow/compute/exec/partition_util.h b/cpp/src/arrow/compute/exec/partition_util.h index b3f302511a75d..f7e46c5ca9653 100644 --- a/cpp/src/arrow/compute/exec/partition_util.h +++ b/cpp/src/arrow/compute/exec/partition_util.h @@ -37,7 +37,7 @@ class PartitionSort { /// This corresponds to ranges in the sorted array containing all row ids for /// each of the partitions. /// - /// prtn_ranges must be initailized and have at least prtn_ranges + 1 elements + /// prtn_ranges must be initailized and have at least num_prtns + 1 elements /// when this method returns prtn_ranges[i] will contains the total number of /// elements in partitions 0 through i. prtn_ranges[0] will be 0. /// @@ -115,6 +115,18 @@ class PartitionLocks { bool AcquirePartitionLock(size_t thread_id, int num_prtns, const int* prtns_to_try, bool limit_retries, int max_retries, int* locked_prtn_id, int* locked_prtn_id_pos); + + class [[nodiscard]] AutoReleaseLock + { + public: + AutoReleaseLock(PartitionLocks* locks, int prtn_id) + : locks(locks), prtn_id(prtn_id) {} + ~AutoReleaseLock() { locks->ReleasePartitionLock(prtn_id); } + PartitionLocks* locks; + int prtn_id; + }; + + AutoReleaseLock AcquirePartitionLock(int prtn_id); /// \brief Release a partition so that other threads can work on it void ReleasePartitionLock(int prtn_id); @@ -147,14 +159,7 @@ class PartitionLocks { /*limit_retries=*/false, /*max_retries=*/-1, &locked_prtn_id, &locked_prtn_id_pos); { - class AutoReleaseLock { - public: - AutoReleaseLock(PartitionLocks* locks, int prtn_id) - : locks(locks), prtn_id(prtn_id) {} - ~AutoReleaseLock() { locks->ReleasePartitionLock(prtn_id); } - PartitionLocks* locks; - int prtn_id; - } auto_release_lock(this, locked_prtn_id); + AutoReleaseLock auto_release_lock(this, locked_prtn_id); ARROW_RETURN_NOT_OK(process_prtn_fn(locked_prtn_id)); } if (locked_prtn_id_pos < num_unprocessed_partitions - 1) { diff --git a/cpp/src/arrow/compute/exec/schema_util.h b/cpp/src/arrow/compute/exec/schema_util.h index f2b14aa545060..a80238cc1571d 100644 --- a/cpp/src/arrow/compute/exec/schema_util.h +++ b/cpp/src/arrow/compute/exec/schema_util.h @@ -21,6 +21,7 @@ #include #include #include +#include #include "arrow/compute/light_array.h" // for KeyColumnMetadata #include "arrow/type.h" // for DataType, FieldRef, Field and Schema @@ -38,7 +39,8 @@ enum class HashJoinProjection : int { KEY = 1, PAYLOAD = 2, FILTER = 3, - OUTPUT = 4 + OUTPUT = 4, + NUM_VALUES = 5, }; struct SchemaProjectionMap { @@ -63,22 +65,46 @@ class SchemaProjectionMaps { public: static constexpr int kMissingField = -1; - Status Init(ProjectionIdEnum full_schema_handle, const Schema& schema, - const std::vector& projection_handles, - const std::vector*>& projections) { - ARROW_DCHECK(projection_handles.size() == projections.size()); - ARROW_RETURN_NOT_OK(RegisterSchema(full_schema_handle, schema)); - for (size_t i = 0; i < projections.size(); ++i) { - ARROW_RETURN_NOT_OK( - RegisterProjectedSchema(projection_handles[i], *(projections[i]), schema)); + Status Init(ProjectionIdEnum full_schema_handle, + const Schema& schema) + { + RETURN_NOT_OK(RegisterSchema(full_schema_handle, schema)); + const int id_base = 0; + std::vector &mapping = mappings_[id_base]; + std::vector &inverse = inverse_mappings_[id_base]; + mapping.resize(schema.num_fields()); + inverse.resize(schema.num_fields()); + std::iota(mapping.begin(), mapping.end(), 0); + std::iota(inverse.begin(), inverse.end(), 0); + return Status::OK(); } - RegisterEnd(); + + Status RegisterProjectedSchema(ProjectionIdEnum handle, + const std::vector& selected_fields, + const Schema& full_schema) { + FieldInfos out_fields; + const FieldVector& in_fields = full_schema.fields(); + out_fields.field_paths.resize(selected_fields.size()); + out_fields.field_names.resize(selected_fields.size()); + out_fields.data_types.resize(selected_fields.size()); + for (size_t i = 0; i < selected_fields.size(); ++i) { + // All fields must be found in schema without ambiguity + ARROW_ASSIGN_OR_RAISE(auto match, selected_fields[i].FindOne(full_schema)); + const std::string& name = in_fields[match[0]]->name(); + const std::shared_ptr& type = in_fields[match[0]]->type(); + out_fields.field_paths[i] = match[0]; + out_fields.field_names[i] = name; + out_fields.data_types[i] = type; + } + int id = schema_id(handle); + schemas_[id] = std::move(out_fields); + GenerateMapForProjection(id); return Status::OK(); } int num_cols(ProjectionIdEnum schema_handle) const { int id = schema_id(schema_handle); - return static_cast(schemas_[id].second.data_types.size()); + return static_cast(schemas_[id].data_types.size()); } bool is_empty(ProjectionIdEnum schema_handle) const { @@ -87,19 +113,19 @@ class SchemaProjectionMaps { const std::string& field_name(ProjectionIdEnum schema_handle, int field_id) const { int id = schema_id(schema_handle); - return schemas_[id].second.field_names[field_id]; + return schemas_[id].field_names[field_id]; } const std::shared_ptr& data_type(ProjectionIdEnum schema_handle, int field_id) const { int id = schema_id(schema_handle); - return schemas_[id].second.data_types[field_id]; + return schemas_[id].data_types[field_id]; } const std::vector>& data_types( ProjectionIdEnum schema_handle) const { int id = schema_id(schema_handle); - return schemas_[id].second.data_types; + return schemas_[id].data_types; } SchemaProjectionMap map(ProjectionIdEnum from, ProjectionIdEnum to) const { @@ -132,55 +158,21 @@ class SchemaProjectionMaps { out_fields.field_names[i] = name; out_fields.data_types[i] = type; } - schemas_.push_back(std::make_pair(handle, out_fields)); - return Status::OK(); - } - - Status RegisterProjectedSchema(ProjectionIdEnum handle, - const std::vector& selected_fields, - const Schema& full_schema) { - FieldInfos out_fields; - const FieldVector& in_fields = full_schema.fields(); - out_fields.field_paths.resize(selected_fields.size()); - out_fields.field_names.resize(selected_fields.size()); - out_fields.data_types.resize(selected_fields.size()); - for (size_t i = 0; i < selected_fields.size(); ++i) { - // All fields must be found in schema without ambiguity - ARROW_ASSIGN_OR_RAISE(auto match, selected_fields[i].FindOne(full_schema)); - const std::string& name = in_fields[match[0]]->name(); - const std::shared_ptr& type = in_fields[match[0]]->type(); - out_fields.field_paths[i] = match[0]; - out_fields.field_names[i] = name; - out_fields.data_types[i] = type; - } - schemas_.push_back(std::make_pair(handle, out_fields)); + schemas_[schema_id(handle)] = std::move(out_fields); return Status::OK(); } - void RegisterEnd() { - size_t size = schemas_.size(); - mappings_.resize(size); - inverse_mappings_.resize(size); - int id_base = 0; - for (size_t i = 0; i < size; ++i) { - GenerateMapForProjection(static_cast(i), id_base); - } - } - int schema_id(ProjectionIdEnum schema_handle) const { - for (size_t i = 0; i < schemas_.size(); ++i) { - if (schemas_[i].first == schema_handle) { - return static_cast(i); - } - } - // We should never get here - ARROW_DCHECK(false); - return -1; + int id = static_cast(schema_handle); + ARROW_DCHECK(id < static_cast(ProjectionIdEnum::NUM_VALUES)); + return id; } - void GenerateMapForProjection(int id_proj, int id_base) { - int num_cols_proj = static_cast(schemas_[id_proj].second.data_types.size()); - int num_cols_base = static_cast(schemas_[id_base].second.data_types.size()); + void GenerateMapForProjection(int id_proj) { + const int id_base = 0; + + int num_cols_proj = static_cast(schemas_[id_proj].data_types.size()); + int num_cols_base = static_cast(schemas_[id_base].data_types.size()); std::vector& mapping = mappings_[id_proj]; std::vector& inverse_mapping = inverse_mappings_[id_proj]; @@ -192,8 +184,8 @@ class SchemaProjectionMaps { mapping[i] = inverse_mapping[i] = i; } } else { - const FieldInfos& fields_proj = schemas_[id_proj].second; - const FieldInfos& fields_base = schemas_[id_base].second; + const FieldInfos& fields_proj = schemas_[id_proj]; + const FieldInfos& fields_base = schemas_[id_base]; for (int i = 0; i < num_cols_base; ++i) { inverse_mapping[i] = SchemaProjectionMap::kMissingField; } @@ -215,9 +207,9 @@ class SchemaProjectionMaps { } // vector used as a mapping from ProjectionIdEnum to fields - std::vector> schemas_; - std::vector> mappings_; - std::vector> inverse_mappings_; + std::array(ProjectionIdEnum::NUM_VALUES)> schemas_; + std::array, static_cast(ProjectionIdEnum::NUM_VALUES)> mappings_; + std::array, static_cast(ProjectionIdEnum::NUM_VALUES)> inverse_mappings_; }; using HashJoinProjectionMaps = SchemaProjectionMaps; diff --git a/cpp/src/arrow/compute/exec/swiss_join.cc b/cpp/src/arrow/compute/exec/swiss_join.cc index 86d5d8436576a..3129418be8cea 100644 --- a/cpp/src/arrow/compute/exec/swiss_join.cc +++ b/cpp/src/arrow/compute/exec/swiss_join.cc @@ -2025,13 +2025,10 @@ class SwissJoin : public HashJoinImpl { Status Init(QueryContext* ctx, JoinType join_type, size_t num_threads, const HashJoinProjectionMaps* proj_map_left, const HashJoinProjectionMaps* proj_map_right, - std::vector key_cmp, Expression filter, - RegisterTaskGroupCallback register_task_group_callback, - StartTaskGroupCallback start_task_group_callback, - OutputBatchCallback output_batch_callback, - FinishedCallback finished_callback) override { + std::vector *key_cmp, Expression *filter, + CallbackRecord callback_record) override { START_COMPUTE_SPAN(span_, "SwissJoinImpl", - {{"detail", filter.ToString()}, + {{"detail", filter->ToString()}, {"join.kind", arrow::compute::ToString(join_type)}, {"join.threads", static_cast(num_threads)}}); @@ -2041,18 +2038,12 @@ class SwissJoin : public HashJoinImpl { pool_ = ctx->memory_pool(); join_type_ = join_type; - key_cmp_.resize(key_cmp.size()); - for (size_t i = 0; i < key_cmp.size(); ++i) { - key_cmp_[i] = key_cmp[i]; - } + key_cmp_ = key_cmp; schema_[0] = proj_map_left; schema_[1] = proj_map_right; - register_task_group_callback_ = std::move(register_task_group_callback); - start_task_group_callback_ = std::move(start_task_group_callback); - output_batch_callback_ = std::move(output_batch_callback); - finished_callback_ = std::move(finished_callback); + callback_record_ = std::move(callback_record); hash_table_ready_.store(false); cancelled_.store(false); @@ -2077,7 +2068,7 @@ class SwissJoin : public HashJoinImpl { } probe_processor_.Init(proj_map_left->num_cols(HashJoinProjection::KEY), join_type_, - &hash_table_, materialize, &key_cmp_, output_batch_callback_); + &hash_table_, materialize, key_cmp_, callback_record_.output_batch); InitTaskGroups(); @@ -2085,17 +2076,17 @@ class SwissJoin : public HashJoinImpl { } void InitTaskGroups() { - task_group_build_ = register_task_group_callback_( + task_group_build_ = callback_record_.register_task_group( [this](size_t thread_index, int64_t task_id) -> Status { return BuildTask(thread_index, task_id); }, [this](size_t thread_index) -> Status { return BuildFinished(thread_index); }); - task_group_merge_ = register_task_group_callback_( + task_group_merge_ = callback_record_.register_task_group( [this](size_t thread_index, int64_t task_id) -> Status { return MergeTask(thread_index, task_id); }, [this](size_t thread_index) -> Status { return MergeFinished(thread_index); }); - task_group_scan_ = register_task_group_callback_( + task_group_scan_ = callback_record_.register_task_group( [this](size_t thread_index, int64_t task_id) -> Status { return ScanTask(thread_index, task_id); }, @@ -2175,14 +2166,14 @@ class SwissJoin : public HashJoinImpl { payload_types.push_back(metadata); } RETURN_NOT_OK(CancelIfNotOK(hash_table_build_.Init( - &hash_table_, num_threads_, build_side_batches_.row_count(), + &hash_table_, num_threads_, static_cast(build_side_batches_.CalculateRowCount()), reject_duplicate_keys, no_payload, key_types, payload_types, pool_, hardware_flags_))); // Process all input batches // return CancelIfNotOK( - start_task_group_callback_(task_group_build_, build_side_batches_.batch_count())); + callback_record_.start_task_group(task_group_build_, build_side_batches_.batch_count())); } Status BuildTask(size_t thread_id, int64_t batch_id) { @@ -2246,7 +2237,7 @@ class SwissJoin : public HashJoinImpl { // RETURN_NOT_OK(CancelIfNotOK(hash_table_build_.PreparePrtnMerge())); return CancelIfNotOK( - start_task_group_callback_(task_group_merge_, hash_table_build_.num_prtns())); + callback_record_.start_task_group(task_group_merge_, hash_table_build_.num_prtns())); } Status MergeTask(size_t /*thread_id*/, int64_t prtn_id) { @@ -2292,7 +2283,7 @@ class SwissJoin : public HashJoinImpl { hash_table_.MergeHasMatch(); int64_t num_tasks = bit_util::CeilDiv(hash_table_.num_rows(), kNumRowsPerScanTask); - return CancelIfNotOK(start_task_group_callback_(task_group_scan_, num_tasks)); + return CancelIfNotOK(callback_record_.start_task_group(task_group_scan_, num_tasks)); } else { return CancelIfNotOK(OnScanHashTableFinished()); } @@ -2364,7 +2355,7 @@ class SwissJoin : public HashJoinImpl { Status status = local_states_[thread_id].materialize.AppendBuildOnly( num_output_rows, key_ids_buf.mutable_data(), payload_ids_buf.mutable_data(), [&](ExecBatch batch) { - output_batch_callback_(static_cast(thread_id), std::move(batch)); + callback_record_.output_batch(static_cast(thread_id), std::move(batch)); }); RETURN_NOT_OK(CancelIfNotOK(status)); if (!status.ok()) { @@ -2402,9 +2393,7 @@ class SwissJoin : public HashJoinImpl { num_produced_batches += materialize.num_produced_batches(); } - finished_callback_(num_produced_batches); - - return Status::OK(); + return callback_record_.finished(num_produced_batches); } Result KeyPayloadFromInput(int side, ExecBatch* input) { @@ -2473,7 +2462,7 @@ class SwissJoin : public HashJoinImpl { MemoryPool* pool_; int num_threads_; JoinType join_type_; - std::vector key_cmp_; + std::vector *key_cmp_; const HashJoinProjectionMaps* schema_[2]; // Task scheduling @@ -2482,11 +2471,8 @@ class SwissJoin : public HashJoinImpl { int task_group_scan_; // Callbacks - RegisterTaskGroupCallback register_task_group_callback_; - StartTaskGroupCallback start_task_group_callback_; - OutputBatchCallback output_batch_callback_; + CallbackRecord callback_record_; BuildFinishedCallback build_finished_callback_; - FinishedCallback finished_callback_; struct ThreadLocalState { JoinResultMaterialize materialize; diff --git a/cpp/src/arrow/compute/light_array.cc b/cpp/src/arrow/compute/light_array.cc index caa392319b73f..4476f2e8e67ab 100644 --- a/cpp/src/arrow/compute/light_array.cc +++ b/cpp/src/arrow/compute/light_array.cc @@ -199,7 +199,8 @@ Status ColumnArraysFromExecBatch(const ExecBatch& batch, } void ResizableArrayData::Init(const std::shared_ptr& data_type, - MemoryPool* pool, int log_num_rows_min) { + MemoryPool* pool, int log_num_rows_min, + int64_t alignment) { #ifndef NDEBUG if (num_rows_allocated_ > 0) { ARROW_DCHECK(data_type_ != NULLPTR); @@ -212,6 +213,7 @@ void ResizableArrayData::Init(const std::shared_ptr& data_type, #endif Clear(/*release_buffers=*/false); log_num_rows_min_ = log_num_rows_min; + alignment_ = alignment; data_type_ = data_type; pool_ = pool; } @@ -248,7 +250,7 @@ Status ResizableArrayData::ResizeFixedLengthBuffers(int num_rows_new) { ARROW_ASSIGN_OR_RAISE( buffers_[kValidityBuffer], AllocateResizableBuffer( - bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, pool_)); + bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, alignment_, pool_)); memset(mutable_data(kValidityBuffer), 0, bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes); if (column_metadata.is_fixed_length) { @@ -257,6 +259,7 @@ Status ResizableArrayData::ResizeFixedLengthBuffers(int num_rows_new) { buffers_[kFixedLengthBuffer], AllocateResizableBuffer( bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, + alignment_, pool_)); memset(mutable_data(kFixedLengthBuffer), 0, bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes); @@ -265,18 +268,19 @@ Status ResizableArrayData::ResizeFixedLengthBuffers(int num_rows_new) { buffers_[kFixedLengthBuffer], AllocateResizableBuffer( num_rows_allocated_new * column_metadata.fixed_length + kNumPaddingBytes, + alignment_, pool_)); } } else { ARROW_ASSIGN_OR_RAISE( buffers_[kFixedLengthBuffer], AllocateResizableBuffer( - (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes, pool_)); + (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes, alignment_, pool_)); } ARROW_ASSIGN_OR_RAISE( buffers_[kVariableLengthBuffer], - AllocateResizableBuffer(sizeof(uint64_t) + kNumPaddingBytes, pool_)); + AllocateResizableBuffer(sizeof(uint64_t) + kNumPaddingBytes, alignment_, pool_)); var_len_buf_size_ = sizeof(uint64_t); } else { @@ -490,7 +494,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source ARROW_DCHECK(num_rows_before >= 0); int num_rows_after = num_rows_before + num_rows_to_append; if (target->num_rows() == 0) { - target->Init(source->type, pool, kLogNumRows); + target->Init(source->type, pool, kLogNumRows, kAlignment); } RETURN_NOT_OK(target->ResizeFixedLengthBuffers(num_rows_after)); @@ -637,7 +641,7 @@ Status ExecBatchBuilder::AppendNulls(const std::shared_ptr& type, int num_rows_before = target.num_rows(); int num_rows_after = num_rows_before + num_rows_to_append; if (target.num_rows() == 0) { - target.Init(type, pool, kLogNumRows); + target.Init(type, pool, kLogNumRows, kAlignment); } RETURN_NOT_OK(target.ResizeFixedLengthBuffers(num_rows_after)); @@ -698,7 +702,7 @@ Status ExecBatchBuilder::AppendSelected(MemoryPool* pool, const ExecBatch& batch const Datum& data = batch.values[col_ids ? col_ids[i] : i]; ARROW_DCHECK(data.is_array()); const std::shared_ptr& array_data = data.array(); - values_[i].Init(array_data->type, pool, kLogNumRows); + values_[i].Init(array_data->type, pool, kLogNumRows, kAlignment); } } @@ -729,7 +733,7 @@ Status ExecBatchBuilder::AppendNulls(MemoryPool* pool, if (values_.empty()) { values_.resize(types.size()); for (size_t i = 0; i < types.size(); ++i) { - values_[i].Init(types[i], pool, kLogNumRows); + values_[i].Init(types[i], pool, kLogNumRows, kAlignment); } } diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h index 389b63cca4143..4e848681b230c 100644 --- a/cpp/src/arrow/compute/light_array.h +++ b/cpp/src/arrow/compute/light_array.h @@ -270,7 +270,7 @@ class ARROW_EXPORT ResizableArrayData { /// \param log_num_rows_min All resize operations will allocate at least enough /// space for (1 << log_num_rows_min) rows void Init(const std::shared_ptr& data_type, MemoryPool* pool, - int log_num_rows_min); + int log_num_rows_min, int64_t alignment = MemoryPool::kDefaultAlignment); /// \brief Resets the array back to an empty state /// \param release_buffers If true then allocated memory is released and the @@ -325,6 +325,7 @@ class ARROW_EXPORT ResizableArrayData { private: static constexpr int64_t kNumPaddingBytes = 64; int log_num_rows_min_; + int64_t alignment_; std::shared_ptr data_type_; MemoryPool* pool_; int num_rows_; @@ -355,6 +356,11 @@ class ARROW_EXPORT ExecBatchBuilder { ResizableArrayData& target, int num_rows_to_append, MemoryPool* pool); + /// \brief Returns a non-owning view into the `col`th column. + KeyColumnArray column(size_t col) const { return values_[col].column_array(); } + + size_t num_cols() const { return values_.size(); } + /// \brief Add selected rows from `batch` /// /// If `col_ids` is null then `num_cols` should less than batch.num_values() and @@ -377,12 +383,17 @@ class ARROW_EXPORT ExecBatchBuilder { ExecBatch Flush(); int num_rows() const { return values_.empty() ? 0 : values_[0].num_rows(); } + bool is_full() const { return num_rows() == num_rows_max(); } static int num_rows_max() { return 1 << kLogNumRows; } private: static constexpr int kLogNumRows = 15; + // Align all buffers to 512 bytes so that we can spill them with + // DirectIO. + static constexpr int64_t kAlignment = 512; + // Calculate how many rows to skip from the tail of the // sequence of selected rows, such that the total size of skipped rows is at // least equal to the size specified by the caller. diff --git a/cpp/src/arrow/datum.cc b/cpp/src/arrow/datum.cc index d0b5cf62c61be..6d06facfa1e22 100644 --- a/cpp/src/arrow/datum.cc +++ b/cpp/src/arrow/datum.cc @@ -125,6 +125,7 @@ int64_t Datum::TotalBufferSize() const { case Datum::TABLE: return util::TotalBufferSize(*std::get>(this->value)); case Datum::SCALAR: + case Datum::NONE: return 0; default: DCHECK(false); diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index a62040f3a70f1..a88a9cfc878ca 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -106,9 +106,11 @@ #elif __APPLE__ #include +#include #elif __linux__ #include +#include #endif namespace arrow { @@ -2109,5 +2111,37 @@ int64_t GetCurrentRSS() { #endif } + int64_t GetTotalMemoryBytes() + { +#if defined(_WIN32) + ULONGLONG result_kb; + if(!GetPhysicallyInstalledSystemMemory(&result_kb)) + { + ARROW_LOG(WARNING) << "Failed to resolve total RAM size: " << std::strerror(GetLastError()); + return -1; + } + return static_cast(result_kb * 1024); +#elif defined(__APPLE__) + int64_t result; + size_t size = sizeof(result); + if(sysctlbyname("hw.memsize", &result, &size, nullptr, 0) == -1) + { + ARROW_LOG(WARNING) << "Failed to resolve total RAM size"; + return -1; + } + return result; +#elif defined(__linux__) + struct sysinfo info; + if(sysinfo(&info) == -1) + { + ARROW_LOG(WARNING) << "Failed to resolve total RAM size: " << std::strerror(errno); + return -1; + } + return static_cast(info.totalram * info.mem_unit); +#else + return 0; +#endif + } + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index df63de47e8386..43d85ec24e28b 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -410,5 +410,11 @@ uint64_t GetThreadId(); ARROW_EXPORT int64_t GetCurrentRSS(); +/// \brief Get the total memory available to the system in bytes +/// +/// This function supports Windows, Linux, and Mac and will return 0 otherwise +ARROW_EXPORT +int64_t GetTotalMemoryBytes(); + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/io_util_test.cc b/cpp/src/arrow/util/io_util_test.cc index f4fcc26d07201..6f0c855cef849 100644 --- a/cpp/src/arrow/util/io_util_test.cc +++ b/cpp/src/arrow/util/io_util_test.cc @@ -1049,5 +1049,18 @@ TEST(CpuInfo, Basic) { ASSERT_EQ(ci->hardware_flags(), original_hardware_flags); } +TEST(Memory, TotalMemory) +{ +#if defined(_WIN32) + ASSERT_GT(GetTotalMemoryBytes(), 0); +#elif defined(__APPLE__) + ASSERT_GT(GetTotalMemoryBytes(), 0); +#elif defined(__linux__) + ASSERT_GT(GetTotalMemoryBytes(), 0); +#else + ASSERT_EQ(GetTotalMemoryBytes(), 0); +#endif +} + } // namespace internal } // namespace arrow