Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-207] fix scala unit tests (#393)
Browse files Browse the repository at this point in the history
* fix string concat w/ null

* add build check for subquery

* fix local function

* fix aggregate

* fix sort by literal

* fix WSCG rules and refine

* fix timestamp, date functions and refine

* fix WSCG rules

* fix condProj

* fix segfault in SMJ

* refine
  • Loading branch information
rui-mo authored Jul 15, 2021
1 parent 8f4f1cb commit dae4d82
Show file tree
Hide file tree
Showing 89 changed files with 1,064 additions and 403 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/unittests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ jobs:
cd arrow-data-source
mvn clean install -DskipTests -Dbuild_arrow=OFF
cd ..
mvn clean package -P full-scala-compiler -Phadoop3.2 -am -pl native-sql-engine/core -DskipTests -Dbuild_arrow=OFF
mvn clean package -P full-scala-compiler -Phadoop-3.2 -am -pl native-sql-engine/core -DskipTests -Dbuild_arrow=OFF
mvn test -P full-scala-compiler -DmembersOnlySuites=org.apache.spark.sql.nativesql -am -DfailIfNoTests=false -Dexec.skip=true -DargLine="-Dspark.test.home=/tmp/spark-3.1.1-bin-hadoop3.2" &> log-file.log
echo '#!/bin/bash' > grep.sh
echo "module_tested=0; module_should_test=8; tests_total=0; while read -r line; do num=\$(echo \"\$line\" | grep -o -E '[0-9]+'); tests_total=\$((tests_total+num)); done <<<\"\$(grep \"Total number of tests run:\" log-file.log)\"; succeed_total=0; while read -r line; do [[ \$line =~ [^0-9]*([0-9]+)\, ]]; num=\${BASH_REMATCH[1]}; succeed_total=\$((succeed_total+num)); let module_tested++; done <<<\"\$(grep \"succeeded\" log-file.log)\"; if test \$tests_total -eq \$succeed_total -a \$module_tested -eq \$module_should_test; then echo \"All unit tests succeed\"; else echo \"Unit tests failed\"; exit 1; fi" >> grep.sh
Expand Down
42 changes: 36 additions & 6 deletions native-sql-engine/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,6 @@
<version>3.1.0.0-RC2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.11.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.seleniumhq.selenium</groupId>
<artifactId>selenium-htmlunit-driver</artifactId>
Expand Down Expand Up @@ -264,6 +258,42 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>2.7.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.18</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.ibm.db2.jcc</groupId>
<artifactId>db2jcc</artifactId>
<version>db2jcc4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>9.2.1.jre8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>spark-sql-columnar-shims-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
plan.orderSpec,
plan.child)
window
case plan: CoalesceExec =>
ColumnarCoalesceExec(plan.numPartitions, plan.child)
case p =>
p
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.intel.oap.execution

import com.intel.oap.expression.ConverterUtils
import org.apache.spark.Partition
import org.apache.spark.SparkContext
import org.apache.spark.TaskContext
Expand All @@ -36,6 +37,20 @@ case class ColumnarCoalesceExec(numPartitions: Int, child: SparkPlan) extends Un
override def supportsColumnar: Boolean = true
override def output: Seq[Attribute] = child.output

buildCheck()

def buildCheck(): Unit = {
for (attr <- output) {
try {
ConverterUtils.checkIfTypeSupported(attr.dataType)
} catch {
case e: UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${attr.dataType} is not supported in ColumnarCoalesceExec.")
}
}
}

override def outputPartitioning: Partitioning = {
if (numPartitions == 1) SinglePartition
else UnknownPartitioning(numPartitions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ case class ColumnarHashAggregateExec(
*/
var skip_count = false
var skip_native = false
var skip_grouping = false
var onlyResExpr = false
var emptyInput = false
var count_num_row = 0
Expand All @@ -172,28 +171,21 @@ case class ColumnarHashAggregateExec(
ConverterUtils.releaseArrowRecordBatch(input_rb)
} else {
// Special case for no input batch
if (aggregateExpressions.nonEmpty) {
if (aggregateExpressions.head
.aggregateFunction.children.head.isInstanceOf[Literal]) {
// This is a special case used by literal aggregation
skip_native = true
breakable{
for (exp <- aggregateExpressions) {
if (exp.aggregateFunction.isInstanceOf[Count]) {
skip_count = true
count_num_row += cb.numRows
break
}
if ((aggregateExpressions.nonEmpty && aggregateExpressions.head
.aggregateFunction.children.head.isInstanceOf[Literal]) ||
(groupingExpressions.nonEmpty && groupingExpressions.head.children.nonEmpty &&
groupingExpressions.head.children.head.isInstanceOf[Literal])) {
// This is a special case used by literal aggregation
skip_native = true
breakable{
for (exp <- aggregateExpressions) {
if (exp.aggregateFunction.isInstanceOf[Count]) {
skip_count = true
count_num_row += cb.numRows
break
}
}
}
} else {
// This is a special case used by grouping literal
if (groupingExpressions.nonEmpty && groupingExpressions.head.children.nonEmpty &&
groupingExpressions.head.children.head.isInstanceOf[Literal]) {
skip_grouping = true
skip_native = true
}
}
}
eval_elapse += System.nanoTime() - beforeEval
Expand Down Expand Up @@ -223,12 +215,9 @@ case class ColumnarHashAggregateExec(
override def next(): ColumnarBatch = {
if (!processed) process
val beforeEval = System.nanoTime()
if (skip_grouping) {
// special handling for literal grouping
getResForGroupingLiteral
} else if (skip_native) {
// special handling for literal aggregation
getResForAggregateLiteral
if (skip_native) {
// special handling for literal aggregation and grouping
getResForAggregateAndGroupingLiteral
} else if (onlyResExpr) {
// special handling for only result expressions
getResForOnlyResExpr
Expand Down Expand Up @@ -287,10 +276,15 @@ case class ColumnarHashAggregateExec(
}
}
}
def getResForAggregateLiteral: ColumnarBatch = {
def getResForAggregateAndGroupingLiteral: ColumnarBatch = {
val resultColumnVectors =
ArrowWritableColumnVector.allocateColumns(0, resultStructType)
ArrowWritableColumnVector.allocateColumns(1, resultStructType)
var idx = 0
for (exp <- groupingExpressions) {
val out_res = exp.children.head.asInstanceOf[Literal].value
putDataIntoVector(resultColumnVectors, out_res, idx)
idx += 1
}
for (exp <- aggregateExpressions) {
val mode = exp.mode
val aggregateFunc = exp.aggregateFunction
Expand Down Expand Up @@ -359,17 +353,6 @@ case class ColumnarHashAggregateExec(
new ColumnarBatch(
resultColumnVectors.map(_.asInstanceOf[ColumnVector]), 1)
}
def getResForGroupingLiteral: ColumnarBatch = {
val resultColumnVectors =
ArrowWritableColumnVector.allocateColumns(0, resultStructType)
for (idx <- groupingExpressions.indices) {
val out_res =
groupingExpressions(idx).children.head.asInstanceOf[Literal].value
putDataIntoVector(resultColumnVectors, out_res, idx)
}
new ColumnarBatch(
resultColumnVectors.map(_.asInstanceOf[ColumnVector]), 1)
}
def getResForOnlyResExpr: ColumnarBatch = {
// This function has limited support for only-result-expression case.
// Fake input for projection:
Expand All @@ -384,13 +367,15 @@ case class ColumnarHashAggregateExec(
1)
}
def getResForEmptyInput: ColumnarBatch = {
val resultColumnVectors =
ArrowWritableColumnVector.allocateColumns(0, resultStructType)
if (aggregateExpressions.isEmpty) {
// To align with spark, in this case, one empty row is returned.
val resultColumnVectors =
ArrowWritableColumnVector.allocateColumns(0, resultStructType)
return new ColumnarBatch(
resultColumnVectors.map(_.asInstanceOf[ColumnVector]), 1)
}
val resultColumnVectors =
ArrowWritableColumnVector.allocateColumns(1, resultStructType)
// If groupby is not required, for Final mode, a default value will be
// returned if input is empty.
var idx = 0
Expand Down Expand Up @@ -509,7 +494,7 @@ case class ColumnarHashAggregateExec(
for (index <- aggBufferAttr.indices) {
val attr = ConverterUtils.getAttrFromExpr(aggBufferAttr(index))
if (supportedTypes.indexOf(attr.dataType) == -1 &&
!attr.dataType.isInstanceOf[DecimalType]) {
!attr.dataType.isInstanceOf[DecimalType]) {
throw new UnsupportedOperationException(
s"${attr.dataType} is not supported in Columnar Average")
}
Expand All @@ -530,7 +515,7 @@ case class ColumnarHashAggregateExec(
val aggBufferAttr = sum.inputAggBufferAttributes
val attr = ConverterUtils.getAttrFromExpr(aggBufferAttr.head)
if (supportedTypes.indexOf(attr.dataType) == -1 &&
!attr.dataType.isInstanceOf[DecimalType]) {
!attr.dataType.isInstanceOf[DecimalType]) {
throw new UnsupportedOperationException(
s"${attr.dataType} is not supported in Columnar Sum")
}
Expand Down Expand Up @@ -584,7 +569,7 @@ case class ColumnarHashAggregateExec(
val aggBufferAttr = min.inputAggBufferAttributes
val attr = ConverterUtils.getAttrFromExpr(aggBufferAttr.head)
if (supportedTypes.indexOf(attr.dataType) == -1 &&
!attr.dataType.isInstanceOf[DecimalType]) {
!attr.dataType.isInstanceOf[DecimalType]) {
throw new UnsupportedOperationException(
s"${attr.dataType} is not supported in Columnar Min")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
import org.apache.spark.util.{ExecutorManager, UserAddedJarUtils, Utils}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.sql.types.DecimalType

import scala.util.control.Breaks.{break, breakable}


/**
* Columnar Based SortExec.
Expand Down Expand Up @@ -106,6 +108,16 @@ case class ColumnarSortExec(
})
}

var allLiteral = true
breakable {
for (expr <- sortOrder) {
if (!expr.child.isInstanceOf[Literal]) {
allLiteral = false
break
}
}
}

/***************** WSCG related function ******************/
override def inputRDDs(): Seq[RDD[ColumnarBatch]] = child match {
case c: ColumnarCodegenSupport if c.supportColumnarCodegen == true =>
Expand Down Expand Up @@ -156,8 +168,9 @@ case class ColumnarSortExec(
/***********************************************************/
def getCodeGenSignature =
if (sortOrder.exists(expr =>
bindReference(ConverterUtils.getAttrFromExpr(expr.child), child.output, true)
.isInstanceOf[BoundReference])) {
!expr.child.isInstanceOf[Literal] &&
bindReference(ConverterUtils.getAttrFromExpr(expr.child), child.output, true)
.isInstanceOf[BoundReference])) {
ColumnarSorter.prebuild(
sortOrder,
child.output,
Expand Down Expand Up @@ -193,6 +206,9 @@ case class ColumnarSortExec(
val hasInput = iter.hasNext
val res = if (!hasInput) {
Iterator.empty
} else if (allLiteral) {
// If sortOrder are all Literal, no need to do sorting.
new CloseableColumnBatchIterator(iter)
} else {
ColumnarPluginConfig.getConf
val execTempDir = ColumnarPluginConfig.getTempFile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I

override def doCodeGen: ColumnarCodegenContext = {
val childCtx = child.asInstanceOf[ColumnarCodegenSupport].doCodeGen
if (childCtx == null)
if (childCtx == null) {
throw new NullPointerException(s"ColumnarWSCG can't doCodeGen on ${child}")
}
val wholeStageCodeGenNode = TreeBuilder.makeFunction(
s"wholestagecodegen",
Lists.newArrayList(childCtx.root),
Expand Down Expand Up @@ -190,18 +191,20 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
outputAttributes: Seq[Attribute]): TreeNode = {
val outputFieldList: List[Field] = outputAttributes.toList.map(attr => {
Field
.nullable(s"${attr.name}#${attr.exprId.id}", CodeGeneration.getResultType(attr.dataType))
.nullable(s"${attr.name.toUpperCase()}#${attr.exprId.id}",
CodeGeneration.getResultType(attr.dataType))
})

val keyFieldList: List[Field] = keyAttributes.toList.map(attr => {
val field = Field
.nullable(s"${attr.name}#${attr.exprId.id}", CodeGeneration.getResultType(attr.dataType))
.nullable(s"${attr.name.toUpperCase()}#${attr.exprId.id}",
CodeGeneration.getResultType(attr.dataType))
if (outputFieldList.indexOf(field) == -1) {
throw new UnsupportedOperationException(
s"CashedRelation not found ${attr.name}#${attr.exprId.id} in ${outputAttributes}")
throw new UnsupportedOperationException(s"CachedRelation not found" +
s"${attr.name.toUpperCase()}#${attr.exprId.id} in ${outputAttributes}")
}
field
});
})

val key_args_node = TreeBuilder.makeFunction(
"key_field",
Expand Down Expand Up @@ -423,7 +426,6 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
iter
}
}

idx += 1
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ColumnarConcat(exps: Seq[Expression], original: Expression)
exp.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)

val resultType = new ArrowType.Utf8()
val funcNode = TreeBuilder.makeFunction("concat",
val funcNode = TreeBuilder.makeFunction("concatOperator",
Lists.newArrayList(exp_node, rightNode(args, exps, iter, iterFaster)), resultType)
(funcNode, expType)
}
Expand All @@ -73,7 +73,7 @@ class ColumnarConcat(exps: Seq[Expression], original: Expression)
val (exp_node, expType): (TreeNode, ArrowType) =
exp.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val resultType = new ArrowType.Utf8()
val funcNode = TreeBuilder.makeFunction("concat",
val funcNode = TreeBuilder.makeFunction("concatOperator",
Lists.newArrayList(exp_node, rightNode(args, exps, iter, iterFaster)), resultType)
funcNode
}
Expand Down
Loading

0 comments on commit dae4d82

Please sign in to comment.