diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala index a1816a889..757471bba 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala @@ -118,6 +118,14 @@ class GazellePluginConfig(conf: SQLConf) extends Logging { val enableColumnarArrowUDF: Boolean = conf.getConfString("spark.oap.sql.columnar.arrowudf", "true").toBoolean && enableCpu + // enable or disable columnar columnar arrow udf + val enableColumnarLocalLimit: Boolean = + conf.getConfString("spark.oap.sql.columnar.locallimit", "true").toBoolean && enableCpu + + // enable or disable columnar columnar arrow udf + val enableColumnarGlobalLimit: Boolean = + conf.getConfString("spark.oap.sql.columnar.globallimit", "true").toBoolean && enableCpu + // enable or disable columnar wholestagecodegen val enableColumnarWholeStageCodegen: Boolean = conf.getConfString("spark.oap.sql.columnar.wholestagecodegen", "true").toBoolean && enableCpu diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala index 8b72fcf08..62f79775e 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala @@ -23,6 +23,7 @@ import com.intel.oap.vectorized._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution._ @@ -308,3 +309,146 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan { throw new UnsupportedOperationException(s"This operator doesn't support doExecute().") } } + +//TODO(): consolidate locallimit and globallimit +case class ColumnarLocalLimitExec(limit: Int, child: SparkPlan) extends LimitExec { + // updating nullability to make all the children consistent + + buildCheck() + + def buildCheck(): Unit = { + for (child <- children) { + for (schema <- child.schema) { + try { + ConverterUtils.checkIfTypeSupported(schema.dataType) + } catch { + case e: UnsupportedOperationException => + throw new UnsupportedOperationException( + s"${schema.dataType} is not supported in ColumnarLocalLimitExec") + } + } + } + } + + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def supportsColumnar = true + override def output: Seq[Attribute] = child.output + protected override def doExecuteColumnar(): RDD[ColumnarBatch] = { + child.executeColumnar().mapPartitions { iter => + val hasInput = iter.hasNext + val res = if (hasInput) { + new Iterator[ColumnarBatch] { + var rowCount = 0 + override def hasNext: Boolean = { + val hasNext = iter.hasNext + hasNext && (rowCount <= limit) + } + + override def next(): ColumnarBatch = { + + if (!hasNext) { + throw new NoSuchElementException("End of ColumnarBatch iterator") + } + + if (rowCount < limit) { + val delta = iter.next() + val preRowCount = rowCount + rowCount += delta.numRows + if (rowCount > limit) { + val newSize = limit - preRowCount + delta.setNumRows(newSize) + } + delta + } else { + throw new NoSuchElementException("End of ColumnarBatch iterator") + } + } + } + } else { + Iterator.empty + } + new CloseableColumnBatchIterator(res) + } + } + + protected override def doExecute() + : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = { + throw new UnsupportedOperationException(s"This operator doesn't support doExecute().") + } + +} + +case class ColumnarGlobalLimitExec(limit: Int, child: SparkPlan) extends LimitExec { + // updating nullability to make all the children consistent + + buildCheck() + + def buildCheck(): Unit = { + for (child <- children) { + for (schema <- child.schema) { + try { + ConverterUtils.checkIfTypeSupported(schema.dataType) + } catch { + case e: UnsupportedOperationException => + throw new UnsupportedOperationException( + s"${schema.dataType} is not supported in ColumnarGlobalLimitExec") + } + } + } + } + + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning + override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + + override def supportsColumnar = true + override def output: Seq[Attribute] = child.output + protected override def doExecuteColumnar(): RDD[ColumnarBatch] = { + child.executeColumnar().mapPartitions { iter => + val hasInput = iter.hasNext + val res = if (hasInput) { + new Iterator[ColumnarBatch] { + var rowCount = 0 + override def hasNext: Boolean = { + val hasNext = iter.hasNext + hasNext && (rowCount <= limit) + } + + override def next(): ColumnarBatch = { + + if (!hasNext) { + throw new NoSuchElementException("End of ColumnarBatch iterator") + } + + if (rowCount < limit) { + val delta = iter.next() + val preRowCount = rowCount + rowCount += delta.numRows + if (rowCount > limit) { + val newSize = limit - preRowCount + delta.setNumRows(newSize) + } + delta + } else { + throw new NoSuchElementException("End of ColumnarBatch iterator") + } + } + } + } else { + Iterator.empty + } + new CloseableColumnBatchIterator(res) + } + } + + protected override def doExecute() + : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = { + throw new UnsupportedOperationException(s"This operator doesn't support doExecute().") + } +} \ No newline at end of file diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index 6a7f5b641..d123b563b 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -107,6 +107,14 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { val children = plan.children.map(replaceWithColumnarPlan) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") ColumnarUnionExec(children) + case plan: LocalLimitExec => + val child = replaceWithColumnarPlan(plan.child) + logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") + ColumnarLocalLimitExec(plan.limit, child) + case plan: GlobalLimitExec => + val child = replaceWithColumnarPlan(plan.child) + logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") + ColumnarGlobalLimitExec(plan.limit, child) case plan: ExpandExec => val child = replaceWithColumnarPlan(plan.child) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala index d19c12e38..5da11f660 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala @@ -61,6 +61,8 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] { val enableColumnarBroadcastExchange = columnarConf.enableColumnarBroadcastExchange val enableColumnarBroadcastJoin = columnarConf.enableColumnarBroadcastJoin val enableColumnarArrowUDF = columnarConf.enableColumnarArrowUDF + val enableColumnarLocalLimit = columnarConf.enableColumnarLocalLimit + val enableColumnarGlobalLimit = columnarConf.enableColumnarGlobalLimit private def tryConvertToColumnar(plan: SparkPlan): Boolean = { try { @@ -101,6 +103,12 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] { case plan: UnionExec => if (!enableColumnarUnion) return false new ColumnarUnionExec(plan.children) + case plan: LocalLimitExec => + if (!enableColumnarLocalLimit) return false + new ColumnarLocalLimitExec(plan.limit, plan.child) + case plan: GlobalLimitExec => + if (!enableColumnarGlobalLimit) return false + new ColumnarGlobalLimitExec(plan.limit, plan.child) case plan: ExpandExec => if (!enableColumnarExpand) return false new ColumnarExpandExec(plan.projections, plan.output, plan.child) diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index ed284df10..962f273b1 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -31,6 +31,8 @@ import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.scalatest.time.SpanSugar._ +import com.intel.oap.execution.ColumnarLocalLimitExec + import org.apache.spark.{SparkConf, SparkContext, TaskContext, TestUtils} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql._ @@ -1096,7 +1098,7 @@ class StreamSuite extends StreamTest { require(execPlan != null) val localLimits = execPlan.collect { - case l: LocalLimitExec => l + case l: ColumnarLocalLimitExec => l case l: StreamingLocalLimitExec => l } @@ -1110,7 +1112,7 @@ class StreamSuite extends StreamTest { s"Local limit was not StreamingLocalLimitExec:\n$execPlan") } else { assert( - localLimits.head.isInstanceOf[LocalLimitExec], + localLimits.head.isInstanceOf[ColumnarLocalLimitExec], s"Local limit was not LocalLimitExec:\n$execPlan") } }