-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22982] Remove unsafe asynchronous close() call from FileDownloadChannel #20179
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -332,16 +332,14 @@ private[netty] class NettyRpcEnv( | |
|
||
val pipe = Pipe.open() | ||
val source = new FileDownloadChannel(pipe.source()) | ||
try { | ||
Utils.tryWithSafeFinallyAndFailureCallbacks(block = { | ||
val client = downloadClient(parsedUri.getHost(), parsedUri.getPort()) | ||
val callback = new FileDownloadCallback(pipe.sink(), source, client) | ||
client.stream(parsedUri.getPath(), callback) | ||
} catch { | ||
case e: Exception => | ||
pipe.sink().close() | ||
source.close() | ||
throw e | ||
} | ||
})(catchBlock = { | ||
pipe.sink().close() | ||
source.close() | ||
}) | ||
|
||
source | ||
} | ||
|
@@ -376,18 +374,13 @@ private[netty] class NettyRpcEnv( | |
|
||
def setError(e: Throwable): Unit = { | ||
error = e | ||
source.close() | ||
} | ||
|
||
override def read(dst: ByteBuffer): Int = { | ||
Try(source.read(dst)) match { | ||
case _ if error != null => throw error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is better to also add a short comment here. This bug is subtle and no test against it now. Just from this code, it is hard to know why we check error even success. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a pair of comments to explain the flow of calls involving |
||
case Success(bytesRead) => bytesRead | ||
case Failure(readErr) => | ||
if (error != null) { | ||
throw error | ||
} else { | ||
throw readErr | ||
} | ||
case Failure(readErr) => throw readErr | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,8 @@ | |
package org.apache.spark.shuffle | ||
|
||
import java.io._ | ||
|
||
import com.google.common.io.ByteStreams | ||
import java.nio.channels.Channels | ||
import java.nio.file.Files | ||
|
||
import org.apache.spark.{SparkConf, SparkEnv} | ||
import org.apache.spark.internal.Logging | ||
|
@@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver( | |
// find out the consolidated file, then the offset within that from our index | ||
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) | ||
|
||
val in = new DataInputStream(new FileInputStream(indexFile)) | ||
// SPARK-22982: if this FileInputStream's position is seeked forward by another piece of code | ||
// which is incorrectly using our file descriptor then this code will fetch the wrong offsets | ||
// (which may cause a reducer to be sent a different reducer's data). The explicit position | ||
// checks added here were a useful debugging aid during SPARK-22982 and may help prevent this | ||
// class of issue from re-occurring in the future which is why they are left here even though | ||
// SPARK-22982 is fixed. | ||
val channel = Files.newByteChannel(indexFile.toPath) | ||
channel.position(blockId.reduceId * 8) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc @zsxwing I recall you mentioned about a performance issue with skipping data in the file channel, do we have this problem here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made sure to incorporate @zsxwing's changes here. The problem originally related to calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I'm not clear whether the change here is related to "asynchronous close()" issue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's used to detect bugs like "asynchronous close()" earlier in the future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For some more background: the asynchronous Given this, I see how it might be puzzling that this patch is adding a check only here. There are two reasons for this:
While investigating, I added these checks so that the index read fails-fast when this issue occurs, which made it significantly easier to reproduce and diagnose the root cause (fixed by the other changes in this patch). There are a number of interesting details in the story of how we worked from the original high-level data corruption symptom to this low-level IO bug. I'll see about writing up the complete story in a blog post at some point. |
||
val in = new DataInputStream(Channels.newInputStream(channel)) | ||
try { | ||
ByteStreams.skipFully(in, blockId.reduceId * 8) | ||
val offset = in.readLong() | ||
val nextOffset = in.readLong() | ||
val actualPosition = channel.position() | ||
val expectedPosition = blockId.reduceId * 8 + 16 | ||
if (actualPosition != expectedPosition) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe an assert There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I considered this, but I don't think there's ever a case where we want to elide this particular check: if we read an incorrect offset here then there's (potentially) no other mechanism to detect this error, leading to silent wrong answers. |
||
throw new Exception(s"SPARK-22982: Incorrect channel position after index file reads: " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we'd better change to some specific There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any suggestions for a better exception subtype? I don't expect this to be a recoverable error and wanted to avoid the possibility that downstream code catches and handles this error. Maybe I should go further and make it a RuntimeException to make it even more fatal? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Thanks! |
||
s"expected $expectedPosition but actual position was $actualPosition.") | ||
} | ||
new FileSegmentManagedBuffer( | ||
transportConf, | ||
getDataFile(blockId.shuffleId, blockId.mapId), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the caller of
read
would close the source channel?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This currently happens in two places:
spark/core/src/main/scala/org/apache/spark/util/Utils.scala
Line 661 in 2831571
Utils.copyStream(closeStreams = true)
call which is guaranteed to clean up the stream.fetchFn
fromgetclassFileInputStreamFromRpc
and then close it in afinally
block infindClassLocally
:spark/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
Line 167 in e08d06b