diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryExpression.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryExpression.scala index 1ad6b4bed..fe1eb2c3f 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryExpression.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryExpression.scala @@ -130,8 +130,9 @@ object ColumnarBinaryExpression { new ColumnarFromUnixTime(left, right) case d: DateSub => new ColumnarDateSub(left, right) - case g: GetJsonObject => - new ColumnarGetJsonObject(left, right, g) + //TODO(): the current impl has poor perf + // case g: GetJsonObject => + // new ColumnarGetJsonObject(left, right, g) case instr: StringInstr => new ColumnarStringInstr(left, right, instr) case other => diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/conditioned_probe_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/conditioned_probe_kernel.cc index 80211fb75..e30451578 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/conditioned_probe_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/conditioned_probe_kernel.cc @@ -551,7 +551,6 @@ class ConditionedProbeKernel::Impl { : hash_relation_(hash_relation), appender_list_(appender_list) {} uint64_t Evaluate(std::shared_ptr key_array, const arrow::ArrayVector& key_payloads) override { - struct timespec start, end; auto typed_key_array = std::dynamic_pointer_cast(key_array); std::vector> payloads; int i = 0; @@ -617,30 +616,46 @@ class ConditionedProbeKernel::Impl { } uint64_t out_length = 0; auto unsafe_key_row = std::make_shared(payloads.size()); - for (int i = 0; i < key_array->length(); i++) { - int index; - if (!do_unsafe_row) { - index = fast_probe(i); - } else { + + if (do_unsafe_row) { + for (int i = 0; i < key_array->length(); i++) { unsafe_key_row->reset(); for (auto payload_arr : payloads) { payload_arr->Append(i, &unsafe_key_row); } - index = hash_relation_->Get(typed_key_array->GetView(i), unsafe_key_row); - } - if (index == -1) { - continue; + int index = hash_relation_->Get(typed_key_array->GetView(i), unsafe_key_row); + if (index == -1) { + continue; + } + auto index_list = hash_relation_->GetItemListByIndex(index); + // TODO(): move this out of the loop + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->Append(index_list)); + } else { + THROW_NOT_OK(appender->Append(0, i, index_list.size())); + } + } + out_length += index_list.size(); } - auto index_list = hash_relation_->GetItemListByIndex(index); - for (auto appender : appender_list_) { - if (appender->GetType() == AppenderBase::left) { - THROW_NOT_OK(appender->Append(index_list)); - } else { - THROW_NOT_OK(appender->Append(0, i, index_list.size())); + } else { + for (int i = 0; i < key_array->length(); i++) { + int index = fast_probe(i); + if (index == -1) { + continue; + } + auto index_list = hash_relation_->GetItemListByIndex(index); + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->Append(index_list)); + } else { + THROW_NOT_OK(appender->Append(0, i, index_list.size())); + } } + out_length += index_list.size(); } - out_length += index_list.size(); } + return out_length; } @@ -741,38 +756,61 @@ class ConditionedProbeKernel::Impl { } uint64_t out_length = 0; auto unsafe_key_row = std::make_shared(payloads.size()); - for (int i = 0; i < key_array->length(); i++) { - int index; - if (!do_unsafe_row) { - index = fast_probe(i); - } else { + if (do_unsafe_row) { + for (int i = 0; i < key_array->length(); i++) { unsafe_key_row->reset(); for (auto payload_arr : payloads) { payload_arr->Append(i, &unsafe_key_row); } - index = hash_relation_->Get(typed_key_array->GetView(i), unsafe_key_row); - } - if (index == -1) { + int index = hash_relation_->Get(typed_key_array->GetView(i), unsafe_key_row); + if (index == -1) { + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->AppendNull()); + } else { + THROW_NOT_OK(appender->Append(0, i)); + } + } + out_length += 1; + continue; + } + auto index_list = hash_relation_->GetItemListByIndex(index); for (auto appender : appender_list_) { if (appender->GetType() == AppenderBase::left) { - THROW_NOT_OK(appender->AppendNull()); + THROW_NOT_OK(appender->Append(index_list)); } else { - THROW_NOT_OK(appender->Append(0, i)); + THROW_NOT_OK(appender->Append(0, i, index_list.size())); } } - out_length += 1; - continue; + out_length += index_list.size(); } - auto index_list = hash_relation_->GetItemListByIndex(index); - for (auto appender : appender_list_) { - if (appender->GetType() == AppenderBase::left) { - THROW_NOT_OK(appender->Append(index_list)); - } else { - THROW_NOT_OK(appender->Append(0, i, index_list.size())); + } else { + for (int i = 0; i < key_array->length(); i++) { + int index = fast_probe(i); + + if (index == -1) { + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->AppendNull()); + } else { + THROW_NOT_OK(appender->Append(0, i)); + } + } + out_length += 1; + continue; } + auto index_list = hash_relation_->GetItemListByIndex(index); + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->Append(index_list)); + } else { + THROW_NOT_OK(appender->Append(0, i, index_list.size())); + } + } + out_length += index_list.size(); } - out_length += index_list.size(); } + return out_length; } @@ -926,11 +964,10 @@ class ConditionedProbeKernel::Impl { } uint64_t out_length = 0; auto unsafe_key_row = std::make_shared(payloads.size()); - for (int i = 0; i < key_array->length(); i++) { - int index; - if (!do_unsafe_row) { - index = getSingleKeyIndex(fast_probe, i); - } else { + if (do_unsafe_row) { + for (int i = 0; i < key_array->length(); i++) { + int index; + for (int colIdx = 0; colIdx < payloads.size(); colIdx++) { if (has_null_list[colIdx] && key_payloads[colIdx]->IsNull(i)) { // If the keys in stream side contains null, will join this row. @@ -944,18 +981,35 @@ class ConditionedProbeKernel::Impl { hash_relation_->IfExists(typed_key_array->GetView(i), unsafe_key_row); } } + + if (index == -1) { + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->AppendNull()); + } else { + THROW_NOT_OK(appender->Append(0, i)); + } + } + out_length += 1; + } } - if (index == -1) { - for (auto appender : appender_list_) { - if (appender->GetType() == AppenderBase::left) { - THROW_NOT_OK(appender->AppendNull()); - } else { - THROW_NOT_OK(appender->Append(0, i)); + } else { + for (int i = 0; i < key_array->length(); i++) { + int index = getSingleKeyIndex(fast_probe, i); + + if (index == -1) { + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->AppendNull()); + } else { + THROW_NOT_OK(appender->Append(0, i)); + } } + out_length += 1; } - out_length += 1; } } + return out_length; } @@ -1074,33 +1128,46 @@ class ConditionedProbeKernel::Impl { uint64_t out_length = 0; auto unsafe_key_row = std::make_shared(payloads.size()); - for (int i = 0; i < key_array->length(); i++) { - int index; - if (!do_unsafe_row) { - index = fast_probe(i); - } else { - if (unsafe_key_row) { - unsafe_key_row->reset(); - } + if (do_unsafe_row) { + for (int i = 0; i < key_array->length(); i++) { + unsafe_key_row->reset(); for (auto payload_arr : payloads) { payload_arr->Append(i, &unsafe_key_row); } - auto make_unsafe_row_end = std::chrono::steady_clock::now(); - index = hash_relation_->IfExists(typed_key_array->GetView(i), unsafe_key_row); - } - if (index == -1) { - continue; + int index = + hash_relation_->IfExists(typed_key_array->GetView(i), unsafe_key_row); + + if (index == -1) { + continue; + } + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->AppendNull()); + } else { + THROW_NOT_OK(appender->Append(0, i)); + } + } + out_length += 1; } - for (auto appender : appender_list_) { - if (appender->GetType() == AppenderBase::left) { - THROW_NOT_OK(appender->AppendNull()); - } else { - THROW_NOT_OK(appender->Append(0, i)); + } else { + for (int i = 0; i < key_array->length(); i++) { + int index = fast_probe(i); + + if (index == -1) { + continue; + } + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::left) { + THROW_NOT_OK(appender->AppendNull()); + } else { + THROW_NOT_OK(appender->Append(0, i)); + } } + out_length += 1; } - out_length += 1; } + return out_length; } @@ -1197,32 +1264,51 @@ class ConditionedProbeKernel::Impl { } uint64_t out_length = 0; auto unsafe_key_row = std::make_shared(payloads.size()); - for (int i = 0; i < key_array->length(); i++) { - int index; - if (!do_unsafe_row) { - index = fast_probe(i); - } else { + if (do_unsafe_row) { + for (int i = 0; i < key_array->length(); i++) { unsafe_key_row->reset(); for (auto payload_arr : payloads) { payload_arr->Append(i, &unsafe_key_row); } - index = hash_relation_->IfExists(typed_key_array->GetView(i), unsafe_key_row); - } - bool exists = true; - if (index == -1) { - exists = false; + int index = + hash_relation_->IfExists(typed_key_array->GetView(i), unsafe_key_row); + + bool exists = true; + if (index == -1) { + exists = false; + } + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::exist) { + THROW_NOT_OK(appender->AppendExistence(exists)); + } else if (appender->GetType() == AppenderBase::right) { + THROW_NOT_OK(appender->Append(0, i)); + } else { + THROW_NOT_OK(appender->AppendNull()); + } + } + out_length += 1; } - for (auto appender : appender_list_) { - if (appender->GetType() == AppenderBase::exist) { - THROW_NOT_OK(appender->AppendExistence(exists)); - } else if (appender->GetType() == AppenderBase::right) { - THROW_NOT_OK(appender->Append(0, i)); - } else { - THROW_NOT_OK(appender->AppendNull()); + } else { + for (int i = 0; i < key_array->length(); i++) { + int index = fast_probe(i); + + bool exists = true; + if (index == -1) { + exists = false; } + for (auto appender : appender_list_) { + if (appender->GetType() == AppenderBase::exist) { + THROW_NOT_OK(appender->AppendExistence(exists)); + } else if (appender->GetType() == AppenderBase::right) { + THROW_NOT_OK(appender->Append(0, i)); + } else { + THROW_NOT_OK(appender->AppendNull()); + } + } + out_length += 1; } - out_length += 1; } + return out_length; } diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc index 8b66ddb0d..e46ee123b 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc @@ -76,6 +76,11 @@ class HashRelationKernel::Impl { std::dynamic_pointer_cast(parameter_nodes[0])->holder()); builder_type_ = std::stoi(builder_type_str); } + // Notes: + // 0 -> unsed, should be removed + // 1 -> SHJ + // 2 -> BHJ + // 3 -> Semi opts SHJ, can be applied to anti join also if (builder_type_ == 3) { // This is for using unsafeHashMap while with skipDuplication strategy semi_ = true; @@ -117,6 +122,7 @@ class HashRelationKernel::Impl { hash_relation_ = std::make_shared(ctx_, hash_relation_list, key_size_); } else { + // TODO: better to estimate key_size_ for multiple keys join hash_relation_ = std::make_shared(ctx_, hash_relation_list); } } else { @@ -133,36 +139,24 @@ class HashRelationKernel::Impl { } if (builder_type_ == 2) return arrow::Status::OK(); std::shared_ptr key_array; - if (builder_type_ == 0) { - if (key_projector_) { - arrow::ArrayVector outputs; - auto length = in.size() > 0 ? in[0]->length() : 0; - auto in_batch = - arrow::RecordBatch::Make(arrow::schema(input_field_list_), length, in); - RETURN_NOT_OK(key_projector_->Evaluate(*in_batch, ctx_->memory_pool(), &outputs)); - key_array = outputs[0]; - } else { - key_array = in[key_indices_[0]]; - } - key_hash_cached_.push_back(key_array); - } else { - /* Process original key projection */ - arrow::ArrayVector project_outputs; - auto length = in.size() > 0 ? in[0]->length() : 0; - auto in_batch = - arrow::RecordBatch::Make(arrow::schema(input_field_list_), length, in); - RETURN_NOT_OK(key_prepare_projector_->Evaluate(*in_batch, ctx_->memory_pool(), - &project_outputs)); - keys_cached_.push_back(project_outputs); - /* Process key Hash projection */ - arrow::ArrayVector hash_outputs; - auto hash_in_batch = - arrow::RecordBatch::Make(hash_input_schema_, length, project_outputs); - RETURN_NOT_OK( - key_projector_->Evaluate(*hash_in_batch, ctx_->memory_pool(), &hash_outputs)); - key_array = hash_outputs[0]; - key_hash_cached_.push_back(key_array); - } + + /* Process original key projection */ + arrow::ArrayVector project_outputs; + auto length = in.size() > 0 ? in[0]->length() : 0; + auto in_batch = + arrow::RecordBatch::Make(arrow::schema(input_field_list_), length, in); + RETURN_NOT_OK(key_prepare_projector_->Evaluate(*in_batch, ctx_->memory_pool(), + &project_outputs)); + keys_cached_.push_back(project_outputs); + /* Process key Hash projection */ + arrow::ArrayVector hash_outputs; + auto hash_in_batch = + arrow::RecordBatch::Make(hash_input_schema_, length, project_outputs); + RETURN_NOT_OK( + key_projector_->Evaluate(*hash_in_batch, ctx_->memory_pool(), &hash_outputs)); + key_array = hash_outputs[0]; + key_hash_cached_.push_back(key_array); + return arrow::Status::OK(); } @@ -172,12 +166,13 @@ class HashRelationKernel::Impl { if (builder_type_ == 1) { int init_key_capacity = 128; int init_bytes_map_capacity = init_key_capacity * 256; + // TODO: should try to estimate the disticnt keys if (num_total_cached_ > 32) { init_key_capacity = pow(2, ceil(log2(num_total_cached_)) + 1); } long tmp_capacity = init_key_capacity; if (key_size_ != -1) { - tmp_capacity *= 6; + tmp_capacity *= 12; } else { tmp_capacity *= 128; }