Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-207] Fix NaN in Max and Min #495

Merged
merged 7 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class ColumnarHashAggregation(

var inputAttrQueue: scala.collection.mutable.Queue[Attribute] = _
val resultType = CodeGeneration.getResultType()
val NaN_check : Boolean = GazellePluginConfig.getConf.enableColumnarNaNCheck

def getColumnarFuncNode(expr: Expression): TreeNode = {
try {
Expand Down Expand Up @@ -181,7 +182,8 @@ class ColumnarHashAggregation(
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
TreeBuilder.makeFunction("action_max", childrenColumnarFuncNodeList.asJava, resultType)
val actionName = "action_max" + s"_$NaN_check"
TreeBuilder.makeFunction(actionName, childrenColumnarFuncNodeList.asJava, resultType)
case Min(_) =>
val childrenColumnarFuncNodeList =
mode match {
Expand All @@ -192,7 +194,8 @@ class ColumnarHashAggregation(
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
TreeBuilder.makeFunction("action_min", childrenColumnarFuncNodeList.asJava, resultType)
val actionName = "action_min" + s"_$NaN_check"
TreeBuilder.makeFunction(actionName, childrenColumnarFuncNodeList.asJava, resultType)
case StddevSamp(_, _) =>
mode match {
case Partial =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ case class ColumnarCollapseCodegenStages(
plan: ColumnarConditionProjectExec,
skip_smj: Boolean = false): SparkPlan = plan.child match {
case p: ColumnarBroadcastHashJoinExec
if plan.condition == null && !containsExpression(plan.projectList) =>
if plan.condition == null && plan.projectList == null =>
ColumnarBroadcastHashJoinExec(
p.leftKeys,
p.rightKeys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ import java.io.FileInputStream
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkConf, SparkFunSuite}

class ArrowColumnarBatchSerializerSuite extends SparkFunSuite with SharedSparkSession {

protected var avgBatchNumRows: SQLMetric = _
protected var outputNumRows: SQLMetric = _

override def sparkConf: SparkConf =
super.sparkConf
.set("spark.shuffle.compress", "false")

override def beforeEach() = {
avgBatchNumRows = SQLMetrics.createAverageMetric(
spark.sparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import java.net.URI
import java.nio.file.{Files, StandardOpenOption}
import java.util.Locale

import scala.collection.mutable
import com.intel.oap.execution.{ColumnarBroadcastHashJoinExec, ColumnarSortMergeJoinExec}

import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{LocalFileSystem, Path}

import org.apache.spark.SparkException
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT}
Expand Down Expand Up @@ -708,21 +708,21 @@ class FileBasedDataSourceSuite extends QueryTest
val joinedDF = df1FromFile.join(df2FromFile, Seq("count"))
if (compressionFactor == 0.5) {
val bJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case bJoin: BroadcastHashJoinExec => bJoin
case bJoin: ColumnarBroadcastHashJoinExec => bJoin
}
assert(bJoinExec.nonEmpty)
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case smJoin: SortMergeJoinExec => smJoin
case smJoin: ColumnarSortMergeJoinExec => smJoin
}
assert(smJoinExec.isEmpty)
} else {
// compressionFactor is 1.0
val bJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case bJoin: BroadcastHashJoinExec => bJoin
case bJoin: ColumnarBroadcastHashJoinExec => bJoin
}
assert(bJoinExec.isEmpty)
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case smJoin: SortMergeJoinExec => smJoin
case smJoin: ColumnarSortMergeJoinExec => smJoin
}
assert(smJoinExec.nonEmpty)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package org.apache.spark.sql

import java.util.Locale

import com.intel.oap.execution.{ColumnarSortExec, ColumnarSortMergeJoinExec}

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

import org.mockito.Mockito._

import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
Expand Down Expand Up @@ -866,14 +866,14 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}
val executed = df.queryExecution.executedPlan
val executedJoins = collect(executed) {
case j: SortMergeJoinExec => j
case j: ColumnarSortMergeJoinExec => j
}
// This only applies to the above tested queries, in which a child SortMergeJoin always
// contains the SortOrder required by its parent SortMergeJoin. Thus, SortExec should never
// appear as parent of SortMergeJoin.
executed.foreach {
case s: SortExec => s.foreach {
case j: SortMergeJoinExec => fail(
case s: ColumnarSortExec => s.foreach {
case j: ColumnarSortMergeJoinExec => fail(
s"No extra sort should be added since $j already satisfies the required ordering"
)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,7 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}

test("SPARK-23957 Remove redundant sort from subquery plan(scalar subquery)") {
ignore("SPARK-23957 Remove redundant sort from subquery plan(scalar subquery)") {
withTempView("t1", "t2", "t3") {
Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1")
Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")
Expand Down
Loading