Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-5][SCALA] Fix ColumnarBroadcastExchange didn't fallback issue w/…
Browse files Browse the repository at this point in the history
… DPP (#6)

* [SCALA] Fix ColumnarBroadcastExchange didn't fallback issue when DPP is enabled

Signed-off-by: Chendi Xue <[email protected]>

* [SCALA & JAVA] Fix ExpressionEvaluator api change null pointer issue

Signed-off-by: Chendi Xue <[email protected]>
  • Loading branch information
xuechendi authored Jan 5, 2021
1 parent cea3f04 commit 089ccf9
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public void close() {
}

byte[] getSchemaBytesBuf(Schema schema) throws IOException {
if (schema == null) return null;
ByteArrayOutputStream out = new ByteArrayOutputStream();
MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), schema);
return out.toByteArray();
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
*/
private def insertRowGuardRecursive(plan: SparkPlan): SparkPlan = {
plan match {
case p: ShuffleExchangeExec =>
RowGuard(p.withNewChildren(p.children.map(insertRowGuardOrNot)))
case p: BroadcastExchangeExec =>
RowGuard(p.withNewChildren(p.children.map(insertRowGuardOrNot)))
case p: ShuffledHashJoinExec =>
RowGuard(p.withNewChildren(p.children.map(insertRowGuardRecursive)))
case p if !supportCodegen(p) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan)
throw e
} finally {
val timeout: Int = SQLConf.get.broadcastTimeout.toInt
relation.asInstanceOf[ColumnarHashedRelation].countDownClose(timeout)
if (relation != null)
relation.asInstanceOf[ColumnarHashedRelation].countDownClose(timeout)
}
}
}
Expand Down

0 comments on commit 089ccf9

Please sign in to comment.