From 6d807e5d951a80be434085a498c07b7473881f3b Mon Sep 17 00:00:00 2001 From: philo Date: Tue, 15 Mar 2022 10:34:27 +0800 Subject: [PATCH 01/22] Fix issues in ArrowRowToColumnarExec and writting shuffle metadata --- .../execution/ArrowRowToColumnarExec.scala | 16 +- .../oap/extension/ColumnarOverrides.scala | 2 +- .../adaptive/AdaptiveSparkPlanExec.scala | 810 ++++++++++++++++++ .../org/apache/spark/util/ShimUtils.scala | 3 +- 4 files changed, 826 insertions(+), 5 deletions(-) create mode 100644 shims/spark321/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala index c386a1523..d2ffe9e67 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala @@ -29,8 +29,8 @@ import org.apache.arrow.vector.ipc.message.ArrowRecordBatch import org.apache.arrow.vector.types.pojo.Schema import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.execution.{RowToColumnarExec, SparkPlan} +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.execution.{RowToColumnarExec, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.datasources.v2.arrow.{SparkMemoryUtils, SparkSchemaUtils} import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils.UnsafeItr import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -42,7 +42,7 @@ import org.apache.spark.TaskContext import org.apache.spark.unsafe.Platform -class ArrowRowToColumnarExec(child: SparkPlan) extends RowToColumnarExec(child = child) { +case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { override def nodeName: String = "ArrowRowToColumnarExec" buildCheck() @@ -77,6 +77,16 @@ class ArrowRowToColumnarExec(child: SparkPlan) extends RowToColumnarExec(child = "processTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to convert") ) + override def output: Seq[Attribute] = child.output + + // For spark 3.2. + protected def withNewChildInternal(newChild: SparkPlan): ArrowRowToColumnarExec = + copy(child = newChild) + + override def doExecute(): RDD[InternalRow] = { + child.execute() + } + override def doExecuteColumnar(): RDD[ColumnarBatch] = { val numInputRows = longMetric("numInputRows") val numOutputBatches = longMetric("numOutputBatches") diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index 1c3b430fd..4b45d33c7 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -377,7 +377,7 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] { if (columnarConf.enableArrowRowToColumnar) { logDebug(s"ColumnarPostOverrides ArrowRowToColumnarExec(${child.getClass})") try { - new ArrowRowToColumnarExec(child) + ArrowRowToColumnarExec(child) } catch { case _: Throwable => logInfo("ArrowRowToColumnar: Falling back to RowToColumnar...") diff --git a/shims/spark321/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/shims/spark321/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala new file mode 100644 index 000000000..d87e2459c --- /dev/null +++ b/shims/spark321/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -0,0 +1,810 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import java.util +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.JavaConverters._ +import scala.collection.concurrent.TrieMap +import scala.collection.mutable +import scala.concurrent.ExecutionContext +import scala.util.control.NonFatal + +import org.apache.spark.broadcast +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} +import org.apache.spark.sql.catalyst.trees.TreeNodeTag +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec._ +import org.apache.spark.sql.execution.bucketing.DisableUnnecessaryBucketedScan +import org.apache.spark.sql.execution.exchange._ +import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates, SQLPlanMetric} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.ThreadUtils + +/** + * A root node to execute the query plan adaptively. It splits the query plan into independent + * stages and executes them in order according to their dependencies. The query stage + * materializes its output at the end. When one stage completes, the data statistics of the + * materialized output will be used to optimize the remainder of the query. + * + * To create query stages, we traverse the query tree bottom up. When we hit an exchange node, + * and if all the child query stages of this exchange node are materialized, we create a new + * query stage for this exchange node. The new stage is then materialized asynchronously once it + * is created. + * + * When one query stage finishes materialization, the rest query is re-optimized and planned based + * on the latest statistics provided by all materialized stages. Then we traverse the query plan + * again and create more stages if possible. After all stages have been materialized, we execute + * the rest of the plan. + */ +case class AdaptiveSparkPlanExec( + inputPlan: SparkPlan, + @transient context: AdaptiveExecutionContext, + @transient preprocessingRules: Seq[Rule[SparkPlan]], + @transient isSubquery: Boolean, + @transient override val supportsColumnar: Boolean = false) + extends LeafExecNode { + + @transient private val lock = new Object() + + @transient private val logOnLevel: ( => String) => Unit = conf.adaptiveExecutionLogLevel match { + case "TRACE" => logTrace(_) + case "DEBUG" => logDebug(_) + case "INFO" => logInfo(_) + case "WARN" => logWarning(_) + case "ERROR" => logError(_) + case _ => logDebug(_) + } + + @transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]() + + // The logical plan optimizer for re-optimizing the current logical plan. + @transient private val optimizer = new AQEOptimizer(conf) + + // `EnsureRequirements` may remove user-specified repartition and assume the query plan won't + // change its output partitioning. This assumption is not true in AQE. Here we check the + // `inputPlan` which has not been processed by `EnsureRequirements` yet, to find out the + // effective user-specified repartition. Later on, the AQE framework will make sure the final + // output partitioning is not changed w.r.t the effective user-specified repartition. + @transient private val requiredDistribution: Option[Distribution] = if (isSubquery) { + // Subquery output does not need a specific output partitioning. + Some(UnspecifiedDistribution) + } else { + AQEUtils.getRequiredDistribution(inputPlan) + } + + // A list of physical plan rules to be applied before creation of query stages. The physical + // plan should reach a final status of query stages (i.e., no more addition or removal of + // Exchange nodes) after running these rules. + @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( + RemoveRedundantProjects, + // For cases like `df.repartition(a, b).select(c)`, there is no distribution requirement for + // the final plan, but we do need to respect the user-specified repartition. Here we ask + // `EnsureRequirements` to not optimize out the user-specified repartition-by-col to work + // around this case. + EnsureRequirements(optimizeOutRepartition = requiredDistribution.isDefined), + RemoveRedundantSorts, + DisableUnnecessaryBucketedScan + ) ++ context.session.sessionState.queryStagePrepRules + + // A list of physical optimizer rules to be applied to a new stage before its execution. These + // optimizations should be stage-independent. + @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( + PlanAdaptiveDynamicPruningFilters(this), + ReuseAdaptiveSubquery(context.subqueryCache), + // Skew join does not handle `AQEShuffleRead` so needs to be applied first. + OptimizeSkewedJoin, + OptimizeSkewInRebalancePartitions, + CoalesceShufflePartitions(context.session), + // `OptimizeShuffleWithLocalRead` needs to make use of 'AQEShuffleReadExec.partitionSpecs' + // added by `CoalesceShufflePartitions`, and must be executed after it. + OptimizeShuffleWithLocalRead + ) + + // This rule is stateful as it maintains the codegen stage ID. We can't create a fresh one every + // time and need to keep it in a variable. + @transient private val collapseCodegenStagesRule: Rule[SparkPlan] = + CollapseCodegenStages() + + // A list of physical optimizer rules to be applied right after a new stage is created. The input + // plan to these rules has exchange as its root node. + private def postStageCreationRules(outputsColumnar: Boolean) = Seq( + ApplyColumnarRulesAndInsertTransitions( + context.session.sessionState.columnarRules, outputsColumnar), + collapseCodegenStagesRule + ) + + private def optimizeQueryStage( + plan: SparkPlan, + isFinalStage: Boolean): SparkPlan = context.qe.withCteMap { + val optimized = queryStageOptimizerRules.foldLeft(plan) { case (latestPlan, rule) => + val applied = rule.apply(latestPlan) + val result = rule match { + case _: AQEShuffleReadRule if !applied.fastEquals(latestPlan) => + val distribution = if (isFinalStage) { + // If `requiredDistribution` is None, it means `EnsureRequirements` will not optimize + // out the user-specified repartition, thus we don't have a distribution requirement + // for the final plan. + requiredDistribution.getOrElse(UnspecifiedDistribution) + } else { + UnspecifiedDistribution + } + if (ValidateRequirements.validate(applied, distribution)) { + applied + } else { + logDebug(s"Rule ${rule.ruleName} is not applied as it breaks the " + + "distribution requirement of the query plan.") + latestPlan + } + case _ => applied + } + planChangeLogger.logRule(rule.ruleName, latestPlan, result) + result + } + planChangeLogger.logBatch("AQE Query Stage Optimization", plan, optimized) + optimized + } + + @transient private val costEvaluator = + conf.getConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS) match { + case Some(className) => CostEvaluator.instantiate(className, session.sparkContext.getConf) + case _ => SimpleCostEvaluator + } + + @transient val initialPlan = context.session.withActive { + applyPhysicalRules( + inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations"))) + } + + @volatile private var currentPhysicalPlan = initialPlan + + private var isFinalPlan = false + + private var currentStageId = 0 + + /** + * Return type for `createQueryStages` + * @param newPlan the new plan with created query stages. + * @param allChildStagesMaterialized whether all child stages have been materialized. + * @param newStages the newly created query stages, including new reused query stages. + */ + private case class CreateStageResult( + newPlan: SparkPlan, + allChildStagesMaterialized: Boolean, + newStages: Seq[QueryStageExec]) + + def executedPlan: SparkPlan = currentPhysicalPlan + + override def conf: SQLConf = context.session.sessionState.conf + + override def output: Seq[Attribute] = inputPlan.output + + override def doCanonicalize(): SparkPlan = inputPlan.canonicalized + + override def resetMetrics(): Unit = { + metrics.valuesIterator.foreach(_.reset()) + executedPlan.resetMetrics() + } + + private def getExecutionId: Option[Long] = { + // If the `QueryExecution` does not match the current execution ID, it means the execution ID + // belongs to another (parent) query, and we should not call update UI in this query. + Option(context.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)) + .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe) + } + + private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { + if (isFinalPlan) return currentPhysicalPlan + + // In case of this adaptive plan being executed out of `withActive` scoped functions, e.g., + // `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be + // created in the middle of the execution. + context.session.withActive { + val executionId = getExecutionId + // Use inputPlan logicalLink here in case some top level physical nodes may be removed + // during `initialPlan` + var currentLogicalPlan = inputPlan.logicalLink.get + var result = createQueryStages(currentPhysicalPlan) + val events = new LinkedBlockingQueue[StageMaterializationEvent]() + val errors = new mutable.ArrayBuffer[Throwable]() + var stagesToReplace = Seq.empty[QueryStageExec] + while (!result.allChildStagesMaterialized) { + currentPhysicalPlan = result.newPlan + if (result.newStages.nonEmpty) { + stagesToReplace = result.newStages ++ stagesToReplace + executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan))) + + // SPARK-33933: we should submit tasks of broadcast stages first, to avoid waiting + // for tasks to be scheduled and leading to broadcast timeout. + // This partial fix only guarantees the start of materialization for BroadcastQueryStage + // is prior to others, but because the submission of collect job for broadcasting is + // running in another thread, the issue is not completely resolved. + val reorderedNewStages = result.newStages + .sortWith { + case (_: BroadcastQueryStageExec, _: BroadcastQueryStageExec) => false + case (_: BroadcastQueryStageExec, _) => true + case _ => false + } + + // Start materialization of all new stages and fail fast if any stages failed eagerly + reorderedNewStages.foreach { stage => + try { + stage.materialize().onComplete { res => + if (res.isSuccess) { + events.offer(StageSuccess(stage, res.get)) + } else { + events.offer(StageFailure(stage, res.failed.get)) + } + }(AdaptiveSparkPlanExec.executionContext) + } catch { + case e: Throwable => + cleanUpAndThrowException(Seq(e), Some(stage.id)) + } + } + } + + // Wait on the next completed stage, which indicates new stats are available and probably + // new stages can be created. There might be other stages that finish at around the same + // time, so we process those stages too in order to reduce re-planning. + val nextMsg = events.take() + val rem = new util.ArrayList[StageMaterializationEvent]() + events.drainTo(rem) + (Seq(nextMsg) ++ rem.asScala).foreach { + case StageSuccess(stage, res) => + stage.resultOption.set(Some(res)) + case StageFailure(stage, ex) => + errors.append(ex) + } + + // In case of errors, we cancel all running stages and throw exception. + if (errors.nonEmpty) { + cleanUpAndThrowException(errors.toSeq, None) + } + + // Try re-optimizing and re-planning. Adopt the new plan if its cost is equal to or less + // than that of the current plan; otherwise keep the current physical plan together with + // the current logical plan since the physical plan's logical links point to the logical + // plan it has originated from. + // Meanwhile, we keep a list of the query stages that have been created since last plan + // update, which stands for the "semantic gap" between the current logical and physical + // plans. And each time before re-planning, we replace the corresponding nodes in the + // current logical plan with logical query stages to make it semantically in sync with + // the current physical plan. Once a new plan is adopted and both logical and physical + // plans are updated, we can clear the query stage list because at this point the two plans + // are semantically and physically in sync again. + val logicalPlan = replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace) + val (newPhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan) + val origCost = costEvaluator.evaluateCost(currentPhysicalPlan) + val newCost = costEvaluator.evaluateCost(newPhysicalPlan) + if (newCost < origCost || + (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) { + logOnLevel(s"Plan changed from $currentPhysicalPlan to $newPhysicalPlan") + cleanUpTempTags(newPhysicalPlan) + currentPhysicalPlan = newPhysicalPlan + currentLogicalPlan = newLogicalPlan + stagesToReplace = Seq.empty[QueryStageExec] + } + // Now that some stages have finished, we can try creating new stages. + result = createQueryStages(currentPhysicalPlan) + } + + // Run the final plan when there's no more unfinished stages. + currentPhysicalPlan = applyPhysicalRules( + optimizeQueryStage(result.newPlan, isFinalStage = true), + postStageCreationRules(supportsColumnar), + Some((planChangeLogger, "AQE Post Stage Creation"))) + isFinalPlan = true + executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan))) + currentPhysicalPlan + } + } + + // Use a lazy val to avoid this being called more than once. + @transient private lazy val finalPlanUpdate: Unit = { + // Subqueries that don't belong to any query stage of the main query will execute after the + // last UI update in `getFinalPhysicalPlan`, so we need to update UI here again to make sure + // the newly generated nodes of those subqueries are updated. + if (!isSubquery && currentPhysicalPlan.find(_.subqueries.nonEmpty).isDefined) { + getExecutionId.foreach(onUpdatePlan(_, Seq.empty)) + } + logOnLevel(s"Final plan: $currentPhysicalPlan") + } + + override def executeCollect(): Array[InternalRow] = { + withFinalPlanUpdate(_.executeCollect()) + } + + override def executeTake(n: Int): Array[InternalRow] = { + withFinalPlanUpdate(_.executeTake(n)) + } + + override def executeTail(n: Int): Array[InternalRow] = { + withFinalPlanUpdate(_.executeTail(n)) + } + + override def doExecute(): RDD[InternalRow] = { + withFinalPlanUpdate(_.execute()) + } + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + withFinalPlanUpdate(_.executeColumnar()) + } + + override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { + withFinalPlanUpdate { finalPlan => + assert(finalPlan.isInstanceOf[BroadcastQueryStageExec]) + finalPlan.doExecuteBroadcast() + } + } + + private def withFinalPlanUpdate[T](fun: SparkPlan => T): T = { + val plan = getFinalPhysicalPlan() + val result = fun(plan) + finalPlanUpdate + result + } + + protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan") + + override def generateTreeString( + depth: Int, + lastChildren: Seq[Boolean], + append: String => Unit, + verbose: Boolean, + prefix: String = "", + addSuffix: Boolean = false, + maxFields: Int, + printNodeId: Boolean, + indent: Int = 0): Unit = { + super.generateTreeString( + depth, + lastChildren, + append, + verbose, + prefix, + addSuffix, + maxFields, + printNodeId, + indent) + if (currentPhysicalPlan.fastEquals(initialPlan)) { + currentPhysicalPlan.generateTreeString( + depth + 1, + lastChildren :+ true, + append, + verbose, + prefix = "", + addSuffix = false, + maxFields, + printNodeId, + indent) + } else { + generateTreeStringWithHeader( + if (isFinalPlan) "Final Plan" else "Current Plan", + currentPhysicalPlan, + depth, + lastChildren, + append, + verbose, + maxFields, + printNodeId) + generateTreeStringWithHeader( + "Initial Plan", + initialPlan, + depth, + lastChildren, + append, + verbose, + maxFields, + printNodeId) + } + } + + + private def generateTreeStringWithHeader( + header: String, + plan: SparkPlan, + depth: Int, + lastChildren: Seq[Boolean], + append: String => Unit, + verbose: Boolean, + maxFields: Int, + printNodeId: Boolean): Unit = { + append(" " * depth) + append(s"+- == $header ==\n") + plan.generateTreeString( + 0, + Nil, + append, + verbose, + prefix = "", + addSuffix = false, + maxFields, + printNodeId, + indent = depth + 1) + } + + override def hashCode(): Int = inputPlan.hashCode() + + override def equals(obj: Any): Boolean = { + if (!obj.isInstanceOf[AdaptiveSparkPlanExec]) { + return false + } + + this.inputPlan == obj.asInstanceOf[AdaptiveSparkPlanExec].inputPlan + } + + /** + * This method is called recursively to traverse the plan tree bottom-up and create a new query + * stage or try reusing an existing stage if the current node is an [[Exchange]] node and all of + * its child stages have been materialized. + * + * With each call, it returns: + * 1) The new plan replaced with [[QueryStageExec]] nodes where new stages are created. + * 2) Whether the child query stages (if any) of the current node have all been materialized. + * 3) A list of the new query stages that have been created. + */ + private def createQueryStages(plan: SparkPlan): CreateStageResult = plan match { + case e: Exchange => + // First have a quick check in the `stageCache` without having to traverse down the node. + context.stageCache.get(e.canonicalized) match { + case Some(existingStage) if conf.exchangeReuseEnabled && (existingStage.schema == e.schema) => + val stage = reuseQueryStage(existingStage, e) + val isMaterialized = stage.isMaterialized + CreateStageResult( + newPlan = stage, + allChildStagesMaterialized = isMaterialized, + newStages = if (isMaterialized) Seq.empty else Seq(stage)) + + case _ => + val result = createQueryStages(e.child) + val newPlan = e.withNewChildren(Seq(result.newPlan)).asInstanceOf[Exchange] + // Create a query stage only when all the child query stages are ready. + if (result.allChildStagesMaterialized) { + var newStage = newQueryStage(newPlan) + if (conf.exchangeReuseEnabled) { + // Check the `stageCache` again for reuse. If a match is found, ditch the new stage + // and reuse the existing stage found in the `stageCache`, otherwise update the + // `stageCache` with the new stage. + val queryStage = context.stageCache.getOrElseUpdate( + newStage.plan.canonicalized, newStage) + if (queryStage.ne(newStage) && (queryStage.schema == e.schema)) { + newStage = reuseQueryStage(queryStage, e) + } + } + val isMaterialized = newStage.isMaterialized + CreateStageResult( + newPlan = newStage, + allChildStagesMaterialized = isMaterialized, + newStages = if (isMaterialized) Seq.empty else Seq(newStage)) + } else { + CreateStageResult(newPlan = newPlan, + allChildStagesMaterialized = false, newStages = result.newStages) + } + } + + case q: QueryStageExec => + CreateStageResult(newPlan = q, + allChildStagesMaterialized = q.isMaterialized, newStages = Seq.empty) + + case _ => + if (plan.children.isEmpty) { + CreateStageResult(newPlan = plan, allChildStagesMaterialized = true, newStages = Seq.empty) + } else { + val results = plan.children.map(createQueryStages) + CreateStageResult( + newPlan = plan.withNewChildren(results.map(_.newPlan)), + allChildStagesMaterialized = results.forall(_.allChildStagesMaterialized), + newStages = results.flatMap(_.newStages)) + } + } + + private def newQueryStage(e: Exchange): QueryStageExec = { + val optimizedPlan = optimizeQueryStage(e.child, isFinalStage = false) + val queryStage = e match { + case s: ShuffleExchangeLike => + val newShuffle = applyPhysicalRules( + s.withNewChildren(Seq(optimizedPlan)), + postStageCreationRules(outputsColumnar = s.supportsColumnar), + Some((planChangeLogger, "AQE Post Stage Creation"))) + if (!newShuffle.isInstanceOf[ShuffleExchangeLike]) { + throw new IllegalStateException( + "Custom columnar rules cannot transform shuffle node to something else.") + } + ShuffleQueryStageExec(currentStageId, newShuffle, s.canonicalized) + case b: BroadcastExchangeLike => + val newBroadcast = applyPhysicalRules( + b.withNewChildren(Seq(optimizedPlan)), + postStageCreationRules(outputsColumnar = b.supportsColumnar), + Some((planChangeLogger, "AQE Post Stage Creation"))) + if (!newBroadcast.isInstanceOf[BroadcastExchangeLike]) { + throw new IllegalStateException( + "Custom columnar rules cannot transform broadcast node to something else.") + } + BroadcastQueryStageExec(currentStageId, newBroadcast, b.canonicalized) + } + currentStageId += 1 + setLogicalLinkForNewQueryStage(queryStage, e) + queryStage + } + + private def reuseQueryStage(existing: QueryStageExec, exchange: Exchange): QueryStageExec = { + val queryStage = existing.newReuseInstance(currentStageId, exchange.output) + currentStageId += 1 + setLogicalLinkForNewQueryStage(queryStage, exchange) + queryStage + } + + /** + * Set the logical node link of the `stage` as the corresponding logical node of the `plan` it + * encloses. If an `plan` has been transformed from a `Repartition`, it should have `logicalLink` + * available by itself; otherwise traverse down to find the first node that is not generated by + * `EnsureRequirements`. + */ + private def setLogicalLinkForNewQueryStage(stage: QueryStageExec, plan: SparkPlan): Unit = { + val link = plan.getTagValue(TEMP_LOGICAL_PLAN_TAG).orElse( + plan.logicalLink.orElse(plan.collectFirst { + case p if p.getTagValue(TEMP_LOGICAL_PLAN_TAG).isDefined => + p.getTagValue(TEMP_LOGICAL_PLAN_TAG).get + case p if p.logicalLink.isDefined => p.logicalLink.get + })) + assert(link.isDefined) + stage.setLogicalLink(link.get) + } + + /** + * For each query stage in `stagesToReplace`, find their corresponding logical nodes in the + * `logicalPlan` and replace them with new [[LogicalQueryStage]] nodes. + * 1. If the query stage can be mapped to an integral logical sub-tree, replace the corresponding + * logical sub-tree with a leaf node [[LogicalQueryStage]] referencing this query stage. For + * example: + * Join SMJ SMJ + * / \ / \ / \ + * r1 r2 => Xchg1 Xchg2 => Stage1 Stage2 + * | | + * r1 r2 + * The updated plan node will be: + * Join + * / \ + * LogicalQueryStage1(Stage1) LogicalQueryStage2(Stage2) + * + * 2. Otherwise (which means the query stage can only be mapped to part of a logical sub-tree), + * replace the corresponding logical sub-tree with a leaf node [[LogicalQueryStage]] + * referencing to the top physical node into which this logical node is transformed during + * physical planning. For example: + * Agg HashAgg HashAgg + * | | | + * child => Xchg => Stage1 + * | + * HashAgg + * | + * child + * The updated plan node will be: + * LogicalQueryStage(HashAgg - Stage1) + */ + private def replaceWithQueryStagesInLogicalPlan( + plan: LogicalPlan, + stagesToReplace: Seq[QueryStageExec]): LogicalPlan = { + var logicalPlan = plan + stagesToReplace.foreach { + case stage if currentPhysicalPlan.find(_.eq(stage)).isDefined => + val logicalNodeOpt = stage.getTagValue(TEMP_LOGICAL_PLAN_TAG).orElse(stage.logicalLink) + assert(logicalNodeOpt.isDefined) + val logicalNode = logicalNodeOpt.get + val physicalNode = currentPhysicalPlan.collectFirst { + case p if p.eq(stage) || + p.getTagValue(TEMP_LOGICAL_PLAN_TAG).exists(logicalNode.eq) || + p.logicalLink.exists(logicalNode.eq) => p + } + assert(physicalNode.isDefined) + // Set the temp link for those nodes that are wrapped inside a `LogicalQueryStage` node for + // they will be shared and reused by different physical plans and their usual logical links + // can be overwritten through re-planning processes. + setTempTagRecursive(physicalNode.get, logicalNode) + // Replace the corresponding logical node with LogicalQueryStage + val newLogicalNode = LogicalQueryStage(logicalNode, physicalNode.get) + val newLogicalPlan = logicalPlan.transformDown { + case p if p.eq(logicalNode) => newLogicalNode + } + logicalPlan = newLogicalPlan + + case _ => // Ignore those earlier stages that have been wrapped in later stages. + } + logicalPlan + } + + /** + * Re-optimize and run physical planning on the current logical plan based on the latest stats. + */ + private def reOptimize( + logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = context.qe.withCteMap { + logicalPlan.invalidateStatsCache() + val optimized = optimizer.execute(logicalPlan) + val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next() + val newPlan = applyPhysicalRules( + sparkPlan, + preprocessingRules ++ queryStagePreparationRules, + Some((planChangeLogger, "AQE Replanning"))) + + // When both enabling AQE and DPP, `PlanAdaptiveDynamicPruningFilters` rule will + // add the `BroadcastExchangeExec` node manually in the DPP subquery, + // not through `EnsureRequirements` rule. Therefore, when the DPP subquery is complicated + // and need to be re-optimized, AQE also need to manually insert the `BroadcastExchangeExec` + // node to prevent the loss of the `BroadcastExchangeExec` node in DPP subquery. + // Here, we also need to avoid to insert the `BroadcastExchangeExec` node when the newPlan + // is already the `BroadcastExchangeExec` plan after apply the `LogicalQueryStageStrategy` rule. + val finalPlan = currentPhysicalPlan match { + case b: BroadcastExchangeLike + if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => b.withNewChildren(Seq(newPlan)) + case _ => newPlan + } + + (finalPlan, optimized) + } + + /** + * Recursively set `TEMP_LOGICAL_PLAN_TAG` for the current `plan` node. + */ + private def setTempTagRecursive(plan: SparkPlan, logicalPlan: LogicalPlan): Unit = { + plan.setTagValue(TEMP_LOGICAL_PLAN_TAG, logicalPlan) + plan.children.foreach(c => setTempTagRecursive(c, logicalPlan)) + } + + /** + * Unset all `TEMP_LOGICAL_PLAN_TAG` tags. + */ + private def cleanUpTempTags(plan: SparkPlan): Unit = { + plan.foreach { + case plan: SparkPlan if plan.getTagValue(TEMP_LOGICAL_PLAN_TAG).isDefined => + plan.unsetTagValue(TEMP_LOGICAL_PLAN_TAG) + case _ => + } + } + + /** + * Notify the listeners of the physical plan change. + */ + private def onUpdatePlan(executionId: Long, newSubPlans: Seq[SparkPlan]): Unit = { + if (isSubquery) { + // When executing subqueries, we can't update the query plan in the UI as the + // UI doesn't support partial update yet. However, the subquery may have been + // optimized into a different plan and we must let the UI know the SQL metrics + // of the new plan nodes, so that it can track the valid accumulator updates later + // and display SQL metrics correctly. + val newMetrics = newSubPlans.flatMap { p => + p.flatMap(_.metrics.values.map(m => SQLPlanMetric(m.name.get, m.id, m.metricType))) + } + context.session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveSQLMetricUpdates( + executionId.toLong, newMetrics)) + } else { + val planDescriptionMode = ExplainMode.fromString(conf.uiExplainMode) + context.session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate( + executionId, + context.qe.explainString(planDescriptionMode), + SparkPlanInfo.fromSparkPlan(context.qe.executedPlan))) + } + } + + /** + * Cancel all running stages with best effort and throw an Exception containing all stage + * materialization errors and stage cancellation errors. + */ + private def cleanUpAndThrowException( + errors: Seq[Throwable], + earlyFailedStage: Option[Int]): Unit = { + currentPhysicalPlan.foreach { + // earlyFailedStage is the stage which failed before calling doMaterialize, + // so we should avoid calling cancel on it to re-trigger the failure again. + case s: QueryStageExec if !earlyFailedStage.contains(s.id) => + try { + s.cancel() + } catch { + case NonFatal(t) => + logError(s"Exception in cancelling query stage: ${s.treeString}", t) + } + case _ => + } + val e = if (errors.size == 1) { + errors.head + } else { + val se = QueryExecutionErrors.multiFailuresInStageMaterializationError(errors.head) + errors.tail.foreach(se.addSuppressed) + se + } + throw e + } +} + +object AdaptiveSparkPlanExec { + private[adaptive] val executionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("QueryStageCreator", 16)) + + /** + * The temporary [[LogicalPlan]] link for query stages. + * + * Physical nodes wrapped in a [[LogicalQueryStage]] can be shared among different physical plans + * and thus their usual logical links can be overwritten during query planning, leading to + * situations where those nodes point to a new logical plan and the rest point to the current + * logical plan. In this case we use temp logical links to make sure we can always trace back to + * the original logical links until a new physical plan is adopted, by which time we can clear up + * the temp logical links. + */ + val TEMP_LOGICAL_PLAN_TAG = TreeNodeTag[LogicalPlan]("temp_logical_plan") + + /** + * Apply a list of physical operator rules on a [[SparkPlan]]. + */ + def applyPhysicalRules( + plan: SparkPlan, + rules: Seq[Rule[SparkPlan]], + loggerAndBatchName: Option[(PlanChangeLogger[SparkPlan], String)] = None): SparkPlan = { + if (loggerAndBatchName.isEmpty) { + rules.foldLeft(plan) { case (sp, rule) => rule.apply(sp) } + } else { + val (logger, batchName) = loggerAndBatchName.get + val newPlan = rules.foldLeft(plan) { case (sp, rule) => + val result = rule.apply(sp) + logger.logRule(rule.ruleName, sp, result) + result + } + logger.logBatch(batchName, plan, newPlan) + newPlan + } + } +} + +/** + * The execution context shared between the main query and all sub-queries. + */ +case class AdaptiveExecutionContext(session: SparkSession, qe: QueryExecution) { + + /** + * The subquery-reuse map shared across the entire query. + */ + val subqueryCache: TrieMap[SparkPlan, BaseSubqueryExec] = + new TrieMap[SparkPlan, BaseSubqueryExec]() + + /** + * The exchange-reuse map shared across the entire query, including sub-queries. + */ + val stageCache: TrieMap[SparkPlan, QueryStageExec] = + new TrieMap[SparkPlan, QueryStageExec]() +} + +/** + * The event type for stage materialization. + */ +sealed trait StageMaterializationEvent + +/** + * The materialization of a query stage completed with success. + */ +case class StageSuccess(stage: QueryStageExec, result: Any) extends StageMaterializationEvent + +/** + * The materialization of a query stage hit an error and failed. + */ +case class StageFailure(stage: QueryStageExec, error: Throwable) extends StageMaterializationEvent diff --git a/shims/spark321/src/main/scala/org/apache/spark/util/ShimUtils.scala b/shims/spark321/src/main/scala/org/apache/spark/util/ShimUtils.scala index 551b03749..5eaf1f0ff 100644 --- a/shims/spark321/src/main/scala/org/apache/spark/util/ShimUtils.scala +++ b/shims/spark321/src/main/scala/org/apache/spark/util/ShimUtils.scala @@ -37,7 +37,8 @@ object ShimUtils { shuffleId: Int, mapId: Long, partitionLengths: Array[Long], dataTmp: File): Unit = { shuffleBlockResolver match { case resolver: IndexShuffleBlockResolver => - resolver.writeMetadataFileAndCommit(shuffleId, mapId, partitionLengths, null, dataTmp) + // Set checksums with an empty array. TODO: do we need to pass the actual checksum? + resolver.writeMetadataFileAndCommit(shuffleId, mapId, partitionLengths, Array[Long](), dataTmp) case _ => throw new RuntimeException ("IndexShuffleBlockResolver is expected!") } } From 2ac622ba1e8983024a8f5ca3c9d6fc6666a75a9c Mon Sep 17 00:00:00 2001 From: philo Date: Tue, 15 Mar 2022 11:55:24 +0800 Subject: [PATCH 02/22] Remove AdaptiveSparkPlanExec in spark-3.2 shim layer --- .../adaptive/AdaptiveSparkPlanExec.scala | 810 ------------------ 1 file changed, 810 deletions(-) delete mode 100644 shims/spark321/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala diff --git a/shims/spark321/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/shims/spark321/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala deleted file mode 100644 index d87e2459c..000000000 --- a/shims/spark321/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ /dev/null @@ -1,810 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.adaptive - -import java.util -import java.util.concurrent.LinkedBlockingQueue - -import scala.collection.JavaConverters._ -import scala.collection.concurrent.TrieMap -import scala.collection.mutable -import scala.concurrent.ExecutionContext -import scala.util.control.NonFatal - -import org.apache.spark.broadcast -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} -import org.apache.spark.sql.catalyst.plans.physical.{Distribution, UnspecifiedDistribution} -import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} -import org.apache.spark.sql.catalyst.trees.TreeNodeTag -import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec._ -import org.apache.spark.sql.execution.bucketing.DisableUnnecessaryBucketedScan -import org.apache.spark.sql.execution.exchange._ -import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates, SQLPlanMetric} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.ThreadUtils - -/** - * A root node to execute the query plan adaptively. It splits the query plan into independent - * stages and executes them in order according to their dependencies. The query stage - * materializes its output at the end. When one stage completes, the data statistics of the - * materialized output will be used to optimize the remainder of the query. - * - * To create query stages, we traverse the query tree bottom up. When we hit an exchange node, - * and if all the child query stages of this exchange node are materialized, we create a new - * query stage for this exchange node. The new stage is then materialized asynchronously once it - * is created. - * - * When one query stage finishes materialization, the rest query is re-optimized and planned based - * on the latest statistics provided by all materialized stages. Then we traverse the query plan - * again and create more stages if possible. After all stages have been materialized, we execute - * the rest of the plan. - */ -case class AdaptiveSparkPlanExec( - inputPlan: SparkPlan, - @transient context: AdaptiveExecutionContext, - @transient preprocessingRules: Seq[Rule[SparkPlan]], - @transient isSubquery: Boolean, - @transient override val supportsColumnar: Boolean = false) - extends LeafExecNode { - - @transient private val lock = new Object() - - @transient private val logOnLevel: ( => String) => Unit = conf.adaptiveExecutionLogLevel match { - case "TRACE" => logTrace(_) - case "DEBUG" => logDebug(_) - case "INFO" => logInfo(_) - case "WARN" => logWarning(_) - case "ERROR" => logError(_) - case _ => logDebug(_) - } - - @transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]() - - // The logical plan optimizer for re-optimizing the current logical plan. - @transient private val optimizer = new AQEOptimizer(conf) - - // `EnsureRequirements` may remove user-specified repartition and assume the query plan won't - // change its output partitioning. This assumption is not true in AQE. Here we check the - // `inputPlan` which has not been processed by `EnsureRequirements` yet, to find out the - // effective user-specified repartition. Later on, the AQE framework will make sure the final - // output partitioning is not changed w.r.t the effective user-specified repartition. - @transient private val requiredDistribution: Option[Distribution] = if (isSubquery) { - // Subquery output does not need a specific output partitioning. - Some(UnspecifiedDistribution) - } else { - AQEUtils.getRequiredDistribution(inputPlan) - } - - // A list of physical plan rules to be applied before creation of query stages. The physical - // plan should reach a final status of query stages (i.e., no more addition or removal of - // Exchange nodes) after running these rules. - @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( - RemoveRedundantProjects, - // For cases like `df.repartition(a, b).select(c)`, there is no distribution requirement for - // the final plan, but we do need to respect the user-specified repartition. Here we ask - // `EnsureRequirements` to not optimize out the user-specified repartition-by-col to work - // around this case. - EnsureRequirements(optimizeOutRepartition = requiredDistribution.isDefined), - RemoveRedundantSorts, - DisableUnnecessaryBucketedScan - ) ++ context.session.sessionState.queryStagePrepRules - - // A list of physical optimizer rules to be applied to a new stage before its execution. These - // optimizations should be stage-independent. - @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( - PlanAdaptiveDynamicPruningFilters(this), - ReuseAdaptiveSubquery(context.subqueryCache), - // Skew join does not handle `AQEShuffleRead` so needs to be applied first. - OptimizeSkewedJoin, - OptimizeSkewInRebalancePartitions, - CoalesceShufflePartitions(context.session), - // `OptimizeShuffleWithLocalRead` needs to make use of 'AQEShuffleReadExec.partitionSpecs' - // added by `CoalesceShufflePartitions`, and must be executed after it. - OptimizeShuffleWithLocalRead - ) - - // This rule is stateful as it maintains the codegen stage ID. We can't create a fresh one every - // time and need to keep it in a variable. - @transient private val collapseCodegenStagesRule: Rule[SparkPlan] = - CollapseCodegenStages() - - // A list of physical optimizer rules to be applied right after a new stage is created. The input - // plan to these rules has exchange as its root node. - private def postStageCreationRules(outputsColumnar: Boolean) = Seq( - ApplyColumnarRulesAndInsertTransitions( - context.session.sessionState.columnarRules, outputsColumnar), - collapseCodegenStagesRule - ) - - private def optimizeQueryStage( - plan: SparkPlan, - isFinalStage: Boolean): SparkPlan = context.qe.withCteMap { - val optimized = queryStageOptimizerRules.foldLeft(plan) { case (latestPlan, rule) => - val applied = rule.apply(latestPlan) - val result = rule match { - case _: AQEShuffleReadRule if !applied.fastEquals(latestPlan) => - val distribution = if (isFinalStage) { - // If `requiredDistribution` is None, it means `EnsureRequirements` will not optimize - // out the user-specified repartition, thus we don't have a distribution requirement - // for the final plan. - requiredDistribution.getOrElse(UnspecifiedDistribution) - } else { - UnspecifiedDistribution - } - if (ValidateRequirements.validate(applied, distribution)) { - applied - } else { - logDebug(s"Rule ${rule.ruleName} is not applied as it breaks the " + - "distribution requirement of the query plan.") - latestPlan - } - case _ => applied - } - planChangeLogger.logRule(rule.ruleName, latestPlan, result) - result - } - planChangeLogger.logBatch("AQE Query Stage Optimization", plan, optimized) - optimized - } - - @transient private val costEvaluator = - conf.getConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS) match { - case Some(className) => CostEvaluator.instantiate(className, session.sparkContext.getConf) - case _ => SimpleCostEvaluator - } - - @transient val initialPlan = context.session.withActive { - applyPhysicalRules( - inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations"))) - } - - @volatile private var currentPhysicalPlan = initialPlan - - private var isFinalPlan = false - - private var currentStageId = 0 - - /** - * Return type for `createQueryStages` - * @param newPlan the new plan with created query stages. - * @param allChildStagesMaterialized whether all child stages have been materialized. - * @param newStages the newly created query stages, including new reused query stages. - */ - private case class CreateStageResult( - newPlan: SparkPlan, - allChildStagesMaterialized: Boolean, - newStages: Seq[QueryStageExec]) - - def executedPlan: SparkPlan = currentPhysicalPlan - - override def conf: SQLConf = context.session.sessionState.conf - - override def output: Seq[Attribute] = inputPlan.output - - override def doCanonicalize(): SparkPlan = inputPlan.canonicalized - - override def resetMetrics(): Unit = { - metrics.valuesIterator.foreach(_.reset()) - executedPlan.resetMetrics() - } - - private def getExecutionId: Option[Long] = { - // If the `QueryExecution` does not match the current execution ID, it means the execution ID - // belongs to another (parent) query, and we should not call update UI in this query. - Option(context.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)) - .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe) - } - - private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { - if (isFinalPlan) return currentPhysicalPlan - - // In case of this adaptive plan being executed out of `withActive` scoped functions, e.g., - // `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be - // created in the middle of the execution. - context.session.withActive { - val executionId = getExecutionId - // Use inputPlan logicalLink here in case some top level physical nodes may be removed - // during `initialPlan` - var currentLogicalPlan = inputPlan.logicalLink.get - var result = createQueryStages(currentPhysicalPlan) - val events = new LinkedBlockingQueue[StageMaterializationEvent]() - val errors = new mutable.ArrayBuffer[Throwable]() - var stagesToReplace = Seq.empty[QueryStageExec] - while (!result.allChildStagesMaterialized) { - currentPhysicalPlan = result.newPlan - if (result.newStages.nonEmpty) { - stagesToReplace = result.newStages ++ stagesToReplace - executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan))) - - // SPARK-33933: we should submit tasks of broadcast stages first, to avoid waiting - // for tasks to be scheduled and leading to broadcast timeout. - // This partial fix only guarantees the start of materialization for BroadcastQueryStage - // is prior to others, but because the submission of collect job for broadcasting is - // running in another thread, the issue is not completely resolved. - val reorderedNewStages = result.newStages - .sortWith { - case (_: BroadcastQueryStageExec, _: BroadcastQueryStageExec) => false - case (_: BroadcastQueryStageExec, _) => true - case _ => false - } - - // Start materialization of all new stages and fail fast if any stages failed eagerly - reorderedNewStages.foreach { stage => - try { - stage.materialize().onComplete { res => - if (res.isSuccess) { - events.offer(StageSuccess(stage, res.get)) - } else { - events.offer(StageFailure(stage, res.failed.get)) - } - }(AdaptiveSparkPlanExec.executionContext) - } catch { - case e: Throwable => - cleanUpAndThrowException(Seq(e), Some(stage.id)) - } - } - } - - // Wait on the next completed stage, which indicates new stats are available and probably - // new stages can be created. There might be other stages that finish at around the same - // time, so we process those stages too in order to reduce re-planning. - val nextMsg = events.take() - val rem = new util.ArrayList[StageMaterializationEvent]() - events.drainTo(rem) - (Seq(nextMsg) ++ rem.asScala).foreach { - case StageSuccess(stage, res) => - stage.resultOption.set(Some(res)) - case StageFailure(stage, ex) => - errors.append(ex) - } - - // In case of errors, we cancel all running stages and throw exception. - if (errors.nonEmpty) { - cleanUpAndThrowException(errors.toSeq, None) - } - - // Try re-optimizing and re-planning. Adopt the new plan if its cost is equal to or less - // than that of the current plan; otherwise keep the current physical plan together with - // the current logical plan since the physical plan's logical links point to the logical - // plan it has originated from. - // Meanwhile, we keep a list of the query stages that have been created since last plan - // update, which stands for the "semantic gap" between the current logical and physical - // plans. And each time before re-planning, we replace the corresponding nodes in the - // current logical plan with logical query stages to make it semantically in sync with - // the current physical plan. Once a new plan is adopted and both logical and physical - // plans are updated, we can clear the query stage list because at this point the two plans - // are semantically and physically in sync again. - val logicalPlan = replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace) - val (newPhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan) - val origCost = costEvaluator.evaluateCost(currentPhysicalPlan) - val newCost = costEvaluator.evaluateCost(newPhysicalPlan) - if (newCost < origCost || - (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) { - logOnLevel(s"Plan changed from $currentPhysicalPlan to $newPhysicalPlan") - cleanUpTempTags(newPhysicalPlan) - currentPhysicalPlan = newPhysicalPlan - currentLogicalPlan = newLogicalPlan - stagesToReplace = Seq.empty[QueryStageExec] - } - // Now that some stages have finished, we can try creating new stages. - result = createQueryStages(currentPhysicalPlan) - } - - // Run the final plan when there's no more unfinished stages. - currentPhysicalPlan = applyPhysicalRules( - optimizeQueryStage(result.newPlan, isFinalStage = true), - postStageCreationRules(supportsColumnar), - Some((planChangeLogger, "AQE Post Stage Creation"))) - isFinalPlan = true - executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan))) - currentPhysicalPlan - } - } - - // Use a lazy val to avoid this being called more than once. - @transient private lazy val finalPlanUpdate: Unit = { - // Subqueries that don't belong to any query stage of the main query will execute after the - // last UI update in `getFinalPhysicalPlan`, so we need to update UI here again to make sure - // the newly generated nodes of those subqueries are updated. - if (!isSubquery && currentPhysicalPlan.find(_.subqueries.nonEmpty).isDefined) { - getExecutionId.foreach(onUpdatePlan(_, Seq.empty)) - } - logOnLevel(s"Final plan: $currentPhysicalPlan") - } - - override def executeCollect(): Array[InternalRow] = { - withFinalPlanUpdate(_.executeCollect()) - } - - override def executeTake(n: Int): Array[InternalRow] = { - withFinalPlanUpdate(_.executeTake(n)) - } - - override def executeTail(n: Int): Array[InternalRow] = { - withFinalPlanUpdate(_.executeTail(n)) - } - - override def doExecute(): RDD[InternalRow] = { - withFinalPlanUpdate(_.execute()) - } - - override def doExecuteColumnar(): RDD[ColumnarBatch] = { - withFinalPlanUpdate(_.executeColumnar()) - } - - override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { - withFinalPlanUpdate { finalPlan => - assert(finalPlan.isInstanceOf[BroadcastQueryStageExec]) - finalPlan.doExecuteBroadcast() - } - } - - private def withFinalPlanUpdate[T](fun: SparkPlan => T): T = { - val plan = getFinalPhysicalPlan() - val result = fun(plan) - finalPlanUpdate - result - } - - protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan") - - override def generateTreeString( - depth: Int, - lastChildren: Seq[Boolean], - append: String => Unit, - verbose: Boolean, - prefix: String = "", - addSuffix: Boolean = false, - maxFields: Int, - printNodeId: Boolean, - indent: Int = 0): Unit = { - super.generateTreeString( - depth, - lastChildren, - append, - verbose, - prefix, - addSuffix, - maxFields, - printNodeId, - indent) - if (currentPhysicalPlan.fastEquals(initialPlan)) { - currentPhysicalPlan.generateTreeString( - depth + 1, - lastChildren :+ true, - append, - verbose, - prefix = "", - addSuffix = false, - maxFields, - printNodeId, - indent) - } else { - generateTreeStringWithHeader( - if (isFinalPlan) "Final Plan" else "Current Plan", - currentPhysicalPlan, - depth, - lastChildren, - append, - verbose, - maxFields, - printNodeId) - generateTreeStringWithHeader( - "Initial Plan", - initialPlan, - depth, - lastChildren, - append, - verbose, - maxFields, - printNodeId) - } - } - - - private def generateTreeStringWithHeader( - header: String, - plan: SparkPlan, - depth: Int, - lastChildren: Seq[Boolean], - append: String => Unit, - verbose: Boolean, - maxFields: Int, - printNodeId: Boolean): Unit = { - append(" " * depth) - append(s"+- == $header ==\n") - plan.generateTreeString( - 0, - Nil, - append, - verbose, - prefix = "", - addSuffix = false, - maxFields, - printNodeId, - indent = depth + 1) - } - - override def hashCode(): Int = inputPlan.hashCode() - - override def equals(obj: Any): Boolean = { - if (!obj.isInstanceOf[AdaptiveSparkPlanExec]) { - return false - } - - this.inputPlan == obj.asInstanceOf[AdaptiveSparkPlanExec].inputPlan - } - - /** - * This method is called recursively to traverse the plan tree bottom-up and create a new query - * stage or try reusing an existing stage if the current node is an [[Exchange]] node and all of - * its child stages have been materialized. - * - * With each call, it returns: - * 1) The new plan replaced with [[QueryStageExec]] nodes where new stages are created. - * 2) Whether the child query stages (if any) of the current node have all been materialized. - * 3) A list of the new query stages that have been created. - */ - private def createQueryStages(plan: SparkPlan): CreateStageResult = plan match { - case e: Exchange => - // First have a quick check in the `stageCache` without having to traverse down the node. - context.stageCache.get(e.canonicalized) match { - case Some(existingStage) if conf.exchangeReuseEnabled && (existingStage.schema == e.schema) => - val stage = reuseQueryStage(existingStage, e) - val isMaterialized = stage.isMaterialized - CreateStageResult( - newPlan = stage, - allChildStagesMaterialized = isMaterialized, - newStages = if (isMaterialized) Seq.empty else Seq(stage)) - - case _ => - val result = createQueryStages(e.child) - val newPlan = e.withNewChildren(Seq(result.newPlan)).asInstanceOf[Exchange] - // Create a query stage only when all the child query stages are ready. - if (result.allChildStagesMaterialized) { - var newStage = newQueryStage(newPlan) - if (conf.exchangeReuseEnabled) { - // Check the `stageCache` again for reuse. If a match is found, ditch the new stage - // and reuse the existing stage found in the `stageCache`, otherwise update the - // `stageCache` with the new stage. - val queryStage = context.stageCache.getOrElseUpdate( - newStage.plan.canonicalized, newStage) - if (queryStage.ne(newStage) && (queryStage.schema == e.schema)) { - newStage = reuseQueryStage(queryStage, e) - } - } - val isMaterialized = newStage.isMaterialized - CreateStageResult( - newPlan = newStage, - allChildStagesMaterialized = isMaterialized, - newStages = if (isMaterialized) Seq.empty else Seq(newStage)) - } else { - CreateStageResult(newPlan = newPlan, - allChildStagesMaterialized = false, newStages = result.newStages) - } - } - - case q: QueryStageExec => - CreateStageResult(newPlan = q, - allChildStagesMaterialized = q.isMaterialized, newStages = Seq.empty) - - case _ => - if (plan.children.isEmpty) { - CreateStageResult(newPlan = plan, allChildStagesMaterialized = true, newStages = Seq.empty) - } else { - val results = plan.children.map(createQueryStages) - CreateStageResult( - newPlan = plan.withNewChildren(results.map(_.newPlan)), - allChildStagesMaterialized = results.forall(_.allChildStagesMaterialized), - newStages = results.flatMap(_.newStages)) - } - } - - private def newQueryStage(e: Exchange): QueryStageExec = { - val optimizedPlan = optimizeQueryStage(e.child, isFinalStage = false) - val queryStage = e match { - case s: ShuffleExchangeLike => - val newShuffle = applyPhysicalRules( - s.withNewChildren(Seq(optimizedPlan)), - postStageCreationRules(outputsColumnar = s.supportsColumnar), - Some((planChangeLogger, "AQE Post Stage Creation"))) - if (!newShuffle.isInstanceOf[ShuffleExchangeLike]) { - throw new IllegalStateException( - "Custom columnar rules cannot transform shuffle node to something else.") - } - ShuffleQueryStageExec(currentStageId, newShuffle, s.canonicalized) - case b: BroadcastExchangeLike => - val newBroadcast = applyPhysicalRules( - b.withNewChildren(Seq(optimizedPlan)), - postStageCreationRules(outputsColumnar = b.supportsColumnar), - Some((planChangeLogger, "AQE Post Stage Creation"))) - if (!newBroadcast.isInstanceOf[BroadcastExchangeLike]) { - throw new IllegalStateException( - "Custom columnar rules cannot transform broadcast node to something else.") - } - BroadcastQueryStageExec(currentStageId, newBroadcast, b.canonicalized) - } - currentStageId += 1 - setLogicalLinkForNewQueryStage(queryStage, e) - queryStage - } - - private def reuseQueryStage(existing: QueryStageExec, exchange: Exchange): QueryStageExec = { - val queryStage = existing.newReuseInstance(currentStageId, exchange.output) - currentStageId += 1 - setLogicalLinkForNewQueryStage(queryStage, exchange) - queryStage - } - - /** - * Set the logical node link of the `stage` as the corresponding logical node of the `plan` it - * encloses. If an `plan` has been transformed from a `Repartition`, it should have `logicalLink` - * available by itself; otherwise traverse down to find the first node that is not generated by - * `EnsureRequirements`. - */ - private def setLogicalLinkForNewQueryStage(stage: QueryStageExec, plan: SparkPlan): Unit = { - val link = plan.getTagValue(TEMP_LOGICAL_PLAN_TAG).orElse( - plan.logicalLink.orElse(plan.collectFirst { - case p if p.getTagValue(TEMP_LOGICAL_PLAN_TAG).isDefined => - p.getTagValue(TEMP_LOGICAL_PLAN_TAG).get - case p if p.logicalLink.isDefined => p.logicalLink.get - })) - assert(link.isDefined) - stage.setLogicalLink(link.get) - } - - /** - * For each query stage in `stagesToReplace`, find their corresponding logical nodes in the - * `logicalPlan` and replace them with new [[LogicalQueryStage]] nodes. - * 1. If the query stage can be mapped to an integral logical sub-tree, replace the corresponding - * logical sub-tree with a leaf node [[LogicalQueryStage]] referencing this query stage. For - * example: - * Join SMJ SMJ - * / \ / \ / \ - * r1 r2 => Xchg1 Xchg2 => Stage1 Stage2 - * | | - * r1 r2 - * The updated plan node will be: - * Join - * / \ - * LogicalQueryStage1(Stage1) LogicalQueryStage2(Stage2) - * - * 2. Otherwise (which means the query stage can only be mapped to part of a logical sub-tree), - * replace the corresponding logical sub-tree with a leaf node [[LogicalQueryStage]] - * referencing to the top physical node into which this logical node is transformed during - * physical planning. For example: - * Agg HashAgg HashAgg - * | | | - * child => Xchg => Stage1 - * | - * HashAgg - * | - * child - * The updated plan node will be: - * LogicalQueryStage(HashAgg - Stage1) - */ - private def replaceWithQueryStagesInLogicalPlan( - plan: LogicalPlan, - stagesToReplace: Seq[QueryStageExec]): LogicalPlan = { - var logicalPlan = plan - stagesToReplace.foreach { - case stage if currentPhysicalPlan.find(_.eq(stage)).isDefined => - val logicalNodeOpt = stage.getTagValue(TEMP_LOGICAL_PLAN_TAG).orElse(stage.logicalLink) - assert(logicalNodeOpt.isDefined) - val logicalNode = logicalNodeOpt.get - val physicalNode = currentPhysicalPlan.collectFirst { - case p if p.eq(stage) || - p.getTagValue(TEMP_LOGICAL_PLAN_TAG).exists(logicalNode.eq) || - p.logicalLink.exists(logicalNode.eq) => p - } - assert(physicalNode.isDefined) - // Set the temp link for those nodes that are wrapped inside a `LogicalQueryStage` node for - // they will be shared and reused by different physical plans and their usual logical links - // can be overwritten through re-planning processes. - setTempTagRecursive(physicalNode.get, logicalNode) - // Replace the corresponding logical node with LogicalQueryStage - val newLogicalNode = LogicalQueryStage(logicalNode, physicalNode.get) - val newLogicalPlan = logicalPlan.transformDown { - case p if p.eq(logicalNode) => newLogicalNode - } - logicalPlan = newLogicalPlan - - case _ => // Ignore those earlier stages that have been wrapped in later stages. - } - logicalPlan - } - - /** - * Re-optimize and run physical planning on the current logical plan based on the latest stats. - */ - private def reOptimize( - logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = context.qe.withCteMap { - logicalPlan.invalidateStatsCache() - val optimized = optimizer.execute(logicalPlan) - val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next() - val newPlan = applyPhysicalRules( - sparkPlan, - preprocessingRules ++ queryStagePreparationRules, - Some((planChangeLogger, "AQE Replanning"))) - - // When both enabling AQE and DPP, `PlanAdaptiveDynamicPruningFilters` rule will - // add the `BroadcastExchangeExec` node manually in the DPP subquery, - // not through `EnsureRequirements` rule. Therefore, when the DPP subquery is complicated - // and need to be re-optimized, AQE also need to manually insert the `BroadcastExchangeExec` - // node to prevent the loss of the `BroadcastExchangeExec` node in DPP subquery. - // Here, we also need to avoid to insert the `BroadcastExchangeExec` node when the newPlan - // is already the `BroadcastExchangeExec` plan after apply the `LogicalQueryStageStrategy` rule. - val finalPlan = currentPhysicalPlan match { - case b: BroadcastExchangeLike - if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => b.withNewChildren(Seq(newPlan)) - case _ => newPlan - } - - (finalPlan, optimized) - } - - /** - * Recursively set `TEMP_LOGICAL_PLAN_TAG` for the current `plan` node. - */ - private def setTempTagRecursive(plan: SparkPlan, logicalPlan: LogicalPlan): Unit = { - plan.setTagValue(TEMP_LOGICAL_PLAN_TAG, logicalPlan) - plan.children.foreach(c => setTempTagRecursive(c, logicalPlan)) - } - - /** - * Unset all `TEMP_LOGICAL_PLAN_TAG` tags. - */ - private def cleanUpTempTags(plan: SparkPlan): Unit = { - plan.foreach { - case plan: SparkPlan if plan.getTagValue(TEMP_LOGICAL_PLAN_TAG).isDefined => - plan.unsetTagValue(TEMP_LOGICAL_PLAN_TAG) - case _ => - } - } - - /** - * Notify the listeners of the physical plan change. - */ - private def onUpdatePlan(executionId: Long, newSubPlans: Seq[SparkPlan]): Unit = { - if (isSubquery) { - // When executing subqueries, we can't update the query plan in the UI as the - // UI doesn't support partial update yet. However, the subquery may have been - // optimized into a different plan and we must let the UI know the SQL metrics - // of the new plan nodes, so that it can track the valid accumulator updates later - // and display SQL metrics correctly. - val newMetrics = newSubPlans.flatMap { p => - p.flatMap(_.metrics.values.map(m => SQLPlanMetric(m.name.get, m.id, m.metricType))) - } - context.session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveSQLMetricUpdates( - executionId.toLong, newMetrics)) - } else { - val planDescriptionMode = ExplainMode.fromString(conf.uiExplainMode) - context.session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate( - executionId, - context.qe.explainString(planDescriptionMode), - SparkPlanInfo.fromSparkPlan(context.qe.executedPlan))) - } - } - - /** - * Cancel all running stages with best effort and throw an Exception containing all stage - * materialization errors and stage cancellation errors. - */ - private def cleanUpAndThrowException( - errors: Seq[Throwable], - earlyFailedStage: Option[Int]): Unit = { - currentPhysicalPlan.foreach { - // earlyFailedStage is the stage which failed before calling doMaterialize, - // so we should avoid calling cancel on it to re-trigger the failure again. - case s: QueryStageExec if !earlyFailedStage.contains(s.id) => - try { - s.cancel() - } catch { - case NonFatal(t) => - logError(s"Exception in cancelling query stage: ${s.treeString}", t) - } - case _ => - } - val e = if (errors.size == 1) { - errors.head - } else { - val se = QueryExecutionErrors.multiFailuresInStageMaterializationError(errors.head) - errors.tail.foreach(se.addSuppressed) - se - } - throw e - } -} - -object AdaptiveSparkPlanExec { - private[adaptive] val executionContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonCachedThreadPool("QueryStageCreator", 16)) - - /** - * The temporary [[LogicalPlan]] link for query stages. - * - * Physical nodes wrapped in a [[LogicalQueryStage]] can be shared among different physical plans - * and thus their usual logical links can be overwritten during query planning, leading to - * situations where those nodes point to a new logical plan and the rest point to the current - * logical plan. In this case we use temp logical links to make sure we can always trace back to - * the original logical links until a new physical plan is adopted, by which time we can clear up - * the temp logical links. - */ - val TEMP_LOGICAL_PLAN_TAG = TreeNodeTag[LogicalPlan]("temp_logical_plan") - - /** - * Apply a list of physical operator rules on a [[SparkPlan]]. - */ - def applyPhysicalRules( - plan: SparkPlan, - rules: Seq[Rule[SparkPlan]], - loggerAndBatchName: Option[(PlanChangeLogger[SparkPlan], String)] = None): SparkPlan = { - if (loggerAndBatchName.isEmpty) { - rules.foldLeft(plan) { case (sp, rule) => rule.apply(sp) } - } else { - val (logger, batchName) = loggerAndBatchName.get - val newPlan = rules.foldLeft(plan) { case (sp, rule) => - val result = rule.apply(sp) - logger.logRule(rule.ruleName, sp, result) - result - } - logger.logBatch(batchName, plan, newPlan) - newPlan - } - } -} - -/** - * The execution context shared between the main query and all sub-queries. - */ -case class AdaptiveExecutionContext(session: SparkSession, qe: QueryExecution) { - - /** - * The subquery-reuse map shared across the entire query. - */ - val subqueryCache: TrieMap[SparkPlan, BaseSubqueryExec] = - new TrieMap[SparkPlan, BaseSubqueryExec]() - - /** - * The exchange-reuse map shared across the entire query, including sub-queries. - */ - val stageCache: TrieMap[SparkPlan, QueryStageExec] = - new TrieMap[SparkPlan, QueryStageExec]() -} - -/** - * The event type for stage materialization. - */ -sealed trait StageMaterializationEvent - -/** - * The materialization of a query stage completed with success. - */ -case class StageSuccess(stage: QueryStageExec, result: Any) extends StageMaterializationEvent - -/** - * The materialization of a query stage hit an error and failed. - */ -case class StageFailure(stage: QueryStageExec, error: Throwable) extends StageMaterializationEvent From 95d85547b917bde482a29e84335b43a832b81334 Mon Sep 17 00:00:00 2001 From: philo Date: Tue, 15 Mar 2022 23:08:47 +0800 Subject: [PATCH 03/22] Fix issues reported by unit tests --- .../execution/ArrowRowToColumnarExec.scala | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala index d2ffe9e67..60f90e684 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala @@ -29,7 +29,8 @@ import org.apache.arrow.vector.ipc.message.ArrowRecordBatch import org.apache.arrow.vector.types.pojo.Schema import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, UnsafeRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{RowToColumnarExec, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.datasources.v2.arrow.{SparkMemoryUtils, SparkSchemaUtils} import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils.UnsafeItr @@ -38,7 +39,7 @@ import org.apache.spark.sql.execution.vectorized.WritableColumnVector import org.apache.spark.sql.types._ import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} -import org.apache.spark.TaskContext +import org.apache.spark.{TaskContext, broadcast} import org.apache.spark.unsafe.Platform @@ -79,6 +80,12 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def supportsColumnar: Boolean = true + // For spark 3.2. protected def withNewChildInternal(newChild: SparkPlan): ArrowRowToColumnarExec = copy(child = newChild) @@ -87,6 +94,10 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { child.execute() } + override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { + child.executeBroadcast() + } + override def doExecuteColumnar(): RDD[ColumnarBatch] = { val numInputRows = longMetric("numInputRows") val numOutputBatches = longMetric("numOutputBatches") @@ -216,12 +227,4 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { } } - override def canEqual(other: Any): Boolean = other.isInstanceOf[ArrowRowToColumnarExec] - - override def equals(other: Any): Boolean = other match { - case that: ArrowRowToColumnarExec => - (that canEqual this) && super.equals(that) - case _ => false - } - } From fdf48f98b3c323e4274beeeb637b17fecac020b8 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Wed, 16 Mar 2022 17:19:42 +0800 Subject: [PATCH 04/22] fix AQE Signed-off-by: Yuan Zhou --- .../ColumnarShuffleExchangeExec.scala | 22 ++++++++++++++----- .../execution/ShuffledColumnarBatchRDD.scala | 14 ++++++++++++ 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index 84721424f..60128425b 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.CoalesceExec.EmptyPartition import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils @@ -200,15 +201,15 @@ case class ColumnarShuffleExchangeExec( copy(child = newChild) } -class ColumnarShuffleExchangeAdaptor( +case class ColumnarShuffleExchangeAdaptor( override val outputPartitioning: Partitioning, child: SparkPlan, shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS) - extends ShuffleExchangeExec(outputPartitioning, child) { + extends ShuffleExchangeLike { private[sql] lazy val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) - private[sql] override lazy val readMetrics = + private[sql] lazy val readMetrics = SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), @@ -228,7 +229,17 @@ class ColumnarShuffleExchangeAdaptor( override def output: Seq[Attribute] = child.output override def supportsColumnar: Boolean = true + override def numMappers: Int = shuffleDependency.rdd.getNumPartitions + override def numPartitions: Int = shuffleDependency.partitioner.numPartitions + override def runtimeStatistics: Statistics = { + val dataSize = metrics("dataSize").value + val rowCount = metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN).value + Statistics(dataSize, Some(rowCount)) + } + override def getShuffleRDD(partitionSpecs: Array[ShufflePartitionSpec]): RDD[ColumnarBatch] = { + cachedShuffleRDD + } override def stringArgs = super.stringArgs ++ Iterator(s"[id=#$id]") //super.stringArgs ++ Iterator(output.map(o => s"${o}#${o.dataType.simpleString}")) @@ -285,7 +296,7 @@ class ColumnarShuffleExchangeAdaptor( // 'shuffleDependency' is only needed when enable AQE. Columnar shuffle will use 'columnarShuffleDependency' @transient - override lazy val shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow] = + lazy val shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow] = new ShuffleDependency[Int, InternalRow, InternalRow]( _rdd = new ColumnarShuffleExchangeExec.DummyPairRDDWithPartitions( sparkContext, @@ -315,7 +326,8 @@ class ColumnarShuffleExchangeAdaptor( attr.name + ":" + attr.dataType }.toString() } - + override protected def withNewChildInternal(newChild: SparkPlan): ColumnarShuffleExchangeAdaptor = + copy(child = newChild) } object ColumnarShuffleExchangeExec extends Logging { diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala index 1d64aa271..1b57f8381 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.shuffle.sort.SortShuffleManager +import org.apache.spark.sql.execution.CoalescedMapperPartitionSpec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch @@ -86,6 +87,9 @@ class ShuffledColumnarBatchRDD( coalescedPartitionSpec.endReducerIndex).flatMap { reducerIndex => tracker.getPreferredLocationsForShuffle(dependency, reducerIndex) } + case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, _) => + tracker.getMapLocation(dependency, startMapIndex, endMapIndex) + case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex, _) => tracker.getMapLocation(dependency, startMapIndex, endMapIndex) @@ -128,6 +132,16 @@ class ShuffledColumnarBatchRDD( endReducerIndex, context, sqlMetricsReporter) + + case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, numReducers) => + SparkEnv.get.shuffleManager.getReader( + dependency.shuffleHandle, + startMapIndex, + endMapIndex, + 0, + numReducers, + context, + sqlMetricsReporter) } reader.read().asInstanceOf[Iterator[Product2[Int, ColumnarBatch]]].map(_._2) } From 138d6b86b18ab2274ff86f1ebc02f387089fe357 Mon Sep 17 00:00:00 2001 From: philo Date: Wed, 16 Mar 2022 22:34:09 +0800 Subject: [PATCH 05/22] Use correct type for equals in ColumnarBroadcastExchangeAdaptor --- .../spark/sql/execution/ColumnarBroadcastExchangeExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala index 99f7f7f69..fe030561b 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala @@ -303,10 +303,10 @@ class ColumnarBroadcastExchangeAdaptor(mode: BroadcastMode, child: SparkPlan) override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = plan.doExecuteBroadcast[T]() - override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarShuffleExchangeAdaptor] + override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarBroadcastExchangeAdaptor] override def equals(other: Any): Boolean = other match { - case that: ColumnarShuffleExchangeAdaptor => + case that: ColumnarBroadcastExchangeAdaptor => (that canEqual this) && super.equals(that) case _ => false } From ea104db07f8c4ad39a1d87f3cd3c4cb7fbfde2af Mon Sep 17 00:00:00 2001 From: philo Date: Thu, 17 Mar 2022 00:08:45 +0800 Subject: [PATCH 06/22] Fix compatibility issues to make the changes work on spark 3.1 also --- .../ColumnarShuffleExchangeExec.scala | 4 ++- .../execution/ShuffledColumnarBatchRDD.scala | 17 ++++++++-- .../com/intel/oap/sql/shims/SparkShims.scala | 8 +++++ .../sql/shims/spark311/Spark311Shims.scala | 31 +++++++++++++++++-- .../sql/shims/spark321/Spark321Shims.scala | 31 +++++++++++++++++-- 5 files changed, 83 insertions(+), 8 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index 60128425b..47d0d634f 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -326,7 +326,9 @@ case class ColumnarShuffleExchangeAdaptor( attr.name + ":" + attr.dataType }.toString() } - override protected def withNewChildInternal(newChild: SparkPlan): ColumnarShuffleExchangeAdaptor = + + // For spark3.2. + protected def withNewChildInternal(newChild: SparkPlan): ColumnarShuffleExchangeAdaptor = copy(child = newChild) } diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala index 1b57f8381..ea074c4c9 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala @@ -17,10 +17,11 @@ package org.apache.spark.sql.execution +import com.intel.oap.sql.shims.SparkShimLoader + import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.shuffle.sort.SortShuffleManager -import org.apache.spark.sql.execution.CoalescedMapperPartitionSpec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch @@ -87,7 +88,11 @@ class ShuffledColumnarBatchRDD( coalescedPartitionSpec.endReducerIndex).flatMap { reducerIndex => tracker.getPreferredLocationsForShuffle(dependency, reducerIndex) } - case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, _) => + case spec if SparkShimLoader.getSparkShims.isCoalescedMapperPartitionSpec(spec) => + val startMapIndex = + SparkShimLoader.getSparkShims.getStartMapIndexOfCoalescedMapperPartitionSpec(spec) + val endMapIndex = + SparkShimLoader.getSparkShims.getEndMapIndexOfCoalescedMapperPartitionSpec(spec) tracker.getMapLocation(dependency, startMapIndex, endMapIndex) case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex, _) => @@ -133,7 +138,13 @@ class ShuffledColumnarBatchRDD( context, sqlMetricsReporter) - case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, numReducers) => + case spec if SparkShimLoader.getSparkShims.isCoalescedMapperPartitionSpec(spec) => + val startMapIndex = + SparkShimLoader.getSparkShims.getStartMapIndexOfCoalescedMapperPartitionSpec(spec) + val endMapIndex = + SparkShimLoader.getSparkShims.getEndMapIndexOfCoalescedMapperPartitionSpec(spec) + val numReducers = + SparkShimLoader.getSparkShims.getNumReducersOfCoalescedMapperPartitionSpec(spec) SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, startMapIndex, diff --git a/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala b/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala index 62420a6f8..61bd49b57 100644 --- a/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala @@ -113,4 +113,12 @@ trait SparkShims { * REPARTITION is changed to REPARTITION_BY_COL from spark 3.2. */ def isRepartition(shuffleOrigin: ShuffleOrigin): Boolean + + def isCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Boolean + + def getStartMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int + + def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int + + def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int } diff --git a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala index e637877e9..790b4cf33 100644 --- a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala +++ b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala @@ -36,8 +36,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} -import org.apache.spark.sql.execution.ShufflePartitionSpec -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan} import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, CustomShuffleReaderExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, ParquetOptions, ParquetReadSupport, VectorizedParquetRecordReader} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec @@ -179,4 +178,32 @@ class Spark311Shims extends SparkShims { } } + /** + * CoalescedMapperPartitionSpec is introduced in spark3.2. So always return false for spark3.1. + */ + override def isCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Boolean = { + false + } + + /** + * CoalescedMapperPartitionSpec is introduced in spark3.2. So always return -1 for spark3.1. + */ + override def getStartMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + -1 + } + + /** + * CoalescedMapperPartitionSpec is introduced in spark3.2. So always return -1 for spark3.1. + */ + override def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + -1 + } + + /** + * CoalescedMapperPartitionSpec is introduced in spark3.2. So always return -1 for spark3.1. + */ + override def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + -1 + } + } \ No newline at end of file diff --git a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala index ce8d2d33e..8512e1b2b 100644 --- a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala +++ b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala @@ -38,8 +38,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec -import org.apache.spark.sql.execution.ShufflePartitionSpec -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{CoalescedMapperPartitionSpec, ShufflePartitionSpec, SparkPlan} import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, BroadcastQueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, ParquetOptions, ParquetReadSupport, VectorizedParquetRecordReader} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec @@ -230,4 +229,32 @@ class Spark321Shims extends SparkShims { } } + override def isCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Boolean = { + spec match { + case _: CoalescedMapperPartitionSpec => true + case _ => false + } + } + + override def getStartMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + spec match { + case c: CoalescedMapperPartitionSpec => c.startMapIndex + case _ => throw new RuntimeException("CoalescedMapperPartitionSpec is expected!") + } + } + + override def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + spec match { + case c: CoalescedMapperPartitionSpec => c.endMapIndex + case _ => throw new RuntimeException("CoalescedMapperPartitionSpec is expected!") + } + } + + override def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { + spec match { + case c: CoalescedMapperPartitionSpec => c.numReducers + case _ => throw new RuntimeException("CoalescedMapperPartitionSpec is expected!") + } + } + } \ No newline at end of file From 31c32bc9044878391588be39f6c08c36e954ef76 Mon Sep 17 00:00:00 2001 From: philo Date: Thu, 17 Mar 2022 00:27:33 +0800 Subject: [PATCH 07/22] Refine the code --- .../intel/oap/sql/shims/spark311/Spark311Shims.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala index 790b4cf33..7535c4eeb 100644 --- a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala +++ b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala @@ -186,24 +186,24 @@ class Spark311Shims extends SparkShims { } /** - * CoalescedMapperPartitionSpec is introduced in spark3.2. So always return -1 for spark3.1. + * This method cannot be invoked in spark3.1. */ override def getStartMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { - -1 + throw new RuntimeException("This method should not be invoked in spark 3.1.") } /** - * CoalescedMapperPartitionSpec is introduced in spark3.2. So always return -1 for spark3.1. + * This method cannot be invoked in spark3.1. */ override def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { - -1 + throw new RuntimeException("This method should not be invoked in spark 3.1.") } /** - * CoalescedMapperPartitionSpec is introduced in spark3.2. So always return -1 for spark3.1. + * This method cannot be invoked in spark3.1. */ override def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int = { - -1 + throw new RuntimeException("This method should not be invoked in spark 3.1.") } } \ No newline at end of file From 8ad5a35e936863801b074529ca26efd8771a08bd Mon Sep 17 00:00:00 2001 From: philo Date: Thu, 17 Mar 2022 21:45:32 +0800 Subject: [PATCH 08/22] Change the parent class of ColumnarBroadcastExchangeAdaptor --- .../intel/oap/extension/ColumnarOverrides.scala | 2 +- .../extension/columnar/ColumnarGuardRule.scala | 2 ++ .../ColumnarBroadcastExchangeExec.scala | 16 ++++++++++++++-- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index 4b45d33c7..6f099fee9 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -219,7 +219,7 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { val child = replaceWithColumnarPlan(plan.child) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") if (isSupportAdaptive) - new ColumnarBroadcastExchangeAdaptor(plan.mode, child) + ColumnarBroadcastExchangeAdaptor(plan.mode, child) else ColumnarBroadcastExchangeExec(plan.mode, child) case plan: BroadcastHashJoinExec => diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala index 68e186b8f..24cb78f14 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala @@ -190,6 +190,8 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] { broadcastQueryStageExec.plan match { case plan: BroadcastExchangeExec => new ColumnarBroadcastExchangeExec(plan.mode, plan.child) + case plan: ColumnarBroadcastExchangeAdaptor => + new ColumnarBroadcastExchangeExec(plan.mode, plan.child) case plan: ReusedExchangeExec => plan match { case ReusedExchangeExec(_, b: BroadcastExchangeExec) => diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala index fe030561b..0650894ec 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.launcher.SparkLauncher import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, _} import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, _} @@ -263,8 +264,8 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) copy(child = newChild) } -class ColumnarBroadcastExchangeAdaptor(mode: BroadcastMode, child: SparkPlan) - extends BroadcastExchangeExec(mode, child) { +case class ColumnarBroadcastExchangeAdaptor(mode: BroadcastMode, child: SparkPlan) + extends BroadcastExchangeLike { val plan: ColumnarBroadcastExchangeExec = new ColumnarBroadcastExchangeExec(mode, child) override def supportsColumnar = true @@ -277,6 +278,13 @@ class ColumnarBroadcastExchangeAdaptor(mode: BroadcastMode, child: SparkPlan) override def doCanonicalize(): SparkPlan = plan.doCanonicalize() + // Ported from BroadcastExchangeExec + override def runtimeStatistics: Statistics = { + val dataSize = metrics("dataSize").value + val rowCount = metrics("numOutputRows").value + Statistics(dataSize, Some(rowCount)) + } + @transient private val timeout: Long = SQLConf.get.broadcastTimeout @@ -310,4 +318,8 @@ class ColumnarBroadcastExchangeAdaptor(mode: BroadcastMode, child: SparkPlan) (that canEqual this) && super.equals(that) case _ => false } + + // For spark3.2. + override protected def withNewChildInternal(newChild: SparkPlan): ColumnarBroadcastExchangeAdaptor = + copy(child = newChild) } From 1c6965dd51c6bb68f5fc693ac9b7a862545a6854 Mon Sep 17 00:00:00 2001 From: philo Date: Fri, 18 Mar 2022 09:25:23 +0800 Subject: [PATCH 09/22] Remove override keyword to make the code compatible with spark 3.1 --- .../spark/sql/execution/ColumnarBroadcastExchangeExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala index 0650894ec..23aa893bc 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala @@ -320,6 +320,6 @@ case class ColumnarBroadcastExchangeAdaptor(mode: BroadcastMode, child: SparkPla } // For spark3.2. - override protected def withNewChildInternal(newChild: SparkPlan): ColumnarBroadcastExchangeAdaptor = + protected def withNewChildInternal(newChild: SparkPlan): ColumnarBroadcastExchangeAdaptor = copy(child = newChild) } From 929aaf899172e9ac88d9fce019470cc9efcf7751 Mon Sep 17 00:00:00 2001 From: philo Date: Fri, 18 Mar 2022 11:46:56 +0800 Subject: [PATCH 10/22] Add missing match case --- .../com/intel/oap/extension/columnar/ColumnarGuardRule.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala index 24cb78f14..942b4f529 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala @@ -173,6 +173,8 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] { broadcastQueryStageExec.plan match { case plan: BroadcastExchangeExec => new ColumnarBroadcastExchangeExec(plan.mode, plan.child) + case plan: ColumnarBroadcastExchangeAdaptor => + new ColumnarBroadcastExchangeExec(plan.mode, plan.child) case plan: ReusedExchangeExec => plan match { case ReusedExchangeExec(_, b: BroadcastExchangeExec) => From b4bdd9eb0218ae2c3bc205afe13d9a4662ba163a Mon Sep 17 00:00:00 2001 From: philo Date: Fri, 18 Mar 2022 15:00:48 +0800 Subject: [PATCH 11/22] Try to fix BHJ issue --- .../com/intel/oap/extension/columnar/ColumnarGuardRule.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala index 942b4f529..875704492 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala @@ -179,6 +179,8 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] { plan match { case ReusedExchangeExec(_, b: BroadcastExchangeExec) => new ColumnarBroadcastExchangeExec(b.mode, b.child) + case ReusedExchangeExec(_, b: ColumnarBroadcastExchangeAdaptor) => + new ColumnarBroadcastExchangeExec(b.mode, b.child) case _ => } } @@ -198,6 +200,8 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] { plan match { case ReusedExchangeExec(_, b: BroadcastExchangeExec) => new ColumnarBroadcastExchangeExec(b.mode, b.child) + case ReusedExchangeExec(_, b: ColumnarBroadcastExchangeAdaptor) => + new ColumnarBroadcastExchangeExec(b.mode, b.child) case _ => } } From 5d2060d7f4ae294732c2e7466f430c4fe0b1c98c Mon Sep 17 00:00:00 2001 From: philo Date: Fri, 18 Mar 2022 18:15:43 +0800 Subject: [PATCH 12/22] Remove canEqual & Equal for two case classes --- .../sql/execution/ColumnarBroadcastExchangeExec.scala | 8 -------- .../spark/sql/execution/ColumnarShuffleExchangeExec.scala | 8 -------- 2 files changed, 16 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala index 23aa893bc..13749f9dc 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala @@ -311,14 +311,6 @@ case class ColumnarBroadcastExchangeAdaptor(mode: BroadcastMode, child: SparkPla override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = plan.doExecuteBroadcast[T]() - override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarBroadcastExchangeAdaptor] - - override def equals(other: Any): Boolean = other match { - case that: ColumnarBroadcastExchangeAdaptor => - (that canEqual this) && super.equals(that) - case _ => false - } - // For spark3.2. protected def withNewChildInternal(newChild: SparkPlan): ColumnarBroadcastExchangeAdaptor = copy(child = newChild) diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index 47d0d634f..54dcfebf3 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -308,14 +308,6 @@ case class ColumnarShuffleExchangeAdaptor( override val shuffleHandle: ShuffleHandle = columnarShuffleDependency.shuffleHandle } - override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarShuffleExchangeAdaptor] - - override def equals(other: Any): Boolean = other match { - case that: ColumnarShuffleExchangeAdaptor => - (that canEqual this) && super.equals(that) - case _ => false - } - override def verboseString(maxFields: Int): String = toString(super.verboseString(maxFields)) override def simpleString(maxFields: Int): String = toString(super.simpleString(maxFields)) From 5e66540d2e34e32b43111d515f0135985bc30005 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Sun, 20 Mar 2022 15:46:02 +0800 Subject: [PATCH 13/22] fallback on reused broadcast exchange Signed-off-by: Yuan Zhou --- .../com/intel/oap/extension/ColumnarOverrides.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index 6f099fee9..c5b5a28e7 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -213,8 +213,15 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { right) case plan: BroadcastQueryStageExec => logDebug( - s"Columnar Processing for ${plan.getClass} is currently supported, actual plan is ${plan.plan.getClass}.") - plan + s"Columnar Processing for ${plan.getClass} is currently supported, actual plan is ${plan.plan}.") + plan.plan match { + case ReusedExchangeExec(_, originalBroadcastPlan: ColumnarBroadcastExchangeAdaptor) => + val newBroadcast = BroadcastExchangeExec( + originalBroadcastPlan.mode, + DataToArrowColumnarExec(plan.plan, 1)) + SparkShimLoader.getSparkShims.newBroadcastQueryStageExec(plan.id, newBroadcast) + case other => plan + } case plan: BroadcastExchangeExec => val child = replaceWithColumnarPlan(plan.child) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") From f0c8be50e2c23716ee4ecb34171cdd351144c253 Mon Sep 17 00:00:00 2001 From: philo Date: Sun, 20 Mar 2022 22:09:40 +0800 Subject: [PATCH 14/22] Fix the case that BroadcastExchangeExec can connect to BHJ --- .../oap/extension/ColumnarOverrides.scala | 43 +++++++++++++++---- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index c5b5a28e7..021174e47 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -234,15 +234,40 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { val left = replaceWithColumnarPlan(plan.left) val right = replaceWithColumnarPlan(plan.right) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - ColumnarBroadcastHashJoinExec( - plan.leftKeys, - plan.rightKeys, - plan.joinType, - plan.buildSide, - plan.condition, - left, - right, - nullAware = plan.isNullAwareAntiJoin) + // Due to the tackling for BroadcastQueryStageExec with ReusedExchangeExec(_, + // originalBroadcastPlan: ColumnarBroadcastExchangeAdaptor), it is possible that + // left or right child has row based BroadcastExchangeExec. If so, we should fallback. + val leftBEE = left match { + case broadcastQueryStageExec: BroadcastQueryStageExec => + broadcastQueryStageExec.plan match { + case _: BroadcastExchangeExec => true + case _ => false + } + case _ => false + } + val rightBEE = right match { + case broadcastQueryStageExec: BroadcastQueryStageExec => + broadcastQueryStageExec.plan match { + case _: BroadcastExchangeExec => true + case _ => false + } + case _ => false + } + if (leftBEE || rightBEE) { + val children = plan.children.map(replaceWithColumnarPlan) + logDebug(s"Columnar Processing for ${plan.getClass} is not currently supported.") + plan.withNewChildren(children) + } else { + ColumnarBroadcastHashJoinExec( + plan.leftKeys, + plan.rightKeys, + plan.joinType, + plan.buildSide, + plan.condition, + left, + right, + nullAware = plan.isNullAwareAntiJoin) + } } else { val children = plan.children.map(replaceWithColumnarPlan) logDebug(s"Columnar Processing for ${plan.getClass} is not currently supported.") From 35a15d436ad6ba64f936c98aef3fefafa394e67a Mon Sep 17 00:00:00 2001 From: philo Date: Mon, 21 Mar 2022 00:50:19 +0800 Subject: [PATCH 15/22] Override withNewChildInternal in ArrowColumnarToRowExec [To fix an assert error] --- .../execution/ArrowColumnarToRowExec.scala | 34 +++++++++++++------ .../oap/extension/ColumnarOverrides.scala | 4 +-- .../sql/shims/spark321/Spark321Shims.scala | 23 +------------ 3 files changed, 27 insertions(+), 34 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala index e83c0740a..5eeb7c9d2 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala @@ -20,22 +20,33 @@ package com.intel.oap.execution import com.intel.oap.expression.ConverterUtils import com.intel.oap.vectorized.{ArrowColumnarToRowJniWrapper, ArrowWritableColumnVector} import org.apache.arrow.vector.types.pojo.{Field, Schema} - import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan} +import org.apache.spark.sql.execution.{CodegenSupport, ColumnarToRowTransition, SparkPlan} import org.apache.spark.sql.types._ import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import scala.concurrent.duration._ -class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child = child) { +case class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition with CodegenSupport { override def nodeName: String = "ArrowColumnarToRow" + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + // `ColumnarToRowExec` processes the input RDD directly, which is kind of a leaf node in the + // codegen stage and needs to do the limit check. + protected override def canCheckLimitNotReached: Boolean = true + override def supportCodegen: Boolean = false buildCheck() @@ -68,6 +79,10 @@ class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child = "convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to convert") ) + protected def doProduce(ctx: CodegenContext): String = { + throw new RuntimeException("Codegen is not supported!") + } + override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") val numInputBatches = longMetric("numInputBatches") @@ -154,12 +169,11 @@ class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child = } } - override def canEqual(other: Any): Boolean = other.isInstanceOf[ArrowColumnarToRowExec] - - override def equals(other: Any): Boolean = other match { - case that: ArrowColumnarToRowExec => - (that canEqual this) && super.equals(that) - case _ => false + override def inputRDDs(): Seq[RDD[InternalRow]] = { + Seq(child.executeColumnar().asInstanceOf[RDD[InternalRow]]) // Hack because of type erasure } + + override protected def withNewChildInternal(newChild: SparkPlan): ArrowColumnarToRowExec = + copy(child = newChild) } diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index 021174e47..fcde80a8f 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -430,7 +430,7 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] { val child = replaceWithColumnarPlan(plan.child) logDebug(s"ColumnarPostOverrides ArrowColumnarToRowExec(${child.getClass})") try { - new ArrowColumnarToRowExec(child) + ArrowColumnarToRowExec(child) } catch { case _: Throwable => logInfo("ArrowColumnarToRowExec: Falling back to ColumnarToRow...") @@ -450,7 +450,7 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] { if (columnarConf.enableArrowColumnarToRow) { try { val child = replaceWithColumnarPlan(c.child) - new ArrowColumnarToRowExec(child) + ArrowColumnarToRowExec(child) } catch { case _: Throwable => logInfo("ArrowColumnarToRow : Falling back to ColumnarToRow...") diff --git a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala index 8512e1b2b..3fafa8d3f 100644 --- a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala +++ b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, Parqu import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.arrow.SparkVectorUtils import org.apache.spark.sql.execution.datasources.{DataSourceUtils, OutputWriter} -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, REPARTITION_BY_COL, ReusedExchangeExec, ShuffleExchangeExec, ShuffleOrigin} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, REPARTITION_BY_COL, ReusedExchangeExec, ShuffleOrigin} import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.LongType @@ -160,27 +160,6 @@ class Spark321Shims extends SparkShims { ShimUtils.doFetchFile(urlString, targetDirHandler, targetFileName, sparkConf) } -// /** -// * Fix compatibility issue that ShuffleQueryStageExec has an additional argument in spark 3.2. -// * ShuffleExchangeExec replaces ColumnarShuffleExchangeAdaptor to avoid cyclic dependency. This -// * changes need futher test to verify. -// */ -// override def outputPartitioningForColumnarCustomShuffleReaderExec(child: SparkPlan): Partitioning = { -// child match { -// case ShuffleQueryStageExec(_, s: ShuffleExchangeExec, _) => -// s.child.outputPartitioning -// case ShuffleQueryStageExec( -// _, -// r @ ReusedExchangeExec(_, s: ShuffleExchangeExec), _) => -// s.child.outputPartitioning match { -// case e: Expression => r.updateAttr(e).asInstanceOf[Partitioning] -// case other => other -// } -// case _ => -// throw new IllegalStateException("operating on canonicalization plan") -// } -// } - override def newBroadcastQueryStageExec(id: Int, plan: BroadcastExchangeExec): BroadcastQueryStageExec = { BroadcastQueryStageExec(id, plan, plan.doCanonicalize) From 912aec46d254fb30340543a02d208f91206dbe58 Mon Sep 17 00:00:00 2001 From: philo Date: Mon, 21 Mar 2022 01:31:02 +0800 Subject: [PATCH 16/22] Remove override keyword for compatibility --- .../scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala index 5eeb7c9d2..6a34b882c 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala @@ -173,7 +173,7 @@ case class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransit Seq(child.executeColumnar().asInstanceOf[RDD[InternalRow]]) // Hack because of type erasure } - override protected def withNewChildInternal(newChild: SparkPlan): ArrowColumnarToRowExec = + protected def withNewChildInternal(newChild: SparkPlan): ArrowColumnarToRowExec = copy(child = newChild) } From f9a03d06785bece4f1fec6ec5432c6f4b8652f6d Mon Sep 17 00:00:00 2001 From: philo Date: Mon, 21 Mar 2022 14:56:34 +0800 Subject: [PATCH 17/22] Revert "fallback on reused broadcast exchange" This reverts commit ddc8631b6649419b2f0e62195b3c86c7ac3f7521. --- .../com/intel/oap/extension/ColumnarOverrides.scala | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index fcde80a8f..a5fba3d1d 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -213,15 +213,8 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { right) case plan: BroadcastQueryStageExec => logDebug( - s"Columnar Processing for ${plan.getClass} is currently supported, actual plan is ${plan.plan}.") - plan.plan match { - case ReusedExchangeExec(_, originalBroadcastPlan: ColumnarBroadcastExchangeAdaptor) => - val newBroadcast = BroadcastExchangeExec( - originalBroadcastPlan.mode, - DataToArrowColumnarExec(plan.plan, 1)) - SparkShimLoader.getSparkShims.newBroadcastQueryStageExec(plan.id, newBroadcast) - case other => plan - } + s"Columnar Processing for ${plan.getClass} is currently supported, actual plan is ${plan.plan.getClass}.") + plan case plan: BroadcastExchangeExec => val child = replaceWithColumnarPlan(plan.child) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") From 1eb192768291b3f28021c96174a5193c6075b58b Mon Sep 17 00:00:00 2001 From: philo Date: Mon, 21 Mar 2022 14:57:00 +0800 Subject: [PATCH 18/22] Revert "Fix the case that BroadcastExchangeExec can connect to BHJ" This reverts commit 2268205aaa2087a6491e9344458ce70c49427faf. --- .../oap/extension/ColumnarOverrides.scala | 43 ++++--------------- 1 file changed, 9 insertions(+), 34 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index a5fba3d1d..d1eda8011 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -227,40 +227,15 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { val left = replaceWithColumnarPlan(plan.left) val right = replaceWithColumnarPlan(plan.right) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - // Due to the tackling for BroadcastQueryStageExec with ReusedExchangeExec(_, - // originalBroadcastPlan: ColumnarBroadcastExchangeAdaptor), it is possible that - // left or right child has row based BroadcastExchangeExec. If so, we should fallback. - val leftBEE = left match { - case broadcastQueryStageExec: BroadcastQueryStageExec => - broadcastQueryStageExec.plan match { - case _: BroadcastExchangeExec => true - case _ => false - } - case _ => false - } - val rightBEE = right match { - case broadcastQueryStageExec: BroadcastQueryStageExec => - broadcastQueryStageExec.plan match { - case _: BroadcastExchangeExec => true - case _ => false - } - case _ => false - } - if (leftBEE || rightBEE) { - val children = plan.children.map(replaceWithColumnarPlan) - logDebug(s"Columnar Processing for ${plan.getClass} is not currently supported.") - plan.withNewChildren(children) - } else { - ColumnarBroadcastHashJoinExec( - plan.leftKeys, - plan.rightKeys, - plan.joinType, - plan.buildSide, - plan.condition, - left, - right, - nullAware = plan.isNullAwareAntiJoin) - } + ColumnarBroadcastHashJoinExec( + plan.leftKeys, + plan.rightKeys, + plan.joinType, + plan.buildSide, + plan.condition, + left, + right, + nullAware = plan.isNullAwareAntiJoin) } else { val children = plan.children.map(replaceWithColumnarPlan) logDebug(s"Columnar Processing for ${plan.getClass} is not currently supported.") From 49d9b0567d412ea4b503a46a600b5eedcddf8b02 Mon Sep 17 00:00:00 2001 From: philo Date: Mon, 21 Mar 2022 15:03:35 +0800 Subject: [PATCH 19/22] Correct the assert statement in testing broadcast exchange reuse across subqueries --- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index d0839d484..d1f90d580 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -26,11 +26,11 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListe import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} -import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeAdaptor, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.datasources.noop.NoopDataSource import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, REPARTITION, REPARTITION_WITH_NUM, ReusedExchangeExec, ShuffleExchangeExec, ShuffleExchangeLike} +import org.apache.spark.sql.execution.exchange.{Exchange, REPARTITION, REPARTITION_WITH_NUM, ReusedExchangeExec, ShuffleExchangeExec, ShuffleExchangeLike} import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate @@ -537,7 +537,7 @@ class AdaptiveQueryExecSuite // Even with local shuffle reader, the query stage reuse can also work. val ex = findReusedExchange(adaptivePlan) assert(ex.nonEmpty) - assert(ex.head.child.isInstanceOf[BroadcastExchangeExec]) + assert(ex.head.child.isInstanceOf[ColumnarBroadcastExchangeAdaptor]) val sub = findReusedSubquery(adaptivePlan) assert(sub.isEmpty) } From d7ae40331b73367b0e152f0be6444e949ce4bad8 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Sun, 20 Mar 2022 15:46:02 +0800 Subject: [PATCH 20/22] fallback on reused broadcast exchange Signed-off-by: Yuan Zhou --- .../com/intel/oap/extension/ColumnarOverrides.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index d1eda8011..e11b3a054 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -213,8 +213,15 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { right) case plan: BroadcastQueryStageExec => logDebug( - s"Columnar Processing for ${plan.getClass} is currently supported, actual plan is ${plan.plan.getClass}.") - plan + s"Columnar Processing for ${plan.getClass} is currently supported, actual plan is ${plan.plan}.") + plan.plan match { + case ReusedExchangeExec(_, originalBroadcastPlan: ColumnarBroadcastExchangeAdaptor) => + val newBroadcast = BroadcastExchangeExec( + originalBroadcastPlan.mode, + DataToArrowColumnarExec(plan.plan, 1)) + SparkShimLoader.getSparkShims.newBroadcastQueryStageExec(plan.id, newBroadcast) + case other => plan + } case plan: BroadcastExchangeExec => val child = replaceWithColumnarPlan(plan.child) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") From d404095525c98e5955ef93ce367a6a6f03641800 Mon Sep 17 00:00:00 2001 From: philo Date: Tue, 22 Mar 2022 10:24:37 +0800 Subject: [PATCH 21/22] fix reused columnar broadcast xchg --- .../scala/com/intel/oap/extension/ColumnarOverrides.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index e11b3a054..18c15538d 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -379,6 +379,12 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] { var isSupportAdaptive: Boolean = true def replaceWithColumnarPlan(plan: SparkPlan): SparkPlan = plan match { + case RowToColumnarExec(child: BroadcastQueryStageExec) => + logDebug(s"$child Due to a fallback of BHJ inserted into plan." + + s" See above override in BroadcastQueryStageExec") + val localBroadcastXchg = child.plan.asInstanceOf[BroadcastExchangeExec] + val dataToRow = localBroadcastXchg.child.asInstanceOf[DataToArrowColumnarExec] + ColumnarBroadcastExchangeExec(localBroadcastXchg.mode, dataToRow) case plan: RowToColumnarExec => val child = replaceWithColumnarPlan(plan.child) if (columnarConf.enableArrowRowToColumnar) { From 50594bcfaec29178b4eafbb80088058a6eb3afca Mon Sep 17 00:00:00 2001 From: philo Date: Tue, 22 Mar 2022 12:11:48 +0800 Subject: [PATCH 22/22] Add guard logic --- .../intel/oap/extension/ColumnarOverrides.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index 18c15538d..e498d2050 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -379,12 +379,17 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] { var isSupportAdaptive: Boolean = true def replaceWithColumnarPlan(plan: SparkPlan): SparkPlan = plan match { - case RowToColumnarExec(child: BroadcastQueryStageExec) => - logDebug(s"$child Due to a fallback of BHJ inserted into plan." + + // To get ColumnarBroadcastExchangeExec back from the fallback that for DPP reuse. + case RowToColumnarExec(broadcastQueryStageExec: BroadcastQueryStageExec) + if (broadcastQueryStageExec.plan match { + case BroadcastExchangeExec(_, _: DataToArrowColumnarExec) => true + case _ => false + }) => + logDebug(s"Due to a fallback of BHJ inserted into plan." + s" See above override in BroadcastQueryStageExec") - val localBroadcastXchg = child.plan.asInstanceOf[BroadcastExchangeExec] - val dataToRow = localBroadcastXchg.child.asInstanceOf[DataToArrowColumnarExec] - ColumnarBroadcastExchangeExec(localBroadcastXchg.mode, dataToRow) + val localBroadcastXchg = broadcastQueryStageExec.plan.asInstanceOf[BroadcastExchangeExec] + val dataToArrowColumnar = localBroadcastXchg.child.asInstanceOf[DataToArrowColumnarExec] + ColumnarBroadcastExchangeExec(localBroadcastXchg.mode, dataToArrowColumnar) case plan: RowToColumnarExec => val child = replaceWithColumnarPlan(plan.child) if (columnarConf.enableArrowRowToColumnar) {