From c52cc1b23837a3ad49c85eda03c027d25649ab19 Mon Sep 17 00:00:00 2001 From: Yuan Date: Fri, 2 Apr 2021 11:42:44 +0800 Subject: [PATCH] allow to config batchsize in hashagg and wscg (#222) Signed-off-by: Yuan Zhou --- .../codegen/arrow_compute/ext/hash_aggregate_kernel.cc | 10 +++++----- .../arrow_compute/ext/whole_stage_codegen_kernel.cc | 5 +++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc index 8e19adfeb..0614ccfb6 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc @@ -491,8 +491,8 @@ class HashAggregateKernel::Impl { finish_ss << "if(do_hash_aggr_finish_" << level << ") {"; for (int i = 0; i < action_idx; i++) { finish_ss << "aggr_action_list_" << level << "[" << i - << "]->Finish(do_hash_aggr_finish_" << level - << "_offset, 10000, &do_hash_aggr_finish_" << level << "_out);" + << "]->Finish(do_hash_aggr_finish_" << level << "_offset," + << GetBatchSize() << ", &do_hash_aggr_finish_" << level << "_out);" << std::endl; } finish_ss << "if (do_hash_aggr_finish_" << level << "_out.size() > 0) {" << std::endl; @@ -755,7 +755,7 @@ class HashAggregateKernel::Impl { int gp_idx = 0; std::vector> outputs; for (auto action : action_impl_list_) { - action->Finish(offset_, 10000, &outputs); + action->Finish(offset_, GetBatchSize(), &outputs); } if (outputs.size() > 0) { out_length += outputs[0]->length(); @@ -911,7 +911,7 @@ class HashAggregateKernel::Impl { int gp_idx = 0; std::vector> outputs; for (auto action : action_impl_list_) { - action->Finish(offset_, 10000, &outputs); + action->Finish(offset_, GetBatchSize(), &outputs); } if (outputs.size() > 0) { out_length += outputs[0]->length(); @@ -1065,7 +1065,7 @@ class HashAggregateKernel::Impl { int gp_idx = 0; std::vector> outputs; for (auto action : action_impl_list_) { - action->Finish(offset_, 10000, &outputs); + action->Finish(offset_, GetBatchSize(), &outputs); } if (outputs.size() > 0) { out_length += outputs[0]->length(); diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/whole_stage_codegen_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/whole_stage_codegen_kernel.cc index e036405b5..d90b77c44 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/whole_stage_codegen_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/whole_stage_codegen_kernel.cc @@ -424,7 +424,8 @@ class TypedWholeStageCodeGenImpl : public CodeGenBase { for (int i = 0; i < length; i++) { )" << std::endl; } else { - codes_ss << "while (!should_stop_ && out_length < 10000) {" << std::endl; + codes_ss << "while (!should_stop_ && out_length < " << GetBatchSize() << ") {" + << std::endl; } // input preparation for (int i = 0; i < input_field_list.size(); i++) { @@ -664,4 +665,4 @@ std::string WholeStageCodeGenKernel::GetSignature() { return impl_->GetSignature } // namespace extra } // namespace arrowcompute } // namespace codegen -} // namespace sparkcolumnarplugin \ No newline at end of file +} // namespace sparkcolumnarplugin