From ddc8631b6649419b2f0e62195b3c86c7ac3f7521 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Sun, 20 Mar 2022 15:46:02 +0800 Subject: [PATCH] fallback on reused broadcast exchange Signed-off-by: Yuan Zhou --- .../com/intel/oap/extension/ColumnarOverrides.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index d902305c8..933202117 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -209,8 +209,15 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { right) case plan: BroadcastQueryStageExec => logDebug( - s"Columnar Processing for ${plan.getClass} is currently supported, actual plan is ${plan.plan.getClass}.") - plan + s"Columnar Processing for ${plan.getClass} is currently supported, actual plan is ${plan.plan}.") + plan.plan match { + case ReusedExchangeExec(_, originalBroadcastPlan: ColumnarBroadcastExchangeAdaptor) => + val newBroadcast = BroadcastExchangeExec( + originalBroadcastPlan.mode, + DataToArrowColumnarExec(plan.plan, 1)) + SparkShimLoader.getSparkShims.newBroadcastQueryStageExec(plan.id, newBroadcast) + case other => plan + } case plan: BroadcastExchangeExec => val child = replaceWithColumnarPlan(plan.child) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")