Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-981] Add more codegen checking in BHJ & SHJ (#995)
Browse files Browse the repository at this point in the history
* Inital commit

* Cover no consition case for BHJ & SHJ
  • Loading branch information
PHILO-HE authored Jul 4, 2022
1 parent bf685d1 commit 81f4195
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ case class ColumnarBroadcastHashJoinExec(
}
}

var supportCodegen = true

buildCheck()

// A method in ShuffledJoin of spark3.2.
Expand All @@ -111,6 +113,7 @@ case class ColumnarBroadcastHashJoinExec(
ColumnarExpressionConverter.replaceWithColumnarExpression(conditionExpr)
val supportCodegen =
columnarConditionExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null)
this.supportCodegen = this.supportCodegen && supportCodegen
// Columnar BHJ with condition only has codegen version of implementation.
if (!supportCodegen) {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -141,12 +144,30 @@ case class ColumnarBroadcastHashJoinExec(
// build check for expr
if (buildKeyExprs != null) {
for (expr <- buildKeyExprs) {
ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
val columnarBuildKeyExpr = ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
val supportCodegen =
columnarBuildKeyExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null)
this.supportCodegen = this.supportCodegen && supportCodegen
// Fall back the join who has join condition, but does not support codegen.
if (condition.isDefined && !supportCodegen) {
throw new UnsupportedOperationException("Fall back due to codegen is" +
" not supported for " + columnarBuildKeyExpr)
}

}
}
if (streamedKeyExprs != null) {
for (expr <- streamedKeyExprs) {
ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
val columnarStreamedKeyExpr =
ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
val supportCodegen =
columnarStreamedKeyExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null)
this.supportCodegen = this.supportCodegen && supportCodegen
// Fall back the join who has join condition, but does not support codegen.
if (condition.isDefined && !supportCodegen) {
throw new UnsupportedOperationException("Fall back due to codegen is" +
" not supported for " + columnarStreamedKeyExpr)
}
}
}
}
Expand Down Expand Up @@ -273,7 +294,9 @@ case class ColumnarBroadcastHashJoinExec(

override def getChild: SparkPlan = streamedPlan

override def supportColumnarCodegen: Boolean = true
override def supportColumnarCodegen: Boolean = {
this.supportCodegen
}

val output_skip_alias =
if (projectList == null || projectList.isEmpty) super.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ case class ColumnarShuffledHashJoinExec(
"buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"),
"joinTime" -> SQLMetrics.createTimingMetric(sparkContext, "join time"))

var supportCodegen = true

buildCheck()

// For spark 3.2.
Expand Down Expand Up @@ -129,7 +131,15 @@ case class ColumnarShuffledHashJoinExec(
// build check for condition
val conditionExpr: Expression = condition.orNull
if (conditionExpr != null) {
ColumnarExpressionConverter.replaceWithColumnarExpression(conditionExpr)
val columnarConditionExpr =
ColumnarExpressionConverter.replaceWithColumnarExpression(conditionExpr)
val supportCodegen =
columnarConditionExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null)
this.supportCodegen = this.supportCodegen && supportCodegen
if (!supportCodegen) {
throw new UnsupportedOperationException(
"Condition expression is not fully supporting codegen!")
}
}
// build check types
for (attr <- streamedPlan.output) {
Expand All @@ -153,12 +163,29 @@ case class ColumnarShuffledHashJoinExec(
// build check for expr
if (buildKeyExprs != null) {
for (expr <- buildKeyExprs) {
ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
val columnarBuildKeyExpr = ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
val supportCodegen =
columnarBuildKeyExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null)
this.supportCodegen = this.supportCodegen && supportCodegen
// Fall back the join who has join condition, but does not support codegen.
if (condition.isDefined && !supportCodegen) {
throw new UnsupportedOperationException("Fall back due to codegen is" +
" not supported for " + columnarBuildKeyExpr)
}
}
}
if (streamedKeyExprs != null) {
for (expr <- streamedKeyExprs) {
ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
val columnarStreamedKeyExpr =
ColumnarExpressionConverter.replaceWithColumnarExpression(expr)
val supportCodegen =
columnarStreamedKeyExpr.asInstanceOf[ColumnarExpression].supportColumnarCodegen(null)
this.supportCodegen = this.supportCodegen && supportCodegen
// Fall back the join who has join condition, but does not support codegen.
if (condition.isDefined && !supportCodegen) {
throw new UnsupportedOperationException("Fall back due to codegen is" +
" not supported for " + columnarStreamedKeyExpr)
}
}
}
}
Expand Down Expand Up @@ -234,7 +261,9 @@ case class ColumnarShuffledHashJoinExec(
.prepareHashBuildFunction(buildKeyExprs, buildPlan.output, builder_type))
}

override def supportColumnarCodegen: Boolean = true
override def supportColumnarCodegen: Boolean = {
this.supportCodegen
}

val output_skip_alias =
if (projectList == null || projectList.isEmpty) super.output
Expand Down Expand Up @@ -286,7 +315,7 @@ case class ColumnarShuffledHashJoinExec(
}

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
// we will use previous codegen join to handle joins with condition
// we will use previous codegen join to handle joins with condition
if (condition.isDefined) {
return getCodeGenIterator
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ case class ColumnarSortMergeJoinExec(
Seq(streamedPlan.executeColumnar())
}

// Only has codegen implementation.
override def supportColumnarCodegen: Boolean = true

val output_skip_alias =
Expand Down

0 comments on commit 81f4195

Please sign in to comment.