diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala index ec67e8948..2d372c836 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala @@ -92,6 +92,22 @@ case class ColumnarShuffledHashJoinExec( } } + val builder_type = { + if (condition.isDefined) 1 + else { + joinType match { + case LeftSemi => + 3 + case LeftAnti => + 3 + case j: ExistenceJoin => + 3 + case other => + 1 + } + } + } + def buildCheck(): Unit = { // build check for condition val conditionExpr: Expression = condition.orNull @@ -180,7 +196,8 @@ case class ColumnarShuffledHashJoinExec( ColumnarCodegenContext( inputSchema, null, - ColumnarConditionedProbeJoin.prepareHashBuildFunction(buildKeyExprs, buildPlan.output, 1)) + ColumnarConditionedProbeJoin + .prepareHashBuildFunction(buildKeyExprs, buildPlan.output, builder_type)) } override def supportColumnarCodegen: Boolean = true @@ -256,7 +273,7 @@ case class ColumnarShuffledHashJoinExec( val hashRelationBatchHolder: ListBuffer[ColumnarBatch] = ListBuffer() val hash_relation_function = ColumnarConditionedProbeJoin - .prepareHashBuildFunction(buildKeyExprs, buildPlan.output, 1) + .prepareHashBuildFunction(buildKeyExprs, buildPlan.output, builder_type) val hash_relation_schema = ConverterUtils.toArrowSchema(buildPlan.output) val hash_relation_expr = TreeBuilder.makeExpression( diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConditionedProbeJoin.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConditionedProbeJoin.scala index 3b7c6c104..1eead6e8c 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConditionedProbeJoin.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConditionedProbeJoin.scala @@ -55,7 +55,7 @@ object ColumnarConditionedProbeJoin extends Logging { def prepareHashBuildFunction( buildKeys: Seq[Expression], buildInputAttributes: Seq[Attribute], - builder_type: Int = 0, + builder_type: Int = 1, is_broadcast: Boolean = false): TreeNode = { val buildInputFieldList: List[Field] = buildInputAttributes.toList.map(attr => { Field diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc index b0411a4cd..00276e57e 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_relation_kernel.cc @@ -75,6 +75,11 @@ class HashRelationKernel::Impl { std::dynamic_pointer_cast(parameter_nodes[0])->holder()); builder_type_ = std::stoi(builder_type_str); } + if (builder_type_ == 3) { + // This is for using unsafeHashMap while with skipDuplication strategy + semi_ = true; + builder_type_ = 1; + } if (builder_type_ == 0) { // builder_type_ == 0 will be abandoned in near future, won't support // decimal here. @@ -227,11 +232,11 @@ class HashRelationKernel::Impl { PROCESS(arrow::Decimal128Type) if (project_outputs.size() == 1) { switch (project_outputs[0]->type_id()) { -#define PROCESS(InType) \ - case TypeTraits::type_id: { \ - using ArrayType = precompile::TypeTraits::ArrayType; \ - auto typed_key_arr = std::make_shared(project_outputs[0]); \ - RETURN_NOT_OK(hash_relation_->AppendKeyColumn(key_array, typed_key_arr)); \ +#define PROCESS(InType) \ + case TypeTraits::type_id: { \ + using ArrayType = precompile::TypeTraits::ArrayType; \ + auto typed_key_arr = std::make_shared(project_outputs[0]); \ + RETURN_NOT_OK(hash_relation_->AppendKeyColumn(key_array, typed_key_arr, semi_)); \ } break; PROCESS_SUPPORTED_TYPES(PROCESS) #undef PROCESS @@ -252,7 +257,7 @@ class HashRelationKernel::Impl { RETURN_NOT_OK(MakeUnsafeArray(arr->type(), i++, arr, &payload)); payloads.push_back(payload); } - RETURN_NOT_OK(hash_relation_->AppendKeyColumn(key_array, payloads)); + RETURN_NOT_OK(hash_relation_->AppendKeyColumn(key_array, payloads, semi_)); } } } @@ -281,6 +286,7 @@ class HashRelationKernel::Impl { std::vector> key_hash_cached_; uint64_t num_total_cached_ = 0; int builder_type_ = 0; + bool semi_ = false; int key_size_ = -1; // If key_size_ != 0, key will be stored directly in key_map class HashRelationResultIterator : public ResultIterator { diff --git a/native-sql-engine/cpp/src/codegen/common/hash_relation.h b/native-sql-engine/cpp/src/codegen/common/hash_relation.h index b76b742c8..95b5b3ed1 100644 --- a/native-sql-engine/cpp/src/codegen/common/hash_relation.h +++ b/native-sql-engine/cpp/src/codegen/common/hash_relation.h @@ -167,9 +167,9 @@ class HashRelation { return arrow::Status::Invalid("Error minimizing hash table"); } - arrow::Status AppendKeyColumn( - std::shared_ptr in, - const std::vector>& payloads) { + arrow::Status AppendKeyColumn(std::shared_ptr in, + const std::vector>& payloads, + bool semi = false) { if (hash_table_ == nullptr) { throw std::runtime_error("HashRelation Get failed, hash_table is null."); } @@ -184,7 +184,11 @@ class HashRelation { // chendi: Since spark won't join rows contain null, we will skip null // row. if (payload->isNullExists()) continue; - RETURN_NOT_OK(Insert(typed_array->GetView(i), payload, num_arrays_, i)); + if (!semi) { + RETURN_NOT_OK(Insert(typed_array->GetView(i), payload, num_arrays_, i)); + } else { + RETURN_NOT_OK(InsertSkipDup(typed_array->GetView(i), payload, num_arrays_, i)); + } } num_arrays_++; @@ -196,7 +200,8 @@ class HashRelation { typename std::enable_if_t::value>* = nullptr> arrow::Status AppendKeyColumn(std::shared_ptr in, - std::shared_ptr original_key) { + std::shared_ptr original_key, + bool semi = false) { if (hash_table_ == nullptr) { throw std::runtime_error("HashRelation Get failed, hash_table is null."); } @@ -212,8 +217,13 @@ class HashRelation { if (original_key->IsNull(i)) { RETURN_NOT_OK(InsertNull(num_arrays_, i)); } else { - RETURN_NOT_OK( - Insert(typed_array->GetView(i), original_key->GetView(i), num_arrays_, i)); + if (!semi) { + RETURN_NOT_OK(Insert(typed_array->GetView(i), original_key->GetView(i), + num_arrays_, i)); + } else { + RETURN_NOT_OK(InsertSkipDup(typed_array->GetView(i), original_key->GetView(i), + num_arrays_, i)); + } } } } @@ -224,7 +234,8 @@ class HashRelation { } arrow::Status AppendKeyColumn(std::shared_ptr in, - std::shared_ptr original_key) { + std::shared_ptr original_key, + bool semi = false) { if (hash_table_ == nullptr) { throw std::runtime_error("HashRelation Get failed, hash_table is null."); } @@ -242,8 +253,13 @@ class HashRelation { RETURN_NOT_OK(InsertNull(num_arrays_, i)); } else { auto str = original_key->GetString(i); - RETURN_NOT_OK( - Insert(typed_array->GetView(i), str.data(), str.size(), num_arrays_, i)); + if (!semi) { + RETURN_NOT_OK( + Insert(typed_array->GetView(i), str.data(), str.size(), num_arrays_, i)); + } else { + RETURN_NOT_OK(InsertSkipDup(typed_array->GetView(i), str.data(), str.size(), + num_arrays_, i)); + } } } } @@ -462,6 +478,38 @@ class HashRelation { return arrow::Status::OK(); } + arrow::Status InsertSkipDup(int32_t v, std::shared_ptr payload, + uint32_t array_id, uint32_t id) { + assert(hash_table_ != nullptr); + auto index = ArrayItemIndex(array_id, id); + if (!appendNewKey(hash_table_, payload.get(), v, (char*)&index, + sizeof(ArrayItemIndex))) { + return arrow::Status::CapacityError("Insert to HashMap failed."); + } + return arrow::Status::OK(); + } + + template + arrow::Status InsertSkipDup(int32_t v, CType payload, uint32_t array_id, uint32_t id) { + assert(hash_table_ != nullptr); + auto index = ArrayItemIndex(array_id, id); + if (!appendNewKey(hash_table_, payload, v, (char*)&index, sizeof(ArrayItemIndex))) { + return arrow::Status::CapacityError("Insert to HashMap failed."); + } + return arrow::Status::OK(); + } + + arrow::Status InsertSkipDup(int32_t v, const char* payload, size_t payload_len, + uint32_t array_id, uint32_t id) { + assert(hash_table_ != nullptr); + auto index = ArrayItemIndex(array_id, id); + if (!appendNewKey(hash_table_, payload, payload_len, v, (char*)&index, + sizeof(ArrayItemIndex))) { + return arrow::Status::CapacityError("Insert to HashMap failed."); + } + return arrow::Status::OK(); + } + arrow::Status InsertNull(uint32_t array_id, uint32_t id) { // since vanilla spark doesn't support match null in join // we can directly retun to optimize diff --git a/native-sql-engine/cpp/src/third_party/row_wise_memory/hashMap.h b/native-sql-engine/cpp/src/third_party/row_wise_memory/hashMap.h index 0c7975884..f74436eeb 100644 --- a/native-sql-engine/cpp/src/third_party/row_wise_memory/hashMap.h +++ b/native-sql-engine/cpp/src/third_party/row_wise_memory/hashMap.h @@ -20,6 +20,8 @@ #include #include +#include + #include "codegen/arrow_compute/ext/array_item_index.h" #include "third_party/row_wise_memory/unsafe_row.h" @@ -1089,3 +1091,300 @@ static inline bool append(unsafeHashMap* hashMap, const char* keyRow, size_t key return true; } + +/** + * append is used for same key may has multiple value scenario + * if key does not exists, insert key and append a new record for key value + * if key exists, append a new record and linked by previous same key record + * + * return should be a flag of succession of the append. + **/ +static inline bool appendNewKey(unsafeHashMap* hashMap, UnsafeRow* keyRow, int hashVal, + char* value, size_t value_size) { + assert(hashMap->keyArray != NULL); + + const int cursor = hashMap->cursor; + const int mask = hashMap->arrayCapacity - 1; + + int pos = hashVal & mask; + int step = 1; + + const int keyLength = keyRow->sizeInBytes(); + char* base = hashMap->bytesMap; + int klen = keyRow->sizeInBytes(); + const int vlen = value_size; + const int recordLength = 4 + klen + vlen + 4; + char* record = nullptr; + + int keySizeInBytes = 8; + char* keyArrayBase = hashMap->keyArray; + + while (true) { + int KeyAddressOffset = *(int*)(keyArrayBase + pos * keySizeInBytes); + int keyHashCode = *(int*)(keyArrayBase + pos * keySizeInBytes + 4); + + if (KeyAddressOffset < 0) { + // This is a new key. + int keyArrayPos = pos; + record = base + cursor; + // Update keyArray in hashMap + hashMap->numKeys++; + *(int*)(keyArrayBase + pos * keySizeInBytes) = cursor; + *(int*)(keyArrayBase + pos * keySizeInBytes + 4) = hashVal; + hashMap->cursor += recordLength; + break; + } else { + if ((int)keyHashCode == hashVal) { + // Full hash code matches. Let's compare the keys for equality. + record = base + KeyAddressOffset; + if ((getKeyLength(record) == keyLength) && + (memcmp(keyRow->data, getKeyFromBytesMap(record), keyLength) == 0)) { + return true; + } + } + } + + pos = (pos + step) & mask; + step++; + } + + // copy keyRow and valueRow into hashmap + assert((klen & 0xff00) == 0); + auto total_key_length = ((8 + klen + vlen) << 16) | klen; + *((int*)record) = total_key_length; + memcpy(record + 4, keyRow->data, klen); + memcpy(record + 4 + klen, value, vlen); + *((int*)(record + 4 + klen + vlen)) = 0; + + // See if we need to grow keyArray + int growthThreshold = (int)(hashMap->arrayCapacity * loadFactor); + if ((hashMap->numKeys > growthThreshold) && + (hashMap->arrayCapacity < MAX_HASH_MAP_CAPACITY)) { + if (!growAndRehashKeyArray(hashMap)) hashMap->needSpill = true; + } + + return true; +} + +/** + * append is used for same key may has multiple value scenario + * if key does not exists, insert key and append a new record for key value + * if key exists, append a new record and linked by previous same key record + * + * return should be a flag of succession of the append. + **/ +template ::value>* = nullptr> +static inline bool appendNewKey(unsafeHashMap* hashMap, CType keyRow, int hashVal, + char* value, size_t value_size) { + assert(hashMap->keyArray != NULL); + + const int cursor = hashMap->cursor; + const int mask = hashMap->arrayCapacity - 1; + + int pos = hashVal & mask; + int step = 1; + + const int keyLength = sizeof(keyRow); + char* base = hashMap->bytesMap; + int klen = 0; + const int vlen = value_size; + const int recordLength = 4 + klen + vlen + 4; + char* record = nullptr; + + int keySizeInBytes = hashMap->bytesInKeyArray; + char* keyArrayBase = hashMap->keyArray; + + // chendi: Add a optimization here, use offset first bit to indicate if this + // offset is ArrayItemIndex or bytesmap offset if first key, it will be + // arrayItemIndex first bit is 0 if multiple same key, it will be offset first + // bit is 1 + + while (true) { + int KeyAddressOffset = *(int*)(keyArrayBase + pos * keySizeInBytes); + int keyHashCode = *(int*)(keyArrayBase + pos * keySizeInBytes + 4); + + if (KeyAddressOffset == -1) { + // This is a new key. + int keyArrayPos = pos; + // Update keyArray in hashMap + hashMap->numKeys++; + *(int*)(keyArrayBase + pos * keySizeInBytes) = *(int*)value; + *(int*)(keyArrayBase + pos * keySizeInBytes + 4) = hashVal; + *(CType*)(keyArrayBase + pos * keySizeInBytes + 8) = keyRow; + return true; + } else { + char* previous_value = nullptr; + if (((int)keyHashCode == hashVal) && + (keyRow == *(CType*)(keyArrayBase + pos * keySizeInBytes + 8))) { + return true; + } + } + + pos = (pos + step) & mask; + step++; + } + + // copy keyRow and valueRow into hashmap + auto total_key_length = ((8 + klen + vlen) << 16) | klen; + *((int*)record) = total_key_length; + // memcpy(record + 4, &keyRow, klen); + memcpy(record + 4 + klen, value, vlen); + *((int*)(record + 4 + klen + vlen)) = 0; + + // See if we need to grow keyArray + int growthThreshold = (int)(hashMap->arrayCapacity * loadFactor); + if ((hashMap->numKeys > growthThreshold) && + (hashMap->arrayCapacity < MAX_HASH_MAP_CAPACITY)) { + if (!growAndRehashKeyArray(hashMap)) hashMap->needSpill = true; + } + + return true; +} + +/** + * append is used for same key may has multiple value scenario + * if key does not exists, insert key and append a new record for key value + * if key exists, append a new record and linked by previous same key record + * + * return should be a flag of succession of the append. + **/ +template ::value>* = nullptr> +static inline bool appendNewKey(unsafeHashMap* hashMap, CType keyRow, int hashVal, + char* value, size_t value_size) { + assert(hashMap->keyArray != NULL); + + const int cursor = hashMap->cursor; + const int mask = hashMap->arrayCapacity - 1; + + int pos = hashVal & mask; + int step = 1; + + const int keyLength = 16; /*sizeof Deimal128*/ + char* base = hashMap->bytesMap; + int klen = 0; + const int vlen = value_size; + const int recordLength = 4 + klen + vlen + 4; + char* record = nullptr; + + int keySizeInBytes = hashMap->bytesInKeyArray; + char* keyArrayBase = hashMap->keyArray; + + // chendi: Add a optimization here, use offset first bit to indicate if this + // offset is ArrayItemIndex or bytesmap offset if first key, it will be + // arrayItemIndex first bit is 0 if multiple same key, it will be offset first + // bit is 1 + + while (true) { + int KeyAddressOffset = *(int*)(keyArrayBase + pos * keySizeInBytes); + int keyHashCode = *(int*)(keyArrayBase + pos * keySizeInBytes + 4); + + if (KeyAddressOffset == -1) { + // This is a new key. + int keyArrayPos = pos; + // Update keyArray in hashMap + hashMap->numKeys++; + *(int*)(keyArrayBase + pos * keySizeInBytes) = *(int*)value; + *(int*)(keyArrayBase + pos * keySizeInBytes + 4) = hashVal; + *(CType*)(keyArrayBase + pos * keySizeInBytes + 8) = keyRow; + return true; + } else { + char* previous_value = nullptr; + if (((int)keyHashCode == hashVal) && + (keyRow == *(CType*)(keyArrayBase + pos * keySizeInBytes + 8))) { + return true; + } + } + + pos = (pos + step) & mask; + step++; + } + + // copy keyRow and valueRow into hashmap + auto total_key_length = ((8 + klen + vlen) << 16) | klen; + *((int*)record) = total_key_length; + // memcpy(record + 4, &keyRow, klen); + memcpy(record + 4 + klen, value, vlen); + *((int*)(record + 4 + klen + vlen)) = 0; + + // See if we need to grow keyArray + int growthThreshold = (int)(hashMap->arrayCapacity * loadFactor); + if ((hashMap->numKeys > growthThreshold) && + (hashMap->arrayCapacity < MAX_HASH_MAP_CAPACITY)) { + if (!growAndRehashKeyArray(hashMap)) hashMap->needSpill = true; + } + + return true; +} + +/** + * append is used for same key may has multiple value scenario + * if key does not exists, insert key and append a new record for key value + * if key exists, append a new record and linked by previous same key record + * + * return should be a flag of succession of the append. + **/ +static inline bool appendNewKey(unsafeHashMap* hashMap, const char* keyRow, + size_t keyLength, int hashVal, char* value, + size_t value_size) { + assert(hashMap->keyArray != NULL); + + const int cursor = hashMap->cursor; + const int mask = hashMap->arrayCapacity - 1; + + int pos = hashVal & mask; + int step = 1; + + char* base = hashMap->bytesMap; + int klen = keyLength; + const int vlen = value_size; + const int recordLength = 4 + klen + vlen + 4; + char* record = nullptr; + + int keySizeInBytes = hashMap->bytesInKeyArray; + char* keyArrayBase = hashMap->keyArray; + + while (true) { + int KeyAddressOffset = *(int*)(keyArrayBase + pos * keySizeInBytes); + int keyHashCode = *(int*)(keyArrayBase + pos * keySizeInBytes + 4); + + if (KeyAddressOffset < 0) { + // This is a new key. + int keyArrayPos = pos; + record = base + cursor; + // Update keyArray in hashMap + hashMap->numKeys++; + *(int*)(keyArrayBase + pos * keySizeInBytes) = cursor; + *(int*)(keyArrayBase + pos * keySizeInBytes + 4) = hashVal; + hashMap->cursor += recordLength; + break; + } else { + record = base + KeyAddressOffset; + if (((int)keyHashCode == hashVal) && + (memcmp(keyRow, getKeyFromBytesMap(record), keyLength) == 0)) { + return true; + } + } + + pos = (pos + step) & mask; + step++; + } + + // copy keyRow and valueRow into hashmap + assert((klen & 0xff00) == 0); + auto total_key_length = ((8 + klen + vlen) << 16) | klen; + *((int*)record) = total_key_length; + memcpy(record + 4, keyRow, klen); + memcpy(record + 4 + klen, value, vlen); + *((int*)(record + 4 + klen + vlen)) = 0; + + // See if we need to grow keyArray + int growthThreshold = (int)(hashMap->arrayCapacity * loadFactor); + if ((hashMap->numKeys > growthThreshold) && + (hashMap->arrayCapacity < MAX_HASH_MAP_CAPACITY)) { + if (!growAndRehashKeyArray(hashMap)) hashMap->needSpill = true; + } + + return true; +} \ No newline at end of file