Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
fix segfault in SMJ
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Jul 14, 2021
1 parent 4631e48 commit 8d2d2ae
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/unittests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ jobs:
cd arrow-data-source
mvn clean install -DskipTests -Dbuild_arrow=OFF
cd ..
mvn clean package -P full-scala-compiler -Phadoop3.2 -am -pl native-sql-engine/core -DskipTests -Dbuild_arrow=OFF
mvn clean package -P full-scala-compiler -Phadoop-3.2 -am -pl native-sql-engine/core -DskipTests -Dbuild_arrow=OFF
mvn test -P full-scala-compiler -DmembersOnlySuites=org.apache.spark.sql.nativesql -am -DfailIfNoTests=false -Dexec.skip=true -DargLine="-Dspark.test.home=/tmp/spark-3.1.1-bin-hadoop3.2" &> log-file.log
echo '#!/bin/bash' > grep.sh
echo "module_tested=0; module_should_test=8; tests_total=0; while read -r line; do num=\$(echo \"\$line\" | grep -o -E '[0-9]+'); tests_total=\$((tests_total+num)); done <<<\"\$(grep \"Total number of tests run:\" log-file.log)\"; succeed_total=0; while read -r line; do [[ \$line =~ [^0-9]*([0-9]+)\, ]]; num=\${BASH_REMATCH[1]}; succeed_total=\$((succeed_total+num)); let module_tested++; done <<<\"\$(grep \"succeeded\" log-file.log)\"; if test \$tests_total -eq \$succeed_total -a \$module_tested -eq \$module_should_test; then echo \"All unit tests succeed\"; else echo \"Unit tests failed\"; exit 1; fi" >> grep.sh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,20 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
outputAttributes: Seq[Attribute]): TreeNode = {
val outputFieldList: List[Field] = outputAttributes.toList.map(attr => {
Field
.nullable(s"${attr.name}#${attr.exprId.id}", CodeGeneration.getResultType(attr.dataType))
.nullable(s"${attr.name.toUpperCase()}#${attr.exprId.id}",
CodeGeneration.getResultType(attr.dataType))
})

val keyFieldList: List[Field] = keyAttributes.toList.map(attr => {
val field = Field
.nullable(s"${attr.name}#${attr.exprId.id}", CodeGeneration.getResultType(attr.dataType))
.nullable(s"${attr.name.toUpperCase()}#${attr.exprId.id}",
CodeGeneration.getResultType(attr.dataType))
if (outputFieldList.indexOf(field) == -1) {
throw new UnsupportedOperationException(
s"CashedRelation not found ${attr.name}#${attr.exprId.id} in ${outputAttributes}")
throw new UnsupportedOperationException(s"CachedRelation not found" +
s"${attr.name.toUpperCase()}#${attr.exprId.id} in ${outputAttributes}")
}
field
});
})

val key_args_node = TreeBuilder.makeFunction(
"key_field",
Expand Down Expand Up @@ -424,7 +426,6 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
iter
}
}

idx += 1
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
Row("b", 2, 4, 8)))
}

ignore("null inputs") {
test("null inputs") {
val df = Seq(("a", 1), ("a", 1), ("a", 2), ("a", 2), ("b", 4), ("b", 3), ("b", 2))
.toDF("key", "value")
val window = Window.orderBy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,23 +169,15 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper
"postgreSQL/window_part3.sql", // WindowSortKernel::Impl::GetCompFunction_
"subquery/in-subquery/in-joins.sql", // NullPointerException
"udf/postgreSQL/udf-aggregates_part1.sql", // IllegalStateException: Value at index is null
"subquery/exists-subquery/exists-within-and-or.sql",
"postgreSQL/boolean.sql",
"postgreSQL/select_distinct.sql",
"postgreSQL/window_part1.sql",
"postgreSQL/union.sql",
"udf/udf-group-analytics.sql",

/** Cannot reproduce */

// "cte-nonlegacy.sql",
"subquery/exists-subquery/exists-cte.sql", // segfault: arrow::RecordBatch::num_columns()
// "subquery/exists-subquery/exists-basic.sql",
// "subquery/exists-subquery/exists-orderby-limit.sql",
"subquery/exists-subquery/exists-joins-and-set-ops.sql",
// "subquery/exists-subquery/exists-joins-and-set-ops.sql",

/** incorrect result */
"count.sql", // interrupted by signal 9: SIGKILL
// "subquery/in-subquery/in-set-operations.sql",
// "subquery/in-subquery/in-order-by.sql",
// "postgreSQL/join.sql",
// "udf/postgreSQL/udf-join.sql",
// "udf/udf-window.sql",
Expand Down Expand Up @@ -297,7 +289,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper
}
/** To run only the set test */
// if (testList.exists(t =>
// testCase.name.toLowerCase(Locale.ROOT).contains(t.toLowerCase(Locale.ROOT)))) {
// testCase.name.toLowerCase(Locale.ROOT).contains(t.toLowerCase(Locale.ROOT)))) {
// test(testCase.name) {
// runTest(testCase)
// }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ trait SharedSparkSessionBase
with Eventually { self: Suite =>

protected def sparkConf = {
val zoneID = "GMT-8"
val locale = Locale.US
val zoneID = "UTC"
val locale = Locale.ROOT
TimeZone.setDefault(TimeZone.getTimeZone(zoneID))
Locale.setDefault(locale)

Expand All @@ -90,7 +90,7 @@ trait SharedSparkSessionBase
.set("spark.memory.offHeap.size", "10g")
.set("spark.sql.join.preferSortMergeJoin", "false")
.set("spark.unsafe.exceptionOnMemoryLeak", "false")
.set("spark.oap.sql.columnar.tmp_dir", "/codegen/nativesql/")
// .set("spark.oap.sql.columnar.tmp_dir", "/codegen/nativesql/")
.set("spark.oap.sql.columnar.preferColumnar", "true")
.set("spark.sql.parquet.enableVectorizedReader", "false")
.set("spark.sql.orc.enableVectorizedReader", "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,18 @@ std::string GetTemplateString(std::shared_ptr<arrow::DataType> type,
}
}

bool StrCmpCaseInsensitive(const std::string& str1, const std::string& str2) {
auto left_str = str1;
auto right_str = str2;
std::transform(left_str.begin(), left_str.end(), left_str.begin(), ::toupper);
std::transform(right_str.begin(), right_str.end(), right_str.begin(), ::toupper);
if (left_str == right_str) {
return true;
} else {
return false;
}
}

std::string GetParameterList(std::vector<std::string> parameter_list_in, bool comma_ahead,
std::string split) {
std::vector<std::string> parameter_list;
Expand Down Expand Up @@ -430,7 +442,7 @@ arrow::Status GetIndexList(const std::vector<std::shared_ptr<arrow::Field>>& tar
int i = 0;
found = false;
for (auto field : source_list) {
if (key_field->name() == field->name()) {
if (StrCmpCaseInsensitive(key_field->name(), field->name())) {
found = true;
break;
}
Expand All @@ -457,7 +469,7 @@ arrow::Status GetIndexList(
i = 0;
found = false;
for (auto field : left_field_list) {
if (target_field->name() == field->name()) {
if (StrCmpCaseInsensitive(target_field->name(), field->name())) {
(*result_schema_index_list).push_back(std::make_pair(0, i));
found = true;
break;
Expand All @@ -467,7 +479,7 @@ arrow::Status GetIndexList(
if (found == true) continue;
i = 0;
for (auto field : right_field_list) {
if (target_field->name() == field->name()) {
if (StrCmpCaseInsensitive(target_field->name(), field->name())) {
(*result_schema_index_list).push_back(std::make_pair(1, i));
found = true;
right_found++;
Expand All @@ -485,13 +497,31 @@ arrow::Status GetIndexList(
return arrow::Status::OK();
}

std::vector<int> GetIndicesFromSchemaCaseInsensitive(
const std::shared_ptr<arrow::Schema>& result_schema, const std::string& field_name) {
auto fields = result_schema->fields();
std::vector<std::shared_ptr<arrow::Field>> upper_fields;
for (auto field : fields) {
auto upper_field_name = field->name();
std::transform(upper_field_name.begin(), upper_field_name.end(),
upper_field_name.begin(), ::toupper);
auto upper_field = arrow::field(upper_field_name, field->type());
upper_fields.push_back(upper_field);
}
std::shared_ptr<arrow::Schema> upper_shema =
std::make_shared<arrow::Schema>(upper_fields);
auto upper_name = field_name;
std::transform(upper_name.begin(), upper_name.end(), upper_name.begin(), ::toupper);
return upper_shema->GetAllFieldIndices(upper_name);
}

arrow::Status GetIndexListFromSchema(
const std::shared_ptr<arrow::Schema>& result_schema,
const std::vector<std::shared_ptr<arrow::Field>>& field_list,
std::vector<int>* index_list) {
int i = 0;
for (auto field : field_list) {
auto indices = result_schema->GetAllFieldIndices(field->name());
auto indices = GetIndicesFromSchemaCaseInsensitive(result_schema, field->name());
if (indices.size() >= 1) {
(*index_list).push_back(i);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ std::string GetTypeString(std::shared_ptr<arrow::DataType> type,
std::string GetTemplateString(std::shared_ptr<arrow::DataType> type,
std::string template_name, std::string tail = "",
std::string prefix = "");
bool StrCmpCaseInsensitive(const std::string& str1, const std::string& str2);
gandiva::ExpressionPtr GetConcatedKernel(std::vector<gandiva::NodePtr> key_list);
gandiva::ExpressionPtr GetHash32Kernel(std::vector<gandiva::NodePtr> key_list);
gandiva::ExpressionPtr GetHash32Kernel(std::vector<gandiva::NodePtr> key_list,
Expand All @@ -72,6 +73,8 @@ arrow::Status GetIndexList(
const std::vector<std::shared_ptr<arrow::Field>>& right_field_list,
const bool isExistJoin, int* exist_index,
std::vector<std::pair<int, int>>* result_schema_index_list);
std::vector<int> GetIndicesFromSchemaCaseInsensitive(
const std::shared_ptr<arrow::Schema>& result_schema, const std::string& field_name);
arrow::Status GetIndexListFromSchema(
const std::shared_ptr<arrow::Schema>& result_schema,
const std::vector<std::shared_ptr<arrow::Field>>& field_list,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,9 @@ class CachedRelationKernel::Impl {
result_type_(result_type) {
pool_ = ctx_->memory_pool();
for (auto field : key_field_list) {
auto indices = result_schema->GetAllFieldIndices(field->name());
auto indices = GetIndicesFromSchemaCaseInsensitive(result_schema, field->name());
if (indices.size() != 1) {
std::cout << "[ERROR] SortArraysToIndicesKernel::Impl can't find key "
std::cout << "[ERROR] CachedRelationKernel::Impl can't find key "
<< field->ToString() << " from " << result_schema->ToString()
<< std::endl;
throw;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class SortArraysToIndicesKernel::Impl {
std::cout << "use SortArraysToIndicesKernel::Impl" << std::endl;
#endif
for (auto field : key_field_list) {
auto indices = result_schema->GetAllFieldIndices(field->name());
auto indices = GetIndicesFromSchemaCaseInsensitive(result_schema, field->name());
if (indices.size() != 1) {
std::cout << "[ERROR] SortArraysToIndicesKernel::Impl can't find key "
<< field->ToString() << " from " << result_schema->ToString()
Expand Down Expand Up @@ -1208,7 +1208,8 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl {
#ifdef DEBUG
std::cout << "UseSortOnekeyKernel" << std::endl;
#endif
auto indices = result_schema->GetAllFieldIndices(key_field_list[0]->name());
auto indices =
GetIndicesFromSchemaCaseInsensitive(result_schema, key_field_list[0]->name());
if (indices.size() < 1) {
std::cout << "[ERROR] SortOnekeyKernel for arithmetic can't find key "
<< key_field_list[0]->ToString() << " from " << result_schema->ToString()
Expand Down Expand Up @@ -1547,7 +1548,7 @@ class SortMultiplekeyKernel : public SortArraysToIndicesKernel::Impl {
std::cout << "UseSortMultiplekeyKernel" << std::endl;
#endif
for (auto field : key_field_list) {
auto indices = result_schema->GetAllFieldIndices(field->name());
auto indices = GetIndicesFromSchemaCaseInsensitive(result_schema, field->name());
if (indices.size() != 1) {
std::cout << "[ERROR] SortArraysToIndicesKernel::Impl can't find key "
<< field->ToString() << " from " << result_schema->ToString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class WindowSortKernel::Impl {
std::shared_ptr<arrow::Schema> result_schema, bool nulls_first, bool asc)
: ctx_(ctx), nulls_first_(nulls_first), asc_(asc) {
for (auto field : key_field_list) {
auto indices = result_schema->GetAllFieldIndices(field->name());
auto indices = GetIndicesFromSchemaCaseInsensitive(result_schema, field->name());
if (indices.size() != 1) {
std::cout << "[ERROR] WindowSortKernel::Impl can't find key " << field->ToString()
<< " from " << result_schema->ToString() << std::endl;
Expand Down Expand Up @@ -519,7 +519,8 @@ class WindowSortOnekeyKernel : public WindowSortKernel::Impl {
std::shared_ptr<arrow::Schema> result_schema, bool nulls_first,
bool asc)
: ctx_(ctx), nulls_first_(nulls_first), asc_(asc), result_schema_(result_schema) {
auto indices = result_schema->GetAllFieldIndices(key_field_list[0]->name());
auto indices =
GetIndicesFromSchemaCaseInsensitive(result_schema, key_field_list[0]->name());
key_id_ = indices[0];
col_num_ = result_schema->num_fields();
}
Expand Down
4 changes: 1 addition & 3 deletions native-sql-engine/tools/run_ut.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ then
else
echo "SPARK_HOME is $spark_home"
fi

mvn clean test -am -pl native-sql-engine/core -Dbuild_arrow=OFF -Dbuild_protobuf=OFF -DfailIfNoTests=false -Dmaven.test.failure.ignore=true -DargLine="-Dspark.test.home=$spark_home" -DargLine="-Dspark.testing=true" &> native-sql-engine/tools/log-file.log

mvn clean test -P full-scala-compiler -am -pl native-sql-engine/core -Dbuild_arrow=OFF -Dbuild_protobuf=OFF -DfailIfNoTests=false -DargLine="-Dspark.test.home=$spark_home" -Dexec.skip=true -Dmaven.test.failure.ignore=true &> native-sql-engine/tools/log-file.log
cd native-sql-engine/tools/
tests_total=0
module_tested=0
Expand Down

0 comments on commit 8d2d2ae

Please sign in to comment.