Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-574]implement columnar limit (#575)
Browse files Browse the repository at this point in the history
* implement columnar limit

Signed-off-by: Yuan Zhou <[email protected]>

* adding configuration for locallimit/globallimit

Signed-off-by: Yuan Zhou <[email protected]>

* fix

Signed-off-by: Yuan Zhou <[email protected]>

* fix

Signed-off-by: Yuan Zhou <[email protected]>

* fix unit test

Signed-off-by: Yuan Zhou <[email protected]>
  • Loading branch information
zhouyuan authored Nov 26, 2021
1 parent 52ecd29 commit dcdc459
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ class GazellePluginConfig(conf: SQLConf) extends Logging {
val enableColumnarArrowUDF: Boolean =
conf.getConfString("spark.oap.sql.columnar.arrowudf", "true").toBoolean && enableCpu

// enable or disable columnar columnar arrow udf
val enableColumnarLocalLimit: Boolean =
conf.getConfString("spark.oap.sql.columnar.locallimit", "true").toBoolean && enableCpu

// enable or disable columnar columnar arrow udf
val enableColumnarGlobalLimit: Boolean =
conf.getConfString("spark.oap.sql.columnar.globallimit", "true").toBoolean && enableCpu

// enable or disable columnar wholestagecodegen
val enableColumnarWholeStageCodegen: Boolean =
conf.getConfString("spark.oap.sql.columnar.wholestagecodegen", "true").toBoolean && enableCpu
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.intel.oap.vectorized._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution._
Expand Down Expand Up @@ -308,3 +309,146 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}
}

//TODO(): consolidate locallimit and globallimit
case class ColumnarLocalLimitExec(limit: Int, child: SparkPlan) extends LimitExec {
// updating nullability to make all the children consistent

buildCheck()

def buildCheck(): Unit = {
for (child <- children) {
for (schema <- child.schema) {
try {
ConverterUtils.checkIfTypeSupported(schema.dataType)
} catch {
case e: UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${schema.dataType} is not supported in ColumnarLocalLimitExec")
}
}
}
}


override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def outputPartitioning: Partitioning = child.outputPartitioning

override def supportsColumnar = true
override def output: Seq[Attribute] = child.output
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
child.executeColumnar().mapPartitions { iter =>
val hasInput = iter.hasNext
val res = if (hasInput) {
new Iterator[ColumnarBatch] {
var rowCount = 0
override def hasNext: Boolean = {
val hasNext = iter.hasNext
hasNext && (rowCount <= limit)
}

override def next(): ColumnarBatch = {

if (!hasNext) {
throw new NoSuchElementException("End of ColumnarBatch iterator")
}

if (rowCount < limit) {
val delta = iter.next()
val preRowCount = rowCount
rowCount += delta.numRows
if (rowCount > limit) {
val newSize = limit - preRowCount
delta.setNumRows(newSize)
}
delta
} else {
throw new NoSuchElementException("End of ColumnarBatch iterator")
}
}
}
} else {
Iterator.empty
}
new CloseableColumnBatchIterator(res)
}
}

protected override def doExecute()
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}

}

case class ColumnarGlobalLimitExec(limit: Int, child: SparkPlan) extends LimitExec {
// updating nullability to make all the children consistent

buildCheck()

def buildCheck(): Unit = {
for (child <- children) {
for (schema <- child.schema) {
try {
ConverterUtils.checkIfTypeSupported(schema.dataType)
} catch {
case e: UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${schema.dataType} is not supported in ColumnarGlobalLimitExec")
}
}
}
}


override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def outputPartitioning: Partitioning = child.outputPartitioning
override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil

override def supportsColumnar = true
override def output: Seq[Attribute] = child.output
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
child.executeColumnar().mapPartitions { iter =>
val hasInput = iter.hasNext
val res = if (hasInput) {
new Iterator[ColumnarBatch] {
var rowCount = 0
override def hasNext: Boolean = {
val hasNext = iter.hasNext
hasNext && (rowCount <= limit)
}

override def next(): ColumnarBatch = {

if (!hasNext) {
throw new NoSuchElementException("End of ColumnarBatch iterator")
}

if (rowCount < limit) {
val delta = iter.next()
val preRowCount = rowCount
rowCount += delta.numRows
if (rowCount > limit) {
val newSize = limit - preRowCount
delta.setNumRows(newSize)
}
delta
} else {
throw new NoSuchElementException("End of ColumnarBatch iterator")
}
}
}
} else {
Iterator.empty
}
new CloseableColumnBatchIterator(res)
}
}

protected override def doExecute()
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
val children = plan.children.map(replaceWithColumnarPlan)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarUnionExec(children)
case plan: LocalLimitExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarLocalLimitExec(plan.limit, child)
case plan: GlobalLimitExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarGlobalLimitExec(plan.limit, child)
case plan: ExpandExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
val enableColumnarBroadcastExchange = columnarConf.enableColumnarBroadcastExchange
val enableColumnarBroadcastJoin = columnarConf.enableColumnarBroadcastJoin
val enableColumnarArrowUDF = columnarConf.enableColumnarArrowUDF
val enableColumnarLocalLimit = columnarConf.enableColumnarLocalLimit
val enableColumnarGlobalLimit = columnarConf.enableColumnarGlobalLimit

private def tryConvertToColumnar(plan: SparkPlan): Boolean = {
try {
Expand Down Expand Up @@ -101,6 +103,12 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
case plan: UnionExec =>
if (!enableColumnarUnion) return false
new ColumnarUnionExec(plan.children)
case plan: LocalLimitExec =>
if (!enableColumnarLocalLimit) return false
new ColumnarLocalLimitExec(plan.limit, plan.child)
case plan: GlobalLimitExec =>
if (!enableColumnarGlobalLimit) return false
new ColumnarGlobalLimitExec(plan.limit, plan.child)
case plan: ExpandExec =>
if (!enableColumnarExpand) return false
new ColumnarExpandExec(plan.projections, plan.output, plan.child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.scalatest.time.SpanSugar._

import com.intel.oap.execution.ColumnarLocalLimitExec

import org.apache.spark.{SparkConf, SparkContext, TaskContext, TestUtils}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql._
Expand Down Expand Up @@ -1096,7 +1098,7 @@ class StreamSuite extends StreamTest {
require(execPlan != null)

val localLimits = execPlan.collect {
case l: LocalLimitExec => l
case l: ColumnarLocalLimitExec => l
case l: StreamingLocalLimitExec => l
}

Expand All @@ -1110,7 +1112,7 @@ class StreamSuite extends StreamTest {
s"Local limit was not StreamingLocalLimitExec:\n$execPlan")
} else {
assert(
localLimits.head.isInstanceOf[LocalLimitExec],
localLimits.head.isInstanceOf[ColumnarLocalLimitExec],
s"Local limit was not LocalLimitExec:\n$execPlan")
}
}
Expand Down

0 comments on commit dcdc459

Please sign in to comment.