Skip to content

Commit

Permalink
Ensure we exhaust row iterator before closing the hostBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
abellina committed Jun 27, 2023
1 parent 0535357 commit 48e8c05
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ case class GpuBroadcastToRowExec(
BoundReference(idx, buildKeys(idx).dataType, buildKeys(idx).nullable))
rowProject(broadcastRow).copy().asInstanceOf[InternalRow]
}
}.toArray
}.toArray // force evaluation so we don't close hostBatch too soon
}

gpuLongMetric("dataSize") += serBatch.dataSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,13 @@ case class GpuSubqueryBroadcastExec(
hostBatch.rowIterator().asScala.map { row =>
val broadcastRow = broadcastModeProject.map(_(row)).getOrElse(row)
rowProject(broadcastRow).copy().asInstanceOf[InternalRow]
}
}.toArray // force evaluation so we don't close hostBatch too soon
}

gpuLongMetric("dataSize") += serBatch.dataSize
gpuLongMetric(COLLECT_TIME) += System.nanoTime() - beforeCollect

result.toArray
result
}

protected override def doExecute(): RDD[InternalRow] = {
Expand Down

0 comments on commit 48e8c05

Please sign in to comment.