Skip to content

Commit

Permalink
[NSE-265] Reserve enough memory before UnsafeAppend in builder (oap-p…
Browse files Browse the repository at this point in the history
…roject#266)

* change the UnsafeAppend to Append

* fix buffer builder in shuffle

shuffle builder use UnsafeAppend API for better performance. it
tries to reserve enough space based on results of last recordbatch,
this maybe not buggy if there's a dense recordbatch after a sparse one.

this patch adds below fixes:
- adds Reset() after Finish() in builder
- reserve length for offset_builder in binary builder

A further clean up on the reservation logic should be needed.

Signed-off-by: Yuan Zhou <[email protected]>

Co-authored-by: Yuan Zhou <[email protected]>
  • Loading branch information
JkSelf and zhouyuan committed Apr 23, 2021
1 parent 6c2ea2f commit 214c38b
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -757,8 +757,7 @@ class HashAggregateKernel::Impl {
int gp_idx = 0;
std::vector<std::shared_ptr<arrow::Array>> outputs;
for (auto action : action_impl_list_) {
// FIXME(): to work around NSE-241
action->Finish(offset_, 20000, &outputs);
action->Finish(offset_, batch_size_, &outputs);
}
if (outputs.size() > 0) {
out_length += outputs[0]->length();
Expand Down Expand Up @@ -917,8 +916,7 @@ class HashAggregateKernel::Impl {
int gp_idx = 0;
std::vector<std::shared_ptr<arrow::Array>> outputs;
for (auto action : action_impl_list_) {
// FIXME(): to work around NSE-241
action->Finish(offset_, 20000, &outputs);
action->Finish(offset_, batch_size_, &outputs);
}
if (outputs.size() > 0) {
out_length += outputs[0]->length();
Expand Down Expand Up @@ -1074,8 +1072,7 @@ class HashAggregateKernel::Impl {
int gp_idx = 0;
std::vector<std::shared_ptr<arrow::Array>> outputs;
for (auto action : action_impl_list_) {
// FIXME(): to work around NSE-241
action->Finish(offset_, 20000, &outputs);
action->Finish(offset_, batch_size_, &outputs);
}
if (outputs.size() > 0) {
out_length += outputs[0]->length();
Expand Down
11 changes: 10 additions & 1 deletion native-sql-engine/cpp/src/shuffle/splitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,11 @@ arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffer
auto& builder = partition_binary_builders_[binary_idx][partition_id];
if (reset_buffers) {
RETURN_NOT_OK(builder->Finish(&arrays[i]));
builder->Reset();
} else {
auto data_size = builder->value_data_length();
RETURN_NOT_OK(builder->Finish(&arrays[i]));
builder->Reset();
RETURN_NOT_OK(builder->Reserve(num_rows));
RETURN_NOT_OK(builder->ReserveData(data_size));
}
Expand All @@ -441,9 +443,11 @@ arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffer
partition_large_binary_builders_[large_binary_idx][partition_id];
if (reset_buffers) {
RETURN_NOT_OK(builder->Finish(&arrays[i]));
builder->Reset();
} else {
auto data_size = builder->value_data_length();
RETURN_NOT_OK(builder->Finish(&arrays[i]));
builder->Reset();
RETURN_NOT_OK(builder->Reserve(num_rows));
RETURN_NOT_OK(builder->ReserveData(data_size));
}
Expand Down Expand Up @@ -699,6 +703,9 @@ arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) {
RETURN_NOT_OK(AllocatePartitionBuffers(pid, new_size));
} else { // not first allocate, spill
if (partition_id_cnt_[pid] > partition_buffer_size_[pid]) { // need reallocate?
// TODO(): CacheRecordBatch will try to reset builder buffer
// AllocatePartitionBuffers will then Reserve memory for builder based on last
// recordbatch, the logic on reservation size should be cleaned up
RETURN_NOT_OK(CacheRecordBatch(pid, true));
RETURN_NOT_OK(SpillPartition(pid));
RETURN_NOT_OK(AllocatePartitionBuffers(pid, new_size));
Expand Down Expand Up @@ -1047,6 +1054,7 @@ arrow::Status Splitter::AppendBinary(
offset_type length;
auto value = src_arr->GetValue(row, &length);
const auto& builder = dst_builders[partition_id_[row]];
RETURN_NOT_OK(builder->Reserve(1));
RETURN_NOT_OK(builder->ReserveData(length));
builder->UnsafeAppend(value, length);
}
Expand All @@ -1056,10 +1064,11 @@ arrow::Status Splitter::AppendBinary(
offset_type length;
auto value = src_arr->GetValue(row, &length);
const auto& builder = dst_builders[partition_id_[row]];
RETURN_NOT_OK(builder->Reserve(1));
RETURN_NOT_OK(builder->ReserveData(length));
builder->UnsafeAppend(value, length);
} else {
dst_builders[partition_id_[row]]->UnsafeAppendNull();
dst_builders[partition_id_[row]]->AppendNull();
}
}
}
Expand Down

0 comments on commit 214c38b

Please sign in to comment.