From e19fe198a538f1929cbc49cd66f61e31c5ed2c97 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 5 Nov 2021 11:13:31 +0800 Subject: [PATCH] [NSE-542] Add rule to propagate local window for rank + filter pattern (#545) Closes #542 --- .../scala/com/intel/oap/GazellePlugin.scala | 19 +- .../oap/execution/ColumnarWindowExec.scala | 23 +- .../com/intel/oap/execution/Patterns.scala | 62 +++++ .../oap/extension/ColumnarOverrides.scala | 16 +- .../oap/extension/OptimizerOverrides.scala | 225 ++++++++++++++++++ .../oap/extension/StrategyOverrides.scala | 59 ++++- .../columnar/ColumnarGuardRule.scala | 12 +- .../com/intel/oap/tpc/util/TPCRunner.scala | 2 +- 8 files changed, 394 insertions(+), 24 deletions(-) create mode 100644 native-sql-engine/core/src/main/scala/com/intel/oap/execution/Patterns.scala create mode 100644 native-sql-engine/core/src/main/scala/com/intel/oap/extension/OptimizerOverrides.scala diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala index fa9124b2e..9d17d3f6e 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala @@ -20,13 +20,10 @@ package com.intel.oap import java.util import java.util.Collections import java.util.Objects - import scala.language.implicitConversions - import com.intel.oap.GazellePlugin.GAZELLE_SESSION_EXTENSION_NAME import com.intel.oap.GazellePlugin.SPARK_SESSION_EXTS_KEY -import com.intel.oap.extension.StrategyOverrides - +import com.intel.oap.extension.{OptimizerOverrides, StrategyOverrides} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.api.plugin.DriverPlugin @@ -34,7 +31,7 @@ import org.apache.spark.api.plugin.ExecutorPlugin import org.apache.spark.api.plugin.PluginContext import org.apache.spark.api.plugin.SparkPlugin import org.apache.spark.sql.SparkSessionExtensions -import org.apache.spark.sql.internal.StaticSQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} class GazellePlugin extends SparkPlugin { override def driverPlugin(): DriverPlugin = { @@ -87,6 +84,11 @@ private[oap] trait GazelleSparkExtensionsInjector { } private[oap] object GazellePlugin { + + val LOCAL_OVERRIDDEN_CLASSES: Seq[Class[_]] = Seq() + + initialLocalOverriddenClasses() + // To enable GazellePlugin in production, set "spark.plugins=com.intel.oap.GazellePlugin" val SPARK_SQL_PLUGINS_KEY: String = "spark.plugins" val GAZELLE_PLUGIN_NAME: String = Objects.requireNonNull(classOf[GazellePlugin] @@ -94,15 +96,20 @@ private[oap] object GazellePlugin { val SPARK_SESSION_EXTS_KEY: String = StaticSQLConf.SPARK_SESSION_EXTENSIONS.key val GAZELLE_SESSION_EXTENSION_NAME: String = Objects.requireNonNull( classOf[GazelleSessionExtensions].getCanonicalName) - /** * Specify all injectors that Gazelle is using in following list. */ val DEFAULT_INJECTORS: List[GazelleSparkExtensionsInjector] = List( ColumnarOverrides, + OptimizerOverrides, StrategyOverrides ) + def initialLocalOverriddenClasses(): Unit = { + LOCAL_OVERRIDDEN_CLASSES.foreach(clazz => + GazellePlugin.getClass.getClassLoader.loadClass(clazz.getName)) + } + implicit def sparkConfImplicit(conf: SparkConf): SparkConfImplicits = { new SparkConfImplicits(conf) } diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala index 9975ded0a..3b4709ae9 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala @@ -18,7 +18,6 @@ package com.intel.oap.execution import java.util.concurrent.TimeUnit - import com.google.flatbuffers.FlatBufferBuilder import com.intel.oap.GazellePluginConfig import com.intel.oap.expression.{CodeGeneration, ConverterUtils} @@ -26,13 +25,12 @@ import com.intel.oap.vectorized.{ArrowWritableColumnVector, CloseableColumnBatch import org.apache.arrow.gandiva.expression.TreeBuilder import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} - import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, Cast, Descending, Expression, Literal, MakeDecimal, NamedExpression, PredicateHelper, Rank, SortOrder, UnscaledValue, WindowExpression, WindowFunction, WindowSpecDefinition} -import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning, UnspecifiedDistribution} import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.{SortExec, SparkPlan} @@ -44,17 +42,19 @@ import org.apache.spark.sql.types.{DataType, DateType, DecimalType, DoubleType, import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ExecutorManager + import scala.collection.JavaConverters._ import scala.collection.immutable.Stream.Empty import scala.collection.mutable.ListBuffer import scala.util.Random - import org.apache.spark.sql.execution.datasources.v2.arrow.SparkSchemaUtils + import util.control.Breaks._ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], + isLocal: Boolean, child: SparkPlan) extends WindowExecBase { override def supportsColumnar: Boolean = true @@ -64,6 +64,10 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], buildCheck() override def requiredChildDistribution: Seq[Distribution] = { + if (isLocal) { + // localized window doesn't require distribution + return Seq.fill(children.size)(UnspecifiedDistribution) + } if (partitionSpec.isEmpty) { // Only show warning when the number of bytes is larger than 100 MiB? logWarning("No Partition Defined for Window operation! Moving all data to a single " @@ -421,7 +425,7 @@ object ColumnarWindowExec extends Logging { } override def apply(plan: SparkPlan): SparkPlan = plan transformUp { - case p @ ColumnarWindowExec(windowExpression, partitionSpec, orderSpec, child) => + case p @ ColumnarWindowExec(windowExpression, partitionSpec, orderSpec, isLocalized, child) => val windows = ListBuffer[NamedExpression]() val inProjectExpressions = ListBuffer[NamedExpression]() val outProjectExpressions = windowExpression.map(e => e.asInstanceOf[Alias]) @@ -431,7 +435,8 @@ object ColumnarWindowExec extends Logging { } val inputProject = ColumnarConditionProjectExec(null, child.output ++ inProjectExpressions, child) - val window = new ColumnarWindowExec(windows, partitionSpec, orderSpec, inputProject) + val window = new ColumnarWindowExec(windows, partitionSpec, orderSpec, isLocalized, + inputProject) val outputProject = ColumnarConditionProjectExec(null, child.output ++ outProjectExpressions, window) outputProject @@ -440,14 +445,14 @@ object ColumnarWindowExec extends Logging { object RemoveSort extends Rule[SparkPlan] with PredicateHelper { override def apply(plan: SparkPlan): SparkPlan = plan transform { - case p1 @ ColumnarWindowExec(_, _, _, p2 @ (_: SortExec | _: ColumnarSortExec)) => + case p1 @ ColumnarWindowExec(_, _, _, _, p2 @ (_: SortExec | _: ColumnarSortExec)) => p1.withNewChildren(p2.children) } } object RemoveCoalesceBatches extends Rule[SparkPlan] with PredicateHelper { override def apply(plan: SparkPlan): SparkPlan = plan transform { - case p1 @ ColumnarWindowExec(_, _, _, p2: CoalesceBatchesExec) => + case p1 @ ColumnarWindowExec(_, _, _, _, p2: CoalesceBatchesExec) => p1.withNewChildren(p2.children) } } @@ -516,11 +521,13 @@ object ColumnarWindowExec extends Logging { def createWithOptimizations(windowExpression: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], + isLocalized: Boolean, child: SparkPlan): SparkPlan = { val columnar = new ColumnarWindowExec( windowExpression, partitionSpec, orderSpec, + isLocalized, child) ColumnarWindowExec.optimize(columnar) } diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/Patterns.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/Patterns.scala new file mode 100644 index 000000000..a3617d4ca --- /dev/null +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/Patterns.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.intel.oap.execution + +import com.intel.oap.extension.LocalRankWindow +import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression, SortOrder, WindowFunctionType} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Window} + +object LocalPhysicalWindow { + // windowFunctionType, windowExpression, partitionSpec, orderSpec, child + private type ReturnType = + (WindowFunctionType, Seq[NamedExpression], Seq[Expression], Seq[SortOrder], LogicalPlan) + + def unapply(a: Any): Option[ReturnType] = a match { + case expr @ Window(windowExpressions, partitionSpec, orderSpec, child) => + + // The window expression should not be empty here, otherwise it's a bug. + if (windowExpressions.isEmpty) { + throw new IllegalArgumentException(s"Window expression is empty in $expr") + } + + if (!windowExpressions.exists(expr => { + expr.isInstanceOf[Alias] && + LocalRankWindow.isLocalWindowColumnName(expr.asInstanceOf[Alias].name) + })) { + return None + } + + val windowFunctionType = windowExpressions.map(WindowFunctionType.functionType) + .reduceLeft { (t1: WindowFunctionType, t2: WindowFunctionType) => + if (t1 != t2) { + // We shouldn't have different window function type here, otherwise it's a bug. + throw new IllegalArgumentException( + s"Found different window function type in $windowExpressions") + } else { + t1 + } + } + + Some((windowFunctionType, windowExpressions, partitionSpec, orderSpec, child)) + + case _ => None + } +} + +object Patterns { + +} diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index 08d0902ea..6a7f5b641 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -18,10 +18,10 @@ package com.intel.oap import com.intel.oap.execution._ +import com.intel.oap.extension.LocalWindowExec import com.intel.oap.extension.columnar.ColumnarGuardRule import com.intel.oap.extension.columnar.RowGuard import com.intel.oap.sql.execution.RowToArrowColumnarExec - import org.apache.spark.internal.config._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} @@ -235,12 +235,26 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { plan.windowExpression, plan.partitionSpec, plan.orderSpec, + isLocalized = false, replaceWithColumnarPlan(plan.child)) } catch { case _: Throwable => logInfo("Columnar Window: Falling back to regular Window...") plan } + case plan: LocalWindowExec => + try { + ColumnarWindowExec.createWithOptimizations( + plan.windowExpression, + plan.partitionSpec, + plan.orderSpec, + isLocalized = true, + replaceWithColumnarPlan(plan.child)) + } catch { + case _: Throwable => + logInfo("Localized Columnar Window: Falling back to regular Window...") + plan + } case p => val children = plan.children.map(replaceWithColumnarPlan) logDebug(s"Columnar Processing for ${p.getClass} is currently not supported.") diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/OptimizerOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/OptimizerOverrides.scala new file mode 100644 index 000000000..1aff863e2 --- /dev/null +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/OptimizerOverrides.scala @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.extension + +import java.util.Objects +import com.intel.oap.GazelleSparkExtensionsInjector +import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, NamedExpression, Rank, SortOrder, WindowExpression, WindowFunctionType} +import org.apache.spark.sql.catalyst.planning.PhysicalWindow +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule + +import java.util + +/** + * This contains workarounds to vanilla spark's restrictions extending + * optimizer rule sets. An ideal implementation may start from SQL parser + * to separate window using similar way as local/global limit were created. + * Also, the rules around specific pattern matching of logical.Window may + * need to be tuned as it's possible that we are adding LocalWindow, GlobalWindow + * operators. + */ +object LocalRankWindow extends Rule[LogicalPlan] with SQLConfHelper { + val LOCAL_WINDOW_COLUMN_SUFFIX = "<>local" + val GLOBAL_WINDOW_COLUMN_SUFFIX = "<>global" + + // rank->filter to rank(local)->filter->rank-filter + def apply(plan: LogicalPlan): LogicalPlan = { + plan.transformUp { + // up + case p @ RankFilterPattern(filterCond, matchedWindow, filteredRankColumns) => + p.transformDown { + // down + case w @ PhysicalWindow(WindowFunctionType.SQL, windowExprs, partitionSpec, orderSpec, + windowChild) => + if (w eq matchedWindow) { + val innerWindow = Window(windowExprs.map(expr => expr.transformDown { + case alias: Alias => + if (filteredRankColumns.contains(alias.name)) { + Alias(alias.child, toLocalWindowColumnName(alias.name))() + } else { + alias + } + case other => other + }.asInstanceOf[NamedExpression]), partitionSpec, orderSpec, windowChild) + + val innerFilter = Filter(filterCond.transformDown { + case attr: AttributeReference => + if (filteredRankColumns.contains(attr.name)) { + val windowOutAttr = innerWindow.output + .find(windowAttr => windowAttr.name == + toLocalWindowColumnName(attr.name)).get + windowOutAttr.toAttribute + } else { + attr + } + case other => other + }, innerWindow) + + val originalOutputAttributes = new util.HashMap[String, Alias]() + + val outerWindow = Window(windowExprs.map { + expr => expr.transformDown { + case alias: Alias => + if (filteredRankColumns.contains(alias.name)) { + val globalName = toGlobalWindowColumnName(alias.name) + val globalAlias = Alias(alias.child, globalName)() + originalOutputAttributes.put(globalName, alias) + globalAlias + } else { + alias + } + case other => other + }.asInstanceOf[NamedExpression] + }, partitionSpec, orderSpec, + Project(innerFilter.output.flatMap { + attr: Attribute => + if (isLocalWindowColumnName(attr.name)) { + None + } else { + Some(attr) + } + }, innerFilter)) + Project(outerWindow.output.map { + attr: Attribute => + if (isGlobalWindowColumnName(attr.name)) { + val restoredAlias = originalOutputAttributes.get(attr.name) + Alias(attr, restoredAlias.name)(restoredAlias.exprId, + restoredAlias.qualifier, restoredAlias.explicitMetadata, + restoredAlias.nonInheritableMetadataKeys) + } else { + attr + } + }, outerWindow) + } else { + w + } + case plan => plan + } + case other @ _ => other + } + } + + def toLocalWindowColumnName(col: String): String = { + col + LOCAL_WINDOW_COLUMN_SUFFIX + } + + def isLocalWindowColumnName(col: String): Boolean = { + col.endsWith(LOCAL_WINDOW_COLUMN_SUFFIX) + } + + def toGlobalWindowColumnName(col: String): String = { + col + GLOBAL_WINDOW_COLUMN_SUFFIX + } + + def isGlobalWindowColumnName(col: String): Boolean = { + col.endsWith(GLOBAL_WINDOW_COLUMN_SUFFIX) + } +} + +object RankFilterPattern { + // filterExpression, window relation, filtered rank column name + private type ReturnType = (Expression, Window, Seq[String]) + + def getRankColumns(plan: LogicalPlan): (Seq[Option[String]], Option[Window]) = { + plan match { + case p @ Project(expressions, child) => + val nameMapping = new java.util.HashMap[String, String]() + expressions.foreach { + case ar @ AttributeReference(n, _, _, _) => + nameMapping.put(n, n) + // todo alias + case _ => + } + val tuple = getRankColumns(child) + (tuple._1.map(c => Some(nameMapping.get(c))), tuple._2) + case s @ SubqueryAlias(identifier, child) => + getRankColumns(child) + case w @ Window(windowExpressions, partitionSpec, orderSpec, child) => + if (w.windowExpressions.size != 1) { + (Nil, None) + } else { + w.windowExpressions.head.collectFirst { + case a @ Alias(WindowExpression(Rank(children), _), aliasName) => + if (LocalRankWindow.isLocalWindowColumnName(a.name) || + LocalRankWindow.isGlobalWindowColumnName(a.name)) { + // already optimized + (Nil, None) + } else { + (Seq(Some(aliasName)), Some(w)) + } + }.getOrElse((Nil, None)) + } + case _ => (Nil, None) + } + } + + def isColumnReference(expr: Expression, col: String): Boolean = { + expr match { + case attr: AttributeReference => + Objects.equals(attr.name, col) + case _ => + false + } + } + + def isLiteral(expr: Expression): Boolean = { + expr match { + case lit: Literal => true + case _ => false + } + } + + def unapply(a: Any): Option[ReturnType] = a match { + case f @ Filter(cond, child) => + val (rankColumns, window) = getRankColumns(f.child) + val filteredRankColumns: Seq[String] = rankColumns.flatten.flatMap { col => + val isDesiredPattern = cond match { + // todo rk < 100 && xxx ? + case lt@LessThan(l, r) => + isColumnReference(l, col) && isLiteral(r) + case lte@LessThanOrEqual(l, r) => + isColumnReference(l, col) && isLiteral(r) + case gt@GreaterThan(l, r) => + isColumnReference(r, col) && isLiteral(l) + case gt@GreaterThanOrEqual(l, r) => + isColumnReference(r, col) && isLiteral(l) + case _ => + false + } + if (isDesiredPattern) Some(col) else None + } + + if (filteredRankColumns.nonEmpty) { + Some(cond, window.get, filteredRankColumns) + } else { + None + } + case _ => + None + } +} + +object OptimizerOverrides extends GazelleSparkExtensionsInjector { + override def inject(extensions: SparkSessionExtensions): Unit = { + extensions.injectOptimizerRule(_ => LocalRankWindow) + // not in use for now + } +} diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/StrategyOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/StrategyOverrides.scala index de4b467fc..148661e38 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/StrategyOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/StrategyOverrides.scala @@ -19,17 +19,19 @@ package com.intel.oap.extension import com.intel.oap.GazellePluginConfig import com.intel.oap.GazelleSparkExtensionsInjector - -import org.apache.spark.sql.SparkSessionExtensions -import org.apache.spark.sql.Strategy -import org.apache.spark.sql.catalyst.SQLConfHelper +import com.intel.oap.execution.LocalPhysicalWindow +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SparkSessionExtensions, Strategy, execution} +import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, JoinedRow, NamedExpression, SortOrder, SpecificInternalRow, UnsafeProjection, UnsafeRow, WindowFunctionType} import org.apache.spark.sql.catalyst.optimizer.BuildLeft import org.apache.spark.sql.catalyst.optimizer.BuildRight import org.apache.spark.sql.catalyst.optimizer.JoinSelectionHelper -import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, PhysicalWindow} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.joins +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan, joins} +import org.apache.spark.sql.execution.window.WindowExecBase object JoinSelectionOverrides extends Strategy with JoinSelectionHelper with SQLConfHelper { @@ -73,8 +75,51 @@ object JoinSelectionOverrides extends Strategy with JoinSelectionHelper with SQL } } +case class LocalWindowExec( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan) + extends WindowExecBase { + + override def output: Seq[Attribute] = + child.output ++ windowExpression.map(_.toAttribute) + + override def requiredChildDistribution: Seq[Distribution] = { + super.requiredChildDistribution + } + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = + Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec) + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning + + protected override def doExecute(): RDD[InternalRow] = { + // todo implement this to fall back + throw new UnsupportedOperationException() + } +} + +object LocalWindowApply extends Strategy { + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case LocalPhysicalWindow( + WindowFunctionType.SQL, windowExprs, partitionSpec, orderSpec, child) => + LocalWindowExec( + windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil + + case LocalPhysicalWindow( + WindowFunctionType.Python, windowExprs, partitionSpec, orderSpec, child) => + Nil // python window not supported + + case _ => Nil + } +} + object StrategyOverrides extends GazelleSparkExtensionsInjector { override def inject(extensions: SparkSessionExtensions): Unit = { extensions.injectPlannerStrategy(_ => JoinSelectionOverrides) + extensions.injectPlannerStrategy(_ => LocalWindowApply) } } diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala index ee02e165c..d19c12e38 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala @@ -19,7 +19,7 @@ package com.intel.oap.extension.columnar import com.intel.oap.GazellePluginConfig import com.intel.oap.execution._ - +import com.intel.oap.extension.LocalWindowExec import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -181,6 +181,16 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] { plan.windowExpression, plan.partitionSpec, plan.orderSpec, + isLocalized = false, + plan.child) + window + case plan: LocalWindowExec => + if (!enableColumnarWindow) return false + val window = ColumnarWindowExec.createWithOptimizations( + plan.windowExpression, + plan.partitionSpec, + plan.orderSpec, + isLocalized = true, plan.child) window case plan: CoalesceExec => diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/util/TPCRunner.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/util/TPCRunner.scala index 8b7559253..2009aa7c0 100644 --- a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/util/TPCRunner.scala +++ b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/util/TPCRunner.scala @@ -33,7 +33,7 @@ class TPCRunner(val spark: SparkSession, val resource: String) { println("Running query %s (round %d)... ".format(caseId, roundId)) val df = spark.sql(sql) if (explain) { - df.explain(extended = false) + df.explain(extended = true) } df.show(100) }