From 1cad0184416c07667ed64b1918fc7f0294cc59e2 Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Mon, 1 Nov 2021 21:47:06 +0800 Subject: [PATCH] [NSE-207] improve the fix for join optimization (#541) * add back cond project optimization * fix UT --- .../ColumnarBroadcastHashJoinExec.scala | 21 ++++++++++++++----- .../ColumnarCollapseCodegenStages.scala | 2 +- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala index 2f050fc78..f5b3109a6 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala @@ -137,7 +137,7 @@ case class ColumnarBroadcastHashJoinExec( } override def output: Seq[Attribute] = - if (projectList == null || projectList.isEmpty) super.output + if (projectList == null) super.output else projectList.map(_.toAttribute) def getBuildPlan: SparkPlan = buildPlan override def supportsColumnar = true @@ -429,11 +429,22 @@ case class ColumnarBroadcastHashJoinExec( } val outputNumRows = output_rb.getLength ConverterUtils.releaseArrowRecordBatch(input_rb) - val output = ConverterUtils.fromArrowRecordBatch(probe_out_schema, output_rb) - ConverterUtils.releaseArrowRecordBatch(output_rb) - eval_elapse += System.nanoTime() - beforeEval + val resBatch = if (probe_out_schema.getFields.size() == 0) { + // If no col is selected by Projection, empty batch will be returned. + val resultColumnVectors = + ArrowWritableColumnVector.allocateColumns(0, resultStructType) + ConverterUtils.releaseArrowRecordBatch(output_rb) + eval_elapse += System.nanoTime() - beforeEval + new ColumnarBatch( + resultColumnVectors.map(_.asInstanceOf[ColumnVector]), outputNumRows) + } else { + val output = ConverterUtils.fromArrowRecordBatch(probe_out_schema, output_rb) + ConverterUtils.releaseArrowRecordBatch(output_rb) + eval_elapse += System.nanoTime() - beforeEval + new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]).toArray, outputNumRows) + } numOutputRows += outputNumRows - new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]).toArray, outputNumRows) + resBatch } } SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseCodegenStages.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseCodegenStages.scala index b8694b81b..07e69f3d9 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseCodegenStages.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseCodegenStages.scala @@ -178,7 +178,7 @@ case class ColumnarCollapseCodegenStages( plan: ColumnarConditionProjectExec, skip_smj: Boolean = false): SparkPlan = plan.child match { case p: ColumnarBroadcastHashJoinExec - if plan.condition == null && plan.projectList == null => + if plan.condition == null && !containsExpression(plan.projectList) => ColumnarBroadcastHashJoinExec( p.leftKeys, p.rightKeys,