Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
fix shuffle redudant reat
Browse files Browse the repository at this point in the history
Signed-off-by: Yuan Zhou <[email protected]>
  • Loading branch information
zhouyuan committed May 10, 2021
1 parent 39cf3c2 commit 045666f
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class ColumnarShuffleManager(conf: SparkConf) extends ShuffleManager with Loggin
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val blocksByAddress = SparkEnv.get.mapOutputTracker
.getMapSizesByExecutorId(handle.shuffleId, 0, Int.MaxValue, startPartition, endPartition)
.getMapSizesByExecutorId(handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
if (handle.isInstanceOf[ColumnarShuffleHandle[K, _]]) {
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ import scala.concurrent.Future

case class ColumnarShuffleExchangeExec(
override val outputPartitioning: Partitioning,
child: SparkPlan)
child: SparkPlan,
shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS)
extends Exchange {

private[sql] lazy val writeMetrics =
Expand Down

0 comments on commit 045666f

Please sign in to comment.