Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-574]implement columnar limit #575

Merged
merged 5 commits into from
Nov 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ class GazellePluginConfig(conf: SQLConf) extends Logging {
val enableColumnarArrowUDF: Boolean =
conf.getConfString("spark.oap.sql.columnar.arrowudf", "true").toBoolean && enableCpu

// enable or disable columnar columnar arrow udf
val enableColumnarLocalLimit: Boolean =
conf.getConfString("spark.oap.sql.columnar.locallimit", "true").toBoolean && enableCpu

// enable or disable columnar columnar arrow udf
val enableColumnarGlobalLimit: Boolean =
conf.getConfString("spark.oap.sql.columnar.globallimit", "true").toBoolean && enableCpu

// enable or disable columnar wholestagecodegen
val enableColumnarWholeStageCodegen: Boolean =
conf.getConfString("spark.oap.sql.columnar.wholestagecodegen", "true").toBoolean && enableCpu
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.intel.oap.vectorized._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution._
Expand Down Expand Up @@ -308,3 +309,146 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}
}

//TODO(): consolidate locallimit and globallimit
case class ColumnarLocalLimitExec(limit: Int, child: SparkPlan) extends LimitExec {
// updating nullability to make all the children consistent

buildCheck()

def buildCheck(): Unit = {
for (child <- children) {
for (schema <- child.schema) {
try {
ConverterUtils.checkIfTypeSupported(schema.dataType)
} catch {
case e: UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${schema.dataType} is not supported in ColumnarLocalLimitExec")
}
}
}
}


override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def outputPartitioning: Partitioning = child.outputPartitioning

override def supportsColumnar = true
override def output: Seq[Attribute] = child.output
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
child.executeColumnar().mapPartitions { iter =>
val hasInput = iter.hasNext
val res = if (hasInput) {
new Iterator[ColumnarBatch] {
var rowCount = 0
override def hasNext: Boolean = {
val hasNext = iter.hasNext
hasNext && (rowCount <= limit)
}

override def next(): ColumnarBatch = {

if (!hasNext) {
throw new NoSuchElementException("End of ColumnarBatch iterator")
}

if (rowCount < limit) {
val delta = iter.next()
val preRowCount = rowCount
rowCount += delta.numRows
if (rowCount > limit) {
val newSize = limit - preRowCount
delta.setNumRows(newSize)
}
delta
} else {
throw new NoSuchElementException("End of ColumnarBatch iterator")
}
}
}
} else {
Iterator.empty
}
new CloseableColumnBatchIterator(res)
}
}

protected override def doExecute()
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}

}

case class ColumnarGlobalLimitExec(limit: Int, child: SparkPlan) extends LimitExec {
// updating nullability to make all the children consistent

buildCheck()

def buildCheck(): Unit = {
for (child <- children) {
for (schema <- child.schema) {
try {
ConverterUtils.checkIfTypeSupported(schema.dataType)
} catch {
case e: UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${schema.dataType} is not supported in ColumnarGlobalLimitExec")
}
}
}
}


override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def outputPartitioning: Partitioning = child.outputPartitioning
override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil

override def supportsColumnar = true
override def output: Seq[Attribute] = child.output
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
child.executeColumnar().mapPartitions { iter =>
val hasInput = iter.hasNext
val res = if (hasInput) {
new Iterator[ColumnarBatch] {
var rowCount = 0
override def hasNext: Boolean = {
val hasNext = iter.hasNext
hasNext && (rowCount <= limit)
}

override def next(): ColumnarBatch = {

if (!hasNext) {
throw new NoSuchElementException("End of ColumnarBatch iterator")
}

if (rowCount < limit) {
val delta = iter.next()
val preRowCount = rowCount
rowCount += delta.numRows
if (rowCount > limit) {
val newSize = limit - preRowCount
delta.setNumRows(newSize)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi, delta should be returned

delta
} else {
throw new NoSuchElementException("End of ColumnarBatch iterator")
}
}
}
} else {
Iterator.empty
}
new CloseableColumnBatchIterator(res)
}
}

protected override def doExecute()
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
val children = plan.children.map(replaceWithColumnarPlan)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarUnionExec(children)
case plan: LocalLimitExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarLocalLimitExec(plan.limit, child)
case plan: GlobalLimitExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarGlobalLimitExec(plan.limit, child)
case plan: ExpandExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
val enableColumnarBroadcastExchange = columnarConf.enableColumnarBroadcastExchange
val enableColumnarBroadcastJoin = columnarConf.enableColumnarBroadcastJoin
val enableColumnarArrowUDF = columnarConf.enableColumnarArrowUDF
val enableColumnarLocalLimit = columnarConf.enableColumnarLocalLimit
val enableColumnarGlobalLimit = columnarConf.enableColumnarGlobalLimit

private def tryConvertToColumnar(plan: SparkPlan): Boolean = {
try {
Expand Down Expand Up @@ -101,6 +103,12 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
case plan: UnionExec =>
if (!enableColumnarUnion) return false
new ColumnarUnionExec(plan.children)
case plan: LocalLimitExec =>
if (!enableColumnarLocalLimit) return false
new ColumnarLocalLimitExec(plan.limit, plan.child)
case plan: GlobalLimitExec =>
if (!enableColumnarGlobalLimit) return false
new ColumnarGlobalLimitExec(plan.limit, plan.child)
case plan: ExpandExec =>
if (!enableColumnarExpand) return false
new ColumnarExpandExec(plan.projections, plan.output, plan.child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.scalatest.time.SpanSugar._

import com.intel.oap.execution.ColumnarLocalLimitExec

import org.apache.spark.{SparkConf, SparkContext, TaskContext, TestUtils}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql._
Expand Down Expand Up @@ -1096,7 +1098,7 @@ class StreamSuite extends StreamTest {
require(execPlan != null)

val localLimits = execPlan.collect {
case l: LocalLimitExec => l
case l: ColumnarLocalLimitExec => l
case l: StreamingLocalLimitExec => l
}

Expand All @@ -1110,7 +1112,7 @@ class StreamSuite extends StreamTest {
s"Local limit was not StreamingLocalLimitExec:\n$execPlan")
} else {
assert(
localLimits.head.isInstanceOf[LocalLimitExec],
localLimits.head.isInstanceOf[ColumnarLocalLimitExec],
s"Local limit was not LocalLimitExec:\n$execPlan")
}
}
Expand Down