Skip to content

Commit

Permalink
Add spilling for hash join
Browse files Browse the repository at this point in the history
  • Loading branch information
save-buffer committed Jul 26, 2022
1 parent bbf249e commit a1b3b13
Show file tree
Hide file tree
Showing 33 changed files with 1,268 additions and 300 deletions.
2 changes: 2 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,10 @@ if(ARROW_COMPUTE)
compute/exec/partition_util.cc
compute/exec/options.cc
compute/exec/project_node.cc
compute/exec/query_context.cc
compute/exec/sink_node.cc
compute/exec/source_node.cc
compute/exec/spilling_util.cc
compute/exec/swiss_join.cc
compute/exec/task_util.cc
compute/exec/tpch_node.cc
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ class ARROW_EXPORT ResizableBuffer : public MutableBuffer {
ARROW_EXPORT
Result<std::unique_ptr<Buffer>> AllocateBuffer(const int64_t size,
MemoryPool* pool = NULLPTR);
ARROW_EXPORT
Result<std::unique_ptr<Buffer>> AllocateBuffer(const int64_t size,
int64_t alignment,
MemoryPool* pool = NULLPTR);

/// \brief Allocate a resizeable buffer from a memory pool, zero its padding.
///
Expand All @@ -468,6 +472,9 @@ Result<std::unique_ptr<Buffer>> AllocateBuffer(const int64_t size,
ARROW_EXPORT
Result<std::unique_ptr<ResizableBuffer>> AllocateResizableBuffer(
const int64_t size, MemoryPool* pool = NULLPTR);
ARROW_EXPORT
Result<std::unique_ptr<ResizableBuffer>> AllocateResizableBuffer(
const int64_t size, const int64_t alignment, MemoryPool* pool = NULLPTR);

/// \brief Allocate a bitmap buffer from a memory pool
/// no guarantee on values is provided.
Expand Down
121 changes: 119 additions & 2 deletions cpp/src/arrow/compute/exec/accumulation_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
// under the License.

#include "arrow/compute/exec/accumulation_queue.h"
#include "arrow/compute/exec/key_hash.h"

#include <iterator>

namespace arrow {
namespace util {
using arrow::compute::ExecBatch;
namespace compute {

AccumulationQueue::AccumulationQueue(AccumulationQueue&& that) {
this->batches_ = std::move(that.batches_);
this->row_count_ = that.row_count_;
Expand Down Expand Up @@ -54,5 +55,121 @@ void AccumulationQueue::Clear() {
}

ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; }

Status SpillingAccumulationQueue::Init(QueryContext *ctx)
{
ctx_ = ctx;
partition_locks_.Init(ctx_->max_concurrency(), kNumPartitions);
return Status::OK();
}

Status SpillingAccumulationQueue::InsertBatch(
size_t thread_index,
ExecBatch batch,
const Datum &hashes64)
{
for(int64_t i = 0; i < batch.length; i += util::MiniBatch::kMiniBatchLength)
{
int64_t length = std::min(static_cast<int64_t>(batch.length - i),
static_cast<int64_t>(util::MiniBatch::kMiniBatchLength));

RETURN_NOT_OK(PartitionBatchIntoBuilders(
thread_index,
hashes64,
batch,
i,
length));
}
return Status::OK();
}

Status SpillingAccumulationQueue::PartitionBatchIntoBuilders(
size_t thread_index,
Datum &hashes64,
ExecBatch &batch,
int64_t start,
int64_t length)
{
const uint64_t *hashes = hashes64.array()->
uint16_t row_ids[util::MiniBatch::kMiniBatchLength];
std::iota(row_ids, row_ids + util::MiniBatch::kMiniBatchLength, 0);
uint16_t part_starts[kNumPartitions + 1];
PartitionSort::Eval(
util::MiniBatch::kMiniBatchLength,
kNumPartitions,
part_starts,
[&](int64_t i)
{
return hashes[i] & (kNumPartitions - 1);
},
[&row_ids, start](int64_t i, int output_pos)
{
row_ids[i] = static_cast<uint16_t>(start + output_pos);
});
int num_unprocessed_partitions = 0;
int unprocessed_partition_ids[kNumPartitions];
for(int i = 0; i < kNumPartitions; i++)
{
bool is_part_empty = (part_starts[i + 1] == part_starts[i]);
if(is_part_empty)
unprocessed_partition_ids[num_unprocessed_partitions++] = i;
}
while(num_unprocessed_partitions > 0)
{
int locked_part_id;
int locked_part_id_pos;
partition_locks_.AcquirePartitionLock(
thread_index,
num_unprocessed_partitions,
unprocessed_partition_ids,
/*limit_retries=*/false,
/*max_retries=*/-1,
&locked_part_id,
&locked_part_id_pos);

uint64_t num_total_rows_to_append =
part_starts[locked_part_id + 1] - part_starts[locked_part_id];

while(num_total_rows_to_append > 0)
{
int num_rows_to_append = std::min(
static_cast<int>(num_total_rows_to_append),
static_cast<int>(ExecBatchBuilder::num_rows_max() - builders_[locked_part_id].num_rows()));

RETURN_NOT_OK(builders_[locked_part_id].AppendSelected(
ctx_->memory_pool(),
batch,
num_rows_to_append,
row_ids + part_starts[locked_part_id],
batch.num_values()));

RETURN_NOT_OK(hash_builders_[locked_part_id].AppendSelected(
ctx_->memory_pool(),
{ {

if(builders_[locked_part_id].is_full())
{
ExecBatch batch = builders_[locked_part_id].Flush();
if(locked_part_id < spilling_cursor_)
files_[locked_part_id].SpillBatch(ctx_, std::move(batch));
else
queues_[locked_part_id].InsertBatch(std::move(batch));
}
part_starts[locked_part_id] += num_rows_to_append;
num_total_rows_to_append -= num_rows_to_append;
}

partition_locks_.ReleasePartitionLock(locked_part_id);
if (locked_part_id_pos < num_unprocessed_partitions - 1)
{
unprocessed_partition_ids[locked_part_id_pos] =
unprocessed_partition_ids[num_unprocessed_partitions - 1];
}
num_unprocessed_partitions -= 1;
}
return Status::OK();
}


} // namespace util
} // namespace arrow
41 changes: 38 additions & 3 deletions cpp/src/arrow/compute/exec/accumulation_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
#include <vector>

#include "arrow/compute/exec.h"
#include "arrow/compute/light_array.h"
#include "arrow/compute/exec/partition_util.h"
#include "arrow/compute/exec/task_util.h"
#include "arrow/compute/exec/spilling_util.h"

namespace arrow {
namespace util {
using arrow::compute::ExecBatch;
namespace compute {

/// \brief A container that accumulates batches until they are ready to
/// be processed.
Expand Down Expand Up @@ -53,5 +56,37 @@ class AccumulationQueue {
std::vector<ExecBatch> batches_;
};

} // namespace util
class SpillingAccumulationQueue
{
public:
static constexpr int kNumPartitions = 64;
Status Init(QueryContext *ctx);
Status InsertBatch(
size_t thread_index,
ExecBatch batch,
Datum hashes64);

private:
Status PartitionBatchIntoBuilders(
size_t thread_index,
const uint64_t *hashes,
ExecBatch &batch,
int64_t start,
int64_t length);

int spilling_cursor_ = 0; // denotes the first in-memory partition
QueryContext* ctx_;
PartitionLocks partition_locks_;

AccumulationQueue queues_[kNumPartitions];
AccumulationQueue hash_queues_[kNumPartitions];

ExecBatchBuilder builders_[kNumPartitions];
ExecBatchBuilder hash_builders_[kNumPartitions];

SpillFile files_[kNumPartitions];
SpillFile hash_files_[kNumPartitions];
};

} // namespace compute
} // namespace arrow
20 changes: 11 additions & 9 deletions cpp/src/arrow/compute/exec/aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ScalarAggregateNode : public ExecNode {
auto aggregates = aggregate_options.aggregates;

const auto& input_schema = *inputs[0]->output_schema();
auto exec_ctx = plan->exec_context();
auto exec_ctx = plan->query_context()->exec_context();

std::vector<const ScalarAggregateKernel*> kernels(aggregates.size());
std::vector<std::vector<std::unique_ptr<KernelState>>> states(kernels.size());
Expand Down Expand Up @@ -150,7 +150,7 @@ class ScalarAggregateNode : public ExecNode {
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Consume"}});
KernelContext batch_ctx{plan()->exec_context()};
KernelContext batch_ctx{plan()->query_context()->exec_context()};
batch_ctx.SetState(states_[i][thread_index].get());

ExecSpan single_column_batch{{batch.values[target_field_ids_[i]]}, batch.length};
Expand Down Expand Up @@ -245,7 +245,7 @@ class ScalarAggregateNode : public ExecNode {
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Finalize"}});
KernelContext ctx{plan()->exec_context()};
KernelContext ctx{plan()->query_context()->exec_context()};
ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll(
kernels_[i], &ctx, std::move(states_[i])));
RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[i]));
Expand All @@ -267,13 +267,12 @@ class ScalarAggregateNode : public ExecNode {

class GroupByNode : public ExecNode {
public:
GroupByNode(ExecNode* input, std::shared_ptr<Schema> output_schema, ExecContext* ctx,
GroupByNode(ExecNode* input, std::shared_ptr<Schema> output_schema,
std::vector<int> key_field_ids, std::vector<int> agg_src_field_ids,
std::vector<Aggregate> aggs,
std::vector<const HashAggregateKernel*> agg_kernels)
: ExecNode(input->plan(), {input}, {"groupby"}, std::move(output_schema),
/*num_outputs=*/1),
ctx_(ctx),
key_field_ids_(std::move(key_field_ids)),
agg_src_field_ids_(std::move(agg_src_field_ids)),
aggs_(std::move(aggs)),
Expand Down Expand Up @@ -326,7 +325,7 @@ class GroupByNode : public ExecNode {
agg_src_types[i] = input_schema->field(agg_src_field_id)->type().get();
}

auto ctx = input->plan()->exec_context();
auto ctx = plan->query_context()->exec_context();

// Construct aggregates
ARROW_ASSIGN_OR_RAISE(auto agg_kernels,
Expand Down Expand Up @@ -354,7 +353,7 @@ class GroupByNode : public ExecNode {
}

return input->plan()->EmplaceNode<GroupByNode>(
input, schema(std::move(output_fields)), ctx, std::move(key_field_ids),
input, schema(std::move(output_fields)), std::move(key_field_ids),
std::move(agg_src_field_ids), std::move(aggs), std::move(agg_kernels));
}

Expand Down Expand Up @@ -393,7 +392,8 @@ class GroupByNode : public ExecNode {
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Consume"}});
KernelContext kernel_ctx{ctx_};
auto ctx = plan_->query_context()->exec_context();
KernelContext kernel_ctx{ctx};
kernel_ctx.SetState(state->agg_states[i].get());

ExecSpan agg_batch({batch[agg_src_field_ids_[i]], ExecValue(*id_batch.array())},
Expand Down Expand Up @@ -429,7 +429,9 @@ class GroupByNode : public ExecNode {
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Merge"}});
KernelContext batch_ctx{ctx_};

auto ctx = plan_->query_context()->exec_context();
KernelContext batch_ctx{ctx};
DCHECK(state0->agg_states[i]);
batch_ctx.SetState(state0->agg_states[i].get());

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ class AsofJoinNode : public ExecNode {
if (dst.empty()) {
return NULLPTR;
} else {
return dst.Materialize(plan()->exec_context()->memory_pool(), output_schema(),
return dst.Materialize(plan()->query_context()->memory_pool(), output_schema(),
state_);
}
}
Expand Down
26 changes: 17 additions & 9 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace compute {
namespace {

struct ExecPlanImpl : public ExecPlan {
explicit ExecPlanImpl(ExecContext* exec_context,
explicit ExecPlanImpl(ExecContext *exec_context,
std::shared_ptr<const KeyValueMetadata> metadata = NULLPTR)
: ExecPlan(exec_context), metadata_(std::move(metadata)) {}

Expand All @@ -57,8 +57,8 @@ struct ExecPlanImpl : public ExecPlan {
}
}

size_t GetThreadIndex() { return thread_indexer_(); }
size_t max_concurrency() const { return thread_indexer_.Capacity(); }
size_t GetThreadIndex() { return query_context_.GetThreadIndex(); }
size_t max_concurrency() const { return query_context_.max_concurrency(); }

ExecNode* AddNode(std::unique_ptr<ExecNode> node) {
if (node->label().empty()) {
Expand Down Expand Up @@ -87,7 +87,7 @@ struct ExecPlanImpl : public ExecPlan {
}

Status ScheduleTask(std::function<Status()> fn) {
auto executor = exec_context_->executor();
auto executor = query_context()->executor();
if (!executor) return fn();
// Adds a task which submits fn to the executor and tracks its progress. If we're
// already stopping then the task is ignored and fn is not executed.
Expand Down Expand Up @@ -140,6 +140,8 @@ struct ExecPlanImpl : public ExecPlan {
return Status::Invalid("restarted ExecPlan");
}

RETURN_NOT_OK(query_context()->Init(max_concurrency()));

std::vector<Future<>> futures;
for (auto& n : nodes_) {
RETURN_NOT_OK(n->Init());
Expand All @@ -154,7 +156,7 @@ struct ExecPlanImpl : public ExecPlan {
task_scheduler_->RegisterEnd();
int num_threads = 1;
bool sync_execution = true;
if (auto executor = exec_context()->executor()) {
if (auto executor = query_context()->executor()) {
num_threads = executor->GetCapacity();
sync_execution = false;
}
Expand Down Expand Up @@ -326,7 +328,6 @@ struct ExecPlanImpl : public ExecPlan {
util::tracing::Span span_;
std::shared_ptr<const KeyValueMetadata> metadata_;

ThreadIndexer thread_indexer_;
std::atomic<bool> group_ended_{false};
util::AsyncTaskGroup task_group_;
std::unique_ptr<TaskScheduler> task_scheduler_ = TaskScheduler::Make();
Expand All @@ -351,8 +352,15 @@ util::optional<int> GetNodeIndex(const std::vector<ExecNode*>& nodes,
const uint32_t ExecPlan::kMaxBatchSize;

Result<std::shared_ptr<ExecPlan>> ExecPlan::Make(
ExecContext* ctx, std::shared_ptr<const KeyValueMetadata> metadata) {
return std::shared_ptr<ExecPlan>(new ExecPlanImpl{ctx, metadata});
QueryOptions opts,
ExecContext *ctx,
std::shared_ptr<const KeyValueMetadata> metadata) {
return std::shared_ptr<ExecPlan>(new ExecPlanImpl{opts, ctx, std::move(metadata)});
}

Result<std::shared_ptr<ExecPlan>> ExecPlan::Make(
ExecContext *ctx, std::shared_ptr<const KeyValueMetadata> metadata) {
return Make({}, ctx, std::move(metadata));
}

ExecNode* ExecPlan::AddNode(std::unique_ptr<ExecNode> node) {
Expand Down Expand Up @@ -475,7 +483,7 @@ MapNode::MapNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::move(output_schema),
/*num_outputs=*/1) {
if (async_mode) {
executor_ = plan_->exec_context()->executor();
executor_ = plan_->query_context()->executor();
} else {
executor_ = nullptr;
}
Expand Down
Loading

0 comments on commit a1b3b13

Please sign in to comment.