From b7542d70213e65c1f4d06bf2bb17064efe41a11a Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Mon, 4 Jan 2021 16:37:22 +0800 Subject: [PATCH 1/2] fix columnar bhj Signed-off-by: Yuan Zhou --- .../execution/ColumnarSortMergeJoinExec.scala | 38 ++++++++++--------- .../expression/ColumnarSortMergeJoin.scala | 1 + .../ColumnarBroadcastExchangeExec.scala | 5 +-- 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala index 0d7f65ef5..d22369edd 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala @@ -85,24 +85,24 @@ case class ColumnarSortMergeJoinExec( val prepareTime = longMetric("prepareTime") val totaltime_sortmegejoin = longMetric("totaltime_sortmergejoin") val resultSchema = this.schema - try { - ColumnarSortMergeJoin.precheck( - leftKeys, - rightKeys, - resultSchema, - joinType, - condition, - left, - right, - joinTime, - prepareTime, - totaltime_sortmegejoin, - numOutputRows, - sparkConf) - } catch { - case e: Throwable => - throw e - } + //try { + // ColumnarSortMergeJoin.precheck( + // leftKeys, + // rightKeys, + // resultSchema, + // joinType, + // condition, + // left, + // right, + // joinTime, + // prepareTime, + // totaltime_sortmegejoin, + // numOutputRows, + // sparkConf) + //} catch { + // case e: Throwable => + // throw e + //} override def supportsColumnar = true @@ -319,6 +319,8 @@ case class ColumnarSortMergeJoinExec( } ColumnarCodegenContext(inputSchema, outputSchema, codeGenNode) } + val triggerBuildSignature = getCodeGenSignature + /***********************************************************/ def getCodeGenSignature: String = diff --git a/core/src/main/scala/com/intel/oap/expression/ColumnarSortMergeJoin.scala b/core/src/main/scala/com/intel/oap/expression/ColumnarSortMergeJoin.scala index ab7de2911..7447e4b2c 100644 --- a/core/src/main/scala/com/intel/oap/expression/ColumnarSortMergeJoin.scala +++ b/core/src/main/scala/com/intel/oap/expression/ColumnarSortMergeJoin.scala @@ -257,6 +257,7 @@ object ColumnarSortMergeJoin extends Logging { case (expr, i) => { val (nativeNode, returnType) = ConverterUtils.getColumnarFuncNode(expr) if (s"${nativeNode.toProtobuf}".contains("fnNode")) { + throw new UnsupportedOperationException(s"join key with expression is not supported.") rkeyProjectOrdinalList += i Field.nullable(s"${expr}", returnType) } else { diff --git a/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala index 7831ea3a2..91736b697 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala @@ -113,9 +113,8 @@ class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) hashRelationKernel.build( hash_relation_schema, Lists.newArrayList(hash_relation_expr), - null, - true, - SparkMemoryUtils.globalMemoryPool()) + true + ) val iter = ConverterUtils.convertFromNetty(output, input) var numRows: Long = 0 val _input = new ArrayBuffer[ColumnarBatch]() From 807c161f67abe2d852c9e81949b0eb7332353711 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Mon, 4 Jan 2021 17:11:54 +0800 Subject: [PATCH 2/2] fix Signed-off-by: Yuan Zhou --- .../execution/ColumnarSortMergeJoinExec.scala | 18 ------------------ .../ColumnarBroadcastExchangeExec.scala | 5 +++-- 2 files changed, 3 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala index d22369edd..ddaffb742 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala @@ -85,24 +85,6 @@ case class ColumnarSortMergeJoinExec( val prepareTime = longMetric("prepareTime") val totaltime_sortmegejoin = longMetric("totaltime_sortmergejoin") val resultSchema = this.schema - //try { - // ColumnarSortMergeJoin.precheck( - // leftKeys, - // rightKeys, - // resultSchema, - // joinType, - // condition, - // left, - // right, - // joinTime, - // prepareTime, - // totaltime_sortmegejoin, - // numOutputRows, - // sparkConf) - //} catch { - // case e: Throwable => - // throw e - //} override def supportsColumnar = true diff --git a/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala index 91736b697..7831ea3a2 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala @@ -113,8 +113,9 @@ class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) hashRelationKernel.build( hash_relation_schema, Lists.newArrayList(hash_relation_expr), - true - ) + null, + true, + SparkMemoryUtils.globalMemoryPool()) val iter = ConverterUtils.convertFromNetty(output, input) var numRows: Long = 0 val _input = new ArrayBuffer[ColumnarBatch]()