Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-770] [NSE-774] Fix runtime issues on spark 3.2 #773

Merged
merged 22 commits into from
Mar 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6d807e5
Fix issues in ArrowRowToColumnarExec and writting shuffle metadata
PHILO-HE Mar 15, 2022
2ac622b
Remove AdaptiveSparkPlanExec in spark-3.2 shim layer
PHILO-HE Mar 15, 2022
95d8554
Fix issues reported by unit tests
PHILO-HE Mar 15, 2022
fdf48f9
fix AQE
zhouyuan Mar 16, 2022
138d6b8
Use correct type for equals in ColumnarBroadcastExchangeAdaptor
PHILO-HE Mar 16, 2022
ea104db
Fix compatibility issues to make the changes work on spark 3.1 also
PHILO-HE Mar 16, 2022
31c32bc
Refine the code
PHILO-HE Mar 16, 2022
8ad5a35
Change the parent class of ColumnarBroadcastExchangeAdaptor
PHILO-HE Mar 17, 2022
1c6965d
Remove override keyword to make the code compatible with spark 3.1
PHILO-HE Mar 18, 2022
929aaf8
Add missing match case
PHILO-HE Mar 18, 2022
b4bdd9e
Try to fix BHJ issue
PHILO-HE Mar 18, 2022
5d2060d
Remove canEqual & Equal for two case classes
PHILO-HE Mar 18, 2022
5e66540
fallback on reused broadcast exchange
zhouyuan Mar 20, 2022
f0c8be5
Fix the case that BroadcastExchangeExec can connect to BHJ
PHILO-HE Mar 20, 2022
35a15d4
Override withNewChildInternal in ArrowColumnarToRowExec [To fix an as…
PHILO-HE Mar 20, 2022
912aec4
Remove override keyword for compatibility
PHILO-HE Mar 20, 2022
f9a03d0
Revert "fallback on reused broadcast exchange"
PHILO-HE Mar 21, 2022
1eb1927
Revert "Fix the case that BroadcastExchangeExec can connect to BHJ"
PHILO-HE Mar 21, 2022
49d9b05
Correct the assert statement in testing broadcast exchange reuse acro…
PHILO-HE Mar 21, 2022
d7ae403
fallback on reused broadcast exchange
zhouyuan Mar 20, 2022
d404095
fix reused columnar broadcast xchg
PHILO-HE Mar 22, 2022
50594bc
Add guard logic
PHILO-HE Mar 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,33 @@ package com.intel.oap.execution
import com.intel.oap.expression.ConverterUtils
import com.intel.oap.vectorized.{ArrowColumnarToRowJniWrapper, ArrowWritableColumnVector}
import org.apache.arrow.vector.types.pojo.{Field, Schema}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan}
import org.apache.spark.sql.execution.{CodegenSupport, ColumnarToRowTransition, SparkPlan}
import org.apache.spark.sql.types._

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._

class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child = child) {
case class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition with CodegenSupport {
override def nodeName: String = "ArrowColumnarToRow"

override def output: Seq[Attribute] = child.output

override def outputPartitioning: Partitioning = child.outputPartitioning

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

// `ColumnarToRowExec` processes the input RDD directly, which is kind of a leaf node in the
// codegen stage and needs to do the limit check.
protected override def canCheckLimitNotReached: Boolean = true

override def supportCodegen: Boolean = false

buildCheck()
Expand Down Expand Up @@ -68,6 +79,10 @@ class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child =
"convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to convert")
)

protected def doProduce(ctx: CodegenContext): String = {
throw new RuntimeException("Codegen is not supported!")
}

override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
Expand Down Expand Up @@ -154,12 +169,11 @@ class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child =
}
}

override def canEqual(other: Any): Boolean = other.isInstanceOf[ArrowColumnarToRowExec]

override def equals(other: Any): Boolean = other match {
case that: ArrowColumnarToRowExec =>
(that canEqual this) && super.equals(that)
case _ => false
override def inputRDDs(): Seq[RDD[InternalRow]] = {
Seq(child.executeColumnar().asInstanceOf[RDD[InternalRow]]) // Hack because of type erasure
}

protected def withNewChildInternal(newChild: SparkPlan): ArrowColumnarToRowExec =
copy(child = newChild)
}

Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,21 @@ import org.apache.arrow.vector.ipc.message.ArrowRecordBatch
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.{RowToColumnarExec, SparkPlan}
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{RowToColumnarExec, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.datasources.v2.arrow.{SparkMemoryUtils, SparkSchemaUtils}
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils.UnsafeItr
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.WritableColumnVector
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.TaskContext
import org.apache.spark.{TaskContext, broadcast}
import org.apache.spark.unsafe.Platform


class ArrowRowToColumnarExec(child: SparkPlan) extends RowToColumnarExec(child = child) {
case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
override def nodeName: String = "ArrowRowToColumnarExec"

buildCheck()
Expand Down Expand Up @@ -77,6 +78,26 @@ class ArrowRowToColumnarExec(child: SparkPlan) extends RowToColumnarExec(child =
"processTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to convert")
)

override def output: Seq[Attribute] = child.output

override def outputPartitioning: Partitioning = child.outputPartitioning

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def supportsColumnar: Boolean = true

// For spark 3.2.
protected def withNewChildInternal(newChild: SparkPlan): ArrowRowToColumnarExec =
copy(child = newChild)

override def doExecute(): RDD[InternalRow] = {
child.execute()
}

override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
child.executeBroadcast()
}

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
Expand Down Expand Up @@ -206,12 +227,4 @@ class ArrowRowToColumnarExec(child: SparkPlan) extends RowToColumnarExec(child =
}
}

override def canEqual(other: Any): Boolean = other.isInstanceOf[ArrowRowToColumnarExec]

override def equals(other: Any): Boolean = other match {
case that: ArrowRowToColumnarExec =>
(that canEqual this) && super.equals(that)
case _ => false
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,20 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
right)
case plan: BroadcastQueryStageExec =>
logDebug(
s"Columnar Processing for ${plan.getClass} is currently supported, actual plan is ${plan.plan.getClass}.")
plan
s"Columnar Processing for ${plan.getClass} is currently supported, actual plan is ${plan.plan}.")
plan.plan match {
case ReusedExchangeExec(_, originalBroadcastPlan: ColumnarBroadcastExchangeAdaptor) =>
val newBroadcast = BroadcastExchangeExec(
originalBroadcastPlan.mode,
DataToArrowColumnarExec(plan.plan, 1))
SparkShimLoader.getSparkShims.newBroadcastQueryStageExec(plan.id, newBroadcast)
case other => plan
}
case plan: BroadcastExchangeExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
if (isSupportAdaptive)
new ColumnarBroadcastExchangeAdaptor(plan.mode, child)
ColumnarBroadcastExchangeAdaptor(plan.mode, child)
else
ColumnarBroadcastExchangeExec(plan.mode, child)
case plan: BroadcastHashJoinExec =>
Expand Down Expand Up @@ -372,12 +379,23 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] {
var isSupportAdaptive: Boolean = true

def replaceWithColumnarPlan(plan: SparkPlan): SparkPlan = plan match {
// To get ColumnarBroadcastExchangeExec back from the fallback that for DPP reuse.
case RowToColumnarExec(broadcastQueryStageExec: BroadcastQueryStageExec)
if (broadcastQueryStageExec.plan match {
case BroadcastExchangeExec(_, _: DataToArrowColumnarExec) => true
case _ => false
}) =>
logDebug(s"Due to a fallback of BHJ inserted into plan." +
s" See above override in BroadcastQueryStageExec")
val localBroadcastXchg = broadcastQueryStageExec.plan.asInstanceOf[BroadcastExchangeExec]
val dataToArrowColumnar = localBroadcastXchg.child.asInstanceOf[DataToArrowColumnarExec]
ColumnarBroadcastExchangeExec(localBroadcastXchg.mode, dataToArrowColumnar)
case plan: RowToColumnarExec =>
val child = replaceWithColumnarPlan(plan.child)
if (columnarConf.enableArrowRowToColumnar) {
logDebug(s"ColumnarPostOverrides ArrowRowToColumnarExec(${child.getClass})")
try {
new ArrowRowToColumnarExec(child)
ArrowRowToColumnarExec(child)
} catch {
case _: Throwable =>
logInfo("ArrowRowToColumnar: Falling back to RowToColumnar...")
Expand All @@ -398,7 +416,7 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] {
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"ColumnarPostOverrides ArrowColumnarToRowExec(${child.getClass})")
try {
new ArrowColumnarToRowExec(child)
ArrowColumnarToRowExec(child)
} catch {
case _: Throwable =>
logInfo("ArrowColumnarToRowExec: Falling back to ColumnarToRow...")
Expand All @@ -418,7 +436,7 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] {
if (columnarConf.enableArrowColumnarToRow) {
try {
val child = replaceWithColumnarPlan(c.child)
new ArrowColumnarToRowExec(child)
ArrowColumnarToRowExec(child)
} catch {
case _: Throwable =>
logInfo("ArrowColumnarToRow : Falling back to ColumnarToRow...")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,14 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
broadcastQueryStageExec.plan match {
case plan: BroadcastExchangeExec =>
new ColumnarBroadcastExchangeExec(plan.mode, plan.child)
case plan: ColumnarBroadcastExchangeAdaptor =>
new ColumnarBroadcastExchangeExec(plan.mode, plan.child)
case plan: ReusedExchangeExec =>
plan match {
case ReusedExchangeExec(_, b: BroadcastExchangeExec) =>
new ColumnarBroadcastExchangeExec(b.mode, b.child)
case ReusedExchangeExec(_, b: ColumnarBroadcastExchangeAdaptor) =>
new ColumnarBroadcastExchangeExec(b.mode, b.child)
case _ =>
}
}
Expand All @@ -190,10 +194,14 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
broadcastQueryStageExec.plan match {
case plan: BroadcastExchangeExec =>
new ColumnarBroadcastExchangeExec(plan.mode, plan.child)
case plan: ColumnarBroadcastExchangeAdaptor =>
new ColumnarBroadcastExchangeExec(plan.mode, plan.child)
case plan: ReusedExchangeExec =>
plan match {
case ReusedExchangeExec(_, b: BroadcastExchangeExec) =>
new ColumnarBroadcastExchangeExec(b.mode, b.child)
case ReusedExchangeExec(_, b: ColumnarBroadcastExchangeAdaptor) =>
new ColumnarBroadcastExchangeExec(b.mode, b.child)
case _ =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, _}
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, _}
Expand Down Expand Up @@ -263,8 +264,8 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan)
copy(child = newChild)
}

class ColumnarBroadcastExchangeAdaptor(mode: BroadcastMode, child: SparkPlan)
extends BroadcastExchangeExec(mode, child) {
case class ColumnarBroadcastExchangeAdaptor(mode: BroadcastMode, child: SparkPlan)
extends BroadcastExchangeLike {
val plan: ColumnarBroadcastExchangeExec = new ColumnarBroadcastExchangeExec(mode, child)

override def supportsColumnar = true
Expand All @@ -277,6 +278,13 @@ class ColumnarBroadcastExchangeAdaptor(mode: BroadcastMode, child: SparkPlan)

override def doCanonicalize(): SparkPlan = plan.doCanonicalize()

// Ported from BroadcastExchangeExec
override def runtimeStatistics: Statistics = {
val dataSize = metrics("dataSize").value
val rowCount = metrics("numOutputRows").value
Statistics(dataSize, Some(rowCount))
}

@transient
private val timeout: Long = SQLConf.get.broadcastTimeout

Expand All @@ -303,11 +311,7 @@ class ColumnarBroadcastExchangeAdaptor(mode: BroadcastMode, child: SparkPlan)
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] =
plan.doExecuteBroadcast[T]()

override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarShuffleExchangeAdaptor]

override def equals(other: Any): Boolean = other match {
case that: ColumnarShuffleExchangeAdaptor =>
(that canEqual this) && super.equals(that)
case _ => false
}
// For spark3.2.
protected def withNewChildInternal(newChild: SparkPlan): ColumnarBroadcastExchangeAdaptor =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.CoalesceExec.EmptyPartition
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
Expand Down Expand Up @@ -200,15 +201,15 @@ case class ColumnarShuffleExchangeExec(
copy(child = newChild)
}

class ColumnarShuffleExchangeAdaptor(
case class ColumnarShuffleExchangeAdaptor(
override val outputPartitioning: Partitioning,
child: SparkPlan,
shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS)
extends ShuffleExchangeExec(outputPartitioning, child) {
extends ShuffleExchangeLike {

private[sql] lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
private[sql] override lazy val readMetrics =
private[sql] lazy val readMetrics =
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
Expand All @@ -228,7 +229,17 @@ class ColumnarShuffleExchangeAdaptor(
override def output: Seq[Attribute] = child.output

override def supportsColumnar: Boolean = true
override def numMappers: Int = shuffleDependency.rdd.getNumPartitions

override def numPartitions: Int = shuffleDependency.partitioner.numPartitions
override def runtimeStatistics: Statistics = {
val dataSize = metrics("dataSize").value
val rowCount = metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN).value
Statistics(dataSize, Some(rowCount))
}
override def getShuffleRDD(partitionSpecs: Array[ShufflePartitionSpec]): RDD[ColumnarBatch] = {
cachedShuffleRDD
}
override def stringArgs =
super.stringArgs ++ Iterator(s"[id=#$id]")
//super.stringArgs ++ Iterator(output.map(o => s"${o}#${o.dataType.simpleString}"))
Expand Down Expand Up @@ -285,7 +296,7 @@ class ColumnarShuffleExchangeAdaptor(

// 'shuffleDependency' is only needed when enable AQE. Columnar shuffle will use 'columnarShuffleDependency'
@transient
override lazy val shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow] =
lazy val shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow] =
new ShuffleDependency[Int, InternalRow, InternalRow](
_rdd = new ColumnarShuffleExchangeExec.DummyPairRDDWithPartitions(
sparkContext,
Expand All @@ -297,14 +308,6 @@ class ColumnarShuffleExchangeAdaptor(
override val shuffleHandle: ShuffleHandle = columnarShuffleDependency.shuffleHandle
}

override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarShuffleExchangeAdaptor]

override def equals(other: Any): Boolean = other match {
case that: ColumnarShuffleExchangeAdaptor =>
(that canEqual this) && super.equals(that)
case _ => false
}

override def verboseString(maxFields: Int): String = toString(super.verboseString(maxFields))

override def simpleString(maxFields: Int): String = toString(super.simpleString(maxFields))
Expand All @@ -316,6 +319,9 @@ class ColumnarShuffleExchangeAdaptor(
}.toString()
}

// For spark3.2.
protected def withNewChildInternal(newChild: SparkPlan): ColumnarShuffleExchangeAdaptor =
copy(child = newChild)
}

object ColumnarShuffleExchangeExec extends Logging {
Expand Down
Loading