Skip to content

Commit

Permalink
Add spilling for hash join
Browse files Browse the repository at this point in the history
  • Loading branch information
save-buffer committed Aug 18, 2022
1 parent 8474ee5 commit 8f07030
Show file tree
Hide file tree
Showing 40 changed files with 2,187 additions and 589 deletions.
3 changes: 3 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,11 @@ if(ARROW_COMPUTE)
compute/exec/partition_util.cc
compute/exec/options.cc
compute/exec/project_node.cc
compute/exec/query_context.cc
compute/exec/sink_node.cc
compute/exec/source_node.cc
compute/exec/spilling_util.cc
compute/exec/spilling_join.cc
compute/exec/swiss_join.cc
compute/exec/task_util.cc
compute/exec/tpch_node.cc
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ class ARROW_EXPORT ResizableBuffer : public MutableBuffer {
ARROW_EXPORT
Result<std::unique_ptr<Buffer>> AllocateBuffer(const int64_t size,
MemoryPool* pool = NULLPTR);
ARROW_EXPORT
Result<std::unique_ptr<Buffer>> AllocateBuffer(const int64_t size,
int64_t alignment,
MemoryPool* pool = NULLPTR);

/// \brief Allocate a resizeable buffer from a memory pool, zero its padding.
///
Expand All @@ -468,6 +472,9 @@ Result<std::unique_ptr<Buffer>> AllocateBuffer(const int64_t size,
ARROW_EXPORT
Result<std::unique_ptr<ResizableBuffer>> AllocateResizableBuffer(
const int64_t size, MemoryPool* pool = NULLPTR);
ARROW_EXPORT
Result<std::unique_ptr<ResizableBuffer>> AllocateResizableBuffer(
const int64_t size, const int64_t alignment, MemoryPool* pool = NULLPTR);

/// \brief Allocate a bitmap buffer from a memory pool
/// no guarantee on values is provided.
Expand Down
122 changes: 120 additions & 2 deletions cpp/src/arrow/compute/exec/accumulation_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/util/atomic_util.h"
#include "arrow/compute/exec/accumulation_queue.h"
#include "arrow/compute/exec/key_hash.h"

#include <iterator>

namespace arrow {
namespace util {
using arrow::compute::ExecBatch;
namespace compute {

AccumulationQueue::AccumulationQueue(AccumulationQueue&& that) {
this->batches_ = std::move(that.batches_);
this->row_count_ = that.row_count_;
Expand All @@ -48,11 +50,127 @@ void AccumulationQueue::InsertBatch(ExecBatch batch) {
batches_.emplace_back(std::move(batch));
}

void AccumulationQueue::SetBatch(size_t idx, ExecBatch batch)
{
ARROW_DCHECK(idx < batches_.size());
arrow::util::AtomicFetchSub(&row_count_, batches_[idx].length, std::memory_order_relaxed);
arrow::util::AtomicFetchAdd(&row_count_, batch.length, std::memory_order_relaxed);
batches_[idx] = std::move(batch);
}

void AccumulationQueue::Clear() {
row_count_ = 0;
batches_.clear();
}

ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; }

Status SpillingAccumulationQueue::Init(QueryContext *ctx)
{
ctx_ = ctx;
partition_locks_.Init(ctx_->max_concurrency(), kNumPartitions);
return Status::OK();
}

Status SpillingAccumulationQueue::InsertBatch(
size_t thread_index,
ExecBatch batch)
{
Datum hash_datum = std::move(batch.values.back());
const uint64_t *hashes = reinterpret_cast<const uint64_t *>(hash_datum.array()->buffers[1]->data());
std::vector<uint16_t> row_ids(batch.length);
std::iota(row_ids.begin(), row_ids.end(), 0);
uint16_t part_starts[kNumPartitions + 1];
PartitionSort::Eval(
util::MiniBatch::kMiniBatchLength,
kNumPartitions,
part_starts,
[&](int64_t i)
{
return hashes[i] & (kNumPartitions - 1);
},
[&row_ids](int64_t i, int output_pos)
{
row_ids[i] = static_cast<uint16_t>(output_pos);
});

int unprocessed_partition_ids[kNumPartitions];
RETURN_NOT_OK(partition_locks_.ForEachPartition(
thread_index,
unprocessed_partition_ids,
[&](int part_id)
{
return part_starts[part_id + 1] == part_starts[part_id];
},
[&](int locked_part_id)
{
uint64_t num_total_rows_to_append =
part_starts[locked_part_id + 1] - part_starts[locked_part_id];

while(num_total_rows_to_append > 0)
{
int num_rows_to_append = std::min(
static_cast<int>(num_total_rows_to_append),
static_cast<int>(ExecBatchBuilder::num_rows_max() - builders_[locked_part_id].num_rows()));

RETURN_NOT_OK(builders_[locked_part_id].AppendSelected(
ctx_->memory_pool(),
batch,
num_rows_to_append,
row_ids.data() + part_starts[locked_part_id],
batch.num_values()));

if(builders_[locked_part_id].is_full())
{
ExecBatch batch = builders_[locked_part_id].Flush();
if(locked_part_id < spilling_cursor_)
RETURN_NOT_OK(files_[locked_part_id].SpillBatch(ctx_, std::move(batch)));
else
queues_[locked_part_id].InsertBatch(std::move(batch));
}
part_starts[locked_part_id] += num_rows_to_append;
num_total_rows_to_append -= num_rows_to_append;
}
return Status::OK();
}));
return Status::OK();
}

Status SpillingAccumulationQueue::GetPartition(
size_t thread_index,
int partition,
std::function<Status(size_t, AccumulationQueue)> on_finished)
{
if(partition >= spilling_cursor_)
return on_finished(thread_index, std::move(queues_[partition]));

queues_[partition].Resize(files_[partition].num_batches());
return files_[partition].ReadBackBatches(
ctx_,
[this, partition](size_t idx, ExecBatch batch)
{
queues_[partition].SetBatch(idx, std::move(batch));
return Status::OK();
},
[this, partition, on_finished](size_t thread_index)
{
return on_finished(thread_index, std::move(queues_[partition]));
});
}

Result<bool> SpillingAccumulationQueue::AdvanceSpillCursor()
{
int to_spill = spilling_cursor_.fetch_add(1);
if(to_spill >= kNumPartitions)
return false;

auto lock = partition_locks_.AcquirePartitionLock(to_spill);

size_t num_batches = queues_[to_spill].batch_count();
for(size_t i = 0; i < num_batches; i++)
RETURN_NOT_OK(files_[to_spill].SpillBatch(ctx_, std::move(queues_[to_spill][i])));
return true;
}

} // namespace util
} // namespace arrow
41 changes: 38 additions & 3 deletions cpp/src/arrow/compute/exec/accumulation_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
#include <vector>

#include "arrow/compute/exec.h"
#include "arrow/compute/light_array.h"
#include "arrow/compute/exec/partition_util.h"
#include "arrow/compute/exec/task_util.h"
#include "arrow/compute/exec/spilling_util.h"

namespace arrow {
namespace util {
using arrow::compute::ExecBatch;
namespace compute {

/// \brief A container that accumulates batches until they are ready to
/// be processed.
Expand All @@ -42,9 +45,11 @@ class AccumulationQueue {

void Concatenate(AccumulationQueue&& that);
void InsertBatch(ExecBatch batch);
void SetBatch(size_t idx, ExecBatch batch);
int64_t row_count() { return row_count_; }
size_t batch_count() { return batches_.size(); }
bool empty() const { return batches_.empty(); }
void Resize(size_t size) { batches_.resize(size); }
void Clear();
ExecBatch& operator[](size_t i);

Expand All @@ -53,5 +58,35 @@ class AccumulationQueue {
std::vector<ExecBatch> batches_;
};

} // namespace util
class SpillingAccumulationQueue
{
public:
static constexpr int kNumPartitions = 64;
Status Init(QueryContext *ctx);
// Assumes that the final column in batch contains 64-bit hashes of the columns.
Status InsertBatch(
size_t thread_index,
ExecBatch batch);
Status GetPartition(
size_t thread_index,
int partition,
std::function<Status(size_t, AccumulationQueue)> on_finished);
Result<bool> AdvanceSpillCursor();

private:
std::atomic<int> spilling_cursor_{0}; // denotes the first in-memory partition
QueryContext* ctx_;
PartitionLocks partition_locks_;

AccumulationQueue queues_[kNumPartitions];
AccumulationQueue hash_queues_[kNumPartitions];

ExecBatchBuilder builders_[kNumPartitions];
ExecBatchBuilder hash_builders_[kNumPartitions];

SpillFile files_[kNumPartitions];
SpillFile hash_files_[kNumPartitions];
};

} // namespace compute
} // namespace arrow
34 changes: 18 additions & 16 deletions cpp/src/arrow/compute/exec/aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ScalarAggregateNode : public ExecNode {
auto aggregates = aggregate_options.aggregates;

const auto& input_schema = *inputs[0]->output_schema();
auto exec_ctx = plan->exec_context();
auto exec_ctx = plan->query_context()->exec_context();

std::vector<const ScalarAggregateKernel*> kernels(aggregates.size());
std::vector<std::vector<std::unique_ptr<KernelState>>> states(kernels.size());
Expand Down Expand Up @@ -113,7 +113,7 @@ class ScalarAggregateNode : public ExecNode {
}

KernelContext kernel_ctx{exec_ctx};
states[i].resize(plan->max_concurrency());
states[i].resize(plan->query_context()->max_concurrency());
RETURN_NOT_OK(Kernel::InitAll(&kernel_ctx,
KernelInitArgs{kernels[i],
{
Expand Down Expand Up @@ -150,7 +150,7 @@ class ScalarAggregateNode : public ExecNode {
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Consume"}});
KernelContext batch_ctx{plan()->exec_context()};
KernelContext batch_ctx{plan()->query_context()->exec_context()};
batch_ctx.SetState(states_[i][thread_index].get());

ExecSpan single_column_batch{{batch.values[target_field_ids_[i]]}, batch.length};
Expand All @@ -168,7 +168,7 @@ class ScalarAggregateNode : public ExecNode {
{"batch.length", batch.length}});
DCHECK_EQ(input, inputs_[0]);

auto thread_index = plan_->GetThreadIndex();
auto thread_index = plan_->query_context()->GetThreadIndex();

if (ErrorIfNotOk(DoConsume(ExecSpan(batch), thread_index))) return;

Expand Down Expand Up @@ -245,7 +245,7 @@ class ScalarAggregateNode : public ExecNode {
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Finalize"}});
KernelContext ctx{plan()->exec_context()};
KernelContext ctx{plan()->query_context()->exec_context()};
ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll(
kernels_[i], &ctx, std::move(states_[i])));
RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[i]));
Expand All @@ -267,20 +267,19 @@ class ScalarAggregateNode : public ExecNode {

class GroupByNode : public ExecNode {
public:
GroupByNode(ExecNode* input, std::shared_ptr<Schema> output_schema, ExecContext* ctx,
GroupByNode(ExecNode* input, std::shared_ptr<Schema> output_schema,
std::vector<int> key_field_ids, std::vector<int> agg_src_field_ids,
std::vector<Aggregate> aggs,
std::vector<const HashAggregateKernel*> agg_kernels)
: ExecNode(input->plan(), {input}, {"groupby"}, std::move(output_schema),
/*num_outputs=*/1),
ctx_(ctx),
key_field_ids_(std::move(key_field_ids)),
agg_src_field_ids_(std::move(agg_src_field_ids)),
aggs_(std::move(aggs)),
agg_kernels_(std::move(agg_kernels)) {}

Status Init() override {
output_task_group_id_ = plan_->RegisterTaskGroup(
output_task_group_id_ = plan_->query_context()->RegisterTaskGroup(
[this](size_t, int64_t task_id) {
OutputNthBatch(task_id);
return Status::OK();
Expand Down Expand Up @@ -326,7 +325,7 @@ class GroupByNode : public ExecNode {
agg_src_types[i] = input_schema->field(agg_src_field_id)->type().get();
}

auto ctx = input->plan()->exec_context();
auto ctx = plan->query_context()->exec_context();

// Construct aggregates
ARROW_ASSIGN_OR_RAISE(auto agg_kernels,
Expand Down Expand Up @@ -354,7 +353,7 @@ class GroupByNode : public ExecNode {
}

return input->plan()->EmplaceNode<GroupByNode>(
input, schema(std::move(output_fields)), ctx, std::move(key_field_ids),
input, schema(std::move(output_fields)), std::move(key_field_ids),
std::move(agg_src_field_ids), std::move(aggs), std::move(agg_kernels));
}

Expand All @@ -366,7 +365,7 @@ class GroupByNode : public ExecNode {
{{"group_by", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
size_t thread_index = plan_->GetThreadIndex();
size_t thread_index = plan_->query_context()->GetThreadIndex();
if (thread_index >= local_states_.size()) {
return Status::IndexError("thread index ", thread_index, " is out of range [0, ",
local_states_.size(), ")");
Expand All @@ -393,7 +392,8 @@ class GroupByNode : public ExecNode {
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Consume"}});
KernelContext kernel_ctx{ctx_};
auto ctx = plan_->query_context()->exec_context();
KernelContext kernel_ctx{ctx};
kernel_ctx.SetState(state->agg_states[i].get());

ExecSpan agg_batch({batch[agg_src_field_ids_[i]], ExecValue(*id_batch.array())},
Expand Down Expand Up @@ -429,7 +429,9 @@ class GroupByNode : public ExecNode {
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Merge"}});
KernelContext batch_ctx{ctx_};

auto ctx = plan_->query_context()->exec_context();
KernelContext batch_ctx{ctx};
DCHECK(state0->agg_states[i]);
batch_ctx.SetState(state0->agg_states[i].get());

Expand Down Expand Up @@ -497,7 +499,7 @@ class GroupByNode : public ExecNode {

int64_t num_output_batches = bit_util::CeilDiv(out_data_.length, output_batch_size());
outputs_[0]->InputFinished(this, static_cast<int>(num_output_batches));
RETURN_NOT_OK(plan_->StartTaskGroup(output_task_group_id_, num_output_batches));
RETURN_NOT_OK(plan_->query_context()->StartTaskGroup(output_task_group_id_, num_output_batches));
return Status::OK();
}

Expand Down Expand Up @@ -548,7 +550,7 @@ class GroupByNode : public ExecNode {
{"node.detail", ToString()},
{"node.kind", kind_name()}});

local_states_.resize(plan_->max_concurrency());
local_states_.resize(plan_->query_context()->max_concurrency());
return Status::OK();
}

Expand Down Expand Up @@ -593,7 +595,7 @@ class GroupByNode : public ExecNode {
};

ThreadLocalState* GetLocalState() {
size_t thread_index = plan_->GetThreadIndex();
size_t thread_index = plan_->query_context()->GetThreadIndex();
return &local_states_[thread_index];
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ class AsofJoinNode : public ExecNode {
if (dst.empty()) {
return NULLPTR;
} else {
return dst.Materialize(plan()->exec_context()->memory_pool(), output_schema(),
return dst.Materialize(plan()->query_context()->memory_pool(), output_schema(),
state_);
}
}
Expand Down
Loading

0 comments on commit 8f07030

Please sign in to comment.