From 0e9f64162ea7fc6946067fec4c3f66b0eaf949b1 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Wed, 23 Feb 2022 13:28:26 +0800 Subject: [PATCH 1/6] refactor hash join Signed-off-by: Yuan Zhou --- .../arrow_compute/ext/hash_relation_kernel.cc | 55 +++++++++---------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc index 8b66ddb0d..b8a936429 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc @@ -76,6 +76,11 @@ class HashRelationKernel::Impl { std::dynamic_pointer_cast(parameter_nodes[0])->holder()); builder_type_ = std::stoi(builder_type_str); } + // Notes: + // 0 -> unsed, should be removed + // 1 -> SHJ + // 2 -> BHJ + // 3 -> Semi opts SHJ, can be applied to anti join also if (builder_type_ == 3) { // This is for using unsafeHashMap while with skipDuplication strategy semi_ = true; @@ -117,6 +122,7 @@ class HashRelationKernel::Impl { hash_relation_ = std::make_shared(ctx_, hash_relation_list, key_size_); } else { + //TODO: better to estimate key_size_ for multiple keys join hash_relation_ = std::make_shared(ctx_, hash_relation_list); } } else { @@ -133,36 +139,24 @@ class HashRelationKernel::Impl { } if (builder_type_ == 2) return arrow::Status::OK(); std::shared_ptr key_array; - if (builder_type_ == 0) { - if (key_projector_) { - arrow::ArrayVector outputs; - auto length = in.size() > 0 ? in[0]->length() : 0; - auto in_batch = - arrow::RecordBatch::Make(arrow::schema(input_field_list_), length, in); - RETURN_NOT_OK(key_projector_->Evaluate(*in_batch, ctx_->memory_pool(), &outputs)); - key_array = outputs[0]; - } else { - key_array = in[key_indices_[0]]; - } - key_hash_cached_.push_back(key_array); - } else { - /* Process original key projection */ - arrow::ArrayVector project_outputs; - auto length = in.size() > 0 ? in[0]->length() : 0; - auto in_batch = - arrow::RecordBatch::Make(arrow::schema(input_field_list_), length, in); - RETURN_NOT_OK(key_prepare_projector_->Evaluate(*in_batch, ctx_->memory_pool(), - &project_outputs)); - keys_cached_.push_back(project_outputs); - /* Process key Hash projection */ - arrow::ArrayVector hash_outputs; - auto hash_in_batch = - arrow::RecordBatch::Make(hash_input_schema_, length, project_outputs); - RETURN_NOT_OK( - key_projector_->Evaluate(*hash_in_batch, ctx_->memory_pool(), &hash_outputs)); - key_array = hash_outputs[0]; - key_hash_cached_.push_back(key_array); - } + + /* Process original key projection */ + arrow::ArrayVector project_outputs; + auto length = in.size() > 0 ? in[0]->length() : 0; + auto in_batch = + arrow::RecordBatch::Make(arrow::schema(input_field_list_), length, in); + RETURN_NOT_OK(key_prepare_projector_->Evaluate(*in_batch, ctx_->memory_pool(), + &project_outputs)); + keys_cached_.push_back(project_outputs); + /* Process key Hash projection */ + arrow::ArrayVector hash_outputs; + auto hash_in_batch = + arrow::RecordBatch::Make(hash_input_schema_, length, project_outputs); + RETURN_NOT_OK( + key_projector_->Evaluate(*hash_in_batch, ctx_->memory_pool(), &hash_outputs)); + key_array = hash_outputs[0]; + key_hash_cached_.push_back(key_array); + return arrow::Status::OK(); } @@ -172,6 +166,7 @@ class HashRelationKernel::Impl { if (builder_type_ == 1) { int init_key_capacity = 128; int init_bytes_map_capacity = init_key_capacity * 256; + //TODO: should try to estimate the disticnt keys if (num_total_cached_ > 32) { init_key_capacity = pow(2, ceil(log2(num_total_cached_)) + 1); } From a21735b595e587a82059916b04b13d1db012d170 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Wed, 23 Feb 2022 13:29:20 +0800 Subject: [PATCH 2/6] enable force hash join to test Signed-off-by: Yuan Zhou --- .../core/src/main/scala/com/intel/oap/GazellePluginConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala index bc7c93f38..25ecc866d 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala @@ -84,7 +84,7 @@ class GazellePluginConfig(conf: SQLConf) extends Logging { conf.getConfString("spark.oap.sql.columnar.rowtocolumnar", "true").toBoolean && enableCpu val forceShuffledHashJoin: Boolean = - conf.getConfString("spark.oap.sql.columnar.forceshuffledhashjoin", "false").toBoolean && + conf.getConfString("spark.oap.sql.columnar.forceshuffledhashjoin", "true").toBoolean && enableCpu val resizeShuffledHashJoinInputPartitions: Boolean = From 3718d1ba8dae92630703d1bf51185be554eec135 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Mon, 28 Feb 2022 15:17:39 +0800 Subject: [PATCH 3/6] seperate hashjoin with single/multi keys code path Signed-off-by: Yuan Zhou --- .../ext/conditioned_probe_kernel.cc | 256 ++++++++++++------ .../arrow_compute/ext/hash_relation_kernel.cc | 2 +- 2 files changed, 172 insertions(+), 86 deletions(-) diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/conditioned_probe_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/conditioned_probe_kernel.cc index 80211fb75..e30451578 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/conditioned_probe_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/conditioned_probe_kernel.cc @@ -551,7 +551,6 @@ class ConditionedProbeKernel::Impl { : hash_relation_(hash_relation), appender_list_(appender_list) {} uint64_t Evaluate(std::shared_ptr key_array, const arrow::ArrayVector& key_payloads) override { - struct timespec start, end; auto typed_key_array = std::dynamic_pointer_cast(key_array); std::vector> payloads; int i = 0; @@ -617,30 +616,46 @@ class ConditionedProbeKernel::Impl { } uint64_t out_length = 0; auto unsafe_key_row = std::make_shared(payloads.size()); - for (int i = 0; i < key_array->length(); i++) { - int index; - if (!do_unsafe_row) { - index = fast_probe(i); - } else { + + if (do_unsafe_row) { + for (int i = 0; i < key_array->length(); i++) { unsafe_key_row->reset(); for (auto payload_arr : payloads) { payload_arr->Append(i, &unsafe_key_row); } - index = hash_relation_->Get(typed_key_array->GetView(i), unsafe_key_row); - } - if (index == -1) { - continue; + int index = hash_relation_->Get(typed_key_array->GetView(i), unsafe_key_row); + if (index == -1) { + continue; + } + auto index_list = hash_relation_->GetItemListByIndex(index); + // TODO(): move this out of the loop + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->Append(index_list)); + } else { + THROW_NOT_OK(appender->Append(0, i, index_list.size())); + } + } + out_length += index_list.size(); } - auto index_list = hash_relation_->GetItemListByIndex(index); - for (auto appender : appender_list_) { - if (appender->GetType() == AppenderBase::left) { - THROW_NOT_OK(appender->Append(index_list)); - } else { - THROW_NOT_OK(appender->Append(0, i, index_list.size())); + } else { + for (int i = 0; i < key_array->length(); i++) { + int index = fast_probe(i); + if (index == -1) { + continue; + } + auto index_list = hash_relation_->GetItemListByIndex(index); + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->Append(index_list)); + } else { + THROW_NOT_OK(appender->Append(0, i, index_list.size())); + } } + out_length += index_list.size(); } - out_length += index_list.size(); } + return out_length; } @@ -741,38 +756,61 @@ class ConditionedProbeKernel::Impl { } uint64_t out_length = 0; auto unsafe_key_row = std::make_shared(payloads.size()); - for (int i = 0; i < key_array->length(); i++) { - int index; - if (!do_unsafe_row) { - index = fast_probe(i); - } else { + if (do_unsafe_row) { + for (int i = 0; i < key_array->length(); i++) { unsafe_key_row->reset(); for (auto payload_arr : payloads) { payload_arr->Append(i, &unsafe_key_row); } - index = hash_relation_->Get(typed_key_array->GetView(i), unsafe_key_row); - } - if (index == -1) { + int index = hash_relation_->Get(typed_key_array->GetView(i), unsafe_key_row); + if (index == -1) { + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->AppendNull()); + } else { + THROW_NOT_OK(appender->Append(0, i)); + } + } + out_length += 1; + continue; + } + auto index_list = hash_relation_->GetItemListByIndex(index); for (auto appender : appender_list_) { if (appender->GetType() == AppenderBase::left) { - THROW_NOT_OK(appender->AppendNull()); + THROW_NOT_OK(appender->Append(index_list)); } else { - THROW_NOT_OK(appender->Append(0, i)); + THROW_NOT_OK(appender->Append(0, i, index_list.size())); } } - out_length += 1; - continue; + out_length += index_list.size(); } - auto index_list = hash_relation_->GetItemListByIndex(index); - for (auto appender : appender_list_) { - if (appender->GetType() == AppenderBase::left) { - THROW_NOT_OK(appender->Append(index_list)); - } else { - THROW_NOT_OK(appender->Append(0, i, index_list.size())); + } else { + for (int i = 0; i < key_array->length(); i++) { + int index = fast_probe(i); + + if (index == -1) { + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->AppendNull()); + } else { + THROW_NOT_OK(appender->Append(0, i)); + } + } + out_length += 1; + continue; } + auto index_list = hash_relation_->GetItemListByIndex(index); + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->Append(index_list)); + } else { + THROW_NOT_OK(appender->Append(0, i, index_list.size())); + } + } + out_length += index_list.size(); } - out_length += index_list.size(); } + return out_length; } @@ -926,11 +964,10 @@ class ConditionedProbeKernel::Impl { } uint64_t out_length = 0; auto unsafe_key_row = std::make_shared(payloads.size()); - for (int i = 0; i < key_array->length(); i++) { - int index; - if (!do_unsafe_row) { - index = getSingleKeyIndex(fast_probe, i); - } else { + if (do_unsafe_row) { + for (int i = 0; i < key_array->length(); i++) { + int index; + for (int colIdx = 0; colIdx < payloads.size(); colIdx++) { if (has_null_list[colIdx] && key_payloads[colIdx]->IsNull(i)) { // If the keys in stream side contains null, will join this row. @@ -944,18 +981,35 @@ class ConditionedProbeKernel::Impl { hash_relation_->IfExists(typed_key_array->GetView(i), unsafe_key_row); } } + + if (index == -1) { + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->AppendNull()); + } else { + THROW_NOT_OK(appender->Append(0, i)); + } + } + out_length += 1; + } } - if (index == -1) { - for (auto appender : appender_list_) { - if (appender->GetType() == AppenderBase::left) { - THROW_NOT_OK(appender->AppendNull()); - } else { - THROW_NOT_OK(appender->Append(0, i)); + } else { + for (int i = 0; i < key_array->length(); i++) { + int index = getSingleKeyIndex(fast_probe, i); + + if (index == -1) { + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->AppendNull()); + } else { + THROW_NOT_OK(appender->Append(0, i)); + } } + out_length += 1; } - out_length += 1; } } + return out_length; } @@ -1074,33 +1128,46 @@ class ConditionedProbeKernel::Impl { uint64_t out_length = 0; auto unsafe_key_row = std::make_shared(payloads.size()); - for (int i = 0; i < key_array->length(); i++) { - int index; - if (!do_unsafe_row) { - index = fast_probe(i); - } else { - if (unsafe_key_row) { - unsafe_key_row->reset(); - } + if (do_unsafe_row) { + for (int i = 0; i < key_array->length(); i++) { + unsafe_key_row->reset(); for (auto payload_arr : payloads) { payload_arr->Append(i, &unsafe_key_row); } - auto make_unsafe_row_end = std::chrono::steady_clock::now(); - index = hash_relation_->IfExists(typed_key_array->GetView(i), unsafe_key_row); - } - if (index == -1) { - continue; + int index = + hash_relation_->IfExists(typed_key_array->GetView(i), unsafe_key_row); + + if (index == -1) { + continue; + } + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->AppendNull()); + } else { + THROW_NOT_OK(appender->Append(0, i)); + } + } + out_length += 1; } - for (auto appender : appender_list_) { - if (appender->GetType() == AppenderBase::left) { - THROW_NOT_OK(appender->AppendNull()); - } else { - THROW_NOT_OK(appender->Append(0, i)); + } else { + for (int i = 0; i < key_array->length(); i++) { + int index = fast_probe(i); + + if (index == -1) { + continue; + } + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->AppendNull()); + } else { + THROW_NOT_OK(appender->Append(0, i)); + } } + out_length += 1; } - out_length += 1; } + return out_length; } @@ -1197,32 +1264,51 @@ class ConditionedProbeKernel::Impl { } uint64_t out_length = 0; auto unsafe_key_row = std::make_shared(payloads.size()); - for (int i = 0; i < key_array->length(); i++) { - int index; - if (!do_unsafe_row) { - index = fast_probe(i); - } else { + if (do_unsafe_row) { + for (int i = 0; i < key_array->length(); i++) { unsafe_key_row->reset(); for (auto payload_arr : payloads) { payload_arr->Append(i, &unsafe_key_row); } - index = hash_relation_->IfExists(typed_key_array->GetView(i), unsafe_key_row); - } - bool exists = true; - if (index == -1) { - exists = false; + int index = + hash_relation_->IfExists(typed_key_array->GetView(i), unsafe_key_row); + + bool exists = true; + if (index == -1) { + exists = false; + } + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::exist) { + THROW_NOT_OK(appender->AppendExistence(exists)); + } else if (appender->GetType() == AppenderBase::right) { + THROW_NOT_OK(appender->Append(0, i)); + } else { + THROW_NOT_OK(appender->AppendNull()); + } + } + out_length += 1; } - for (auto appender : appender_list_) { - if (appender->GetType() == AppenderBase::exist) { - THROW_NOT_OK(appender->AppendExistence(exists)); - } else if (appender->GetType() == AppenderBase::right) { - THROW_NOT_OK(appender->Append(0, i)); - } else { - THROW_NOT_OK(appender->AppendNull()); + } else { + for (int i = 0; i < key_array->length(); i++) { + int index = fast_probe(i); + + bool exists = true; + if (index == -1) { + exists = false; } + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::exist) { + THROW_NOT_OK(appender->AppendExistence(exists)); + } else if (appender->GetType() == AppenderBase::right) { + THROW_NOT_OK(appender->Append(0, i)); + } else { + THROW_NOT_OK(appender->AppendNull()); + } + } + out_length += 1; } - out_length += 1; } + return out_length; } diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc index b8a936429..2ec9e3456 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc @@ -172,7 +172,7 @@ class HashRelationKernel::Impl { } long tmp_capacity = init_key_capacity; if (key_size_ != -1) { - tmp_capacity *= 6; + tmp_capacity *= 12; } else { tmp_capacity *= 128; } From f03c19aced710f4359bee79e585ef1ded53845d1 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Mon, 28 Feb 2022 15:21:20 +0800 Subject: [PATCH 4/6] fix format Signed-off-by: Yuan Zhou --- .../cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc index 2ec9e3456..e46ee123b 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc @@ -122,7 +122,7 @@ class HashRelationKernel::Impl { hash_relation_ = std::make_shared(ctx_, hash_relation_list, key_size_); } else { - //TODO: better to estimate key_size_ for multiple keys join + // TODO: better to estimate key_size_ for multiple keys join hash_relation_ = std::make_shared(ctx_, hash_relation_list); } } else { @@ -166,7 +166,7 @@ class HashRelationKernel::Impl { if (builder_type_ == 1) { int init_key_capacity = 128; int init_bytes_map_capacity = init_key_capacity * 256; - //TODO: should try to estimate the disticnt keys + // TODO: should try to estimate the disticnt keys if (num_total_cached_ > 32) { init_key_capacity = pow(2, ceil(log2(num_total_cached_)) + 1); } From 44cdb35abbaaff37b0afbcc79efa47767784c5f2 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Tue, 1 Mar 2022 10:33:43 +0800 Subject: [PATCH 5/6] Revert "enable force hash join to test" This reverts commit a21735b595e587a82059916b04b13d1db012d170. --- .../core/src/main/scala/com/intel/oap/GazellePluginConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala index 25ecc866d..bc7c93f38 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala @@ -84,7 +84,7 @@ class GazellePluginConfig(conf: SQLConf) extends Logging { conf.getConfString("spark.oap.sql.columnar.rowtocolumnar", "true").toBoolean && enableCpu val forceShuffledHashJoin: Boolean = - conf.getConfString("spark.oap.sql.columnar.forceshuffledhashjoin", "true").toBoolean && + conf.getConfString("spark.oap.sql.columnar.forceshuffledhashjoin", "false").toBoolean && enableCpu val resizeShuffledHashJoinInputPartitions: Boolean = From 00baaa7e84d1d01ce83bf5facbd4b734bac8dccd Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Tue, 1 Mar 2022 11:02:21 +0800 Subject: [PATCH 6/6] temp disable get_json_object Signed-off-by: Yuan Zhou --- .../com/intel/oap/expression/ColumnarBinaryExpression.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryExpression.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryExpression.scala index 1ad6b4bed..fe1eb2c3f 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryExpression.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryExpression.scala @@ -130,8 +130,9 @@ object ColumnarBinaryExpression { new ColumnarFromUnixTime(left, right) case d: DateSub => new ColumnarDateSub(left, right) - case g: GetJsonObject => - new ColumnarGetJsonObject(left, right, g) + //TODO(): the current impl has poor perf + // case g: GetJsonObject => + // new ColumnarGetJsonObject(left, right, g) case instr: StringInstr => new ColumnarStringInstr(left, right, instr) case other =>