From 8d2d2aee28372688129bdd20e7aca46937d915bf Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Wed, 7 Jul 2021 15:38:25 +0800 Subject: [PATCH] fix segfault in SMJ --- .github/workflows/unittests.yml | 2 +- .../ColumnarWholeStageCodegenExec.scala | 13 ++++--- .../sql/DataFrameWindowFunctionsSuite.scala | 2 +- .../apache/spark/sql/SQLQueryTestSuite.scala | 16 ++------ .../spark/sql/test/SharedSparkSession.scala | 6 +-- .../arrow_compute/ext/codegen_common.cc | 38 +++++++++++++++++-- .../arrow_compute/ext/codegen_common.h | 3 ++ .../codegen/arrow_compute/ext/kernels_ext.cc | 4 +- .../codegen/arrow_compute/ext/sort_kernel.cc | 7 ++-- .../arrow_compute/ext/window_sort_kernel.h | 5 ++- native-sql-engine/tools/run_ut.sh | 4 +- 11 files changed, 63 insertions(+), 37 deletions(-) diff --git a/.github/workflows/unittests.yml b/.github/workflows/unittests.yml index ece996709..a2120f03b 100644 --- a/.github/workflows/unittests.yml +++ b/.github/workflows/unittests.yml @@ -147,7 +147,7 @@ jobs: cd arrow-data-source mvn clean install -DskipTests -Dbuild_arrow=OFF cd .. - mvn clean package -P full-scala-compiler -Phadoop3.2 -am -pl native-sql-engine/core -DskipTests -Dbuild_arrow=OFF + mvn clean package -P full-scala-compiler -Phadoop-3.2 -am -pl native-sql-engine/core -DskipTests -Dbuild_arrow=OFF mvn test -P full-scala-compiler -DmembersOnlySuites=org.apache.spark.sql.nativesql -am -DfailIfNoTests=false -Dexec.skip=true -DargLine="-Dspark.test.home=/tmp/spark-3.1.1-bin-hadoop3.2" &> log-file.log echo '#!/bin/bash' > grep.sh echo "module_tested=0; module_should_test=8; tests_total=0; while read -r line; do num=\$(echo \"\$line\" | grep -o -E '[0-9]+'); tests_total=\$((tests_total+num)); done <<<\"\$(grep \"Total number of tests run:\" log-file.log)\"; succeed_total=0; while read -r line; do [[ \$line =~ [^0-9]*([0-9]+)\, ]]; num=\${BASH_REMATCH[1]}; succeed_total=\$((succeed_total+num)); let module_tested++; done <<<\"\$(grep \"succeeded\" log-file.log)\"; if test \$tests_total -eq \$succeed_total -a \$module_tested -eq \$module_should_test; then echo \"All unit tests succeed\"; else echo \"Unit tests failed\"; exit 1; fi" >> grep.sh diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala index f8d3d6932..1ecdf50c2 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala @@ -191,18 +191,20 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I outputAttributes: Seq[Attribute]): TreeNode = { val outputFieldList: List[Field] = outputAttributes.toList.map(attr => { Field - .nullable(s"${attr.name}#${attr.exprId.id}", CodeGeneration.getResultType(attr.dataType)) + .nullable(s"${attr.name.toUpperCase()}#${attr.exprId.id}", + CodeGeneration.getResultType(attr.dataType)) }) val keyFieldList: List[Field] = keyAttributes.toList.map(attr => { val field = Field - .nullable(s"${attr.name}#${attr.exprId.id}", CodeGeneration.getResultType(attr.dataType)) + .nullable(s"${attr.name.toUpperCase()}#${attr.exprId.id}", + CodeGeneration.getResultType(attr.dataType)) if (outputFieldList.indexOf(field) == -1) { - throw new UnsupportedOperationException( - s"CashedRelation not found ${attr.name}#${attr.exprId.id} in ${outputAttributes}") + throw new UnsupportedOperationException(s"CachedRelation not found" + + s"${attr.name.toUpperCase()}#${attr.exprId.id} in ${outputAttributes}") } field - }); + }) val key_args_node = TreeBuilder.makeFunction( "key_field", @@ -424,7 +426,6 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I iter } } - idx += 1 } diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 76edcbec9..f683cc546 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -553,7 +553,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest Row("b", 2, 4, 8))) } - ignore("null inputs") { + test("null inputs") { val df = Seq(("a", 1), ("a", 1), ("a", 2), ("a", 2), ("b", 4), ("b", 3), ("b", 2)) .toDF("key", "value") val window = Window.orderBy() diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 564fd8330..b05c60962 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -169,23 +169,15 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper "postgreSQL/window_part3.sql", // WindowSortKernel::Impl::GetCompFunction_ "subquery/in-subquery/in-joins.sql", // NullPointerException "udf/postgreSQL/udf-aggregates_part1.sql", // IllegalStateException: Value at index is null - "subquery/exists-subquery/exists-within-and-or.sql", - "postgreSQL/boolean.sql", - "postgreSQL/select_distinct.sql", - "postgreSQL/window_part1.sql", - "postgreSQL/union.sql", - "udf/udf-group-analytics.sql", /** Cannot reproduce */ -// "cte-nonlegacy.sql", - "subquery/exists-subquery/exists-cte.sql", // segfault: arrow::RecordBatch::num_columns() -// "subquery/exists-subquery/exists-basic.sql", -// "subquery/exists-subquery/exists-orderby-limit.sql", - "subquery/exists-subquery/exists-joins-and-set-ops.sql", +// "subquery/exists-subquery/exists-joins-and-set-ops.sql", /** incorrect result */ "count.sql", // interrupted by signal 9: SIGKILL +// "subquery/in-subquery/in-set-operations.sql", +// "subquery/in-subquery/in-order-by.sql", // "postgreSQL/join.sql", // "udf/postgreSQL/udf-join.sql", // "udf/udf-window.sql", @@ -297,7 +289,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper } /** To run only the set test */ // if (testList.exists(t => -// testCase.name.toLowerCase(Locale.ROOT).contains(t.toLowerCase(Locale.ROOT)))) { +// testCase.name.toLowerCase(Locale.ROOT).contains(t.toLowerCase(Locale.ROOT)))) { // test(testCase.name) { // runTest(testCase) // } diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala index c06cd8955..231ff91c4 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -64,8 +64,8 @@ trait SharedSparkSessionBase with Eventually { self: Suite => protected def sparkConf = { - val zoneID = "GMT-8" - val locale = Locale.US + val zoneID = "UTC" + val locale = Locale.ROOT TimeZone.setDefault(TimeZone.getTimeZone(zoneID)) Locale.setDefault(locale) @@ -90,7 +90,7 @@ trait SharedSparkSessionBase .set("spark.memory.offHeap.size", "10g") .set("spark.sql.join.preferSortMergeJoin", "false") .set("spark.unsafe.exceptionOnMemoryLeak", "false") - .set("spark.oap.sql.columnar.tmp_dir", "/codegen/nativesql/") + // .set("spark.oap.sql.columnar.tmp_dir", "/codegen/nativesql/") .set("spark.oap.sql.columnar.preferColumnar", "true") .set("spark.sql.parquet.enableVectorizedReader", "false") .set("spark.sql.orc.enableVectorizedReader", "false") diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_common.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_common.cc index 635ca4861..31451b86e 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_common.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_common.cc @@ -278,6 +278,18 @@ std::string GetTemplateString(std::shared_ptr type, } } +bool StrCmpCaseInsensitive(const std::string& str1, const std::string& str2) { + auto left_str = str1; + auto right_str = str2; + std::transform(left_str.begin(), left_str.end(), left_str.begin(), ::toupper); + std::transform(right_str.begin(), right_str.end(), right_str.begin(), ::toupper); + if (left_str == right_str) { + return true; + } else { + return false; + } +} + std::string GetParameterList(std::vector parameter_list_in, bool comma_ahead, std::string split) { std::vector parameter_list; @@ -430,7 +442,7 @@ arrow::Status GetIndexList(const std::vector>& tar int i = 0; found = false; for (auto field : source_list) { - if (key_field->name() == field->name()) { + if (StrCmpCaseInsensitive(key_field->name(), field->name())) { found = true; break; } @@ -457,7 +469,7 @@ arrow::Status GetIndexList( i = 0; found = false; for (auto field : left_field_list) { - if (target_field->name() == field->name()) { + if (StrCmpCaseInsensitive(target_field->name(), field->name())) { (*result_schema_index_list).push_back(std::make_pair(0, i)); found = true; break; @@ -467,7 +479,7 @@ arrow::Status GetIndexList( if (found == true) continue; i = 0; for (auto field : right_field_list) { - if (target_field->name() == field->name()) { + if (StrCmpCaseInsensitive(target_field->name(), field->name())) { (*result_schema_index_list).push_back(std::make_pair(1, i)); found = true; right_found++; @@ -485,13 +497,31 @@ arrow::Status GetIndexList( return arrow::Status::OK(); } +std::vector GetIndicesFromSchemaCaseInsensitive( + const std::shared_ptr& result_schema, const std::string& field_name) { + auto fields = result_schema->fields(); + std::vector> upper_fields; + for (auto field : fields) { + auto upper_field_name = field->name(); + std::transform(upper_field_name.begin(), upper_field_name.end(), + upper_field_name.begin(), ::toupper); + auto upper_field = arrow::field(upper_field_name, field->type()); + upper_fields.push_back(upper_field); + } + std::shared_ptr upper_shema = + std::make_shared(upper_fields); + auto upper_name = field_name; + std::transform(upper_name.begin(), upper_name.end(), upper_name.begin(), ::toupper); + return upper_shema->GetAllFieldIndices(upper_name); +} + arrow::Status GetIndexListFromSchema( const std::shared_ptr& result_schema, const std::vector>& field_list, std::vector* index_list) { int i = 0; for (auto field : field_list) { - auto indices = result_schema->GetAllFieldIndices(field->name()); + auto indices = GetIndicesFromSchemaCaseInsensitive(result_schema, field->name()); if (indices.size() >= 1) { (*index_list).push_back(i); } diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_common.h b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_common.h index b08fca3ad..96c76f476 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_common.h +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_common.h @@ -48,6 +48,7 @@ std::string GetTypeString(std::shared_ptr type, std::string GetTemplateString(std::shared_ptr type, std::string template_name, std::string tail = "", std::string prefix = ""); +bool StrCmpCaseInsensitive(const std::string& str1, const std::string& str2); gandiva::ExpressionPtr GetConcatedKernel(std::vector key_list); gandiva::ExpressionPtr GetHash32Kernel(std::vector key_list); gandiva::ExpressionPtr GetHash32Kernel(std::vector key_list, @@ -72,6 +73,8 @@ arrow::Status GetIndexList( const std::vector>& right_field_list, const bool isExistJoin, int* exist_index, std::vector>* result_schema_index_list); +std::vector GetIndicesFromSchemaCaseInsensitive( + const std::shared_ptr& result_schema, const std::string& field_name); arrow::Status GetIndexListFromSchema( const std::shared_ptr& result_schema, const std::vector>& field_list, diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc index c0ac168f7..43ecc7e28 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.cc @@ -327,9 +327,9 @@ class CachedRelationKernel::Impl { result_type_(result_type) { pool_ = ctx_->memory_pool(); for (auto field : key_field_list) { - auto indices = result_schema->GetAllFieldIndices(field->name()); + auto indices = GetIndicesFromSchemaCaseInsensitive(result_schema, field->name()); if (indices.size() != 1) { - std::cout << "[ERROR] SortArraysToIndicesKernel::Impl can't find key " + std::cout << "[ERROR] CachedRelationKernel::Impl can't find key " << field->ToString() << " from " << result_schema->ToString() << std::endl; throw; diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc index 9dc0afcc3..e62c7552f 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc @@ -106,7 +106,7 @@ class SortArraysToIndicesKernel::Impl { std::cout << "use SortArraysToIndicesKernel::Impl" << std::endl; #endif for (auto field : key_field_list) { - auto indices = result_schema->GetAllFieldIndices(field->name()); + auto indices = GetIndicesFromSchemaCaseInsensitive(result_schema, field->name()); if (indices.size() != 1) { std::cout << "[ERROR] SortArraysToIndicesKernel::Impl can't find key " << field->ToString() << " from " << result_schema->ToString() @@ -1208,7 +1208,8 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl { #ifdef DEBUG std::cout << "UseSortOnekeyKernel" << std::endl; #endif - auto indices = result_schema->GetAllFieldIndices(key_field_list[0]->name()); + auto indices = + GetIndicesFromSchemaCaseInsensitive(result_schema, key_field_list[0]->name()); if (indices.size() < 1) { std::cout << "[ERROR] SortOnekeyKernel for arithmetic can't find key " << key_field_list[0]->ToString() << " from " << result_schema->ToString() @@ -1547,7 +1548,7 @@ class SortMultiplekeyKernel : public SortArraysToIndicesKernel::Impl { std::cout << "UseSortMultiplekeyKernel" << std::endl; #endif for (auto field : key_field_list) { - auto indices = result_schema->GetAllFieldIndices(field->name()); + auto indices = GetIndicesFromSchemaCaseInsensitive(result_schema, field->name()); if (indices.size() != 1) { std::cout << "[ERROR] SortArraysToIndicesKernel::Impl can't find key " << field->ToString() << " from " << result_schema->ToString() diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h index ea9b2e7a1..38bb17da2 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h @@ -55,7 +55,7 @@ class WindowSortKernel::Impl { std::shared_ptr result_schema, bool nulls_first, bool asc) : ctx_(ctx), nulls_first_(nulls_first), asc_(asc) { for (auto field : key_field_list) { - auto indices = result_schema->GetAllFieldIndices(field->name()); + auto indices = GetIndicesFromSchemaCaseInsensitive(result_schema, field->name()); if (indices.size() != 1) { std::cout << "[ERROR] WindowSortKernel::Impl can't find key " << field->ToString() << " from " << result_schema->ToString() << std::endl; @@ -519,7 +519,8 @@ class WindowSortOnekeyKernel : public WindowSortKernel::Impl { std::shared_ptr result_schema, bool nulls_first, bool asc) : ctx_(ctx), nulls_first_(nulls_first), asc_(asc), result_schema_(result_schema) { - auto indices = result_schema->GetAllFieldIndices(key_field_list[0]->name()); + auto indices = + GetIndicesFromSchemaCaseInsensitive(result_schema, key_field_list[0]->name()); key_id_ = indices[0]; col_num_ = result_schema->num_fields(); } diff --git a/native-sql-engine/tools/run_ut.sh b/native-sql-engine/tools/run_ut.sh index 6f1d55997..767c98ec7 100755 --- a/native-sql-engine/tools/run_ut.sh +++ b/native-sql-engine/tools/run_ut.sh @@ -13,9 +13,7 @@ then else echo "SPARK_HOME is $spark_home" fi - -mvn clean test -am -pl native-sql-engine/core -Dbuild_arrow=OFF -Dbuild_protobuf=OFF -DfailIfNoTests=false -Dmaven.test.failure.ignore=true -DargLine="-Dspark.test.home=$spark_home" -DargLine="-Dspark.testing=true" &> native-sql-engine/tools/log-file.log - +mvn clean test -P full-scala-compiler -am -pl native-sql-engine/core -Dbuild_arrow=OFF -Dbuild_protobuf=OFF -DfailIfNoTests=false -DargLine="-Dspark.test.home=$spark_home" -Dexec.skip=true -Dmaven.test.failure.ignore=true &> native-sql-engine/tools/log-file.log cd native-sql-engine/tools/ tests_total=0 module_tested=0