From a1b3b13d2ffa2538da7a19a05a895ae39bf4612b Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Thu, 9 Jun 2022 13:37:01 -0700 Subject: [PATCH] Add spilling for hash join --- cpp/src/arrow/CMakeLists.txt | 2 + cpp/src/arrow/buffer.h | 7 + .../arrow/compute/exec/accumulation_queue.cc | 121 +++++- .../arrow/compute/exec/accumulation_queue.h | 41 +- cpp/src/arrow/compute/exec/aggregate_node.cc | 20 +- cpp/src/arrow/compute/exec/asof_join_node.cc | 2 +- cpp/src/arrow/compute/exec/exec_plan.cc | 26 +- cpp/src/arrow/compute/exec/exec_plan.h | 28 +- cpp/src/arrow/compute/exec/filter_node.cc | 4 +- cpp/src/arrow/compute/exec/hash_join.cc | 4 +- cpp/src/arrow/compute/exec/hash_join.h | 2 - cpp/src/arrow/compute/exec/hash_join_node.cc | 401 ++++++++++++++---- cpp/src/arrow/compute/exec/hash_join_node.h | 4 + cpp/src/arrow/compute/exec/project_node.cc | 4 +- cpp/src/arrow/compute/exec/query_context.cc | 65 +++ cpp/src/arrow/compute/exec/query_context.h | 90 ++++ cpp/src/arrow/compute/exec/schema_util.h | 108 ++--- cpp/src/arrow/compute/exec/sink_node.cc | 8 +- cpp/src/arrow/compute/exec/source_node.cc | 4 +- cpp/src/arrow/compute/exec/spilling_util.cc | 278 ++++++++++++ cpp/src/arrow/compute/exec/spilling_util.h | 47 ++ cpp/src/arrow/compute/exec/swiss_join.cc | 4 +- cpp/src/arrow/compute/light_array.cc | 20 +- cpp/src/arrow/compute/light_array.h | 5 +- cpp/src/arrow/dataset/scanner.cc | 6 +- cpp/src/arrow/datum.cc | 1 + cpp/src/arrow/memory_pool.cc | 169 +++++--- cpp/src/arrow/memory_pool.h | 24 +- cpp/src/arrow/memory_pool_internal.h | 8 +- cpp/src/arrow/memory_pool_jemalloc.cc | 20 +- cpp/src/arrow/stl_allocator.h | 6 +- cpp/src/arrow/util/io_util.cc | 33 ++ cpp/src/arrow/util/io_util.h | 6 + 33 files changed, 1268 insertions(+), 300 deletions(-) create mode 100644 cpp/src/arrow/compute/exec/query_context.cc create mode 100644 cpp/src/arrow/compute/exec/query_context.h create mode 100644 cpp/src/arrow/compute/exec/spilling_util.cc create mode 100644 cpp/src/arrow/compute/exec/spilling_util.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 93dd1297bd744..197bb16c4956a 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -402,8 +402,10 @@ if(ARROW_COMPUTE) compute/exec/partition_util.cc compute/exec/options.cc compute/exec/project_node.cc + compute/exec/query_context.cc compute/exec/sink_node.cc compute/exec/source_node.cc + compute/exec/spilling_util.cc compute/exec/swiss_join.cc compute/exec/task_util.cc compute/exec/tpch_node.cc diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index 8be10d282b060..51367e6b8b412 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -460,6 +460,10 @@ class ARROW_EXPORT ResizableBuffer : public MutableBuffer { ARROW_EXPORT Result> AllocateBuffer(const int64_t size, MemoryPool* pool = NULLPTR); +ARROW_EXPORT +Result> AllocateBuffer(const int64_t size, + int64_t alignment, + MemoryPool* pool = NULLPTR); /// \brief Allocate a resizeable buffer from a memory pool, zero its padding. /// @@ -468,6 +472,9 @@ Result> AllocateBuffer(const int64_t size, ARROW_EXPORT Result> AllocateResizableBuffer( const int64_t size, MemoryPool* pool = NULLPTR); +ARROW_EXPORT +Result> AllocateResizableBuffer( + const int64_t size, const int64_t alignment, MemoryPool* pool = NULLPTR); /// \brief Allocate a bitmap buffer from a memory pool /// no guarantee on values is provided. diff --git a/cpp/src/arrow/compute/exec/accumulation_queue.cc b/cpp/src/arrow/compute/exec/accumulation_queue.cc index 192db52942820..c33013d559889 100644 --- a/cpp/src/arrow/compute/exec/accumulation_queue.cc +++ b/cpp/src/arrow/compute/exec/accumulation_queue.cc @@ -16,12 +16,13 @@ // under the License. #include "arrow/compute/exec/accumulation_queue.h" +#include "arrow/compute/exec/key_hash.h" #include namespace arrow { -namespace util { -using arrow::compute::ExecBatch; +namespace compute { + AccumulationQueue::AccumulationQueue(AccumulationQueue&& that) { this->batches_ = std::move(that.batches_); this->row_count_ = that.row_count_; @@ -54,5 +55,121 @@ void AccumulationQueue::Clear() { } ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; } + + Status SpillingAccumulationQueue::Init(QueryContext *ctx) + { + ctx_ = ctx; + partition_locks_.Init(ctx_->max_concurrency(), kNumPartitions); + return Status::OK(); + } + + Status SpillingAccumulationQueue::InsertBatch( + size_t thread_index, + ExecBatch batch, + const Datum &hashes64) + { + for(int64_t i = 0; i < batch.length; i += util::MiniBatch::kMiniBatchLength) + { + int64_t length = std::min(static_cast(batch.length - i), + static_cast(util::MiniBatch::kMiniBatchLength)); + + RETURN_NOT_OK(PartitionBatchIntoBuilders( + thread_index, + hashes64, + batch, + i, + length)); + } + return Status::OK(); + } + + Status SpillingAccumulationQueue::PartitionBatchIntoBuilders( + size_t thread_index, + Datum &hashes64, + ExecBatch &batch, + int64_t start, + int64_t length) + { + const uint64_t *hashes = hashes64.array()-> + uint16_t row_ids[util::MiniBatch::kMiniBatchLength]; + std::iota(row_ids, row_ids + util::MiniBatch::kMiniBatchLength, 0); + uint16_t part_starts[kNumPartitions + 1]; + PartitionSort::Eval( + util::MiniBatch::kMiniBatchLength, + kNumPartitions, + part_starts, + [&](int64_t i) + { + return hashes[i] & (kNumPartitions - 1); + }, + [&row_ids, start](int64_t i, int output_pos) + { + row_ids[i] = static_cast(start + output_pos); + }); + int num_unprocessed_partitions = 0; + int unprocessed_partition_ids[kNumPartitions]; + for(int i = 0; i < kNumPartitions; i++) + { + bool is_part_empty = (part_starts[i + 1] == part_starts[i]); + if(is_part_empty) + unprocessed_partition_ids[num_unprocessed_partitions++] = i; + } + while(num_unprocessed_partitions > 0) + { + int locked_part_id; + int locked_part_id_pos; + partition_locks_.AcquirePartitionLock( + thread_index, + num_unprocessed_partitions, + unprocessed_partition_ids, + /*limit_retries=*/false, + /*max_retries=*/-1, + &locked_part_id, + &locked_part_id_pos); + + uint64_t num_total_rows_to_append = + part_starts[locked_part_id + 1] - part_starts[locked_part_id]; + + while(num_total_rows_to_append > 0) + { + int num_rows_to_append = std::min( + static_cast(num_total_rows_to_append), + static_cast(ExecBatchBuilder::num_rows_max() - builders_[locked_part_id].num_rows())); + + RETURN_NOT_OK(builders_[locked_part_id].AppendSelected( + ctx_->memory_pool(), + batch, + num_rows_to_append, + row_ids + part_starts[locked_part_id], + batch.num_values())); + + RETURN_NOT_OK(hash_builders_[locked_part_id].AppendSelected( + ctx_->memory_pool(), + { { + + if(builders_[locked_part_id].is_full()) + { + ExecBatch batch = builders_[locked_part_id].Flush(); + if(locked_part_id < spilling_cursor_) + files_[locked_part_id].SpillBatch(ctx_, std::move(batch)); + else + queues_[locked_part_id].InsertBatch(std::move(batch)); + } + part_starts[locked_part_id] += num_rows_to_append; + num_total_rows_to_append -= num_rows_to_append; + } + + partition_locks_.ReleasePartitionLock(locked_part_id); + if (locked_part_id_pos < num_unprocessed_partitions - 1) + { + unprocessed_partition_ids[locked_part_id_pos] = + unprocessed_partition_ids[num_unprocessed_partitions - 1]; + } + num_unprocessed_partitions -= 1; + } + return Status::OK(); + } + + } // namespace util } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/accumulation_queue.h b/cpp/src/arrow/compute/exec/accumulation_queue.h index 4b23e5ffcac54..5b6c017fc6c82 100644 --- a/cpp/src/arrow/compute/exec/accumulation_queue.h +++ b/cpp/src/arrow/compute/exec/accumulation_queue.h @@ -21,10 +21,13 @@ #include #include "arrow/compute/exec.h" +#include "arrow/compute/light_array.h" +#include "arrow/compute/exec/partition_util.h" +#include "arrow/compute/exec/task_util.h" +#include "arrow/compute/exec/spilling_util.h" namespace arrow { -namespace util { -using arrow::compute::ExecBatch; +namespace compute { /// \brief A container that accumulates batches until they are ready to /// be processed. @@ -53,5 +56,37 @@ class AccumulationQueue { std::vector batches_; }; -} // namespace util +class SpillingAccumulationQueue +{ +public: + static constexpr int kNumPartitions = 64; + Status Init(QueryContext *ctx); + Status InsertBatch( + size_t thread_index, + ExecBatch batch, + Datum hashes64); + +private: + Status PartitionBatchIntoBuilders( + size_t thread_index, + const uint64_t *hashes, + ExecBatch &batch, + int64_t start, + int64_t length); + + int spilling_cursor_ = 0; // denotes the first in-memory partition + QueryContext* ctx_; + PartitionLocks partition_locks_; + + AccumulationQueue queues_[kNumPartitions]; + AccumulationQueue hash_queues_[kNumPartitions]; + + ExecBatchBuilder builders_[kNumPartitions]; + ExecBatchBuilder hash_builders_[kNumPartitions]; + + SpillFile files_[kNumPartitions]; + SpillFile hash_files_[kNumPartitions]; +}; + +} // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc index cca266ad691d3..cace30c7d5d90 100644 --- a/cpp/src/arrow/compute/exec/aggregate_node.cc +++ b/cpp/src/arrow/compute/exec/aggregate_node.cc @@ -83,7 +83,7 @@ class ScalarAggregateNode : public ExecNode { auto aggregates = aggregate_options.aggregates; const auto& input_schema = *inputs[0]->output_schema(); - auto exec_ctx = plan->exec_context(); + auto exec_ctx = plan->query_context()->exec_context(); std::vector kernels(aggregates.size()); std::vector>> states(kernels.size()); @@ -150,7 +150,7 @@ class ScalarAggregateNode : public ExecNode { {"function.options", aggs_[i].options ? aggs_[i].options->ToString() : ""}, {"function.kind", std::string(kind_name()) + "::Consume"}}); - KernelContext batch_ctx{plan()->exec_context()}; + KernelContext batch_ctx{plan()->query_context()->exec_context()}; batch_ctx.SetState(states_[i][thread_index].get()); ExecSpan single_column_batch{{batch.values[target_field_ids_[i]]}, batch.length}; @@ -245,7 +245,7 @@ class ScalarAggregateNode : public ExecNode { {"function.options", aggs_[i].options ? aggs_[i].options->ToString() : ""}, {"function.kind", std::string(kind_name()) + "::Finalize"}}); - KernelContext ctx{plan()->exec_context()}; + KernelContext ctx{plan()->query_context()->exec_context()}; ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll( kernels_[i], &ctx, std::move(states_[i]))); RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[i])); @@ -267,13 +267,12 @@ class ScalarAggregateNode : public ExecNode { class GroupByNode : public ExecNode { public: - GroupByNode(ExecNode* input, std::shared_ptr output_schema, ExecContext* ctx, + GroupByNode(ExecNode* input, std::shared_ptr output_schema, std::vector key_field_ids, std::vector agg_src_field_ids, std::vector aggs, std::vector agg_kernels) : ExecNode(input->plan(), {input}, {"groupby"}, std::move(output_schema), /*num_outputs=*/1), - ctx_(ctx), key_field_ids_(std::move(key_field_ids)), agg_src_field_ids_(std::move(agg_src_field_ids)), aggs_(std::move(aggs)), @@ -326,7 +325,7 @@ class GroupByNode : public ExecNode { agg_src_types[i] = input_schema->field(agg_src_field_id)->type().get(); } - auto ctx = input->plan()->exec_context(); + auto ctx = plan->query_context()->exec_context(); // Construct aggregates ARROW_ASSIGN_OR_RAISE(auto agg_kernels, @@ -354,7 +353,7 @@ class GroupByNode : public ExecNode { } return input->plan()->EmplaceNode( - input, schema(std::move(output_fields)), ctx, std::move(key_field_ids), + input, schema(std::move(output_fields)), std::move(key_field_ids), std::move(agg_src_field_ids), std::move(aggs), std::move(agg_kernels)); } @@ -393,7 +392,8 @@ class GroupByNode : public ExecNode { {"function.options", aggs_[i].options ? aggs_[i].options->ToString() : ""}, {"function.kind", std::string(kind_name()) + "::Consume"}}); - KernelContext kernel_ctx{ctx_}; + auto ctx = plan_->query_context()->exec_context(); + KernelContext kernel_ctx{ctx}; kernel_ctx.SetState(state->agg_states[i].get()); ExecSpan agg_batch({batch[agg_src_field_ids_[i]], ExecValue(*id_batch.array())}, @@ -429,7 +429,9 @@ class GroupByNode : public ExecNode { {"function.options", aggs_[i].options ? aggs_[i].options->ToString() : ""}, {"function.kind", std::string(kind_name()) + "::Merge"}}); - KernelContext batch_ctx{ctx_}; + + auto ctx = plan_->query_context()->exec_context(); + KernelContext batch_ctx{ctx}; DCHECK(state0->agg_states[i]); batch_ctx.SetState(state0->agg_states[i].get()); diff --git a/cpp/src/arrow/compute/exec/asof_join_node.cc b/cpp/src/arrow/compute/exec/asof_join_node.cc index 3da612aa03e41..0ca90256f7af2 100644 --- a/cpp/src/arrow/compute/exec/asof_join_node.cc +++ b/cpp/src/arrow/compute/exec/asof_join_node.cc @@ -550,7 +550,7 @@ class AsofJoinNode : public ExecNode { if (dst.empty()) { return NULLPTR; } else { - return dst.Materialize(plan()->exec_context()->memory_pool(), output_schema(), + return dst.Materialize(plan()->query_context()->memory_pool(), output_schema(), state_); } } diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 15d95690076a1..776a536cdbc27 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -45,7 +45,7 @@ namespace compute { namespace { struct ExecPlanImpl : public ExecPlan { - explicit ExecPlanImpl(ExecContext* exec_context, + explicit ExecPlanImpl(ExecContext *exec_context, std::shared_ptr metadata = NULLPTR) : ExecPlan(exec_context), metadata_(std::move(metadata)) {} @@ -57,8 +57,8 @@ struct ExecPlanImpl : public ExecPlan { } } - size_t GetThreadIndex() { return thread_indexer_(); } - size_t max_concurrency() const { return thread_indexer_.Capacity(); } + size_t GetThreadIndex() { return query_context_.GetThreadIndex(); } + size_t max_concurrency() const { return query_context_.max_concurrency(); } ExecNode* AddNode(std::unique_ptr node) { if (node->label().empty()) { @@ -87,7 +87,7 @@ struct ExecPlanImpl : public ExecPlan { } Status ScheduleTask(std::function fn) { - auto executor = exec_context_->executor(); + auto executor = query_context()->executor(); if (!executor) return fn(); // Adds a task which submits fn to the executor and tracks its progress. If we're // already stopping then the task is ignored and fn is not executed. @@ -140,6 +140,8 @@ struct ExecPlanImpl : public ExecPlan { return Status::Invalid("restarted ExecPlan"); } + RETURN_NOT_OK(query_context()->Init(max_concurrency())); + std::vector> futures; for (auto& n : nodes_) { RETURN_NOT_OK(n->Init()); @@ -154,7 +156,7 @@ struct ExecPlanImpl : public ExecPlan { task_scheduler_->RegisterEnd(); int num_threads = 1; bool sync_execution = true; - if (auto executor = exec_context()->executor()) { + if (auto executor = query_context()->executor()) { num_threads = executor->GetCapacity(); sync_execution = false; } @@ -326,7 +328,6 @@ struct ExecPlanImpl : public ExecPlan { util::tracing::Span span_; std::shared_ptr metadata_; - ThreadIndexer thread_indexer_; std::atomic group_ended_{false}; util::AsyncTaskGroup task_group_; std::unique_ptr task_scheduler_ = TaskScheduler::Make(); @@ -351,8 +352,15 @@ util::optional GetNodeIndex(const std::vector& nodes, const uint32_t ExecPlan::kMaxBatchSize; Result> ExecPlan::Make( - ExecContext* ctx, std::shared_ptr metadata) { - return std::shared_ptr(new ExecPlanImpl{ctx, metadata}); + QueryOptions opts, + ExecContext *ctx, + std::shared_ptr metadata) { + return std::shared_ptr(new ExecPlanImpl{opts, ctx, std::move(metadata)}); +} + +Result> ExecPlan::Make( + ExecContext *ctx, std::shared_ptr metadata) { + return Make({}, ctx, std::move(metadata)); } ExecNode* ExecPlan::AddNode(std::unique_ptr node) { @@ -475,7 +483,7 @@ MapNode::MapNode(ExecPlan* plan, std::vector inputs, std::move(output_schema), /*num_outputs=*/1) { if (async_mode) { - executor_ = plan_->exec_context()->executor(); + executor_ = plan_->query_context()->executor(); } else { executor_ = nullptr; } diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index 5e52f606a69d5..ee44df7fcdd18 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -24,6 +24,7 @@ #include "arrow/compute/exec.h" #include "arrow/compute/exec/util.h" +#include "arrow/compute/exec/query_context.h" #include "arrow/compute/type_fwd.h" #include "arrow/type_fwd.h" #include "arrow/util/async_util.h" @@ -46,11 +47,16 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this { virtual ~ExecPlan() = default; - ExecContext* exec_context() const { return exec_context_; } + QueryContext* query_context() { return &query_context_; } /// Make an empty exec plan static Result> Make( - ExecContext* = default_exec_context(), + QueryOptions options, + ExecContext *exec_context = default_exec_context(), + std::shared_ptr metadata = NULLPTR); + + static Result> Make( + ExecContext *exec_context = default_exec_context(), std::shared_ptr metadata = NULLPTR); ExecNode* AddNode(std::unique_ptr node); @@ -146,25 +152,11 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this { /// \brief Return the plan's attached metadata std::shared_ptr metadata() const; - /// \brief Should the plan use a legacy batching strategy - /// - /// This is currently in place only to support the Scanner::ToTable - /// method. This method relies on batch indices from the scanner - /// remaining consistent. This is impractical in the ExecPlan which - /// might slice batches as needed (e.g. for a join) - /// - /// However, it still works for simple plans and this is the only way - /// we have at the moment for maintaining implicit order. - bool UseLegacyBatching() const { return use_legacy_batching_; } - // For internal use only, see above comment - void SetUseLegacyBatching(bool value) { use_legacy_batching_ = value; } - std::string ToString() const; protected: - ExecContext* exec_context_; - bool use_legacy_batching_ = false; - explicit ExecPlan(ExecContext* exec_context) : exec_context_(exec_context) {} + QueryContext query_context_; + explicit ExecPlan(QueryOptions options, ExecContext *exec_ctx) : query_context_(options, *exec_ctx) {} }; class ARROW_EXPORT ExecNode { diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index b424da35f8529..6e5bea622b398 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -51,7 +51,7 @@ class FilterNode : public MapNode { auto filter_expression = filter_options.filter_expression; if (!filter_expression.IsBound()) { ARROW_ASSIGN_OR_RAISE(filter_expression, - filter_expression.Bind(*schema, plan->exec_context())); + filter_expression.Bind(*schema, plan->query_context()->exec_context())); } if (filter_expression.type()->id() != Type::BOOL) { @@ -77,7 +77,7 @@ class FilterNode : public MapNode { {"filter.length", target.length}}); ARROW_ASSIGN_OR_RAISE(Datum mask, ExecuteScalarExpression(simplified_filter, target, - plan()->exec_context())); + plan()->query_context()->exec_context())); if (mask.is_scalar()) { const auto& mask_scalar = mask.scalar_as(); diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index 07a3083fb92ad..28447aa270928 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -123,7 +123,7 @@ class HashJoinBasicImpl : public HashJoinImpl { int num_cols = schema_[side]->num_cols(projection_handle); projected.values.resize(num_cols); - auto to_input = schema_[side]->map(projection_handle, HashJoinProjection::INPUT); + auto to_input = schema_[side]->map(projection_handle, HashJoinProjection::CANONICAL); for (int icol = 0; icol < num_cols; ++icol) { projected.values[icol] = batch.values[to_input.get(icol)]; } @@ -286,7 +286,7 @@ class HashJoinBasicImpl : public HashJoinImpl { }; SchemaProjectionMap left_to_key = - schema_[0]->map(HashJoinProjection::FILTER, HashJoinProjection::KEY); + schema_[0]->map(HashJoinProjection::FILTER, HashJoinProjection::CANONICAL); SchemaProjectionMap left_to_pay = schema_[0]->map(HashJoinProjection::FILTER, HashJoinProjection::PAYLOAD); SchemaProjectionMap right_to_key = diff --git a/cpp/src/arrow/compute/exec/hash_join.h b/cpp/src/arrow/compute/exec/hash_join.h index 0c5e43467e911..44b6be4c6bba7 100644 --- a/cpp/src/arrow/compute/exec/hash_join.h +++ b/cpp/src/arrow/compute/exec/hash_join.h @@ -34,8 +34,6 @@ namespace arrow { namespace compute { -using arrow::util::AccumulationQueue; - class HashJoinImpl { public: using OutputBatchCallback = std::function; diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index 44667b9f28382..32aee2bb55ed9 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -86,6 +86,56 @@ Result> HashJoinSchema::ComputePayload( return payload_refs; } +Result> HashJoinSchema::ComputeCanonicalColumnOrder( + const Schema &schema, + const SchemaProjectionMaps &maps) +{ + int num_keys = maps.num_cols(HashJoinProjection::KEY); + int num_pays = maps.num_cols(HashJoinProjection::PAYLOAD); + int num_filt = maps.num_cols(HashJoinProjection::FILTER); + + int num_key_filters = 0; + int num_pay_filters = 0; + auto filt_to_key = maps.map(HashJoinProjection::FILTER, HashJoinProjection::KEY); + auto filt_to_pay = maps.map(HashJoinProjection::FILTER, HashJoinProjection::PAYLOAD); + for(int i = 0; i < num_filt; i++) + { + num_key_filters += (filt_to_key.get(i) != SchemaProjectionMap::kMissingField); + num_pay_filters += (filt_to_pay.get(i) != SchemaProjectionMap::kMissingField); + } + + // Inclusion-Exclusion principle. Note that a column cannot be both a key and + // a payload column, since a payload is a non-key output column + int num_cols = num_keys + num_pays + num_filt + - 0 // KEY ∩ PAYLOAD + - num_key_filters // KEY ∩ FILTER + - num_pay_filters // PAYLOAD ∩ FILTER + + 0; // KEY ∩ PAYLOAD ∩ FILTER + + std::vector cols(num_cols); + auto to_key = maps.map(HashJoinProjection::INPUT, HashJoinProjection::KEY); + auto to_pay = maps.map(HashJoinProjection::INPUT, HashJoinProjection::PAYLOAD); + auto to_fil = maps.map(HashJoinProjection::INPUT, HashJoinProjection::FILTER); + for(int i = 0; i < num_cols; i++) + { + int key_idx = to_key.get(i); + int pay_idx = to_pay.get(i); + int fil_idx = to_fil.get(i); +#ifndef NDEBUG + bool is_key = key_idx != SchemaProjectionMap::kMissingField; + bool is_pay = pay_idx != SchemaProjectionMap::kMissingField; + ARROW_DCHECK(!(is_key && is_pay)); +#endif + if(key_idx != SchemaProjectionMap::kMissingField) + cols[key_idx] = FieldRef(i); + else if(pay_idx != SchemaProjectionMap::kMissingField) + cols[pay_idx + num_keys] = FieldRef(i); + else if(fil_idx != SchemaProjectionMap::kMissingField) + cols[fil_idx + num_keys + num_pays] = FieldRef(i); + } + return cols; +} + Status HashJoinSchema::Init(JoinType join_type, const Schema& left_schema, const std::vector& left_keys, const Schema& right_schema, @@ -124,49 +174,33 @@ Status HashJoinSchema::Init( right_schema, right_keys, right_output, left_field_name_suffix, right_field_name_suffix)); - std::vector handles; - std::vector*> field_refs; - std::vector left_filter, right_filter; RETURN_NOT_OK( CollectFilterColumns(left_filter, right_filter, filter, left_schema, right_schema)); - - handles.push_back(HashJoinProjection::KEY); - field_refs.push_back(&left_keys); - ARROW_ASSIGN_OR_RAISE(auto left_payload, ComputePayload(left_schema, left_output, left_filter, left_keys)); - handles.push_back(HashJoinProjection::PAYLOAD); - field_refs.push_back(&left_payload); - handles.push_back(HashJoinProjection::FILTER); - field_refs.push_back(&left_filter); - - handles.push_back(HashJoinProjection::OUTPUT); - field_refs.push_back(&left_output); - - RETURN_NOT_OK( - proj_maps[0].Init(HashJoinProjection::INPUT, left_schema, handles, field_refs)); - - handles.clear(); - field_refs.clear(); - - handles.push_back(HashJoinProjection::KEY); - field_refs.push_back(&right_keys); + RETURN_NOT_OK(proj_maps[0].Init(HashJoinProjection::INPUT, left_schema)); + RETURN_NOT_OK(proj_maps[0].RegisterProjectedSchema(HashJoinProjection::KEY, left_keys, left_schema)); + RETURN_NOT_OK(proj_maps[0].RegisterProjectedSchema(HashJoinProjection::PAYLOAD, left_payload, left_schema)); + RETURN_NOT_OK(proj_maps[0].RegisterProjectedSchema(HashJoinProjection::FILTER, left_filter, left_schema)); + RETURN_NOT_OK(proj_maps[0].RegisterProjectedSchema(HashJoinProjection::OUTPUT, left_output, left_schema)); + + ARROW_ASSIGN_OR_RAISE(std::vector left_canon, + ComputeCanonicalColumnOrder(left_schema, proj_maps[0])); + RETURN_NOT_OK(proj_maps[0].RegisterProjectedSchema(HashJoinProjection::CANONICAL, left_canon, left_schema)); ARROW_ASSIGN_OR_RAISE(auto right_payload, ComputePayload(right_schema, right_output, right_filter, right_keys)); - handles.push_back(HashJoinProjection::PAYLOAD); - field_refs.push_back(&right_payload); - - handles.push_back(HashJoinProjection::FILTER); - field_refs.push_back(&right_filter); + RETURN_NOT_OK(proj_maps[1].Init(HashJoinProjection::INPUT, right_schema)); + RETURN_NOT_OK(proj_maps[1].RegisterProjectedSchema(HashJoinProjection::KEY, right_keys, right_schema)); + RETURN_NOT_OK(proj_maps[1].RegisterProjectedSchema(HashJoinProjection::PAYLOAD, right_payload, right_schema)); + RETURN_NOT_OK(proj_maps[1].RegisterProjectedSchema(HashJoinProjection::FILTER, right_filter, right_schema)); + RETURN_NOT_OK(proj_maps[1].RegisterProjectedSchema(HashJoinProjection::OUTPUT, right_output, right_schema)); - handles.push_back(HashJoinProjection::OUTPUT); - field_refs.push_back(&right_output); - - RETURN_NOT_OK( - proj_maps[1].Init(HashJoinProjection::INPUT, right_schema, handles, field_refs)); + ARROW_ASSIGN_OR_RAISE(std::vector right_canon, + ComputeCanonicalColumnOrder(left_schema, proj_maps[1])); + RETURN_NOT_OK(proj_maps[1].RegisterProjectedSchema(HashJoinProjection::CANONICAL, right_canon, right_schema)); return Status::OK(); } @@ -341,7 +375,7 @@ std::shared_ptr HashJoinSchema::MakeOutputSchema( Result HashJoinSchema::BindFilter(Expression filter, const Schema& left_schema, const Schema& right_schema, - ExecContext* exec_context) { + ExecContext *exec_context) { if (filter.IsBound() || filter == literal(true)) { return std::move(filter); } @@ -475,6 +509,7 @@ Status ValidateHashJoinNodeOptions(const HashJoinNodeOptions& join_options) { class HashJoinNode; + // This is a struct encapsulating things related to Bloom filters and pushing them around // between HashJoinNodes. The general strategy is to notify other joins at plan-creation // time for that join to expect a Bloom filter. Once the full build side has been @@ -550,7 +585,7 @@ struct BloomFilterPushdownContext { std::vector hashes(batch.length); std::vector bv(bit_vector_bytes); - ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack, GetStack(thread_index)); + ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack, ctx_->GetTempStack(thread_index)); // Start with full selection for the current batch memset(selected.data(), 0xff, bit_vector_bytes); @@ -586,7 +621,7 @@ struct BloomFilterPushdownContext { for (size_t i = 0; i < batch.values.size(); i++) { if (!batch.values[i].is_scalar()) { ARROW_ASSIGN_OR_RAISE(batch.values[i], - Filter(batch.values[i], selected_datum, options, ctx_)); + Filter(batch.values[i], selected_datum, options, ctx_->exec_context())); first_nonscalar = std::min(first_nonscalar, i); ARROW_DCHECK_EQ(batch.values[i].length(), batch.values[first_nonscalar].length()); } @@ -617,25 +652,10 @@ struct BloomFilterPushdownContext { // the disable_bloom_filter_ flag. std::pair> GetPushdownTarget(HashJoinNode* start); - Result GetStack(size_t thread_index) { - if (!tld_[thread_index].is_init) { - RETURN_NOT_OK(tld_[thread_index].stack.Init( - ctx_->memory_pool(), 4 * util::MiniBatch::kMiniBatchLength * sizeof(uint32_t))); - tld_[thread_index].is_init = true; - } - return &tld_[thread_index].stack; - } - StartTaskGroupCallback start_task_group_callback_; bool disable_bloom_filter_; HashJoinSchema* schema_mgr_; - ExecContext* ctx_; - - struct ThreadLocalData { - bool is_init = false; - util::TempVectorStack stack; - }; - std::vector tld_; + QueryContext* ctx_; struct { int task_id_; @@ -737,7 +757,7 @@ class HashJoinNode : public ExecNode { ARROW_ASSIGN_OR_RAISE(Expression filter, schema_mgr->BindFilter(join_options.filter, left_schema, - right_schema, plan->exec_context())); + right_schema, plan->query_context()->exec_context())); // Generate output schema std::shared_ptr output_schema = schema_mgr->MakeOutputSchema( @@ -770,18 +790,71 @@ class HashJoinNode : public ExecNode { const char* kind_name() const override { return "HashJoinNode"; } - Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) { - std::lock_guard guard(build_side_mutex_); - build_accumulator_.InsertBatch(std::move(batch)); - return Status::OK(); - } + Status CanonicalizeColumnOrder( + ExecBatch *batch, + size_t thread_index, + const SchemaProjectionMaps &maps) + { + auto can_to_in = maps.map(HashJoinProjection::CANONICAL, HashJoinProjection::INPUT); + std::vector result(can_to_in.num_cols + 1); + for(int i = 0; i < can_to_in.num_cols; i++) + { + int in_idx = can_to_in.get(i); + result[i] = std::move((*batch).values[in_idx]); + } + ARROW_ASSIGN_OR_RAISE(std::unique_ptr hash_buf, + AllocateBuffer(sizeof(uint64_t) * batch->length, + plan_->query_context()->memory_pool())); + uint64_t *hashes = reinterpret_cast(hash_buf->mutable_data()); + std::vector temp_column_arrays; + int num_keys = maps.num_cols(HashJoinProjection::KEY); + std::vector key_cols(num_keys); + for(int i = 0; i < num_keys; i++) + key_cols[i] = result[i]; + + ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack, + plan_->query_context()->GetTempStack(thread_index)); + + ExecBatch key_batch(std::move(key_cols), batch->length); + RETURN_NOT_OK(Hashing64::HashBatch(std::move(key_batch), + hashes, + temp_column_arrays, + plan_->query_context()->cpu_info()->hardware_flags(), + stack, + 0, + batch->length)); + + ArrayData hash_data(uint64(), batch->length, { nullptr, std::move(hash_buf)}); + result.back() = std::move(hash_data); + batch->values = std::move(result); + return Status::OK(); + } + + Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) + { + { + std::lock_guard guard(build_side_mutex_); + if(!spilling_build_) + { + build_accumulator_.InsertBatch(std::move(batch)); + return Status::OK(); + } + } + RETURN_NOT_OK(spilling_join_.OnBuildSideBatch(thread_index, std::move(batch))); + return Status::OK(); + } Status OnBuildSideFinished(size_t thread_index) { - return pushdown_context_.BuildBloomFilter( - thread_index, std::move(build_accumulator_), - [this](size_t thread_index, AccumulationQueue batches) { - return OnBloomFilterFinished(thread_index, std::move(batches)); - }); + + if(!spilling_build_) + { + return pushdown_context_.BuildBloomFilter( + thread_index, std::move(build_accumulator_), + [this](size_t thread_index, AccumulationQueue batches) { + return OnBloomFilterFinished(thread_index, std::move(batches)); + }); + } + return spilling_join_.OnBuildSideFinished(thread_index); } Status OnBloomFilterFinished(size_t thread_index, AccumulationQueue batches) { @@ -805,6 +878,9 @@ class HashJoinNode : public ExecNode { } Status OnProbeSideBatch(size_t thread_index, ExecBatch batch) { + if(spilling_probe_) + return spilling_join_.OnProbeSideBatch(thread_index, std::move(batch); + { std::lock_guard guard(probe_side_mutex_); if (!bloom_filters_ready_) { @@ -826,6 +902,9 @@ class HashJoinNode : public ExecNode { } Status OnProbeSideFinished(size_t thread_index) { + if(spilling_probe_) + return spilling_join_.OnProbeSideFinished(thread_index); + bool probing_finished; { std::lock_guard guard(probe_side_mutex_); @@ -898,6 +977,12 @@ class HashJoinNode : public ExecNode { START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", {{"batch.length", batch.length}}); + if(ErrorIfNotOk(CanonicalizeColumnOrder(&batch, thread_index, schema_mgr_->proj_maps[side]))) + { + StopProducing(); + return; + } + Status status = side == 0 ? OnProbeSideBatch(thread_index, std::move(batch)) : OnBuildSideBatch(thread_index, std::move(batch)); @@ -946,13 +1031,13 @@ class HashJoinNode : public ExecNode { } Status Init() override { - RETURN_NOT_OK(ExecNode::Init()); - if (plan_->UseLegacyBatching()) { + if (plan_->query_context()->options().use_legacy_batching) { return Status::Invalid( "The plan was configured to use legacy batching but contained a join node " "which is incompatible with legacy batching"); } - bool use_sync_execution = !(plan_->exec_context()->executor()); + + bool use_sync_execution = !(plan_->query_context()->executor()); // TODO(ARROW-15732) // Each side of join might have an IO thread being called from. Once this is fixed // we will change it back to just the CPU's thread pool capacity. @@ -971,7 +1056,7 @@ class HashJoinNode : public ExecNode { use_sync_execution); RETURN_NOT_OK(impl_->Init( - plan_->exec_context(), join_type_, num_threads, &(schema_mgr_->proj_maps[0]), + plan_->query_context()->exec_context(), join_type_, num_threads, &(schema_mgr_->proj_maps[0]), &(schema_mgr_->proj_maps[1]), key_cmp_, filter_, [this](std::function fn, std::function on_finished) { @@ -1057,9 +1142,9 @@ class HashJoinNode : public ExecNode { Expression filter_; std::unique_ptr schema_mgr_; std::unique_ptr impl_; - util::AccumulationQueue build_accumulator_; - util::AccumulationQueue probe_accumulator_; - util::AccumulationQueue queued_batches_to_probe_; + AccumulationQueue build_accumulator_; + AccumulationQueue probe_accumulator_; + AccumulationQueue queued_batches_to_probe_; std::mutex build_side_mutex_; std::mutex probe_side_mutex_; @@ -1072,8 +1157,13 @@ class HashJoinNode : public ExecNode { bool probe_side_finished_ = false; friend struct BloomFilterPushdownContext; + friend struct SpillingHashJoin; + bool disable_bloom_filter_; BloomFilterPushdownContext pushdown_context_; + SpillingHashJoin spilling_join_; + bool spilling_build_ = false; + bool spilling_probe_ = false; }; void BloomFilterPushdownContext::Init( @@ -1083,8 +1173,7 @@ void BloomFilterPushdownContext::Init( FiltersReceivedCallback on_bloom_filters_received, bool disable_bloom_filter, bool use_sync_execution) { schema_mgr_ = owner->schema_mgr_.get(); - ctx_ = owner->plan_->exec_context(); - tld_.resize(num_threads); + ctx_ = owner->plan_->query_context(); disable_bloom_filter_ = disable_bloom_filter; std::tie(push_.pushdown_target_, push_.column_map_) = GetPushdownTarget(owner); eval_.all_received_callback_ = std::move(on_bloom_filters_received); @@ -1131,7 +1220,7 @@ Status BloomFilterPushdownContext::BuildBloomFilter(size_t thread_index, return build_.on_finished_(thread_index, std::move(build_.batches_)); RETURN_NOT_OK(build_.builder_->Begin( - /*num_threads=*/tld_.size(), ctx_->cpu_info()->hardware_flags(), + /*num_threads=*/ctx_->max_concurrency(), ctx_->cpu_info()->hardware_flags(), ctx_->memory_pool(), build_.batches_.row_count(), build_.batches_.batch_count(), push_.bloom_filter_.get())); @@ -1150,7 +1239,7 @@ Status BloomFilterPushdownContext::BuildBloomFilter_exec_task(size_t thread_inde int64_t task_id) { const ExecBatch& input_batch = build_.batches_[task_id]; SchemaProjectionMap key_to_in = - schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY, HashJoinProjection::INPUT); + schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY, HashJoinProjection::CANONICAL); std::vector key_columns(key_to_in.num_cols); for (size_t i = 0; i < key_columns.size(); i++) { int input_idx = key_to_in.get(static_cast(i)); @@ -1163,7 +1252,7 @@ Status BloomFilterPushdownContext::BuildBloomFilter_exec_task(size_t thread_inde } ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(key_columns))); - ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack, GetStack(thread_index)); + ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack, ctx_->GetTempStack(thread_index)); util::TempVectorHolder hash_holder(stack, util::MiniBatch::kMiniBatchLength); uint32_t* hashes = hash_holder.mutable_data(); for (int64_t i = 0; i < key_batch.length; i += util::MiniBatch::kMiniBatchLength) { @@ -1283,6 +1372,168 @@ std::pair> BloomFilterPushdownContext::GetPushdo #endif // ARROW_LITTLE_ENDIAN } + +struct SpillingHashJoin +{ + Status Init(HashJoinNode *owner) + { + owner_ = owner; + task_group_partition_build_ = owner_->plan_->RegisterTaskGroup( + [this](size_t thread_index, int64_t task_id) -> Status + { + return OnBuildSideBatch(thread_index, std::move(owner_->build_accumulator_[task_id])); + }, + [this](size_t thread_index) + { + return OnBuildSideAccumPartitioned(thread_index); + }); + + task_group_partition_probe_ = owner_->plan_->RegisterTaskGroup( + [this](size_t thread_index, int64_t task_id) -> Status + { + return OnProbeSideBatch(thread_index, std::move(owner_->probe_accumulator_[task_id])); + }, + [this](size_t thread_index) + { + return OnProbeSideAccumPartitioned(thread_index); + }); + } + + Status CheckSpilling(size_t thread_index, ExecBatch &batch) + { + size_t size_of_batch = static_cast(batch.TotalBufferSize()); + size_t local_max_batch_size = max_batch_size_.load(); + while(size_of_batch > local_max_batch_size + && !max_batch_size_.compare_exchange_weak(local_max_batch_size, size_of_batch)) + {} + + // Spilling algorithm proven to not use more than + // (SpillThreshold + NumThreads * BatchSize) memory. + // Thus we want to spill when (SpillThreshold + NumThreads * BatchSize) = k * MaxMemory + // with some fuzz factor k (which is 0.8 here because that's what I decided). + // Thus SpillThreshold = k * MaxMemory - NumThreads * BatchSize. + constexpr float kFuzzFactor = 0.8f; + size_t max_memory = static_cast(kFuzzFactor * owner_->plan_->query_context()->options().max_memory_bytes); + size_t num_threads = owner_->plan_->max_concurrency(); + size_t spill_threshold = kFuzzFactor * max_memory - num_threads * local_max_batch_size; + size_t bytes_allocated = static_cast(owner_->plan_->query_context()->memory_pool()->bytes_allocated()); + + size_t backpressure_threshold = spill_threshold / 2; + if(bytes_allocated > probe_spill_threshold) + RETURN_NOT_OK(ApplyProbeBackPressure(thread_index)); + if(bytes_allocated > build_spill_threshold) + RETURN_NOT_OK(StartSpillingBuildSide(thread_index)); + return Status::OK(); + } + + Status ApplyProbeBackPressure(size_t thread_index) + { + owner_->plan_->inputs_[0].PauseProducing(this, backpressure_counter_.fetch_add(1)); + } + + Status StartSpillingBuildSide(size_t thread_index) + { + { + std::lock_guard build_guard(owner->build_side_mutex_); + owner->spilling_build_ = true; + } + RETURN_NOT_OK(owner_->plan_->StartTaskGroup( + task_group_partition_build_, + owner_->build_accumulator_.batch_count())); + { + std::lock_guard probe_guard(owner->probe_side_mutex_); + owner_->spilling_probe_ = true; + } + RETURN_NOT_OK(owner_->plan_->StartTaskGroup( + task_group_partition_probe_, + owner_->probe_accumulator_.batch_count())); + } + + Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) + { + Datum hashes = std::move(batch.values().back()); + batch.values().pop_back(); + + RETURN_NOT_OK(build_accumulator_.InsertBatch( + thread_index, + std::move(batch), + std::move(hashes))); + } + + Status OnBuildSideFinished(size_t thread_index) + { + partition_idx_ = SpillingAccumulationQueue::kNumPartitions - 1; + return BeginNextCollocatedJoin(); + } + + Status OnProbeSideBatch(size_t thread_index, ExecBatch batch) + { + Datum hashes = std::move(batches.values().back()); + batch.values.pop_back(); + + RETURN_NOT_OK(probe_accumulator_.InsertBatch( + thread_index, + std::move(batch), + std::move(hashes))); + } + + Status BeginNextCollocatedJoin(size_t thread_index) + { + ARROW_ASSIGN_OR_RAISE( + Future<> plan_fut, owner_->plan_->BeginExternalTask()); + if(!plan_fut.is_valid()) + return Status::OK(); + + // The future returned by GetPartition will most likely be on the + // IO thread pool, so we want ot transfer to the CPU thread pool before + // building hash table. + CallbackOptions opts; + auto executor = plan()->query_context()->executor(); + if(executor) + { + opts.executor = executor; + opts.should_schedule = ShouldSchedule::IfDifferentExecutor; + } + build_accumulator_ + .GetPartition(partition_idx_) + .Then([this, partition_idx_, plan_fut](AccumulationQueue queue) + { + size_t thread_index = plan_->GetThreadIndex(); + RETURN_NOT_OK( + impl_[partition_idx_].BuildHashTable( + thread_index, + std::move(queue), + [this](size_t thread_index) + { + return OnHashTableFinished(thread_index); + })); + }, {}, opts); + } + + std::atomic backpressure_counter_{0}; + + int task_group_partition_build_; + int task_group_partition_probe_; + + SpillingAccumulationQueue build_accumulator_; + SpillingAccumulationQueue probe_accumulator_; + + int task_group_probe_[SpillingAccumulationQueue::kNumPartitions]; + + HashJoinNode *owner_; + std::atomic max_batch_size_{0}; + + using BoolPerPartition = std::bitset; + + bool spilling_ = false; + std::mutex spilling_mutex_; + int partition_idx_; + bool accumulation_finished_ = false; + BoolPerPartition hash_table_ready_; + BoolPerPartition queued_batches_filtered_; + BoolPerPartition queued_batches_probed_; +}; + namespace internal { void RegisterHashJoinNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("hashjoin", HashJoinNode::Make)); diff --git a/cpp/src/arrow/compute/exec/hash_join_node.h b/cpp/src/arrow/compute/exec/hash_join_node.h index 8dc7ea0b8bf2f..0f21a195dc038 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.h +++ b/cpp/src/arrow/compute/exec/hash_join_node.h @@ -84,6 +84,10 @@ class ARROW_EXPORT HashJoinSchema { const SchemaProjectionMap& right_to_filter, const Expression& filter); + Result> ComputeCanonicalColumnOrder( + const Schema &schema, + const SchemaProjectionMaps &map); + bool PayloadIsEmpty(int side) { ARROW_DCHECK(side == 0 || side == 1); return proj_maps[side].num_cols(HashJoinProjection::PAYLOAD) == 0; diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 76925eb613992..f8bab0c15e399 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -65,7 +65,7 @@ class ProjectNode : public MapNode { for (auto& expr : exprs) { if (!expr.IsBound()) { ARROW_ASSIGN_OR_RAISE( - expr, expr.Bind(*inputs[0]->output_schema(), plan->exec_context())); + expr, expr.Bind(*inputs[0]->output_schema(), plan->query_context()->exec_context())); } fields[i] = field(std::move(names[i]), expr.type()->GetSharedPtr()); ++i; @@ -89,7 +89,7 @@ class ProjectNode : public MapNode { SimplifyWithGuarantee(exprs_[i], target.guarantee)); ARROW_ASSIGN_OR_RAISE(values[i], ExecuteScalarExpression(simplified_expr, target, - plan()->exec_context())); + plan()->query_context()->exec_context())); } return ExecBatch{std::move(values), target.length}; } diff --git a/cpp/src/arrow/compute/exec/query_context.cc b/cpp/src/arrow/compute/exec/query_context.cc new file mode 100644 index 0000000000000..fd03cb83cf30d --- /dev/null +++ b/cpp/src/arrow/compute/exec/query_context.cc @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/query_context.h" +#include "arrow/util/cpu_info.h" +#include "arrow/util/io_util.h" + +namespace arrow +{ + using internal::CpuInfo; + namespace compute + { + QueryOptions::QueryOptions() + : max_memory_bytes(static_cast(0.75f * internal::GetTotalMemoryBytes())), + use_legacy_batching(false) + {} + + QueryContext::QueryContext( + QueryOptions opts, + ExecContext exec_context) + : options_(opts), + exec_context_(exec_context), + io_context_(exec_context_.memory_pool()) + { + } + + const CpuInfo* QueryContext::cpu_info() const { return CpuInfo::GetInstance(); } + + Status QueryContext::Init(size_t max_num_threads) + { + tld_.resize(max_num_threads); + return Status::OK(); + } + + size_t QueryContext::GetThreadIndex() { return thread_indexer_(); } + + size_t QueryContext::max_concurrency() const { return thread_indexer_.Capacity(); } + + Result QueryContext::GetTempStack(size_t thread_index) + { + if(!tld_[thread_index].is_init) + { + RETURN_NOT_OK(tld_[thread_index].stack.Init( + memory_pool(), + 4 * util::MiniBatch::kMiniBatchLength * sizeof(uint64_t))); + tld_[thread_index].is_init = true; + } + return &tld_[thread_index].stack; + } + } +} diff --git a/cpp/src/arrow/compute/exec/query_context.h b/cpp/src/arrow/compute/exec/query_context.h new file mode 100644 index 0000000000000..dc4c9bfaee2d6 --- /dev/null +++ b/cpp/src/arrow/compute/exec/query_context.h @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/io/interfaces.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/exec/util.h" + +#pragma once + +namespace arrow +{ + namespace internal + { + class CpuInfo; + } + + using io::IOContext; + namespace compute + { + struct ARROW_EXPORT QueryOptions + { + QueryOptions(); + // 0 means unlimited + size_t max_memory_bytes; + + /// \brief Should the plan use a legacy batching strategy + /// + /// This is currently in place only to support the Scanner::ToTable + /// method. This method relies on batch indices from the scanner + /// remaining consistent. This is impractical in the ExecPlan which + /// might slice batches as needed (e.g. for a join) + /// + /// However, it still works for simple plans and this is the only way + /// we have at the moment for maintaining implicit order. + bool use_legacy_batching; + }; + + class ARROW_EXPORT QueryContext + { + public: + QueryContext( + QueryOptions opts = {}, + ExecContext exec_context = *default_exec_context()); + + Status Init(size_t max_num_threads); + + const ::arrow::internal::CpuInfo *cpu_info() const; + const QueryOptions &options() const { return options_; } + MemoryPool *memory_pool() const { return exec_context_.memory_pool(); } + ::arrow::internal::Executor *executor() const { return exec_context_.executor(); } + ExecContext *exec_context() { return &exec_context_; } + IOContext *io_context() { return &io_context_; } + + size_t GetThreadIndex(); + size_t max_concurrency() const; + Result GetTempStack(size_t thread_index); + + std::atomic in_flight_bytes_to_disk_{0}; + + private: + QueryOptions options_; + // To be replaced with Acero-specific context once scheduler is done and + // we don't need ExecContext for kernels + ExecContext exec_context_; + IOContext io_context_; + + ThreadIndexer thread_indexer_; + struct ThreadLocalData + { + bool is_init = false; + util::TempVectorStack stack; + }; + std::vector tld_; + }; + } +} diff --git a/cpp/src/arrow/compute/exec/schema_util.h b/cpp/src/arrow/compute/exec/schema_util.h index f2b14aa545060..541cf9cc0b263 100644 --- a/cpp/src/arrow/compute/exec/schema_util.h +++ b/cpp/src/arrow/compute/exec/schema_util.h @@ -38,7 +38,9 @@ enum class HashJoinProjection : int { KEY = 1, PAYLOAD = 2, FILTER = 3, - OUTPUT = 4 + CANONICAL = 4, + OUTPUT = 5, + NUM_VALUES = 6, }; struct SchemaProjectionMap { @@ -63,22 +65,38 @@ class SchemaProjectionMaps { public: static constexpr int kMissingField = -1; - Status Init(ProjectionIdEnum full_schema_handle, const Schema& schema, - const std::vector& projection_handles, - const std::vector*>& projections) { - ARROW_DCHECK(projection_handles.size() == projections.size()); - ARROW_RETURN_NOT_OK(RegisterSchema(full_schema_handle, schema)); - for (size_t i = 0; i < projections.size(); ++i) { - ARROW_RETURN_NOT_OK( - RegisterProjectedSchema(projection_handles[i], *(projections[i]), schema)); + Status Init(ProjectionIdEnum full_schema_handle, + const Schema& schema) + { + return RegisterSchema(full_schema_handle, schema); } - RegisterEnd(); + + Status RegisterProjectedSchema(ProjectionIdEnum handle, + const std::vector& selected_fields, + const Schema& full_schema) { + FieldInfos out_fields; + const FieldVector& in_fields = full_schema.fields(); + out_fields.field_paths.resize(selected_fields.size()); + out_fields.field_names.resize(selected_fields.size()); + out_fields.data_types.resize(selected_fields.size()); + for (size_t i = 0; i < selected_fields.size(); ++i) { + // All fields must be found in schema without ambiguity + ARROW_ASSIGN_OR_RAISE(auto match, selected_fields[i].FindOne(full_schema)); + const std::string& name = in_fields[match[0]]->name(); + const std::shared_ptr& type = in_fields[match[0]]->type(); + out_fields.field_paths[i] = match[0]; + out_fields.field_names[i] = name; + out_fields.data_types[i] = type; + } + int id = schema_id(handle); + schemas_[id] = std::move(out_fields); + GenerateMapForProjection(id); return Status::OK(); } int num_cols(ProjectionIdEnum schema_handle) const { int id = schema_id(schema_handle); - return static_cast(schemas_[id].second.data_types.size()); + return static_cast(schemas_[id].data_types.size()); } bool is_empty(ProjectionIdEnum schema_handle) const { @@ -87,19 +105,19 @@ class SchemaProjectionMaps { const std::string& field_name(ProjectionIdEnum schema_handle, int field_id) const { int id = schema_id(schema_handle); - return schemas_[id].second.field_names[field_id]; + return schemas_[id].field_names[field_id]; } const std::shared_ptr& data_type(ProjectionIdEnum schema_handle, int field_id) const { int id = schema_id(schema_handle); - return schemas_[id].second.data_types[field_id]; + return schemas_[id].data_types[field_id]; } const std::vector>& data_types( ProjectionIdEnum schema_handle) const { int id = schema_id(schema_handle); - return schemas_[id].second.data_types; + return schemas_[id].data_types; } SchemaProjectionMap map(ProjectionIdEnum from, ProjectionIdEnum to) const { @@ -132,55 +150,21 @@ class SchemaProjectionMaps { out_fields.field_names[i] = name; out_fields.data_types[i] = type; } - schemas_.push_back(std::make_pair(handle, out_fields)); - return Status::OK(); - } - - Status RegisterProjectedSchema(ProjectionIdEnum handle, - const std::vector& selected_fields, - const Schema& full_schema) { - FieldInfos out_fields; - const FieldVector& in_fields = full_schema.fields(); - out_fields.field_paths.resize(selected_fields.size()); - out_fields.field_names.resize(selected_fields.size()); - out_fields.data_types.resize(selected_fields.size()); - for (size_t i = 0; i < selected_fields.size(); ++i) { - // All fields must be found in schema without ambiguity - ARROW_ASSIGN_OR_RAISE(auto match, selected_fields[i].FindOne(full_schema)); - const std::string& name = in_fields[match[0]]->name(); - const std::shared_ptr& type = in_fields[match[0]]->type(); - out_fields.field_paths[i] = match[0]; - out_fields.field_names[i] = name; - out_fields.data_types[i] = type; - } - schemas_.push_back(std::make_pair(handle, out_fields)); + schemas_[schema_id(handle)] = std::move(out_fields); return Status::OK(); } - void RegisterEnd() { - size_t size = schemas_.size(); - mappings_.resize(size); - inverse_mappings_.resize(size); - int id_base = 0; - for (size_t i = 0; i < size; ++i) { - GenerateMapForProjection(static_cast(i), id_base); - } - } - int schema_id(ProjectionIdEnum schema_handle) const { - for (size_t i = 0; i < schemas_.size(); ++i) { - if (schemas_[i].first == schema_handle) { - return static_cast(i); - } - } - // We should never get here - ARROW_DCHECK(false); - return -1; + int id = static_cast(schema_handle); + ARROW_DCHECK(id < static_cast(ProjectionIdEnum::NUM_VALUES)); + return id; } - void GenerateMapForProjection(int id_proj, int id_base) { - int num_cols_proj = static_cast(schemas_[id_proj].second.data_types.size()); - int num_cols_base = static_cast(schemas_[id_base].second.data_types.size()); + void GenerateMapForProjection(int id_proj) { + const int id_base = 0; + + int num_cols_proj = static_cast(schemas_[id_proj].data_types.size()); + int num_cols_base = static_cast(schemas_[id_base].data_types.size()); std::vector& mapping = mappings_[id_proj]; std::vector& inverse_mapping = inverse_mappings_[id_proj]; @@ -192,8 +176,8 @@ class SchemaProjectionMaps { mapping[i] = inverse_mapping[i] = i; } } else { - const FieldInfos& fields_proj = schemas_[id_proj].second; - const FieldInfos& fields_base = schemas_[id_base].second; + const FieldInfos& fields_proj = schemas_[id_proj]; + const FieldInfos& fields_base = schemas_[id_base]; for (int i = 0; i < num_cols_base; ++i) { inverse_mapping[i] = SchemaProjectionMap::kMissingField; } @@ -215,9 +199,9 @@ class SchemaProjectionMaps { } // vector used as a mapping from ProjectionIdEnum to fields - std::vector> schemas_; - std::vector> mappings_; - std::vector> inverse_mappings_; + std::array(ProjectionIdEnum::NUM_VALUES)> schemas_; + std::array, static_cast(ProjectionIdEnum::NUM_VALUES)> mappings_; + std::array, static_cast(ProjectionIdEnum::NUM_VALUES)> inverse_mappings_; }; using HashJoinProjectionMaps = SchemaProjectionMaps; diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index 9118d5a50e819..cc8b88df9607e 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -430,7 +430,7 @@ static Result MakeTableConsumingSinkNode( const compute::ExecNodeOptions& options) { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "TableConsumingSinkNode")); const auto& sink_options = checked_cast(options); - MemoryPool* pool = plan->exec_context()->memory_pool(); + MemoryPool* pool = plan->query_context()->memory_pool(); auto tb_consumer = std::make_shared(sink_options.output_table, pool); auto consuming_sink_node_options = ConsumingSinkNodeOptions{tb_consumer}; @@ -460,7 +460,7 @@ struct OrderBySinkNode final : public SinkNode { RETURN_NOT_OK(ValidateOrderByOptions(sink_options)); ARROW_ASSIGN_OR_RAISE( std::unique_ptr impl, - OrderByImpl::MakeSort(plan->exec_context(), inputs[0]->output_schema(), + OrderByImpl::MakeSort(plan->query_context()->exec_context(), inputs[0]->output_schema(), sink_options.sort_options)); return plan->EmplaceNode(plan, std::move(inputs), std::move(impl), sink_options.generator); @@ -492,7 +492,7 @@ struct OrderBySinkNode final : public SinkNode { RETURN_NOT_OK(ValidateSelectKOptions(sink_options)); ARROW_ASSIGN_OR_RAISE( std::unique_ptr impl, - OrderByImpl::MakeSelectK(plan->exec_context(), inputs[0]->output_schema(), + OrderByImpl::MakeSelectK(plan->query_context()->exec_context(), inputs[0]->output_schema(), sink_options.select_k_options)); return plan->EmplaceNode(plan, std::move(inputs), std::move(impl), sink_options.generator); @@ -515,7 +515,7 @@ struct OrderBySinkNode final : public SinkNode { DCHECK_EQ(input, inputs_[0]); auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(), - plan()->exec_context()->memory_pool()); + plan()->query_context()->memory_pool()); if (ErrorIfNotOk(maybe_batch.status())) { StopProducing(); if (input_counter_.Cancel()) { diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index a640cf737efa7..659374bd0b78e 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -89,7 +89,7 @@ struct SourceNode : ExecNode { } CallbackOptions options; - auto executor = plan()->exec_context()->executor(); + auto executor = plan()->query_context()->executor(); if (executor) { // These options will transfer execution to the desired Executor if necessary. // This can happen for in-memory scans where batches didn't require @@ -119,7 +119,7 @@ struct SourceNode : ExecNode { return Break(batch_count_); } lock.unlock(); - bool use_legacy_batching = plan_->UseLegacyBatching(); + bool use_legacy_batching = plan_->query_context()->options().use_legacy_batching; ExecBatch morsel = std::move(*maybe_morsel); int64_t morsel_length = static_cast(morsel.length); if (use_legacy_batching || morsel_length == 0) { diff --git a/cpp/src/arrow/compute/exec/spilling_util.cc b/cpp/src/arrow/compute/exec/spilling_util.cc new file mode 100644 index 0000000000000..d7bf9356dfdaa --- /dev/null +++ b/cpp/src/arrow/compute/exec/spilling_util.cc @@ -0,0 +1,278 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "spilling_util.h" +#include + +namespace arrow +{ +namespace compute +{ + struct ArrayInfo + { + int64_t num_children; + std::array, 3> bufs; + std::array sizes; + bool has_dict; + }; + + struct SpillFile::BatchInfo + { + int64_t start; + std::vector arrays; + }; + +#ifdef _WIN32 +#include "windows_compatibility.h" + +const FileHandle kInvalidHandle = INVALID_HANDLE_VALUE; + +static Result OpenTemporaryFile() +{ + constexpr DWORD kTempFileNameSize = MAX_PATH + 1; + wchar_t tmp_name_buf[kTempFileNameSize]; + wchar_t tmp_path_buf[kTempFileNameSize]; + + DWORD ret; + ret = GetTempPath2W(kTempFileNameSize, tmp_path_buf); + if(ret > kTempFileNameSize || ret == 0) + return Status::IOError(); + if(GetTempFileNameW(tmp_path_buf, L"ARROW_TMP", 0, tmp_name_buf) == 0) + return Status::IOError(); + + HANDLE file_handle = CreateFileA( + tmp_name_buf, + GENERIC_READ | GENERIC_WRITE | FILE_APPEND_DATA, + 0, + NULL, + CREATE_ALWAYS, + FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED | FILE_FLAG_DELETE_ON_CLOSE, + NULL); + if(file_handle == INVALID_HANDLE_VALUE) + return Status::IOError("Failed to create temp file"); + return file_handle; +} + +static Status CloseTemporaryFile(FileHandle handle) +{ + if(!CloseHandle(handle)) + return Status::IOError("Failed to close temp file"); + return Status::OK(); +} + +static Status WriteBatch_PlatformSpecific(FileHandle handle, const SpillFile::BatchInfo &info) +{ + OVERLAPPED overlapped; + int64_t offset = info.start; + for(const ArrayInfo &arr : info.arrays) + { + for(size_t i = 0; i < arr.bufs.size(); i++) + { + if(info.bufs[i] != 0) + { + overlapped.Offset = static_cast(offset); + overlapped.OffsetHigh = static_cast(offset >> 32); + if(!WriteFile( + handle, + info.bufs[i]->data(), + info.bufs[i]->size(), + NULL, + &overlapped)) + return Status::IOError("Failed to spill!"); + offset += info.sizes[i]; + } + } + } + return Status::OK(); +} +#else +#include +#include +#include +#include +#include + +Result OpenTemporaryFile() +{ + static std::once_flag generate_tmp_file_name_flag; + + constexpr int kFileNameSize = 1024; + static char name[kFileNameSize]; + + char *name_ptr = name; + std::call_once(generate_tmp_file_name_flag, [name_ptr]() noexcept + { + const char *selectors[] = { "TMPDIR", "TMP", "TEMP", "TEMPDIR" }; + constexpr size_t kNumSelectors = sizeof(selectors) / sizeof(selectors[0]); +#ifdef __ANDROID__ + const char *backup = "/data/local/tmp/"; +#else + const char *backup = "/tmp/"; +#endif + const char *tmp_dir = backup; + for(size_t i = 0; i < kNumSelectors; i++) + { + const char *env = getenv(selectors[i]); + if(env) + { + tmp_dir = env; + break; + } + } + size_t tmp_dir_length = std::strlen(tmp_dir); + + const char *tmp_name_template = "/ARROW_TMP_XXXXXX"; + size_t tmp_name_length = std::strlen(tmp_name_template); + + if((tmp_dir_length + tmp_name_length) >= kFileNameSize) + { + tmp_dir = backup; + tmp_dir_length = std::strlen(backup); + } + + std::strncpy(name_ptr, tmp_dir, kFileNameSize); + std::strncpy(name_ptr + tmp_dir_length, tmp_name_template, kFileNameSize - tmp_dir_length); + }); + +#ifdef __APPLE__ + int fd = mkstemp(name); + if(fd == kInvalidHandle) + return Status::IOError(strerror(errno)); + if(fcntl(fd, F_NOCACHE, 1) == -1) + return Status::IOError(strerror(errno)); +#else + int fd = mkostemp(name, O_DIRECT); + if(fd == kInvalidHandle) + return Status::IOError(strerror(errno)); +#endif + + if(unlink(name) != 0) + return Status::IOError(strerror(errno)); + return fd; +} + +static Status CloseTemporaryFile(FileHandle handle) +{ + if(close(handle) == -1) + return Status::IOError(strerror(errno)); + return Status::OK(); +} + +static Status WriteBatch_PlatformSpecific(FileHandle handle, SpillFile::BatchInfo &info) +{ + std::vector ios; + for(const ArrayInfo &arr : info.arrays) + { + for(int i = 0; i < 3; i++) + { + if(arr.bufs[i]) + { + struct iovec io; + io.iov_base = static_cast(arr.bufs[i]->mutable_data()); + io.iov_len = static_cast(arr.bufs[i]->size()); + ios.push_back(io); + } + } + } + + if(pwritev(handle, ios.data(), static_cast(ios.size()), info.start) == -1) + return Status::IOError("Failed to spill!"); + + // Release all references to the buffers, freeing them. + for(ArrayInfo &arr : info.arrays) + for(int i = 0; i < 3; i++) + if(arr.bufs[i]) + arr.bufs[i].reset(); + return Status::OK(); +} +#endif + +static Status CollectArrayInfo( + SpillFile::BatchInfo &batch_info, + int64_t &total_size, + ArrayData *array) +{ + if(array->offset != 0) + return Status::Invalid("We don't support spilling arrays with offsets"); + + batch_info.arrays.push_back({}); + ArrayInfo &array_info = batch_info.arrays.back(); + + ARROW_DCHECK(array->buffers.size() <= array_info.bufs.size()); + array_info.num_children = array->child_data.size(); + for(size_t i = 0; i < array->buffers.size(); i++) + { + if(array->buffers[i]) + { + array_info.sizes[i] = array->buffers[i]->size(); + total_size += array_info.sizes[i]; + uintptr_t addr = array->buffers[i]->address(); + if((addr % 512) != 0) + return Status::Invalid("Buffer not aligned to 512 bytes!"); + array_info.bufs[i] = std::move(array->buffers[i]); + } + } + + if(array->dictionary) + { + array_info.has_dict = true; + RETURN_NOT_OK(CollectArrayInfo(batch_info, total_size, array->dictionary.get())); + } + + for(std::shared_ptr &child : array->child_data) + RETURN_NOT_OK(CollectArrayInfo(batch_info, total_size, child.get())); + return Status::OK(); +} + +Result> SpillFile::SpillBatch(QueryContext *ctx, ExecBatch batch) +{ + if(handle_ == kInvalidHandle) + { + ARROW_ASSIGN_OR_RAISE(handle_, OpenTemporaryFile()); + } + int64_t total_size = 0; + BatchInfo *info = new BatchInfo; + batches_.push_back(info); + for(int i = 0; i < batch.num_values(); i++) + { + if (batch[i].is_scalar()) + return Status::Invalid("Cannot spill a Scalar"); + RETURN_NOT_OK(CollectArrayInfo(*info, total_size, batch[i].mutable_array())); + } + info->start = size_; + size_ += total_size; + + FileHandle handle = handle_; + ctx->in_flight_bytes_to_disk_ += static_cast(total_size); + return ctx->io_context()->executor()->Submit( + [handle, info, ctx, total_size]() + { + Status st = WriteBatch_PlatformSpecific(handle, *info); + ctx->in_flight_bytes_to_disk_ -= static_cast(total_size); + return st; + }); +} + + Status SpillFile::Cleanup() + { + if(handle_ != kInvalidHandle) + RETURN_NOT_OK(CloseTemporaryFile(handle_)); + return Status::OK(); + } + +} +} diff --git a/cpp/src/arrow/compute/exec/spilling_util.h b/cpp/src/arrow/compute/exec/spilling_util.h new file mode 100644 index 0000000000000..54432e0455854 --- /dev/null +++ b/cpp/src/arrow/compute/exec/spilling_util.h @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/compute/exec/exec_plan.h" + +namespace arrow +{ + namespace compute + { +#ifdef _WIN32 + using FileHandle = void *; + extern const FileHandle kInvalidHandle; +#else + using FileHandle = int; + constexpr FileHandle kInvalidHandle = -1; +#endif + + class SpillFile + { + public: + Result> SpillBatch(QueryContext *ctx, ExecBatch batch); + Status Cleanup(); + + struct BatchInfo; + private: + FileHandle handle_ = kInvalidHandle; + size_t size_ = 0; + std::vector batches_; + }; + } +} diff --git a/cpp/src/arrow/compute/exec/swiss_join.cc b/cpp/src/arrow/compute/exec/swiss_join.cc index 5b01edb1198c2..ea13978f69f17 100644 --- a/cpp/src/arrow/compute/exec/swiss_join.cc +++ b/cpp/src/arrow/compute/exec/swiss_join.cc @@ -2413,7 +2413,7 @@ class SwissJoin : public HashJoinImpl { projected.values.resize(num_key_cols + num_payload_cols); auto key_to_input = - schema_[side]->map(HashJoinProjection::KEY, HashJoinProjection::INPUT); + schema_[side]->map(HashJoinProjection::KEY, HashJoinProjection::CANONICAL); for (int icol = 0; icol < num_key_cols; ++icol) { const Datum& value_in = input->values[key_to_input.get(icol)]; if (value_in.is_scalar()) { @@ -2425,7 +2425,7 @@ class SwissJoin : public HashJoinImpl { } } auto payload_to_input = - schema_[side]->map(HashJoinProjection::PAYLOAD, HashJoinProjection::INPUT); + schema_[side]->map(HashJoinProjection::PAYLOAD, HashJoinProjection::CANONICAL); for (int icol = 0; icol < num_payload_cols; ++icol) { const Datum& value_in = input->values[payload_to_input.get(icol)]; if (value_in.is_scalar()) { diff --git a/cpp/src/arrow/compute/light_array.cc b/cpp/src/arrow/compute/light_array.cc index 4bf3574d09fdb..51c8bca8452e2 100644 --- a/cpp/src/arrow/compute/light_array.cc +++ b/cpp/src/arrow/compute/light_array.cc @@ -187,7 +187,8 @@ Status ColumnArraysFromExecBatch(const ExecBatch& batch, } void ResizableArrayData::Init(const std::shared_ptr& data_type, - MemoryPool* pool, int log_num_rows_min) { + MemoryPool* pool, int log_num_rows_min, + int64_t alignment) { #ifndef NDEBUG if (num_rows_allocated_ > 0) { ARROW_DCHECK(data_type_ != NULLPTR); @@ -200,6 +201,7 @@ void ResizableArrayData::Init(const std::shared_ptr& data_type, #endif Clear(/*release_buffers=*/false); log_num_rows_min_ = log_num_rows_min; + alignment_ = alignment; data_type_ = data_type; pool_ = pool; } @@ -236,7 +238,7 @@ Status ResizableArrayData::ResizeFixedLengthBuffers(int num_rows_new) { ARROW_ASSIGN_OR_RAISE( buffers_[kValidityBuffer], AllocateResizableBuffer( - bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, pool_)); + bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, alignment_, pool_)); memset(mutable_data(kValidityBuffer), 0, bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes); if (column_metadata.is_fixed_length) { @@ -245,6 +247,7 @@ Status ResizableArrayData::ResizeFixedLengthBuffers(int num_rows_new) { buffers_[kFixedLengthBuffer], AllocateResizableBuffer( bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, + alignment_, pool_)); memset(mutable_data(kFixedLengthBuffer), 0, bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes); @@ -253,18 +256,19 @@ Status ResizableArrayData::ResizeFixedLengthBuffers(int num_rows_new) { buffers_[kFixedLengthBuffer], AllocateResizableBuffer( num_rows_allocated_new * column_metadata.fixed_length + kNumPaddingBytes, + alignment_, pool_)); } } else { ARROW_ASSIGN_OR_RAISE( buffers_[kFixedLengthBuffer], AllocateResizableBuffer( - (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes, pool_)); + (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes, alignment_, pool_)); } ARROW_ASSIGN_OR_RAISE( buffers_[kVariableLengthBuffer], - AllocateResizableBuffer(sizeof(uint64_t) + kNumPaddingBytes, pool_)); + AllocateResizableBuffer(sizeof(uint64_t) + kNumPaddingBytes, alignment_, pool_)); var_len_buf_size_ = sizeof(uint64_t); } else { @@ -478,7 +482,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr& source ARROW_DCHECK(num_rows_before >= 0); int num_rows_after = num_rows_before + num_rows_to_append; if (target->num_rows() == 0) { - target->Init(source->type, pool, kLogNumRows); + target->Init(source->type, pool, kLogNumRows, kAlignment); } RETURN_NOT_OK(target->ResizeFixedLengthBuffers(num_rows_after)); @@ -625,7 +629,7 @@ Status ExecBatchBuilder::AppendNulls(const std::shared_ptr& type, int num_rows_before = target.num_rows(); int num_rows_after = num_rows_before + num_rows_to_append; if (target.num_rows() == 0) { - target.Init(type, pool, kLogNumRows); + target.Init(type, pool, kLogNumRows, kAlignment); } RETURN_NOT_OK(target.ResizeFixedLengthBuffers(num_rows_after)); @@ -686,7 +690,7 @@ Status ExecBatchBuilder::AppendSelected(MemoryPool* pool, const ExecBatch& batch const Datum& data = batch.values[col_ids ? col_ids[i] : i]; ARROW_DCHECK(data.is_array()); const std::shared_ptr& array_data = data.array(); - values_[i].Init(array_data->type, pool, kLogNumRows); + values_[i].Init(array_data->type, pool, kLogNumRows, kAlignment); } } @@ -717,7 +721,7 @@ Status ExecBatchBuilder::AppendNulls(MemoryPool* pool, if (values_.empty()) { values_.resize(types.size()); for (size_t i = 0; i < types.size(); ++i) { - values_[i].Init(types[i], pool, kLogNumRows); + values_[i].Init(types[i], pool, kLogNumRows, kAlignment); } } diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h index f0e5c7068716a..ebe41215caa66 100644 --- a/cpp/src/arrow/compute/light_array.h +++ b/cpp/src/arrow/compute/light_array.h @@ -241,7 +241,7 @@ class ARROW_EXPORT ResizableArrayData { /// \param log_num_rows_min All resize operations will allocate at least enough /// space for (1 << log_num_rows_min) rows void Init(const std::shared_ptr& data_type, MemoryPool* pool, - int log_num_rows_min); + int log_num_rows_min, int64_t alignment = MemoryPool::kDefaultAlignment); /// \brief Resets the array back to an empty state /// \param release_buffers If true then allocated memory is released and the @@ -296,6 +296,7 @@ class ARROW_EXPORT ResizableArrayData { private: static constexpr int64_t kNumPaddingBytes = 64; int log_num_rows_min_; + int64_t alignment_; std::shared_ptr data_type_; MemoryPool* pool_; int num_rows_; @@ -348,11 +349,13 @@ class ARROW_EXPORT ExecBatchBuilder { ExecBatch Flush(); int num_rows() const { return values_.empty() ? 0 : values_[0].num_rows(); } + bool is_full() const { return num_rows() == num_rows_max(); } static int num_rows_max() { return 1 << kLogNumRows; } private: static constexpr int kLogNumRows = 15; + static constexpr int64_t kAlignment = 512; // Calculate how many rows to skip from the tail of the // sequence of selected rows, such that the total size of skipped rows is at diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 0ef1d4577cd30..333d03fbc529c 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -361,8 +361,10 @@ Result AsyncScanner::ScanBatchesUnorderedAsync( auto exec_context = std::make_shared(scan_options_->pool, cpu_executor); - ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(exec_context.get())); - plan->SetUseLegacyBatching(use_legacy_batching); + compute::QueryOptions query_options; + query_options.use_legacy_batching = true; + + ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(query_options, exec_context.get())); AsyncGenerator> sink_gen; auto exprs = scan_options_->projection.call()->arguments; diff --git a/cpp/src/arrow/datum.cc b/cpp/src/arrow/datum.cc index f06e97a20ec4b..86dada2185907 100644 --- a/cpp/src/arrow/datum.cc +++ b/cpp/src/arrow/datum.cc @@ -126,6 +126,7 @@ int64_t Datum::TotalBufferSize() const { case Datum::TABLE: return util::TotalBufferSize(*util::get>(this->value)); case Datum::SCALAR: + case Datum::NONE: return 0; default: DCHECK(false); diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index f8682ad313800..e5d1f6e2cda29 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -59,7 +59,7 @@ namespace memory_pool { namespace internal { -alignas(kAlignment) int64_t zero_size_area[1] = {kDebugXorSuffix}; +alignas(MemoryPool::kDefaultAlignment) int64_t zero_size_area[1] = {kDebugXorSuffix}; } // namespace internal @@ -224,12 +224,12 @@ bool IsDebugEnabled() { template class DebugAllocator { public: - static Status AllocateAligned(int64_t size, uint8_t** out) { + static Status AllocateAligned(int64_t size, int64_t alignment, uint8_t** out) { if (size == 0) { *out = memory_pool::internal::kZeroSizeArea; } else { ARROW_ASSIGN_OR_RAISE(int64_t raw_size, RawSize(size)); - RETURN_NOT_OK(WrappedAllocator::AllocateAligned(raw_size, out)); + RETURN_NOT_OK(WrappedAllocator::AllocateAligned(raw_size, alignment, out)); InitAllocatedArea(*out, size); } return Status::OK(); @@ -237,29 +237,29 @@ class DebugAllocator { static void ReleaseUnused() { WrappedAllocator::ReleaseUnused(); } - static Status ReallocateAligned(int64_t old_size, int64_t new_size, uint8_t** ptr) { + static Status ReallocateAligned(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) { CheckAllocatedArea(*ptr, old_size, "reallocation"); if (*ptr == memory_pool::internal::kZeroSizeArea) { - return AllocateAligned(new_size, ptr); + return AllocateAligned(new_size, alignment, ptr); } if (new_size == 0) { // Note that an overflow check isn't needed as `old_size` is supposed to have // been successfully passed to AllocateAligned() before. - WrappedAllocator::DeallocateAligned(*ptr, old_size + kOverhead); + WrappedAllocator::DeallocateAligned(*ptr, old_size + kOverhead, alignment); *ptr = memory_pool::internal::kZeroSizeArea; return Status::OK(); } ARROW_ASSIGN_OR_RAISE(int64_t raw_new_size, RawSize(new_size)); RETURN_NOT_OK( - WrappedAllocator::ReallocateAligned(old_size + kOverhead, raw_new_size, ptr)); + WrappedAllocator::ReallocateAligned(old_size + kOverhead, raw_new_size, alignment, ptr)); InitAllocatedArea(*ptr, new_size); return Status::OK(); } - static void DeallocateAligned(uint8_t* ptr, int64_t size) { + static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t alignment) { CheckAllocatedArea(ptr, size, "deallocation"); if (ptr != memory_pool::internal::kZeroSizeArea) { - WrappedAllocator::DeallocateAligned(ptr, size + kOverhead); + WrappedAllocator::DeallocateAligned(ptr, size + kOverhead, alignment); } } @@ -295,7 +295,7 @@ class SystemAllocator { public: // Allocate memory according to the alignment requirements for Arrow // (as of May 2016 64 bytes) - static Status AllocateAligned(int64_t size, uint8_t** out) { + static Status AllocateAligned(int64_t size, int64_t alignment, uint8_t** out) { if (size == 0) { *out = memory_pool::internal::kZeroSizeArea; return Status::OK(); @@ -303,19 +303,19 @@ class SystemAllocator { #ifdef _WIN32 // Special code path for Windows *out = reinterpret_cast( - _aligned_malloc(static_cast(size), memory_pool::internal::kAlignment)); + _aligned_malloc(static_cast(size), static_cast(alignment))); if (!*out) { return Status::OutOfMemory("malloc of size ", size, " failed"); } #elif defined(sun) || defined(__sun) *out = reinterpret_cast( - memalign(memory_pool::internal::kAlignment, static_cast(size))); + memalign(static_cast(alignment), static_cast(size))); if (!*out) { return Status::OutOfMemory("malloc of size ", size, " failed"); } #else const int result = - posix_memalign(reinterpret_cast(out), memory_pool::internal::kAlignment, + posix_memalign(reinterpret_cast(out), static_cast(alignment), static_cast(size)); if (result == ENOMEM) { return Status::OutOfMemory("malloc of size ", size, " failed"); @@ -323,20 +323,24 @@ class SystemAllocator { if (result == EINVAL) { return Status::Invalid("invalid alignment parameter: ", - memory_pool::internal::kAlignment); + static_cast(alignment)); } #endif return Status::OK(); } - static Status ReallocateAligned(int64_t old_size, int64_t new_size, uint8_t** ptr) { + static Status ReallocateAligned( + int64_t old_size, + int64_t new_size, + int64_t alignment, + uint8_t** ptr) { uint8_t* previous_ptr = *ptr; if (previous_ptr == memory_pool::internal::kZeroSizeArea) { DCHECK_EQ(old_size, 0); - return AllocateAligned(new_size, ptr); + return AllocateAligned(new_size, alignment, ptr); } if (new_size == 0) { - DeallocateAligned(previous_ptr, old_size); + DeallocateAligned(previous_ptr, old_size, alignment); *ptr = memory_pool::internal::kZeroSizeArea; return Status::OK(); } @@ -344,7 +348,7 @@ class SystemAllocator { // Allocate new chunk uint8_t* out = nullptr; - RETURN_NOT_OK(AllocateAligned(new_size, &out)); + RETURN_NOT_OK(AllocateAligned(new_size, alignment, &out)); DCHECK(out); // Copy contents and release old memory chunk memcpy(out, *ptr, static_cast(std::min(new_size, old_size))); @@ -357,7 +361,7 @@ class SystemAllocator { return Status::OK(); } - static void DeallocateAligned(uint8_t* ptr, int64_t size) { + static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t /*alignment*/) { if (ptr == memory_pool::internal::kZeroSizeArea) { DCHECK_EQ(size, 0); } else { @@ -383,13 +387,13 @@ class SystemAllocator { // Helper class directing allocations to the mimalloc allocator. class MimallocAllocator { public: - static Status AllocateAligned(int64_t size, uint8_t** out) { + static Status AllocateAligned(int64_t size, int64_t alignment, uint8_t** out) { if (size == 0) { *out = memory_pool::internal::kZeroSizeArea; return Status::OK(); } *out = reinterpret_cast( - mi_malloc_aligned(static_cast(size), memory_pool::internal::kAlignment)); + mi_malloc_aligned(static_cast(size), static_cast(alignment))); if (*out == NULL) { return Status::OutOfMemory("malloc of size ", size, " failed"); } @@ -398,19 +402,19 @@ class MimallocAllocator { static void ReleaseUnused() { mi_collect(true); } - static Status ReallocateAligned(int64_t old_size, int64_t new_size, uint8_t** ptr) { + static Status ReallocateAligned(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) { uint8_t* previous_ptr = *ptr; if (previous_ptr == memory_pool::internal::kZeroSizeArea) { DCHECK_EQ(old_size, 0); - return AllocateAligned(new_size, ptr); + return AllocateAligned(new_size, ptr, alignment); } if (new_size == 0) { - DeallocateAligned(previous_ptr, old_size); + DeallocateAligned(previous_ptr, old_size, ); *ptr = memory_pool::internal::kZeroSizeArea; return Status::OK(); } *ptr = reinterpret_cast(mi_realloc_aligned( - previous_ptr, static_cast(new_size), memory_pool::internal::kAlignment)); + previous_ptr, static_cast(new_size), alignment)); if (*ptr == NULL) { *ptr = previous_ptr; return Status::OutOfMemory("realloc of size ", new_size, " failed"); @@ -418,7 +422,7 @@ class MimallocAllocator { return Status::OK(); } - static void DeallocateAligned(uint8_t* ptr, int64_t size) { + static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t /*alignment*/) { if (ptr == memory_pool::internal::kZeroSizeArea) { DCHECK_EQ(size, 0); } else { @@ -448,14 +452,14 @@ class BaseMemoryPoolImpl : public MemoryPool { public: ~BaseMemoryPoolImpl() override {} - Status Allocate(int64_t size, uint8_t** out) override { + Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override { if (size < 0) { return Status::Invalid("negative malloc size"); } if (static_cast(size) >= std::numeric_limits::max()) { return Status::OutOfMemory("malloc size overflows size_t"); } - RETURN_NOT_OK(Allocator::AllocateAligned(size, out)); + RETURN_NOT_OK(Allocator::AllocateAligned(size, alignment, out)); #ifndef NDEBUG // Poison data if (size > 0) { @@ -469,14 +473,14 @@ class BaseMemoryPoolImpl : public MemoryPool { return Status::OK(); } - Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override { + Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) override { if (new_size < 0) { return Status::Invalid("negative realloc size"); } if (static_cast(new_size) >= std::numeric_limits::max()) { return Status::OutOfMemory("realloc overflows size_t"); } - RETURN_NOT_OK(Allocator::ReallocateAligned(old_size, new_size, ptr)); + RETURN_NOT_OK(Allocator::ReallocateAligned(old_size, new_size, alignment, ptr)); #ifndef NDEBUG // Poison data if (new_size > old_size) { @@ -490,7 +494,7 @@ class BaseMemoryPoolImpl : public MemoryPool { return Status::OK(); } - void Free(uint8_t* buffer, int64_t size) override { + void Free(uint8_t* buffer, int64_t size, int64_t alignment) override { #ifndef NDEBUG // Poison data if (size > 0) { @@ -499,7 +503,7 @@ class BaseMemoryPoolImpl : public MemoryPool { buffer[size - 1] = kDeallocPoison; } #endif - Allocator::DeallocateAligned(buffer, size); + Allocator::DeallocateAligned(buffer, size, alignment); stats_.UpdateAllocatedBytes(-size); } @@ -672,22 +676,24 @@ Status jemalloc_set_decay_ms(int ms) { LoggingMemoryPool::LoggingMemoryPool(MemoryPool* pool) : pool_(pool) {} -Status LoggingMemoryPool::Allocate(int64_t size, uint8_t** out) { - Status s = pool_->Allocate(size, out); - std::cout << "Allocate: size = " << size << std::endl; +Status LoggingMemoryPool::Allocate(int64_t size, int64_t alignment, uint8_t** out) { + Status s = pool_->Allocate(size, alignment, out); + std::cout << "Allocate: size = " << size << ", alignment = " << alignment << std::endl; return s; } -Status LoggingMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) { +Status LoggingMemoryPool::Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) { Status s = pool_->Reallocate(old_size, new_size, ptr); - std::cout << "Reallocate: old_size = " << old_size << " - new_size = " << new_size + std::cout << "Reallocate: old_size = " << old_size + << ", new_size = " << new_size + << ", alignment = " << alignment << std::endl; return s; } -void LoggingMemoryPool::Free(uint8_t* buffer, int64_t size) { - pool_->Free(buffer, size); - std::cout << "Free: size = " << size << std::endl; +void LoggingMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment) { + pool_->Free(buffer, size, alignment); + std::cout << "Free: size = " << size << ", alignment = " << alignment << std::endl; } int64_t LoggingMemoryPool::bytes_allocated() const { @@ -711,20 +717,20 @@ class ProxyMemoryPool::ProxyMemoryPoolImpl { public: explicit ProxyMemoryPoolImpl(MemoryPool* pool) : pool_(pool) {} - Status Allocate(int64_t size, uint8_t** out) { - RETURN_NOT_OK(pool_->Allocate(size, out)); + Status Allocate(int64_t size, int64_t alignment, uint8_t** out) { + RETURN_NOT_OK(pool_->Allocate(size, alignment, out)); stats_.UpdateAllocatedBytes(size); return Status::OK(); } - Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) { - RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, ptr)); + Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) { + RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, alignment, ptr)); stats_.UpdateAllocatedBytes(new_size - old_size); return Status::OK(); } - void Free(uint8_t* buffer, int64_t size) { - pool_->Free(buffer, size); + void Free(uint8_t* buffer, int64_t size, int64_t alignment) { + pool_->Free(buffer, size, alignment); stats_.UpdateAllocatedBytes(-size); } @@ -745,16 +751,16 @@ ProxyMemoryPool::ProxyMemoryPool(MemoryPool* pool) { ProxyMemoryPool::~ProxyMemoryPool() {} -Status ProxyMemoryPool::Allocate(int64_t size, uint8_t** out) { - return impl_->Allocate(size, out); +Status ProxyMemoryPool::Allocate(int64_t size, int64_t alignment, uint8_t** out) { + return impl_->Allocate(size, alignment, out); } -Status ProxyMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) { - return impl_->Reallocate(old_size, new_size, ptr); +Status ProxyMemoryPool::Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) { + return impl_->Reallocate(old_size, new_size, alignment, ptr); } -void ProxyMemoryPool::Free(uint8_t* buffer, int64_t size) { - return impl_->Free(buffer, size); +void ProxyMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment) { + return impl_->Free(buffer, size, alignment); } int64_t ProxyMemoryPool::bytes_allocated() const { return impl_->bytes_allocated(); } @@ -777,8 +783,8 @@ std::vector SupportedMemoryBackendNames() { /// A Buffer whose lifetime is tied to a particular MemoryPool class PoolBuffer final : public ResizableBuffer { public: - explicit PoolBuffer(std::shared_ptr mm, MemoryPool* pool) - : ResizableBuffer(nullptr, 0, std::move(mm)), pool_(pool) {} + explicit PoolBuffer(std::shared_ptr mm, MemoryPool* pool, int64_t alignment) + : ResizableBuffer(nullptr, 0, std::move(mm)), pool_(pool), alignment_(alignment) {} ~PoolBuffer() override { // Avoid calling pool_->Free if the global pools are destroyed @@ -789,7 +795,7 @@ class PoolBuffer final : public ResizableBuffer { // no guarantee of destructor order between thread/memory pools) uint8_t* ptr = mutable_data(); if (ptr && !global_state.is_finalizing()) { - pool_->Free(ptr, capacity_); + pool_->Free(ptr, capacity_, alignment_); } } @@ -801,9 +807,9 @@ class PoolBuffer final : public ResizableBuffer { if (!ptr || capacity > capacity_) { int64_t new_capacity = bit_util::RoundUpToMultipleOf64(capacity); if (ptr) { - RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, &ptr)); + RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, alignment_, &ptr)); } else { - RETURN_NOT_OK(pool_->Allocate(new_capacity, &ptr)); + RETURN_NOT_OK(pool_->Allocate(new_capacity, alignment_, &ptr)); } data_ = ptr; capacity_ = new_capacity; @@ -822,7 +828,7 @@ class PoolBuffer final : public ResizableBuffer { int64_t new_capacity = bit_util::RoundUpToMultipleOf64(new_size); if (capacity_ != new_capacity) { // Buffer hasn't got yet the requested size. - RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, &ptr)); + RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, alignment_, &ptr)); data_ = ptr; capacity_ = new_capacity; } @@ -834,7 +840,7 @@ class PoolBuffer final : public ResizableBuffer { return Status::OK(); } - static std::shared_ptr MakeShared(MemoryPool* pool) { + static std::shared_ptr MakeShared(MemoryPool* pool, int64_t alignment) { std::shared_ptr mm; if (pool == nullptr) { pool = default_memory_pool(); @@ -842,10 +848,10 @@ class PoolBuffer final : public ResizableBuffer { } else { mm = CPUDevice::memory_manager(pool); } - return std::make_shared(std::move(mm), pool); + return std::make_shared(std::move(mm), pool, alignment); } - static std::unique_ptr MakeUnique(MemoryPool* pool) { + static std::unique_ptr MakeUnique(MemoryPool* pool, int64_t alignment) { std::shared_ptr mm; if (pool == nullptr) { pool = default_memory_pool(); @@ -853,11 +859,12 @@ class PoolBuffer final : public ResizableBuffer { } else { mm = CPUDevice::memory_manager(pool); } - return std::unique_ptr(new PoolBuffer(std::move(mm), pool)); + return std::unique_ptr(new PoolBuffer(std::move(mm), pool, alignment)); } private: MemoryPool* pool_; + int64_t alignment_; }; namespace { @@ -873,14 +880,40 @@ inline Result ResizePoolBuffer(PoolBufferPtr&& buffer, const int64_t } // namespace -Result> AllocateBuffer(const int64_t size, MemoryPool* pool) { - return ResizePoolBuffer>(PoolBuffer::MakeUnique(pool), size); + Result> AllocateBuffer(const int64_t size, MemoryPool *pool) + { + return AllocateBuffer(size, MemoryPool::kDefaultAlignment, pool); + } + +Result> AllocateBuffer( + const int64_t size, + const int64_t alignment, + MemoryPool* pool) { + return ResizePoolBuffer>( + PoolBuffer::MakeUnique( + pool, + alignment), + size); +} + +Result> AllocateResizableBuffer( + const int64_t size, + MemoryPool* pool) { + return AllocateResizableBuffer( + size, + MemoryPool::kDefaultAlignment, + pool); } -Result> AllocateResizableBuffer(const int64_t size, - MemoryPool* pool) { - return ResizePoolBuffer>(PoolBuffer::MakeUnique(pool), - size); +Result> AllocateResizableBuffer( + const int64_t size, + const int64_t alignment, + MemoryPool* pool) { + return ResizePoolBuffer>( + PoolBuffer::MakeUnique( + pool, + alignment), + size); } } // namespace arrow diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h index 58b375af3a9d9..a637cf6e02dd9 100644 --- a/cpp/src/arrow/memory_pool.h +++ b/cpp/src/arrow/memory_pool.h @@ -63,6 +63,8 @@ class MemoryPoolStats { /// take care of the required 64-byte alignment. class ARROW_EXPORT MemoryPool { public: + static constexpr int64_t kDefaultAlignment = 64; + virtual ~MemoryPool() = default; /// \brief EXPERIMENTAL. Create a new instance of the default MemoryPool @@ -71,13 +73,17 @@ class ARROW_EXPORT MemoryPool { /// Allocate a new memory region of at least size bytes. /// /// The allocated region shall be 64-byte aligned. - virtual Status Allocate(int64_t size, uint8_t** out) = 0; + Status Allocate(int64_t size, uint8_t** out) { return Allocate(size, kDefaultAlignment, out); } + + /// Allocate a new memory region of at least size bytes aligned to alignment. + virtual Status Allocate(int64_t size, int64_t alignment, uint8_t** out) = 0; /// Resize an already allocated memory section. /// /// As by default most default allocators on a platform don't support aligned /// reallocation, this function can involve a copy of the underlying data. - virtual Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) = 0; + virtual Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) = 0; + Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) { return Reallocate(old_size, new_size, kDefaultAlignment, ptr); } /// Free an allocated region. /// @@ -85,7 +91,7 @@ class ARROW_EXPORT MemoryPool { /// @param size Allocated size located at buffer. An allocator implementation /// may use this for tracking the amount of allocated bytes as well as for /// faster deallocation if supported by its backend. - virtual void Free(uint8_t* buffer, int64_t size) = 0; + virtual void Free(uint8_t* buffer, int64_t size, int64_t alignment = kDefaultAlignment) = 0; /// Return unused memory to the OS /// @@ -116,10 +122,10 @@ class ARROW_EXPORT LoggingMemoryPool : public MemoryPool { explicit LoggingMemoryPool(MemoryPool* pool); ~LoggingMemoryPool() override = default; - Status Allocate(int64_t size, uint8_t** out) override; - Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override; + Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override; + Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) override; - void Free(uint8_t* buffer, int64_t size) override; + void Free(uint8_t* buffer, int64_t size, int64_t alignment) override; int64_t bytes_allocated() const override; @@ -140,10 +146,10 @@ class ARROW_EXPORT ProxyMemoryPool : public MemoryPool { explicit ProxyMemoryPool(MemoryPool* pool); ~ProxyMemoryPool() override; - Status Allocate(int64_t size, uint8_t** out) override; - Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override; + Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override; + Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) override; - void Free(uint8_t* buffer, int64_t size) override; + void Free(uint8_t* buffer, int64_t size, int64_t alignment) override; int64_t bytes_allocated() const override; diff --git a/cpp/src/arrow/memory_pool_internal.h b/cpp/src/arrow/memory_pool_internal.h index df0ee646a47e8..6176a1da8fcf5 100644 --- a/cpp/src/arrow/memory_pool_internal.h +++ b/cpp/src/arrow/memory_pool_internal.h @@ -26,8 +26,6 @@ namespace memory_pool { namespace internal { -static constexpr size_t kAlignment = 64; - static constexpr int64_t kDebugXorSuffix = -0x181fe80e0b464188LL; // A static piece of memory for 0-size allocations, so as to return @@ -41,9 +39,9 @@ static uint8_t* const kZeroSizeArea = reinterpret_cast(&zero_size_area // Helper class directing allocations to the jemalloc allocator. class JemallocAllocator { public: - static Status AllocateAligned(int64_t size, uint8_t** out); - static Status ReallocateAligned(int64_t old_size, int64_t new_size, uint8_t** ptr); - static void DeallocateAligned(uint8_t* ptr, int64_t size); + static Status AllocateAligned(int64_t size, int64_t alignment, uint8_t** out); + static Status ReallocateAligned(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr); + static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t alignment); static void ReleaseUnused(); }; diff --git a/cpp/src/arrow/memory_pool_jemalloc.cc b/cpp/src/arrow/memory_pool_jemalloc.cc index 48a5bac137bd6..caea99f6e1a93 100644 --- a/cpp/src/arrow/memory_pool_jemalloc.cc +++ b/cpp/src/arrow/memory_pool_jemalloc.cc @@ -81,33 +81,35 @@ namespace memory_pool { namespace internal { -Status JemallocAllocator::AllocateAligned(int64_t size, uint8_t** out) { +Status JemallocAllocator::AllocateAligned(int64_t size, int64_t alignment, uint8_t** out) { if (size == 0) { *out = kZeroSizeArea; return Status::OK(); } *out = reinterpret_cast( - mallocx(static_cast(size), MALLOCX_ALIGN(kAlignment))); + mallocx(static_cast(size), MALLOCX_ALIGN(static_cast(alignment)))); if (*out == NULL) { return Status::OutOfMemory("malloc of size ", size, " failed"); } return Status::OK(); } -Status JemallocAllocator::ReallocateAligned(int64_t old_size, int64_t new_size, - uint8_t** ptr) { +Status JemallocAllocator::ReallocateAligned(int64_t old_size, + int64_t new_size, + int64_t alignment, + uint8_t** ptr) { uint8_t* previous_ptr = *ptr; if (previous_ptr == kZeroSizeArea) { DCHECK_EQ(old_size, 0); - return AllocateAligned(new_size, ptr); + return AllocateAligned(new_size, alignment, ptr); } if (new_size == 0) { - DeallocateAligned(previous_ptr, old_size); + DeallocateAligned(previous_ptr, old_size, alignment); *ptr = kZeroSizeArea; return Status::OK(); } *ptr = reinterpret_cast( - rallocx(*ptr, static_cast(new_size), MALLOCX_ALIGN(kAlignment))); + rallocx(*ptr, static_cast(new_size), MALLOCX_ALIGN(static_cast(alignment)))); if (*ptr == NULL) { *ptr = previous_ptr; return Status::OutOfMemory("realloc of size ", new_size, " failed"); @@ -115,11 +117,11 @@ Status JemallocAllocator::ReallocateAligned(int64_t old_size, int64_t new_size, return Status::OK(); } -void JemallocAllocator::DeallocateAligned(uint8_t* ptr, int64_t size) { +void JemallocAllocator::DeallocateAligned(uint8_t* ptr, int64_t size, int64_t alignment) { if (ptr == kZeroSizeArea) { DCHECK_EQ(size, 0); } else { - dallocx(ptr, MALLOCX_ALIGN(kAlignment)); + sdallocx(ptr, static_cast(size), MALLOCX_ALIGN(static_cast(alignment))); } } diff --git a/cpp/src/arrow/stl_allocator.h b/cpp/src/arrow/stl_allocator.h index b5ad2b53460f0..230dfea5473e5 100644 --- a/cpp/src/arrow/stl_allocator.h +++ b/cpp/src/arrow/stl_allocator.h @@ -100,7 +100,7 @@ class STLMemoryPool : public MemoryPool { /// \brief Construct a memory pool from the given allocator explicit STLMemoryPool(const Allocator& alloc) : alloc_(alloc) {} - Status Allocate(int64_t size, uint8_t** out) override { + Status Allocate(int64_t size, int64_t /*alignment*/, uint8_t** out) override { try { *out = alloc_.allocate(size); } catch (std::bad_alloc& e) { @@ -110,7 +110,7 @@ class STLMemoryPool : public MemoryPool { return Status::OK(); } - Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override { + Status Reallocate(int64_t old_size, int64_t new_size, int64_t /*alignment*/, uint8_t** ptr) override { uint8_t* old_ptr = *ptr; try { *ptr = alloc_.allocate(new_size); @@ -123,7 +123,7 @@ class STLMemoryPool : public MemoryPool { return Status::OK(); } - void Free(uint8_t* buffer, int64_t size) override { + void Free(uint8_t* buffer, int64_t size, int64_t /*alignment*/) override { alloc_.deallocate(buffer, size); stats_.UpdateAllocatedBytes(-size); } diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 11ae80d03e256..90e8b42d3611c 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -106,6 +106,7 @@ #elif __APPLE__ #include +#include #elif __linux__ #include @@ -2107,5 +2108,37 @@ int64_t GetCurrentRSS() { #endif } + int64_t GetTotalMemoryBytes() + { +#if defined(_WIN32) + int64_t result_kb; + if(!GetPhysicallyInstalledSystemMemory(&result)) + { + ARROW_LOG(WARNING) << "FAiled to resolve total RAM size: " << std::strerror(GetLastError()); + return -1; + } + return (result_kb * 1024); +#elif defined(__APPLE__) + int64_t result; + size_t size = sizeof(result); + if(sysctlbyname("hw.memsize", &result, &size, nullptr, 0) == -1) + { + ARROW_LOG(WARNING) << "Failed to resolve total RAM size"; + return -1; + } + return result; +#elif defined(__linux__) + struct sysinfo info; + if(sysinfo(&info) == -1) + { + ARROW_LOG(WARNING) << "Failed to resolve total RAM size: " << std::strerror(errno); + return -1; + } + return static_cast(info.totalram * info.mem_unit); +#else + return 0; +#endif + } + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index df63de47e8386..43d85ec24e28b 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -410,5 +410,11 @@ uint64_t GetThreadId(); ARROW_EXPORT int64_t GetCurrentRSS(); +/// \brief Get the total memory available to the system in bytes +/// +/// This function supports Windows, Linux, and Mac and will return 0 otherwise +ARROW_EXPORT +int64_t GetTotalMemoryBytes(); + } // namespace internal } // namespace arrow