From ecef6f021b84708cd9ffdbaccd60e4bec26c63d8 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 26 Apr 2021 19:15:44 +0800 Subject: [PATCH 1/5] [NSE-285] ColumnarWindow: Support Date/Timestamp input in MAX/MIN --- .../com/intel/oap/ColumnarGuardRule.scala | 2 +- .../scala/com/intel/oap/ColumnarPlugin.scala | 39 +- .../oap/execution/ColumnarWindowExec.scala | 344 ++++++++++++------ .../com/intel/oap/tpc/ds/TPCDSSuite.scala | 9 +- 4 files changed, 245 insertions(+), 149 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala index 39bd2465c..e8877acc7 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala @@ -170,7 +170,7 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] { plan.isSkewJoin) case plan: WindowExec => if (!enableColumnarWindow) return false - val window = ColumnarWindowExec.create( + val window = ColumnarWindowExec.createWithOptimizations( plan.windowExpression, plan.partitionSpec, plan.orderSpec, diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala index 34bcd55e2..e0734e987 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala @@ -221,36 +221,17 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] { } case plan: WindowExec => - if (columnarConf.enableColumnarWindow) { - val sortRemoved = plan.child match { - case sort: SortExec => // remove ordering requirements - replaceWithColumnarPlan(sort.child) - case _ => - replaceWithColumnarPlan(plan.child) - } - // disable CoalesceBatchesExec to reduce Netty direct memory usage - val coalesceBatchRemoved = sortRemoved match { - case s: CoalesceBatchesExec => - s.child - case _ => sortRemoved - } - logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - try { - val window = ColumnarWindowExec.create( - plan.windowExpression, - plan.partitionSpec, - plan.orderSpec, - coalesceBatchRemoved) - return window - } catch { - case _: Throwable => - logInfo("Columnar Window: Falling back to regular Window...") - } + try { + ColumnarWindowExec.createWithOptimizations( + plan.windowExpression, + plan.partitionSpec, + plan.orderSpec, + replaceWithColumnarPlan(plan.child)) + } catch { + case _: Throwable => + logInfo("Columnar Window: Falling back to regular Window...") + plan } - logDebug(s"Columnar Processing for ${plan.getClass} is not currently supported.") - val children = plan.children.map(replaceWithColumnarPlan) - plan.withNewChildren(children) - case p => val children = plan.children.map(replaceWithColumnarPlan) logDebug(s"Columnar Processing for ${p.getClass} is currently not supported.") diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala index 79907245a..3d184ab0f 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala @@ -21,41 +21,60 @@ import java.util.concurrent.TimeUnit import com.google.flatbuffers.FlatBufferBuilder import com.intel.oap.ColumnarPluginConfig -import com.intel.oap.expression.{CodeGeneration, ColumnarLiteral, ConverterUtils} +import com.intel.oap.expression.{CodeGeneration, ConverterUtils} import com.intel.oap.vectorized.{ArrowWritableColumnVector, CloseableColumnBatchIterator, ExpressionEvaluator} import org.apache.arrow.gandiva.expression.TreeBuilder -import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, Cast, Descending, Expression, Literal, MakeDecimal, NamedExpression, Rank, SortOrder, UnscaledValue, WindowExpression, WindowFunction, WindowSpecDefinition} -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Average, Count, Max, Min, Sum} -import org.apache.spark.sql.execution.window.WindowExec -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, Cast, Descending, Expression, Literal, MakeDecimal, NamedExpression, PredicateHelper, Rank, SortOrder, UnscaledValue, WindowExpression, WindowFunction, WindowSpecDefinition} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.{SortExec, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.window.WindowExecBase import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ArrayType, BooleanType, DataType, DecimalType, DoubleType, FloatType, IntegerType, LongType} +import org.apache.spark.sql.types.{DataType, DateType, DecimalType, DoubleType, IntegerType, LongType, StringType, TimestampType} import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ExecutorManager import scala.collection.JavaConverters._ +import scala.collection.immutable.Stream.Empty import scala.collection.mutable.ListBuffer import scala.util.Random -class ColumnarWindowExec(windowExpression: Seq[NamedExpression], +case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], - child: SparkPlan) extends WindowExec(windowExpression, + child: SparkPlan) extends WindowExecBase(windowExpression, partitionSpec, orderSpec, child) { - override def supportsColumnar = true + override def supportsColumnar: Boolean = true override def output: Seq[Attribute] = child.output ++ windowExpression.map(_.toAttribute) + override def requiredChildDistribution: Seq[Distribution] = { + if (partitionSpec.isEmpty) { + // Only show warning when the number of bytes is larger than 100 MiB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " + + "partition, this can cause serious performance degradation.") + AllTuples :: Nil + } else ClusteredDistribution(partitionSpec) :: Nil + } + // We no longer require for sorted input for columnar window override def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "output_batches"), @@ -82,75 +101,78 @@ class ColumnarWindowExec(windowExpression: Seq[NamedExpression], // leave it empty for now } - val windowFunctions: Seq[(String, Expression)] = windowExpression - .map(e => e.asInstanceOf[Alias]) - .map(a => a.child.asInstanceOf[WindowExpression]) - .map(w => (w, w.windowFunction)) - .map { - case (expr, func) => - (expr, func match { - case a: AggregateExpression => a.aggregateFunction - case b: WindowFunction => b - case f => - throw new UnsupportedOperationException("unsupported window function type: " + - f) - }) - } - .map { - case (expr, func) => - val name = func match { - case _: Sum => - checkAggFunctionSpec(expr.windowSpec) - "sum" - case _: Average => - checkAggFunctionSpec(expr.windowSpec) - "avg" - case _: Min => - checkAggFunctionSpec(expr.windowSpec) - "min" - case _: Max => - checkAggFunctionSpec(expr.windowSpec) - "max" - case c: Count => - checkAggFunctionSpec(expr.windowSpec) - if (c.children.exists(_.isInstanceOf[Literal])) { - "count_literal" - } else { - "count" - } - case _: Rank => - checkRankSpec(expr.windowSpec) - val desc: Option[Boolean] = orderSpec.foldLeft[Option[Boolean]](None) { - (desc, s) => - val currentDesc = s.direction match { - case Ascending => false - case Descending => true - case _ => throw new IllegalStateException - } - if (desc.isEmpty) { - Some(currentDesc) - } else if (currentDesc == desc.get) { - Some(currentDesc) - } else { - throw new UnsupportedOperationException("Rank: clashed rank order found") - } - } - desc match { - case Some(true) => "rank_desc" - case Some(false) => "rank_asc" - case None => "rank_asc" - } - case f => throw new UnsupportedOperationException("unsupported window function: " + f) - } - (name, func) - } - - if (windowFunctions.isEmpty) { - throw new UnsupportedOperationException("zero window functions" + - "specified in window") + def validateWindowFunctions(): Seq[(String, Expression)] = { + val windowFunctions = windowExpression + .map(e => e.asInstanceOf[Alias]) + .map(a => a.child.asInstanceOf[WindowExpression]) + .map(w => (w, w.windowFunction)) + .map { + case (expr, func) => + (expr, func match { + case a: AggregateExpression => a.aggregateFunction + case b: WindowFunction => b + case f => + throw new UnsupportedOperationException("unsupported window function type: " + + f) + }) + } + .map { + case (expr, func) => + val name = func match { + case _: Sum => + checkAggFunctionSpec(expr.windowSpec) + "sum" + case _: Average => + checkAggFunctionSpec(expr.windowSpec) + "avg" + case _: Min => + checkAggFunctionSpec(expr.windowSpec) + "min" + case _: Max => + checkAggFunctionSpec(expr.windowSpec) + "max" + case c: Count => + checkAggFunctionSpec(expr.windowSpec) + if (c.children.exists(_.isInstanceOf[Literal])) { + "count_literal" + } else { + "count" + } + case _: Rank => + checkRankSpec(expr.windowSpec) + val desc: Option[Boolean] = orderSpec.foldLeft[Option[Boolean]](None) { + (desc, s) => + val currentDesc = s.direction match { + case Ascending => false + case Descending => true + case _ => throw new IllegalStateException + } + if (desc.isEmpty) { + Some(currentDesc) + } else if (currentDesc == desc.get) { + Some(currentDesc) + } else { + throw new UnsupportedOperationException("Rank: clashed rank order found") + } + } + desc match { + case Some(true) => "rank_desc" + case Some(false) => "rank_asc" + case None => "rank_asc" + } + case f => throw new UnsupportedOperationException("unsupported window function: " + f) + } + (name, func) + } + if (windowFunctions.isEmpty) { + throw new UnsupportedOperationException("zero window functions" + + "specified in window") + } + windowFunctions } override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + val windowFunctions = validateWindowFunctions() child.executeColumnar().mapPartitionsWithIndex { (partIndex, iter) => ExecutorManager.tryTaskSet(numaBindingInfo) if (!iter.hasNext) { @@ -228,24 +250,27 @@ class ColumnarWindowExec(windowExpression: Seq[NamedExpression], SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => evaluator.close()) val windowFinishCost = System.nanoTime() - prev3 totalTime += TimeUnit.NANOSECONDS.toMillis(windowFinishCost) - val itr = batches.zipWithIndex.map { case (recordBatch, i) => { - val prev4 = System.nanoTime() - val length = recordBatch.getLength - val vectors = try { - ArrowWritableColumnVector.loadColumns(length, resultSchema, recordBatch) - } finally { - recordBatch.close() - } - val correspondingInputBatch = inputCache(i) - val batch = new ColumnarBatch( - (0 until correspondingInputBatch.numCols()).map(i => correspondingInputBatch.column(i)).toArray - ++ vectors, correspondingInputBatch.numRows()) - val emitCost = System.nanoTime() - prev4 - totalTime += TimeUnit.NANOSECONDS.toMillis(emitCost) - numOutputRows += batch.numRows() - numOutputBatches += 1 - batch - }}.toIterator + val itr = batches.zipWithIndex.map { + case (recordBatch, i) => + val prev4 = System.nanoTime() + val length = recordBatch.getLength + val vectors = try { + ArrowWritableColumnVector.loadColumns(length, resultSchema, recordBatch) + } finally { + recordBatch.close() + } + val correspondingInputBatch = inputCache(i) + val batch = new ColumnarBatch( + (0 until correspondingInputBatch.numCols()) + .map(i => correspondingInputBatch.column(i)) + .toArray + ++ vectors, correspondingInputBatch.numRows()) + val emitCost = System.nanoTime() - prev4 + totalTime += TimeUnit.NANOSECONDS.toMillis(emitCost) + numOutputRows += batch.numRows() + numOutputBatches += 1 + batch + }.toIterator new CloseableColumnBatchIterator(itr) } } @@ -284,26 +309,27 @@ class ColumnarWindowExec(windowExpression: Seq[NamedExpression], override def isComplex: Boolean = false } -} -object ColumnarWindowExec { + override protected def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException() + } +} - def createWithProjection( - windowExpression: Seq[NamedExpression], - partitionSpec: Seq[Expression], - orderSpec: Seq[SortOrder], - child: SparkPlan): SparkPlan = { +object ColumnarWindowExec extends Logging { + object AddProjectionsAroundWindow extends Rule[SparkPlan] with PredicateHelper { def makeInputProject(ex: Expression, inputProjects: ListBuffer[NamedExpression]): Expression = { ex match { - case ae: AggregateExpression => ae.withNewChildren(ae.children.map(makeInputProject(_, inputProjects))) - case ae: WindowExpression => ae.withNewChildren(ae.children.map(makeInputProject(_, inputProjects))) + case ae: AggregateExpression => ae.withNewChildren( + ae.children.map(makeInputProject(_, inputProjects))) + case ae: WindowExpression => ae.withNewChildren( + ae.children.map(makeInputProject(_, inputProjects))) case func @ (_: AggregateFunction | _: WindowFunction) => val params = func.children // rewrite val rewritten = func match { case _: Average => - // rewrite params for AVG + // rewrite params for AVG params.map { param => param.dataType match { @@ -338,7 +364,8 @@ object ColumnarWindowExec { DataType.equalsStructurally(from, to) } - def makeOutputProject(ex: Expression, windows: ListBuffer[NamedExpression], inputProjects: ListBuffer[NamedExpression]): Expression = { + def makeOutputProject(ex: Expression, windows: ListBuffer[NamedExpression], + inputProjects: ListBuffer[NamedExpression]): Expression = { val out = ex match { case we: WindowExpression => val aliasName = "__alias_%d__".format(Random.nextLong()) @@ -357,34 +384,115 @@ object ColumnarWindowExec { } } catch { case t: Throwable => + // scalastyle:off println System.err.println("Warning: " + t.getMessage) Cast(out, ex.dataType) + // scalastyle:on println } casted } - val windows = ListBuffer[NamedExpression]() - val inProjectExpressions = ListBuffer[NamedExpression]() - val outProjectExpressions = windowExpression.map(e => e.asInstanceOf[Alias]) - .map { a => - a.withNewChildren(List(makeOutputProject(a.child, windows, inProjectExpressions))) + override def apply(plan: SparkPlan): SparkPlan = plan transformUp { + case p @ ColumnarWindowExec(windowExpression, partitionSpec, orderSpec, child) => + val windows = ListBuffer[NamedExpression]() + val inProjectExpressions = ListBuffer[NamedExpression]() + val outProjectExpressions = windowExpression.map(e => e.asInstanceOf[Alias]) + .map { a => + a.withNewChildren(List(makeOutputProject(a.child, windows, inProjectExpressions))) .asInstanceOf[NamedExpression] - } + } + val inputProject = ColumnarConditionProjectExec(null, + child.output ++ inProjectExpressions, child) + val window = new ColumnarWindowExec(windows, partitionSpec, orderSpec, inputProject) + val outputProject = ColumnarConditionProjectExec(null, + child.output ++ outProjectExpressions, window) + outputProject + } + } + + object RemoveSort extends Rule[SparkPlan] with PredicateHelper { + override def apply(plan: SparkPlan): SparkPlan = plan transform { + case p1 @ ColumnarWindowExec(_, _, _, p2 @ (_: SortExec | _: ColumnarSortExec)) => + p1.withNewChildren(p2.children) + } + } - val inputProject = ColumnarConditionProjectExec(null, child.output ++ inProjectExpressions, child) + object RemoveCoalesceBatches extends Rule[SparkPlan] with PredicateHelper { + override def apply(plan: SparkPlan): SparkPlan = plan transform { + case p1 @ ColumnarWindowExec(_, _, _, p2: CoalesceBatchesExec) => + p1.withNewChildren(p2.children) + } + } + + object CastMutableTypes extends Rule[SparkPlan] with PredicateHelper { + override def apply(plan: SparkPlan): SparkPlan = plan transform { + case p: ColumnarWindowExec => p.transformExpressionsDown { + case we @ WindowExpression(ae @ AggregateExpression(af, _, _, _, _), _) => af match { + case Min(e) => e.dataType match { + case t @ (_: TimestampType) => + Cast(we.copy( + windowFunction = + ae.copy(aggregateFunction = Min(Cast(e, LongType)))), TimestampType) + case t @ (_: DateType) => + Cast( + Cast(we.copy( + windowFunction = + ae.copy(aggregateFunction = Min(Cast(Cast(e, TimestampType, + Some(DateTimeUtils.TimeZoneUTC.getID)), LongType)))), + TimestampType), DateType, Some(DateTimeUtils.TimeZoneUTC.getID)) + case _ => we + } + case Max(e) => e.dataType match { + case t @ (_: TimestampType) => + Cast(we.copy( + windowFunction = + ae.copy(aggregateFunction = Max(Cast(e, LongType)))), TimestampType) + case t @ (_: DateType) => + Cast( + Cast(we.copy( + windowFunction = + ae.copy(aggregateFunction = Max(Cast(Cast(e, TimestampType, + Some(DateTimeUtils.TimeZoneUTC.getID)), LongType)))), + TimestampType), DateType, Some(DateTimeUtils.TimeZoneUTC.getID)) + case _ => we + } + case _ => we + } + } + } + } - val window = new ColumnarWindowExec(windows, partitionSpec, orderSpec, inputProject) + object Validate extends Rule[SparkPlan] with PredicateHelper { + override def apply(plan: SparkPlan): SparkPlan = plan transform { + case w: ColumnarWindowExec => + w.validateWindowFunctions() + w + } + } - val outputProject = ColumnarConditionProjectExec(null, child.output ++ outProjectExpressions, window) + object ColumnarWindowOptimizations extends RuleExecutor[SparkPlan] { + override protected def batches: Seq[ColumnarWindowOptimizations.Batch] = + Batch("Remove Sort", FixedPoint(10), RemoveSort) :: + Batch("Remove Coalesce Batches", FixedPoint(10), RemoveCoalesceBatches) :: + Batch("Cast Mutable Types", Once, CastMutableTypes) :: + Batch("Add Projections", FixedPoint(1), AddProjectionsAroundWindow) :: + Batch("Validate", Once, Validate) :: + Nil + } - outputProject + def optimize(plan: ColumnarWindowExec): SparkPlan = { + ColumnarWindowOptimizations.execute(plan) } - def create( - windowExpression: Seq[NamedExpression], + def createWithOptimizations(windowExpression: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], child: SparkPlan): SparkPlan = { - createWithProjection(windowExpression, partitionSpec, orderSpec, child) + val columnar = new ColumnarWindowExec( + windowExpression, + partitionSpec, + orderSpec, + child) + ColumnarWindowExec.optimize(columnar) } } diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala index e4ef15cfd..3f609551b 100644 --- a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala +++ b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala @@ -105,7 +105,14 @@ class TPCDSSuite extends QueryTest with SharedSparkSession { df.show() } - test("window function with decimal input 2") { + test("window function with date input") { + val df = spark.sql("SELECT MAX(cc_rec_start_date) OVER (PARTITION BY cc_company)" + + "FROM call_center LIMIT 100") + df.explain() + df.show() + } + + ignore("window function with decimal input 2") { val df = spark.sql("SELECT i_item_sk, i_class_id, RANK()" + " OVER (PARTITION BY i_class_id ORDER BY i_current_price) FROM item LIMIT 1000") df.explain() From a8d6bcf89ef7612d6807fb654e931546f4751257 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 27 Apr 2021 18:33:54 +0800 Subject: [PATCH 2/5] fix --- .../com/intel/oap/execution/ColumnarWindowExec.scala | 2 +- .../test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala | 3 ++- .../src/codegen/arrow_compute/ext/actions_impl.cc | 12 ++++++++++++ .../cpp/src/codegen/arrow_compute/ext/kernels_ext.h | 5 +++++ .../src/codegen/arrow_compute/ext/window_kernel.cc | 7 +++++++ 5 files changed, 27 insertions(+), 2 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala index 3d184ab0f..517d37689 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala @@ -474,7 +474,7 @@ object ColumnarWindowExec extends Logging { override protected def batches: Seq[ColumnarWindowOptimizations.Batch] = Batch("Remove Sort", FixedPoint(10), RemoveSort) :: Batch("Remove Coalesce Batches", FixedPoint(10), RemoveCoalesceBatches) :: - Batch("Cast Mutable Types", Once, CastMutableTypes) :: +// Batch("Cast Mutable Types", Once, CastMutableTypes) :: Batch("Add Projections", FixedPoint(1), AddProjectionsAroundWindow) :: Batch("Validate", Once, Validate) :: Nil diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala index 3f609551b..128f68bb7 100644 --- a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala +++ b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala @@ -106,7 +106,8 @@ class TPCDSSuite extends QueryTest with SharedSparkSession { } test("window function with date input") { - val df = spark.sql("SELECT MAX(cc_rec_start_date) OVER (PARTITION BY cc_company)" + + val df = spark.sql("SELECT MAX(cc_rec_end_date) OVER (PARTITION BY cc_company)," + + "MIN(cc_rec_end_date) OVER (PARTITION BY cc_company)" + "FROM call_center LIMIT 100") df.explain() df.show() diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc index c930d5cce..67978fd12 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc @@ -3696,6 +3696,12 @@ arrow::Status MakeMinAction(arrow::compute::ExecContext* ctx, *out = std::dynamic_pointer_cast(action_ptr); \ } break; PROCESS_SUPPORTED_TYPES(PROCESS) + case arrow::Date32Type::type_id: { + using CType = typename arrow::TypeTraits::CType;auto action_ptr = std::make_shared< + MinAction> + (ctx, type); + *out = std::dynamic_pointer_cast(action_ptr); + } break; case arrow::Decimal128Type::type_id: { auto action_ptr = std::make_shared>(ctx, @@ -3721,6 +3727,12 @@ arrow::Status MakeMaxAction(arrow::compute::ExecContext* ctx, *out = std::dynamic_pointer_cast(action_ptr); \ } break; PROCESS_SUPPORTED_TYPES(PROCESS) + case arrow::Date32Type::type_id: { + using CType = typename arrow::TypeTraits::CType;auto action_ptr = std::make_shared< + MaxAction> + (ctx, type); + *out = std::dynamic_pointer_cast(action_ptr); + } break; case arrow::Decimal128Type::type_id: { auto action_ptr = std::make_shared>(ctx, diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h index 4921141c6..9225aa19f 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h @@ -143,6 +143,11 @@ class WindowAggregateFunctionKernel : public KernalBase { arrow::Result>> createBuilder(std::shared_ptr data_type); + template + typename arrow::enable_if_date>> + createBuilder(std::shared_ptr data_type); + template typename arrow::enable_if_number>> createBuilder(std::shared_ptr data_type); diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_kernel.cc index 003c22f63..ce01fc870 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_kernel.cc @@ -153,6 +153,7 @@ arrow::Status WindowAggregateFunctionKernel::Evaluate(const ArrayList& in) { PROC(arrow::Int64Type, arrow::Int64Builder, arrow::Int64Array) \ PROC(arrow::FloatType, arrow::FloatBuilder, arrow::FloatArray) \ PROC(arrow::DoubleType, arrow::DoubleBuilder, arrow::DoubleArray) \ + PROC(arrow::Date32Type, arrow::Date32Builder, arrow::Date32Array) \ PROC(arrow::Decimal128Type, arrow::Decimal128Builder, arrow::Decimal128Array) arrow::Status WindowAggregateFunctionKernel::Finish(ArrayList* out) { @@ -211,6 +212,12 @@ WindowAggregateFunctionKernel::createBuilder(std::shared_ptr da return std::make_shared(data_type, ctx_->memory_pool()); } +template +typename arrow::enable_if_date>> +WindowAggregateFunctionKernel::createBuilder(std::shared_ptr data_type) { + return std::make_shared(ctx_->memory_pool()); +} + template typename arrow::enable_if_number>> WindowAggregateFunctionKernel::createBuilder(std::shared_ptr data_type) { From d6d66fb611ebdbda8b4787fab806b9993ff1edfd Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 27 Apr 2021 18:35:43 +0800 Subject: [PATCH 3/5] comment --- .../scala/com/intel/oap/execution/ColumnarWindowExec.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala index 517d37689..8c23f0e50 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala @@ -424,6 +424,9 @@ object ColumnarWindowExec extends Logging { } } + /** + * FIXME casting solution for timestamp/date32 support + */ object CastMutableTypes extends Rule[SparkPlan] with PredicateHelper { override def apply(plan: SparkPlan): SparkPlan = plan transform { case p: ColumnarWindowExec => p.transformExpressionsDown { From 0e9874df06638f62dca25418fc4a0322d55378b5 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 28 Apr 2021 11:57:22 +0800 Subject: [PATCH 4/5] format --- .../cpp/src/codegen/arrow_compute/ext/actions_impl.cc | 10 ++++------ .../cpp/src/codegen/arrow_compute/ext/kernels_ext.h | 3 +-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc index 67978fd12..43f530cb2 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc @@ -3697,9 +3697,8 @@ arrow::Status MakeMinAction(arrow::compute::ExecContext* ctx, } break; PROCESS_SUPPORTED_TYPES(PROCESS) case arrow::Date32Type::type_id: { - using CType = typename arrow::TypeTraits::CType;auto action_ptr = std::make_shared< - MinAction> - (ctx, type); + using CType = typename arrow::TypeTraits::CType; + auto action_ptr = std::make_shared>(ctx, type); *out = std::dynamic_pointer_cast(action_ptr); } break; case arrow::Decimal128Type::type_id: { @@ -3728,9 +3727,8 @@ arrow::Status MakeMaxAction(arrow::compute::ExecContext* ctx, } break; PROCESS_SUPPORTED_TYPES(PROCESS) case arrow::Date32Type::type_id: { - using CType = typename arrow::TypeTraits::CType;auto action_ptr = std::make_shared< - MaxAction> - (ctx, type); + using CType = typename arrow::TypeTraits::CType; + auto action_ptr = std::make_shared>(ctx, type); *out = std::dynamic_pointer_cast(action_ptr); } break; case arrow::Decimal128Type::type_id: { diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h index 9225aa19f..734d8c727 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h @@ -144,8 +144,7 @@ class WindowAggregateFunctionKernel : public KernalBase { createBuilder(std::shared_ptr data_type); template - typename arrow::enable_if_date>> + typename arrow::enable_if_date>> createBuilder(std::shared_ptr data_type); template From 421f50b641c242aff76e16863d843de3c6814adc Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Sat, 8 May 2021 14:13:32 +0800 Subject: [PATCH 5/5] dependency --- arrow-data-source/pom.xml | 56 ++++----------------------- native-sql-engine/core/pom.xml | 28 +------------- pom.xml | 71 ++++++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 75 deletions(-) diff --git a/arrow-data-source/pom.xml b/arrow-data-source/pom.xml index c3cafb0f4..f6c1f368e 100644 --- a/arrow-data-source/pom.xml +++ b/arrow-data-source/pom.xml @@ -100,26 +100,6 @@ ${scala.version} provided - - org.apache.spark - spark-sql_2.12 - ${spark.version} - - - org.apache.arrow - arrow-vector - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - - provided - junit junit @@ -128,44 +108,24 @@ org.apache.spark - spark-core_2.12 - ${spark.version} + spark-sql_${scala.binary.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} test-jar test - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - org.apache.spark - spark-catalyst_2.12 - ${spark.version} - - - org.apache.arrow - arrow-vector - - + spark-catalyst_${scala.binary.version} test-jar test org.apache.spark - spark-sql_2.12 - ${spark.version} - - - org.apache.arrow - arrow-vector - - + spark-sql_${scala.binary.version} test-jar test diff --git a/native-sql-engine/core/pom.xml b/native-sql-engine/core/pom.xml index 15b0eff21..a6cbb89f5 100644 --- a/native-sql-engine/core/pom.xml +++ b/native-sql-engine/core/pom.xml @@ -48,50 +48,24 @@ org.apache.spark - spark-core_${scala.binary.version} - ${spark.version} + spark-sql_${scala.binary.version} provided - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - org.apache.spark spark-core_${scala.binary.version} - ${spark.version} test-jar test org.apache.spark spark-catalyst_${scala.binary.version} - ${spark.version} - provided - - - org.apache.spark - spark-catalyst_${scala.binary.version} - ${spark.version} test-jar test org.apache.spark spark-sql_${scala.binary.version} - ${spark.version} - provided - - - org.apache.spark - spark-sql_${scala.binary.version} - ${spark.version} test-jar test diff --git a/pom.xml b/pom.xml index f2ff9888d..10cd1bdbb 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,77 @@ native-sql-engine/core + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + org.apache.arrow + arrow-vector + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + test-jar + test + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.version} + + + org.apache.arrow + * + + + test-jar + test + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + + + org.apache.arrow + * + + + test-jar + test + + + + hadoop-3.2