-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20801] Record accurate size of blocks in MapStatus when it's above threshold. #18031
Conversation
To resolve the comments in #16989 :
I propose two configs:
Another idea is just to only keep |
I try to give user a way to control the memory strictly and no blocks are underestimated(setting spark.shuffle.accurateBlockThreshold=0 and spark.shuffle.accurateBlockThresholdByTimesAverage=1). I'm a little bit hesitant to remove the huge blocks from the numerator in that calculation for average size. |
|
||
} | ||
i += 1 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for bringing in another while loop here. I have to calculate the average size first, then filter out the huge blocks. I don't have a better implementation to merge the two while loops into one :(
Test build #77056 has finished for PR 18031 at commit
|
Jenkins, retest this please |
Gentle ping to @JoshRosen @cloud-fan @mridulm |
Test build #77058 has finished for PR 18031 at commit
|
f6670d8
to
970421b
Compare
Test build #77069 has finished for PR 18031 at commit
|
Test build #77070 has finished for PR 18031 at commit
|
* plus a bitmap for tracking which blocks are empty. | ||
* A [[MapStatus]] implementation that stores the accurate size of huge blocks, which are larger | ||
* than both [[config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD]] and | ||
* [[config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE]] * averageSize. It stores the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks the documentation generation for Javadoc 8 is being failed due to these links -
[error] /home/jenkins/workspace/SparkPullRequestBuilder@2/core/target/java/org/apache/spark/scheduler/HighlyCompressedMapStatus.java:4: error: reference not found
[error] * than both {@link config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD} and
[error] ^
[error] /home/jenkins/workspace/SparkPullRequestBuilder@2/core/target/java/org/apache/spark/scheduler/HighlyCompressedMapStatus.java:5: error: reference not found
[error] * {@link config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE} * averageSize. It stores the
[error] ^
Probably, we should wrap it `...`
as I did before - #16013 or find a way to make this link properly.
The other errors seem spurious. Please refer my observation - #17389 (comment)
(I think we should fix it or document ^ somewhere at least).
@HyukjinKwon |
Test build #77072 has finished for PR 18031 at commit
|
val threshold2 = avgSize * Option(SparkEnv.get) | ||
.map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE)) | ||
.getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE.defaultValue.get) | ||
val threshold = math.max(threshold1, threshold2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for curiosity: is there any reason we compute threshold in this way? Is it an empirical threshold?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suppose each map task produces a 90MB bucket and many small buckets (skew data), then avgSize can be very small, and threshold would be 100MB because 100MB (threshold1) > 2 * avgSize (threshold2). If the number of map tasks is large (several hundreads or more), OOM can still happen, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the case you mentioned above is a really good one. But setting spark.shuffle.accurateBlockThreshold
means we can accept sacrificing accuracy of blocks smaller than spark.shuffle.accurateBlockThreshold
. If we want it to be more accurate, set it larger(in this case we can set it 50M). Thus size of the big bucket will be accurate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but my point is these two configs are difficult for users to set. Seems we still need to adjust them case by case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I agree that with this pr, at lease we have a workaround for oom problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is to avoid the OOM. To adjust the value of this config, user needs to be sophisticated. I agree that these two configs are difficult. But with the default setting, we can really avoid some OOM situations(e.g. super huge block when skew happens).
emptyBlocks.trim() | ||
emptyBlocks.runOptimize() | ||
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize) | ||
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that this https://github.com/apache/spark/pull/16989/files#r117174623 is a good comment to have accurate size for the smaller blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Thanks a lot for taking time looking into this pr :)
remove the huge blocks from the numerator in that calculation so that you more accurately size the smaller blocks
Yes, I think this is really good idea to have accurate size for smaller blocks. But I'm proposing two configs(spark.shuffle.accurateBlockThreshold
and spark.shuffle.accurateBlockThresholdByTimesAverage
) in current change, I have to compute the average twice: 1) the average calculated including huge blocks, thus I can filter out the huge blocks 2) the average calculated without huge blocks, thus I can have accurate size for the smaller blocks. A little bit complicated, right? How about remove the spark.shuffle.accurateBlockThresholdByTimesAverage
? Thus we can simplify the logic. @cloud-fan Any ideas about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In current change, if almost all blocks are huge, that's said it is not a skew case, so we won't mark the blocks as huge ones. Then we will still fetch them into memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the default value (spark.shuffle.accurateBlockThreshold=100M and spark.shuffle.accurateBlockThresholdByTimesAverage=2), Yes.
But the user can make it more strict by setting (spark.shuffle.accurateBlockThreshold=0 and spark.shuffle.accurateBlockThresholdByTimesAverage=1).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd tend to have just one flag and simplify the configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for one flag, let's only keep spark.shuffle.accurateBlockThreshold
*/ | ||
private[spark] class HighlyCompressedMapStatus private ( | ||
private[this] var loc: BlockManagerId, | ||
private[this] var numNonEmptyBlocks: Int, | ||
private[this] var emptyBlocks: RoaringBitmap, | ||
private[this] var avgSize: Long) | ||
private[this] var avgSize: Long, | ||
@transient private var hugeBlockSizes: Map[Int, Byte]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I go through part of codes in #16989. It seems to me that If we want is to know which shuffle request should be go to disk instead of memory, do we need to record the mapping of block ids and accurate sizes?
A simpler approach can be adding a bitmap for hugeBlocks. And we can simply fetch those blocks into disk. Another benefit by doing this is to avoid introducing another config REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM
to decide which blocks going to disk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it makes sense to add bitmap for hugeBlocks. But I'm a little bit hesitant. I still prefer to have hugeBlockSizes
more independent from upper logic. In addition, the accurate size of blocks can also have positive effect on pending requests. (e.g. spark.reducer.maxSizeInFlight
can control the size of pending requests better.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The control of spark.reducer.maxSizeInFlight
is not a big problem. It seems to me that any blocks considered as huge should break maxSizeInFlight
and can't be fetching in parallel. We actually don't need to know accurate size of huge blocks, we just need to know it's huge and it should be more than maxSizeInFlight
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya We had this discussion before in the earlier PR (which this is split from).
maxSizeInFlight meant to control how much data can be fetched in parallel and tuned based on network throughput and not memory (though currently, they are directly dependent due to implementation detail).
In reality, it is fairly small compared to what can be held in memory (48mb is default iirc) - since the memory and IO subsystems have different characteristics, using same config to control behavior in both will lead to suboptimal behavior (for example, large memory systems where large amounts can be held in memory, but network bandwidth is not propotionally higher).
94fa7bb
to
a313744
Compare
Test build #77165 has finished for PR 18031 at commit
|
Test build #77166 has finished for PR 18031 at commit
|
// Remove the huge blocks from the calculation for average size and have accurate size for | ||
// smaller blocks. | ||
if (size > threshold) { | ||
totalSize += size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be put in the else branch
val threshold = Option(SparkEnv.get) | ||
.map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD)) | ||
.getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get) | ||
val hugeBlockSizesArray = ArrayBuffer[Tuple2[Int, Byte]]() | ||
while (i < totalNumBlocks) { | ||
var size = uncompressedSizes(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related, why this is a var
?
Test build #77169 has started for PR 18031 at commit |
Test build #77171 has finished for PR 18031 at commit
|
Test build #77182 has finished for PR 18031 at commit
|
In current change:
|
…bove threshold. ## What changes were proposed in this pull request? Currently, when number of reduces is above 2000, HighlyCompressedMapStatus is used to store size of blocks. in HighlyCompressedMapStatus, only average size is stored for non empty blocks. Which is not good for memory control when we shuffle blocks. It makes sense to store the accurate size of block when it's above threshold. ## How was this patch tested? Added test in MapStatusSuite. Author: jinxing <[email protected]> Closes #18031 from jinxing64/SPARK-20801. (cherry picked from commit 2597674) Signed-off-by: Wenchen Fan <[email protected]>
thanks, merging to master/2.2! |
For other reviewers, this is kind of a stability fix, so I backported to branch 2.2 |
@cloud-fan |
…bove threshold. ## What changes were proposed in this pull request? Currently, when number of reduces is above 2000, HighlyCompressedMapStatus is used to store size of blocks. in HighlyCompressedMapStatus, only average size is stored for non empty blocks. Which is not good for memory control when we shuffle blocks. It makes sense to store the accurate size of block when it's above threshold. ## How was this patch tested? Added test in MapStatusSuite. Author: jinxing <[email protected]> Closes apache#18031 from jinxing64/SPARK-20801.
What changes were proposed in this pull request?
Currently, when number of reduces is above 2000, HighlyCompressedMapStatus is used to store size of blocks. in HighlyCompressedMapStatus, only average size is stored for non empty blocks. Which is not good for memory control when we shuffle blocks. It makes sense to store the accurate size of block when it's above threshold.
How was this patch tested?
Added test in MapStatusSuite.