-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29435][Core]MapId in Shuffle Block is inconsistent at the writer and reader part when spark.shuffle.useOldFetchProtocol=true #26095
Conversation
cc @cloud-fan @xuanyuanking Please take a look at the fix, if it is okay I will add UT and refactor the code |
@@ -905,15 +905,8 @@ private[spark] object MapOutputTracker extends Logging { | |||
for (part <- startPartition until endPartition) { | |||
val size = status.getSizeForBlock(part) | |||
if (size != 0) { | |||
if (useOldFetchProtocol) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the report and fix!
The root cause is while we set useOldFetchProtocol=true
here, the shuffle id in the reader side and the writer side are inconsistent.
But we can't fix like this, because while useOldFetchProtocl=false
, we'll use the old version of fetching protocol OpenBlocks
, which consider map id is Integer and will directly parse the string. So for the big and long-running application, it will still not work. See the code:
Line 296 in 148cd26
mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]); |
So the right way I think is doing the fix in ShuffleWriteProcessor
, we should fill mapId with mapTaskId
or mapIndex
denpending on config spark.shuffle.useOldFetchProtocol
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, could you explain why Integer directly parse the string for the big and long-running application not work?
Is it a performance problem?
looking forward for your reply.
@@ -47,8 +47,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( | |||
context, | |||
blockManager.blockStoreClient, | |||
blockManager, | |||
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition, | |||
SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)), | |||
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the shuffle read side and we need to know the value of SHUFFLE_USE_OLD_FETCH_PROTOCOL
. I think the bug is in the shuffle write side which is fixed in this PR. Do we really need to change the shuffle read side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is redundant code, since ShuffleWrite writes the mapId based on the spark.shuffle.useOldFetchProtocol
flag, MapStatus.mapTaskId
always gives the mapId which is set by the ShuffleWriter
((ShuffleBlockId(shuffleId, status.mapTaskId, part), size, mapIndex)) | ||
} | ||
splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += | ||
((ShuffleBlockId(shuffleId, status.mapTaskId, part), size, mapIndex)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we always pick status.mapTaskId
as mapId, is this corrected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I get it now. We should rename MapStatus.mapTaskId
to mapId
.
ok to test |
Please also change the PR description, the detailed log and code can be skipped, even the code linked with master will always in changing. |
Test build #112010 has finished for PR 26095 at commit
|
@sandeep-katta In case of the consistency issue, we'd better add a UT for the old fetch protocol config. I gave a PR to your branch, please have a review: sandeep-katta#3. |
Test build #112014 has finished for PR 26095 at commit
|
Test build #112028 has finished for PR 26095 at commit
|
thanks, merging to master! |
What changes were proposed in this pull request?
Shuffle Block Construction during Shuffle Write and Read is wrong
Shuffle Map Task (Shuffle Write)
19/10/11 22:07:32| ERROR| [Executor task launch worker for task 3] org.apache.spark.shuffle.IndexShuffleBlockResolver: ####### For Debug ############ /tmp/hadoop-root1/nm-local-dir/usercache/root1/appcache/application_1570422377362_0008/blockmgr-6d03250d-6e7c-4bc2-bbb7-22b8e3981c35/0d/shuffle_0_3_0.index
Result Task (Shuffle Read)
19/10/11 22:07:32| ERROR| [Executor task launch worker for task 6] org.apache.spark.storage.ShuffleBlockFetcherIterator: Error occurred while fetching local blocks
java.nio.file.NoSuchFileException: /tmp/hadoop-root1/nm-local-dir/usercache/root1/appcache/application_1570422377362_0008/blockmgr-6d03250d-6e7c-4bc2-bbb7-22b8e3981c35/30/shuffle_0_0_0.index
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
As per SPARK-25341
mapId
ofSortShuffleManager.getWriter
changed tocontext.taskAttemptId()
frompartitionId
code
But
MapOutputTracker.convertMapStatuses
returns the wrong ShuffleBlock, ifspark.shuffle.useOldFetchProtocol
enabled, it returnsparitionId
asmapID
which is wrong . CodeWhy are the changes needed?
Already MapStatus returned by the ShuffleWriter has the mapId for e.g. code here. So it's nice to use
status.mapTaskId
Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing UT and manually tested with
spark.shuffle.useOldFetchProtocol
as true and false