Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
Check whether AQE is supported
Browse files Browse the repository at this point in the history
  • Loading branch information
PHILO-HE committed Jul 18, 2022
1 parent 0da5459 commit 2a99d57
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -562,29 +562,13 @@ case class ColumnarOverrideRules(session: SparkSession) extends ColumnarRule wit
var originalPlan: SparkPlan = _
var fallbacks = 0

private def supportAdaptive(plan: SparkPlan): Boolean = {
// TODO migrate dynamic-partition-pruning onto adaptive execution.
// Only QueryStage will have Exchange as Leaf Plan
val isLeafPlanExchange = plan match {
case e: Exchange => true
case other => false
}
isLeafPlanExchange || (SQLConf.get.adaptiveExecutionEnabled && (sanityCheck(plan) &&
!plan.logicalLink.exists(_.isStreaming) &&
!plan.expressions.exists(_.find(_.isInstanceOf[DynamicPruningSubquery]).isDefined) &&
plan.children.forall(supportAdaptive)))
}

private def sanityCheck(plan: SparkPlan): Boolean =
plan.logicalLink.isDefined

override def preColumnarTransitions: Rule[SparkPlan] = plan => {
if (columnarEnabled) {
// According to Spark's Columnar.scala, the plan is tackled one by one.
// By recording the original plan, we can easily let the whole stage
// fallback at #postColumnarTransitions.
originalPlan = plan
isSupportAdaptive = supportAdaptive(plan)
isSupportAdaptive = SparkShimLoader.getSparkShims.supportAdaptiveWithExchangeConsidered(plan)
val rule = preOverrides
rule.setAdaptiveSupport(isSupportAdaptive)
rule(rowGuardOverrides(plan))
Expand Down Expand Up @@ -651,7 +635,7 @@ case class ColumnarOverrideRules(session: SparkSession) extends ColumnarRule wit

override def postColumnarTransitions: Rule[SparkPlan] = plan => {
if (columnarEnabled) {
if (SQLConf.get.adaptiveExecutionEnabled && fallbackWholeStage(plan)) {
if (isSupportAdaptive && fallbackWholeStage(plan)) {
// BatchScan with ArrowScan initialized can still connect
// to ColumnarToRow for transition.
insertTransitions(originalPlan, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, ParquetOptions, ParquetReadSupport, VectorizedParquetRecordReader}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleOrigin}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ShuffleOrigin}
import org.apache.spark.sql.internal.SQLConf

sealed abstract class ShimDescriptor
Expand Down Expand Up @@ -121,4 +121,20 @@ trait SparkShims {
def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int

def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int

def isLeafPlanExchange (plan: SparkPlan): Boolean = {
plan match {
case e: Exchange => true
case other => false
}
}

def sanityCheck(plan: SparkPlan): Boolean =
plan.logicalLink.isDefined

def supportAdaptive(plan: SparkPlan): Boolean

def supportAdaptiveWithExchangeConsidered(plan: SparkPlan): Boolean = {
isLeafPlanExchange(plan) || supportAdaptive(plan)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.shuffle.sort.SortShuffleWriter
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningSubquery, Expression}
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, CustomShuffleReaderExec, ShuffleQueryStageExec}
Expand Down Expand Up @@ -205,4 +205,14 @@ class Spark311Shims extends SparkShims {
throw new RuntimeException("This method should not be invoked in spark 3.1.")
}

/**
* Ported from InsertAdaptiveSparkPlan.
*/
override def supportAdaptive(plan: SparkPlan): Boolean = {
// TODO migrate dynamic-partition-pruning onto adaptive execution.
sanityCheck(plan) &&
!plan.logicalLink.exists(_.isStreaming) &&
!plan.expressions.exists(_.find(_.isInstanceOf[DynamicPruningSubquery]).isDefined) &&
plan.children.forall(supportAdaptive)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,11 @@ class Spark321Shims extends SparkShims {
}
}

/**
* Ported from InsertAdaptiveSparkPlan.
*/
override def supportAdaptive(plan: SparkPlan): Boolean = {
sanityCheck(plan) && !plan.logicalLink.exists(_.isStreaming) &&
plan.children.forall(supportAdaptive)
}
}

0 comments on commit 2a99d57

Please sign in to comment.