Skip to content

Commit

Permalink
Add spilling for hash join
Browse files Browse the repository at this point in the history
  • Loading branch information
save-buffer committed Aug 11, 2022
1 parent 440f786 commit 8d527a3
Show file tree
Hide file tree
Showing 35 changed files with 1,801 additions and 459 deletions.
3 changes: 3 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,11 @@ if(ARROW_COMPUTE)
compute/exec/partition_util.cc
compute/exec/options.cc
compute/exec/project_node.cc
compute/exec/query_context.cc
compute/exec/sink_node.cc
compute/exec/source_node.cc
compute/exec/spilling_util.cc
compute/exec/spilling_join.cc
compute/exec/swiss_join.cc
compute/exec/task_util.cc
compute/exec/tpch_node.cc
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ class ARROW_EXPORT ResizableBuffer : public MutableBuffer {
ARROW_EXPORT
Result<std::unique_ptr<Buffer>> AllocateBuffer(const int64_t size,
MemoryPool* pool = NULLPTR);
ARROW_EXPORT
Result<std::unique_ptr<Buffer>> AllocateBuffer(const int64_t size,
int64_t alignment,
MemoryPool* pool = NULLPTR);

/// \brief Allocate a resizeable buffer from a memory pool, zero its padding.
///
Expand All @@ -468,6 +472,9 @@ Result<std::unique_ptr<Buffer>> AllocateBuffer(const int64_t size,
ARROW_EXPORT
Result<std::unique_ptr<ResizableBuffer>> AllocateResizableBuffer(
const int64_t size, MemoryPool* pool = NULLPTR);
ARROW_EXPORT
Result<std::unique_ptr<ResizableBuffer>> AllocateResizableBuffer(
const int64_t size, const int64_t alignment, MemoryPool* pool = NULLPTR);

/// \brief Allocate a bitmap buffer from a memory pool
/// no guarantee on values is provided.
Expand Down
127 changes: 125 additions & 2 deletions cpp/src/arrow/compute/exec/accumulation_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/util/atomic_util.h"
#include "arrow/compute/exec/accumulation_queue.h"
#include "arrow/compute/exec/key_hash.h"

#include <iterator>

namespace arrow {
namespace util {
using arrow::compute::ExecBatch;
namespace compute {

AccumulationQueue::AccumulationQueue(AccumulationQueue&& that) {
this->batches_ = std::move(that.batches_);
this->row_count_ = that.row_count_;
Expand All @@ -48,11 +50,132 @@ void AccumulationQueue::InsertBatch(ExecBatch batch) {
batches_.emplace_back(std::move(batch));
}

void AccumulationQueue::InsertAt(ExecBatch batch, size_t idx)
{
ARROW_DCHECK(idx < batches_.size());
arrow::util::AtomicFetchSub(&row_count_, batches_[idx].length, std::memory_order_relaxed);
arrow::util::AtomicFetchAdd(&row_count_, batch.length, std::memory_order_relaxed)
batches_[idx] = std::move(batch);
}

void AccumulationQueue::Clear() {
row_count_ = 0;
batches_.clear();
}

ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; }

Status SpillingAccumulationQueue::Init(QueryContext *ctx)
{
ctx_ = ctx;
partition_locks_.Init(ctx_->max_concurrency(), kNumPartitions);
return Status::OK();
}

Status SpillingAccumulationQueue::InsertBatch(
size_t thread_index,
ExecBatch batch)
{
for(int64_t i = 0; i < batch.length; i += util::MiniBatch::kMiniBatchLength)
{
int64_t length = std::min(static_cast<int64_t>(batch.length - i),
static_cast<int64_t>(util::MiniBatch::kMiniBatchLength));

RETURN_NOT_OK(PartitionBatchIntoBuilders(
thread_index,
batch,
i,
length));
}
return Status::OK();
}

Status SpillingAccumulationQueue::GetPartition(
int partition,
std::function<Status(AccumulationQueue)> on_finished)
{
if(partition >= spilling_cursor_)
return fn(std::move(queues_[partition]));

queues_[partition].Resize(files_[partition].num_batches());
return files_[partition].ReadBackBatches(
ctx_,
[this](size_t idx, ExecBatch batch)
{
queues_[partition].InsertAt(idx, std::move(batch));
return Status::OK();
},
[this, partition, on_finished](size_t thread_index)
{
return on_finished(thread_index, std::move(queues_[partition]));
});
}

Status SpillingAccumulationQueue::PartitionBatchIntoBuilders(
size_t thread_index,
ExecBatch &batch,
int64_t start,
int64_t length)
{
uint64_t *hashes = batch.values().back()->array()->GetValues<uint64_t>(1);
uint16_t row_ids[util::MiniBatch::kMiniBatchLength];
std::iota(row_ids, row_ids + util::MiniBatch::kMiniBatchLength, 0);
uint16_t part_starts[kNumPartitions + 1];
PartitionSort::Eval(
util::MiniBatch::kMiniBatchLength,
kNumPartitions,
part_starts,
[&](int64_t i)
{
return hashes[i] & (kNumPartitions - 1);
},
[&row_ids, start](int64_t i, int output_pos)
{
row_ids[i] = static_cast<uint16_t>(start + output_pos);
});

partition_locks_.ForEachPartition(
thread_index,
unprocessed_partition_ids,
[&](int part_id)
{
return part_starts[part_id + 1] == part_starts[part_id];
},
[&](int locked_part_id)
{
uint64_t num_total_rows_to_append =
part_starts[locked_part_id + 1] - part_starts[locked_part_id];

while(num_total_rows_to_append > 0)
{
int num_rows_to_append = std::min(
static_cast<int>(num_total_rows_to_append),
static_cast<int>(ExecBatchBuilder::num_rows_max() - builders_[locked_part_id].num_rows()));

RETURN_NOT_OK(builders_[locked_part_id].AppendSelected(
ctx_->memory_pool(),
batch,
num_rows_to_append,
row_ids + part_starts[locked_part_id],
batch.num_values()));

if(builders_[locked_part_id].is_full())
{
ExecBatch batch = builders_[locked_part_id].Flush();
if(locked_part_id < spilling_cursor_)
RETURN_NOT_OK(files_[locked_part_id].SpillBatch(ctx_, std::move(batch)));
else
queues_[locked_part_id].InsertBatch(std::move(batch));
}
part_starts[locked_part_id] += num_rows_to_append;
num_total_rows_to_append -= num_rows_to_append;
}
return Status::OK();
});

return Status::OK();
}


} // namespace util
} // namespace arrow
46 changes: 43 additions & 3 deletions cpp/src/arrow/compute/exec/accumulation_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
#include <vector>

#include "arrow/compute/exec.h"
#include "arrow/compute/light_array.h"
#include "arrow/compute/exec/partition_util.h"
#include "arrow/compute/exec/task_util.h"
#include "arrow/compute/exec/spilling_util.h"

namespace arrow {
namespace util {
using arrow::compute::ExecBatch;
namespace compute {

/// \brief A container that accumulates batches until they are ready to
/// be processed.
Expand All @@ -42,9 +45,11 @@ class AccumulationQueue {

void Concatenate(AccumulationQueue&& that);
void InsertBatch(ExecBatch batch);
void InsertAt(ExecBatch batch, size_t idx);
int64_t row_count() { return row_count_; }
size_t batch_count() { return batches_.size(); }
bool empty() const { return batches_.empty(); }
void Resize(size_t size) { batches_.resize(size); }
void Clear();
ExecBatch& operator[](size_t i);

Expand All @@ -53,5 +58,40 @@ class AccumulationQueue {
std::vector<ExecBatch> batches_;
};

} // namespace util
class SpillingAccumulationQueue
{
public:
static constexpr int kNumPartitions = 64;
Status Init(QueryContext *ctx);
// Assumes that the final column in batch contains 64-bit hashes of the columns.
Status InsertBatch(
size_t thread_index,
ExecBatch batch);
Future<AccumulationQueue> GetPartition(
int partition,
std::function<Status(size_t, AccumulationQueue)>);

private:
Status PartitionBatchIntoBuilders(
size_t thread_index,
const uint64_t *hashes,
ExecBatch &batch,
int64_t start,
int64_t length);

int spilling_cursor_ = 0; // denotes the first in-memory partition
QueryContext* ctx_;
PartitionLocks partition_locks_;

AccumulationQueue queues_[kNumPartitions];
AccumulationQueue hash_queues_[kNumPartitions];

ExecBatchBuilder builders_[kNumPartitions];
ExecBatchBuilder hash_builders_[kNumPartitions];

SpillFile files_[kNumPartitions];
SpillFile hash_files_[kNumPartitions];
};

} // namespace compute
} // namespace arrow
34 changes: 18 additions & 16 deletions cpp/src/arrow/compute/exec/aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ScalarAggregateNode : public ExecNode {
auto aggregates = aggregate_options.aggregates;

const auto& input_schema = *inputs[0]->output_schema();
auto exec_ctx = plan->exec_context();
auto exec_ctx = plan->query_context()->exec_context();

std::vector<const ScalarAggregateKernel*> kernels(aggregates.size());
std::vector<std::vector<std::unique_ptr<KernelState>>> states(kernels.size());
Expand Down Expand Up @@ -113,7 +113,7 @@ class ScalarAggregateNode : public ExecNode {
}

KernelContext kernel_ctx{exec_ctx};
states[i].resize(plan->max_concurrency());
states[i].resize(plan->query_context()->max_concurrency());
RETURN_NOT_OK(Kernel::InitAll(&kernel_ctx,
KernelInitArgs{kernels[i],
{
Expand Down Expand Up @@ -150,7 +150,7 @@ class ScalarAggregateNode : public ExecNode {
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Consume"}});
KernelContext batch_ctx{plan()->exec_context()};
KernelContext batch_ctx{plan()->query_context()->exec_context()};
batch_ctx.SetState(states_[i][thread_index].get());

ExecSpan single_column_batch{{batch.values[target_field_ids_[i]]}, batch.length};
Expand All @@ -168,7 +168,7 @@ class ScalarAggregateNode : public ExecNode {
{"batch.length", batch.length}});
DCHECK_EQ(input, inputs_[0]);

auto thread_index = plan_->GetThreadIndex();
auto thread_index = plan_->query_context()->GetThreadIndex();

if (ErrorIfNotOk(DoConsume(ExecSpan(batch), thread_index))) return;

Expand Down Expand Up @@ -245,7 +245,7 @@ class ScalarAggregateNode : public ExecNode {
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Finalize"}});
KernelContext ctx{plan()->exec_context()};
KernelContext ctx{plan()->query_context()->exec_context()};
ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll(
kernels_[i], &ctx, std::move(states_[i])));
RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[i]));
Expand All @@ -267,20 +267,19 @@ class ScalarAggregateNode : public ExecNode {

class GroupByNode : public ExecNode {
public:
GroupByNode(ExecNode* input, std::shared_ptr<Schema> output_schema, ExecContext* ctx,
GroupByNode(ExecNode* input, std::shared_ptr<Schema> output_schema,
std::vector<int> key_field_ids, std::vector<int> agg_src_field_ids,
std::vector<Aggregate> aggs,
std::vector<const HashAggregateKernel*> agg_kernels)
: ExecNode(input->plan(), {input}, {"groupby"}, std::move(output_schema),
/*num_outputs=*/1),
ctx_(ctx),
key_field_ids_(std::move(key_field_ids)),
agg_src_field_ids_(std::move(agg_src_field_ids)),
aggs_(std::move(aggs)),
agg_kernels_(std::move(agg_kernels)) {}

Status Init() override {
output_task_group_id_ = plan_->RegisterTaskGroup(
output_task_group_id_ = plan_->query_context()->RegisterTaskGroup(
[this](size_t, int64_t task_id) {
OutputNthBatch(task_id);
return Status::OK();
Expand Down Expand Up @@ -326,7 +325,7 @@ class GroupByNode : public ExecNode {
agg_src_types[i] = input_schema->field(agg_src_field_id)->type().get();
}

auto ctx = input->plan()->exec_context();
auto ctx = plan->query_context()->exec_context();

// Construct aggregates
ARROW_ASSIGN_OR_RAISE(auto agg_kernels,
Expand Down Expand Up @@ -354,7 +353,7 @@ class GroupByNode : public ExecNode {
}

return input->plan()->EmplaceNode<GroupByNode>(
input, schema(std::move(output_fields)), ctx, std::move(key_field_ids),
input, schema(std::move(output_fields)), std::move(key_field_ids),
std::move(agg_src_field_ids), std::move(aggs), std::move(agg_kernels));
}

Expand All @@ -366,7 +365,7 @@ class GroupByNode : public ExecNode {
{{"group_by", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
size_t thread_index = plan_->GetThreadIndex();
size_t thread_index = plan_->query_context()->GetThreadIndex();
if (thread_index >= local_states_.size()) {
return Status::IndexError("thread index ", thread_index, " is out of range [0, ",
local_states_.size(), ")");
Expand All @@ -393,7 +392,8 @@ class GroupByNode : public ExecNode {
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Consume"}});
KernelContext kernel_ctx{ctx_};
auto ctx = plan_->query_context()->exec_context();
KernelContext kernel_ctx{ctx};
kernel_ctx.SetState(state->agg_states[i].get());

ExecSpan agg_batch({batch[agg_src_field_ids_[i]], ExecValue(*id_batch.array())},
Expand Down Expand Up @@ -429,7 +429,9 @@ class GroupByNode : public ExecNode {
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Merge"}});
KernelContext batch_ctx{ctx_};

auto ctx = plan_->query_context()->exec_context();
KernelContext batch_ctx{ctx};
DCHECK(state0->agg_states[i]);
batch_ctx.SetState(state0->agg_states[i].get());

Expand Down Expand Up @@ -497,7 +499,7 @@ class GroupByNode : public ExecNode {

int64_t num_output_batches = bit_util::CeilDiv(out_data_.length, output_batch_size());
outputs_[0]->InputFinished(this, static_cast<int>(num_output_batches));
RETURN_NOT_OK(plan_->StartTaskGroup(output_task_group_id_, num_output_batches));
RETURN_NOT_OK(plan_->query_context()->StartTaskGroup(output_task_group_id_, num_output_batches));
return Status::OK();
}

Expand Down Expand Up @@ -548,7 +550,7 @@ class GroupByNode : public ExecNode {
{"node.detail", ToString()},
{"node.kind", kind_name()}});

local_states_.resize(plan_->max_concurrency());
local_states_.resize(plan_->query_context()->max_concurrency());
return Status::OK();
}

Expand Down Expand Up @@ -593,7 +595,7 @@ class GroupByNode : public ExecNode {
};

ThreadLocalState* GetLocalState() {
size_t thread_index = plan_->GetThreadIndex();
size_t thread_index = plan_->query_context()->GetThreadIndex();
return &local_states_[thread_index];
}

Expand Down
Loading

0 comments on commit 8d527a3

Please sign in to comment.