Skip to content

Commit

Permalink
Add spilling for hash join
Browse files Browse the repository at this point in the history
  • Loading branch information
save-buffer committed Jul 22, 2022
1 parent f01770d commit fad418d
Show file tree
Hide file tree
Showing 30 changed files with 993 additions and 247 deletions.
2 changes: 2 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,10 @@ if(ARROW_COMPUTE)
compute/exec/partition_util.cc
compute/exec/options.cc
compute/exec/project_node.cc
compute/exec/query_context.cc
compute/exec/sink_node.cc
compute/exec/source_node.cc
compute/exec/spilling_util.cc
compute/exec/swiss_join.cc
compute/exec/task_util.cc
compute/exec/tpch_node.cc
Expand Down
117 changes: 115 additions & 2 deletions cpp/src/arrow/compute/exec/accumulation_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
// under the License.

#include "arrow/compute/exec/accumulation_queue.h"
#include "arrow/compute/exec/key_hash.h"

#include <iterator>

namespace arrow {
namespace util {
using arrow::compute::ExecBatch;
namespace compute {

AccumulationQueue::AccumulationQueue(AccumulationQueue&& that) {
this->batches_ = std::move(that.batches_);
this->row_count_ = that.row_count_;
Expand Down Expand Up @@ -54,5 +55,117 @@ void AccumulationQueue::Clear() {
}

ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; }

Status SpillingAccumulationQueue::Init(QueryContext *ctx)
{
ctx_ = ctx;
partition_locks_.Init(ctx_->max_concurrency(), kNumPartitions);
return Status::OK();
}

Status SpillingAccumulationQueue::InsertBatch(
size_t thread_index,
ExecBatch batch,
const Datum &hashes64)
{
const uint64_t* hashes = hashes64.array()->GetValues<uint64_t>(1);
for(int64_t i = 0; i < batch.length; i += util::MiniBatch::kMiniBatchLength)
{
int64_t length = std::min(static_cast<int64_t>(batch.length - i),
static_cast<int64_t>(util::MiniBatch::kMiniBatchLength));

RETURN_NOT_OK(PartitionBatchIntoBuilders(
thread_index,
hashes,
batch,
i,
length));
}
return Status::OK();
}

Status SpillingAccumulationQueue::PartitionBatchIntoBuilders(
size_t thread_index,
const uint64_t *hashes,
ExecBatch &batch,
int64_t start,
int64_t length)
{
uint16_t row_ids[util::MiniBatch::kMiniBatchLength];
std::iota(row_ids, row_ids + util::MiniBatch::kMiniBatchLength, 0);
uint16_t part_starts[kNumPartitions + 1];
PartitionSort::Eval(
util::MiniBatch::kMiniBatchLength,
kNumPartitions,
part_starts,
[&](int64_t i)
{
return hashes[i] & (kNumPartitions - 1);
},
[&row_ids, start](int64_t i, int output_pos)
{
row_ids[i] = static_cast<uint16_t>(start + output_pos);
});
int num_unprocessed_partitions = 0;
int unprocessed_partition_ids[kNumPartitions];
for(int i = 0; i < kNumPartitions; i++)
{
bool is_part_empty = (part_starts[i + 1] == part_starts[i]);
if(is_part_empty)
unprocessed_partition_ids[num_unprocessed_partitions++] = i;
}
while(num_unprocessed_partitions > 0)
{
int locked_part_id;
int locked_part_id_pos;
partition_locks_.AcquirePartitionLock(
thread_index,
num_unprocessed_partitions,
unprocessed_partition_ids,
/*limit_retries=*/false,
/*max_retries=*/-1,
&locked_part_id,
&locked_part_id_pos);

uint64_t num_total_rows_to_append =
part_starts[locked_part_id + 1] - part_starts[locked_part_id];

while(num_total_rows_to_append > 0)
{
int num_rows_to_append = std::min(
static_cast<int>(num_total_rows_to_append),
static_cast<int>(ExecBatchBuilder::num_rows_max() - builders_[locked_part_id].num_rows()));

RETURN_NOT_OK(builders_[locked_part_id].AppendSelected(
ctx_->memory_pool(),
batch,
num_rows_to_append,
row_ids + part_starts[locked_part_id],
batch.num_values()));

if(builders_[locked_part_id].is_full())
{
ExecBatch batch = builders_[locked_part_id].Flush();
if(locked_part_id <= spilling_cursor_)
files_[locked_part_id].SpillBatch(locked_part_id, std::move(batch));
else
queues_[locked_part_id].InsertBatch(std::move(batch));

}
part_starts[locked_part_id] += num_rows_to_append;
num_total_rows_to_append -= num_rows_to_append;
}

partition_locks_.ReleasePartitionLock(locked_part_id);
if (locked_part_id_pos < num_unprocessed_partitions - 1) {
unprocessed_partition_ids[locked_part_id_pos] =
unprocessed_partition_ids[num_unprocessed_partitions - 1];
}
num_unprocessed_partitions -= 1;
}
return Status::OK();
}


} // namespace util
} // namespace arrow
35 changes: 32 additions & 3 deletions cpp/src/arrow/compute/exec/accumulation_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
#include <vector>

#include "arrow/compute/exec.h"
#include "arrow/compute/light_array.h"
#include "arrow/compute/exec/partition_util.h"
#include "arrow/compute/exec/task_util.h"
#include "arrow/compute/exec/spilling_util.h"

namespace arrow {
namespace util {
using arrow::compute::ExecBatch;
namespace compute {

/// \brief A container that accumulates batches until they are ready to
/// be processed.
Expand Down Expand Up @@ -53,5 +56,31 @@ class AccumulationQueue {
std::vector<ExecBatch> batches_;
};

} // namespace util
class SpillingAccumulationQueue
{
public:
static constexpr int kNumPartitions = 64;
Status Init(QueryContext *ctx);
Status InsertBatch(
size_t thread_index,
ExecBatch batch,
const Datum &hashes64);

private:
Status PartitionBatchIntoBuilders(
size_t thread_index,
const uint64_t *hashes,
ExecBatch &batch,
int64_t start,
int64_t length);

int spilling_cursor_ = -1;
QueryContext* ctx_;
PartitionLocks partition_locks_;
AccumulationQueue queues_[kNumPartitions];
ExecBatchBuilder builders_[kNumPartitions];
SpillFile files_[kNumPartitions];
};

} // namespace compute
} // namespace arrow
20 changes: 11 additions & 9 deletions cpp/src/arrow/compute/exec/aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ScalarAggregateNode : public ExecNode {
auto aggregates = aggregate_options.aggregates;

const auto& input_schema = *inputs[0]->output_schema();
auto exec_ctx = plan->exec_context();
auto exec_ctx = plan->query_context()->exec_context();

std::vector<const ScalarAggregateKernel*> kernels(aggregates.size());
std::vector<std::vector<std::unique_ptr<KernelState>>> states(kernels.size());
Expand Down Expand Up @@ -150,7 +150,7 @@ class ScalarAggregateNode : public ExecNode {
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Consume"}});
KernelContext batch_ctx{plan()->exec_context()};
KernelContext batch_ctx{plan()->query_context()->exec_context()};
batch_ctx.SetState(states_[i][thread_index].get());

ExecSpan single_column_batch{{batch.values[target_field_ids_[i]]}, batch.length};
Expand Down Expand Up @@ -245,7 +245,7 @@ class ScalarAggregateNode : public ExecNode {
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Finalize"}});
KernelContext ctx{plan()->exec_context()};
KernelContext ctx{plan()->query_context()->exec_context()};
ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll(
kernels_[i], &ctx, std::move(states_[i])));
RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[i]));
Expand All @@ -267,13 +267,12 @@ class ScalarAggregateNode : public ExecNode {

class GroupByNode : public ExecNode {
public:
GroupByNode(ExecNode* input, std::shared_ptr<Schema> output_schema, ExecContext* ctx,
GroupByNode(ExecNode* input, std::shared_ptr<Schema> output_schema,
std::vector<int> key_field_ids, std::vector<int> agg_src_field_ids,
std::vector<Aggregate> aggs,
std::vector<const HashAggregateKernel*> agg_kernels)
: ExecNode(input->plan(), {input}, {"groupby"}, std::move(output_schema),
/*num_outputs=*/1),
ctx_(ctx),
key_field_ids_(std::move(key_field_ids)),
agg_src_field_ids_(std::move(agg_src_field_ids)),
aggs_(std::move(aggs)),
Expand Down Expand Up @@ -326,7 +325,7 @@ class GroupByNode : public ExecNode {
agg_src_types[i] = input_schema->field(agg_src_field_id)->type().get();
}

auto ctx = input->plan()->exec_context();
auto ctx = plan->query_context()->exec_context();

// Construct aggregates
ARROW_ASSIGN_OR_RAISE(auto agg_kernels,
Expand Down Expand Up @@ -354,7 +353,7 @@ class GroupByNode : public ExecNode {
}

return input->plan()->EmplaceNode<GroupByNode>(
input, schema(std::move(output_fields)), ctx, std::move(key_field_ids),
input, schema(std::move(output_fields)), std::move(key_field_ids),
std::move(agg_src_field_ids), std::move(aggs), std::move(agg_kernels));
}

Expand Down Expand Up @@ -393,7 +392,8 @@ class GroupByNode : public ExecNode {
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Consume"}});
KernelContext kernel_ctx{ctx_};
auto ctx = plan_->query_context()->exec_context();
KernelContext kernel_ctx{ctx};
kernel_ctx.SetState(state->agg_states[i].get());

ExecSpan agg_batch({batch[agg_src_field_ids_[i]], ExecValue(*id_batch.array())},
Expand Down Expand Up @@ -429,7 +429,9 @@ class GroupByNode : public ExecNode {
{"function.options",
aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
{"function.kind", std::string(kind_name()) + "::Merge"}});
KernelContext batch_ctx{ctx_};

auto ctx = plan_->query_context()->exec_context();
KernelContext batch_ctx{ctx};
DCHECK(state0->agg_states[i]);
batch_ctx.SetState(state0->agg_states[i].get());

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ class AsofJoinNode : public ExecNode {
if (dst.empty()) {
return NULLPTR;
} else {
return dst.Materialize(plan()->exec_context()->memory_pool(), output_schema(),
return dst.Materialize(plan()->query_context()->memory_pool(), output_schema(),
state_);
}
}
Expand Down
17 changes: 9 additions & 8 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace compute {
namespace {

struct ExecPlanImpl : public ExecPlan {
explicit ExecPlanImpl(ExecContext* exec_context,
explicit ExecPlanImpl(ExecContext *exec_context,
std::shared_ptr<const KeyValueMetadata> metadata = NULLPTR)
: ExecPlan(exec_context), metadata_(std::move(metadata)) {}

Expand All @@ -57,8 +57,8 @@ struct ExecPlanImpl : public ExecPlan {
}
}

size_t GetThreadIndex() { return thread_indexer_(); }
size_t max_concurrency() const { return thread_indexer_.Capacity(); }
size_t GetThreadIndex() { return query_context_.GetThreadIndex(); }
size_t max_concurrency() const { return query_context_.max_concurrency(); }

ExecNode* AddNode(std::unique_ptr<ExecNode> node) {
if (node->label().empty()) {
Expand Down Expand Up @@ -87,7 +87,7 @@ struct ExecPlanImpl : public ExecPlan {
}

Status ScheduleTask(std::function<Status()> fn) {
auto executor = exec_context_->executor();
auto executor = query_context()->executor();
if (!executor) return fn();
// Adds a task which submits fn to the executor and tracks its progress. If we're
// already stopping then the task is ignored and fn is not executed.
Expand Down Expand Up @@ -140,6 +140,8 @@ struct ExecPlanImpl : public ExecPlan {
return Status::Invalid("restarted ExecPlan");
}

RETURN_NOT_OK(query_context()->Init(max_concurrency()));

std::vector<Future<>> futures;
for (auto& n : nodes_) {
RETURN_NOT_OK(n->Init());
Expand All @@ -154,7 +156,7 @@ struct ExecPlanImpl : public ExecPlan {
task_scheduler_->RegisterEnd();
int num_threads = 1;
bool sync_execution = true;
if (auto executor = exec_context()->executor()) {
if (auto executor = query_context()->executor()) {
num_threads = executor->GetCapacity();
sync_execution = false;
}
Expand Down Expand Up @@ -326,7 +328,6 @@ struct ExecPlanImpl : public ExecPlan {
util::tracing::Span span_;
std::shared_ptr<const KeyValueMetadata> metadata_;

ThreadIndexer thread_indexer_;
std::atomic<bool> group_ended_{false};
util::AsyncTaskGroup task_group_;
std::unique_ptr<TaskScheduler> task_scheduler_ = TaskScheduler::Make();
Expand All @@ -349,7 +350,7 @@ util::optional<int> GetNodeIndex(const std::vector<ExecNode*>& nodes,
} // namespace

Result<std::shared_ptr<ExecPlan>> ExecPlan::Make(
ExecContext* ctx, std::shared_ptr<const KeyValueMetadata> metadata) {
ExecContext *ctx, std::shared_ptr<const KeyValueMetadata> metadata) {
return std::shared_ptr<ExecPlan>(new ExecPlanImpl{ctx, metadata});
}

Expand Down Expand Up @@ -473,7 +474,7 @@ MapNode::MapNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::move(output_schema),
/*num_outputs=*/1) {
if (async_mode) {
executor_ = plan_->exec_context()->executor();
executor_ = plan_->query_context()->executor();
} else {
executor_ = nullptr;
}
Expand Down
9 changes: 5 additions & 4 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "arrow/compute/exec.h"
#include "arrow/compute/exec/util.h"
#include "arrow/compute/exec/query_context.h"
#include "arrow/compute/type_fwd.h"
#include "arrow/type_fwd.h"
#include "arrow/util/async_util.h"
Expand All @@ -44,11 +45,11 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {

virtual ~ExecPlan() = default;

ExecContext* exec_context() const { return exec_context_; }
QueryContext* query_context() { return &query_context_; }

/// Make an empty exec plan
static Result<std::shared_ptr<ExecPlan>> Make(
ExecContext* = default_exec_context(),
ExecContext *exec_context = default_exec_context(),
std::shared_ptr<const KeyValueMetadata> metadata = NULLPTR);

ExecNode* AddNode(std::unique_ptr<ExecNode> node);
Expand Down Expand Up @@ -147,8 +148,8 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
std::string ToString() const;

protected:
ExecContext* exec_context_;
explicit ExecPlan(ExecContext* exec_context) : exec_context_(exec_context) {}
QueryContext query_context_;
explicit ExecPlan(ExecContext *exec_ctx) : query_context_({}, *exec_ctx) {}
};

class ARROW_EXPORT ExecNode {
Expand Down
Loading

0 comments on commit fad418d

Please sign in to comment.