Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-207] improve the fix for join optimization (#541)
Browse files Browse the repository at this point in the history
* add back cond project optimization

* fix UT
  • Loading branch information
rui-mo authored Nov 1, 2021
1 parent 996f01c commit 1cad018
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ case class ColumnarBroadcastHashJoinExec(
}

override def output: Seq[Attribute] =
if (projectList == null || projectList.isEmpty) super.output
if (projectList == null) super.output
else projectList.map(_.toAttribute)
def getBuildPlan: SparkPlan = buildPlan
override def supportsColumnar = true
Expand Down Expand Up @@ -429,11 +429,22 @@ case class ColumnarBroadcastHashJoinExec(
}
val outputNumRows = output_rb.getLength
ConverterUtils.releaseArrowRecordBatch(input_rb)
val output = ConverterUtils.fromArrowRecordBatch(probe_out_schema, output_rb)
ConverterUtils.releaseArrowRecordBatch(output_rb)
eval_elapse += System.nanoTime() - beforeEval
val resBatch = if (probe_out_schema.getFields.size() == 0) {
// If no col is selected by Projection, empty batch will be returned.
val resultColumnVectors =
ArrowWritableColumnVector.allocateColumns(0, resultStructType)
ConverterUtils.releaseArrowRecordBatch(output_rb)
eval_elapse += System.nanoTime() - beforeEval
new ColumnarBatch(
resultColumnVectors.map(_.asInstanceOf[ColumnVector]), outputNumRows)
} else {
val output = ConverterUtils.fromArrowRecordBatch(probe_out_schema, output_rb)
ConverterUtils.releaseArrowRecordBatch(output_rb)
eval_elapse += System.nanoTime() - beforeEval
new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]).toArray, outputNumRows)
}
numOutputRows += outputNumRows
new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]).toArray, outputNumRows)
resBatch
}
}
SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ case class ColumnarCollapseCodegenStages(
plan: ColumnarConditionProjectExec,
skip_smj: Boolean = false): SparkPlan = plan.child match {
case p: ColumnarBroadcastHashJoinExec
if plan.condition == null && plan.projectList == null =>
if plan.condition == null && !containsExpression(plan.projectList) =>
ColumnarBroadcastHashJoinExec(
p.leftKeys,
p.rightKeys,
Expand Down

0 comments on commit 1cad018

Please sign in to comment.