Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22982] Remove unsafe asynchronous close() call from FileDownloadChannel #20179

Conversation

JoshRosen
Copy link
Contributor

What changes were proposed in this pull request?

This patch fixes a severe asynchronous IO bug in Spark's Netty-based file transfer code. At a high-level, the problem is that an unsafe asynchronous close() of a pipe's source channel creates a race condition where file transfer code closes a file descriptor then attempts to read from it. If the closed file descriptor's number has been reused by an open() call then this invalid read may cause unrelated file operations to return incorrect results. One manifestation of this problem is incorrect query results.

For a high-level overview of how file download works, take a look at the control flow in NettyRpcEnv.openChannel(): this code creates a pipe to buffer results, then submits an asynchronous stream request to a lower-level TransportClient. The callback passes received data to the sink end of the pipe. The source end of the pipe is passed back to the caller of openChannel(). Thus openChannel() returns immediately and callers interact with the returned pipe source channel.

Because the underlying stream request is asynchronous, errors may occur after openChannel() has returned and after that method's caller has started to read() from the returned channel. For example, if a client requests an invalid stream from a remote server then the "stream does not exist" error may not be received from the remote server until after openChannel() has returned. In order to be able to propagate the "stream does not exist" error to the file-fetching application thread, this code wraps the pipe's source channel in a special FileDownloadChannel which adds an setError(t: Throwable) method, then calls this setError() method in the FileDownloadCallback's onFailure method.

It is possible for FileDownloadChannel's read() and setError() methods to be called concurrently from different threads: the setError() method is called from within the Netty RPC system's stream callback handlers, while the read() methods are called from higher-level application code performing remote stream reads.

The problem lies in setError(): the existing code closed the wrapped pipe source channel. Because read() and setError() occur in different threads, this means it is possible for one thread to be calling source.read() while another asynchronously calls source.close(). Java's IO libraries do not guarantee that this will be safe and, in fact, it's possible for these operations to interleave in such a way that a lower-level read() system call occurs right after a close() call. In the best-case, this fails as a read of a closed file descriptor; in the worst-case, the file descriptor number has been re-used by an intervening open() operation and the read corrupts the result of an unrelated file IO operation being performed by a different thread.

The solution here is to remove the stream.close() call in onError(): the thread that is performing the read() calls is responsible for closing the stream in a finally block, so there's no need to close it here. If that thread is blocked in a read() then it will become unblocked when the sink end of the pipe is closed in FileDownloadCallback.onFailure().

After making this change, we also need to refine the read() method to always check for a setError() result, even if the underlying channel read() call has succeeded.

This patch also makes a slight cleanup to a dodgy-looking catch e: Exception block to use a safer try-finally error handling idiom.

This bug was introduced in SPARK-11956 / #9941 and is present in Spark 1.6.0+.

How was this patch tested?

This fix was tested manually against a workload which non-deterministically hit this bug.

@SparkQA
Copy link

SparkQA commented Jan 8, 2018

Test build #85773 has finished for PR 20179 at commit 8e5ffa4.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jan 8, 2018

Test build #85795 has finished for PR 20179 at commit 8e5ffa4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case e: Exception =>
exceptionThrown = false
} finally {
if (exceptionThrown) {
pipe.sink().close()
Copy link

@pathikrit pathikrit Jan 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of using this var exceptionThrown, can we move these 2 close() calls to the catch block? Current code:

var exceptionThrown = true
try {
 // code
 exceptionThrown = false
} finally {
  if (exceptionThrown) {
    pipe.sink().close()
    source.close()
  }
}

Proposed code:

try {
 // code
} catch {
  case e: Throwable =>
    pipe.sink().close()
    source.close()
    throw e
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To preserve the same behavior, I think we'd need to do

try {
 // code
} catch {
  case t: Throwable =>
    pipe.sink().close()
    source.close()
    throw t
}

to ensure that we propagate the original Throwable.

It could be clearer (and safer, in the case of exceptions thrown from close() calls), to use Utils.tryWithSafeFinallyAndFailureCallbacks here, so let me make that change.

val offset = in.readLong()
val nextOffset = in.readLong()
val actualPosition = channel.position()
val expectedPosition = blockId.reduceId * 8 + 16
if (actualPosition != expectedPosition) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe an assert assert(actualPosition == expectedPosition, $msg) is better for things like this so we may elide them using compiler flags if desired

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered this, but I don't think there's ever a case where we want to elide this particular check: if we read an incorrect offset here then there's (potentially) no other mechanism to detect this error, leading to silent wrong answers.

@cloud-fan
Copy link
Contributor

also cc @jerryshao

@@ -376,18 +374,13 @@ private[netty] class NettyRpcEnv(

def setError(e: Throwable): Unit = {
error = e
source.close()
}

override def read(dst: ByteBuffer): Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the caller of read would close the source channel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This currently happens in two places:

// class of issue from re-occurring in the future which is why they are left here even though
// SPARK-22982 is fixed.
val channel = Files.newByteChannel(indexFile.toPath)
channel.position(blockId.reduceId * 8)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @zsxwing I recall you mentioned about a performance issue with skipping data in the file channel, do we have this problem here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made sure to incorporate @zsxwing's changes here. The problem originally related to calling skip(), but this change is from his fix to explicitly use position on a FileChannel instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm not clear whether the change here is related to "asynchronous close()" issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used to detect bugs like "asynchronous close()" earlier in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some more background: the asynchronous close() bug can cause reads from a closed-and-subsequently-reassigned file descriptor number and in principle this can affect almost any IO operation anywhere in the application. For example, if the closed file descriptor number is immediately recycled by opening a socket then the invalid read can cause that socket read to miss data (since the data would have been consumed by the invalid reader and won't be delivered to the legitimate new user of the file descriptor).

Given this, I see how it might be puzzling that this patch is adding a check only here. There are two reasons for this:

  1. Many other IO operations have implicit checksumming such that dropping data due to an invalid read be detected and cause an exception. For example, many compression codecs have block-level checksumming (and magic numbers at the beginning of the stream), so dropping data (especially at the start of a read) will be detected. This particular shuffle index file, however, does not have mechanisms to detect corruption: skipping forward in the read by a multiple of 8 bytes will still read structurally-valid data (but it will be the wrong data, causing the wrong output to be read from the shuffle data file).

  2. In the investigation which uncovered this bug, the invalid reads were predominantly impacting shuffle index lookups for reading local blocks. In a nutshell, there's a subtle race condition where Janino codegen compilation triggers attempted remote classloading of classes which don't exist, triggering the error-handling / error-propagation paths in FileDownloadChannel and causing the invalid asynchronous close() call to be performed. At the same time that this close() call was being performed, another task from the same stage attempts to read the shuffle index files of local blocks and experiences an invalid read due to the falsely-shared file descriptor.

    This is a very hard-to-trigger bug: we were only able to reproduce it on large clusters with very fast machines and shuffles that contain large numbers of map and reduce tasks (more shuffle blocks means more index file reads and more chances for the race to occur; faster machines increase the likelihood of the race occurring; larger clusters give us more chances for the error to occur). In our reproduction, this race occurred on a microsecond timescale (measured via kernel syscall tracing) and occurred relatively rarely, requiring many iterations until we could trigger a reproduction.

While investigating, I added these checks so that the index read fails-fast when this issue occurs, which made it significantly easier to reproduce and diagnose the root cause (fixed by the other changes in this patch).

There are a number of interesting details in the story of how we worked from the original high-level data corruption symptom to this low-level IO bug. I'll see about writing up the complete story in a blog post at some point.

@cloud-fan
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Jan 9, 2018

Test build #85828 has finished for PR 20179 at commit c1278b8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val actualPosition = channel.position()
val expectedPosition = blockId.reduceId * 8 + 16
if (actualPosition != expectedPosition) {
throw new Exception(s"SPARK-22982: Incorrect channel position after index file reads: " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we'd better change to some specific Exception type here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestions for a better exception subtype? I don't expect this to be a recoverable error and wanted to avoid the possibility that downstream code catches and handles this error. Maybe I should go further and make it a RuntimeException to make it even more fatal?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks!

}

override def read(dst: ByteBuffer): Int = {
Try(source.read(dst)) match {
case _ if error != null => throw error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to also add a short comment here. This bug is subtle and no test against it now. Just from this code, it is hard to know why we check error even success.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a pair of comments to explain the flow of calls involving setError() and pipe closes.

@viirya
Copy link
Member

viirya commented Jan 9, 2018

LGTM

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@SparkQA
Copy link

SparkQA commented Jan 10, 2018

Test build #85885 has finished for PR 20179 at commit 9e5f20e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/2.3!

asfgit pushed a commit that referenced this pull request Jan 10, 2018
…adChannel

## What changes were proposed in this pull request?

This patch fixes a severe asynchronous IO bug in Spark's Netty-based file transfer code. At a high-level, the problem is that an unsafe asynchronous `close()` of a pipe's source channel creates a race condition where file transfer code closes a file descriptor then attempts to read from it. If the closed file descriptor's number has been reused by an `open()` call then this invalid read may cause unrelated file operations to return incorrect results. **One manifestation of this problem is incorrect query results.**

For a high-level overview of how file download works, take a look at the control flow in `NettyRpcEnv.openChannel()`: this code creates a pipe to buffer results, then submits an asynchronous stream request to a lower-level TransportClient. The callback passes received data to the sink end of the pipe. The source end of the pipe is passed back to the caller of `openChannel()`. Thus `openChannel()` returns immediately and callers interact with the returned pipe source channel.

Because the underlying stream request is asynchronous, errors may occur after `openChannel()` has returned and after that method's caller has started to `read()` from the returned channel. For example, if a client requests an invalid stream from a remote server then the "stream does not exist" error may not be received from the remote server until after `openChannel()` has returned. In order to be able to propagate the "stream does not exist" error to the file-fetching application thread, this code wraps the pipe's source channel in a special `FileDownloadChannel` which adds an `setError(t: Throwable)` method, then calls this `setError()` method in the FileDownloadCallback's `onFailure` method.

It is possible for `FileDownloadChannel`'s `read()` and `setError()` methods to be called concurrently from different threads: the `setError()` method is called from within the Netty RPC system's stream callback handlers, while the `read()` methods are called from higher-level application code performing remote stream reads.

The problem lies in `setError()`: the existing code closed the wrapped pipe source channel. Because `read()` and `setError()` occur in different threads, this means it is possible for one thread to be calling `source.read()` while another asynchronously calls `source.close()`. Java's IO libraries do not guarantee that this will be safe and, in fact, it's possible for these operations to interleave in such a way that a lower-level `read()` system call occurs right after a `close()` call. In the best-case, this fails as a read of a closed file descriptor; in the worst-case, the file descriptor number has been re-used by an intervening `open()` operation and the read corrupts the result of an unrelated file IO operation being performed by a different thread.

The solution here is to remove the `stream.close()` call in `onError()`: the thread that is performing the `read()` calls is responsible for closing the stream in a `finally` block, so there's no need to close it here. If that thread is blocked in a `read()` then it will become unblocked when the sink end of the pipe is closed in `FileDownloadCallback.onFailure()`.

After making this change, we also need to refine the `read()` method to always check for a `setError()` result, even if the underlying channel `read()` call has succeeded.

This patch also makes a slight cleanup to a dodgy-looking `catch e: Exception` block to use a safer `try-finally` error handling idiom.

This bug was introduced in SPARK-11956 / #9941 and is present in Spark 1.6.0+.

## How was this patch tested?

This fix was tested manually against a workload which non-deterministically hit this bug.

Author: Josh Rosen <[email protected]>

Closes #20179 from JoshRosen/SPARK-22982-fix-unsafe-async-io-in-file-download-channel.

(cherry picked from commit edf0a48)
Signed-off-by: Wenchen Fan <[email protected]>
@asfgit asfgit closed this in edf0a48 Jan 10, 2018
@JoshRosen JoshRosen deleted the SPARK-22982-fix-unsafe-async-io-in-file-download-channel branch January 10, 2018 07:37
@srowen
Copy link
Member

srowen commented Jan 12, 2018

I cherry-picked to 2.2 as well, at least.

asfgit pushed a commit that referenced this pull request Jan 12, 2018
…adChannel

## What changes were proposed in this pull request?

This patch fixes a severe asynchronous IO bug in Spark's Netty-based file transfer code. At a high-level, the problem is that an unsafe asynchronous `close()` of a pipe's source channel creates a race condition where file transfer code closes a file descriptor then attempts to read from it. If the closed file descriptor's number has been reused by an `open()` call then this invalid read may cause unrelated file operations to return incorrect results. **One manifestation of this problem is incorrect query results.**

For a high-level overview of how file download works, take a look at the control flow in `NettyRpcEnv.openChannel()`: this code creates a pipe to buffer results, then submits an asynchronous stream request to a lower-level TransportClient. The callback passes received data to the sink end of the pipe. The source end of the pipe is passed back to the caller of `openChannel()`. Thus `openChannel()` returns immediately and callers interact with the returned pipe source channel.

Because the underlying stream request is asynchronous, errors may occur after `openChannel()` has returned and after that method's caller has started to `read()` from the returned channel. For example, if a client requests an invalid stream from a remote server then the "stream does not exist" error may not be received from the remote server until after `openChannel()` has returned. In order to be able to propagate the "stream does not exist" error to the file-fetching application thread, this code wraps the pipe's source channel in a special `FileDownloadChannel` which adds an `setError(t: Throwable)` method, then calls this `setError()` method in the FileDownloadCallback's `onFailure` method.

It is possible for `FileDownloadChannel`'s `read()` and `setError()` methods to be called concurrently from different threads: the `setError()` method is called from within the Netty RPC system's stream callback handlers, while the `read()` methods are called from higher-level application code performing remote stream reads.

The problem lies in `setError()`: the existing code closed the wrapped pipe source channel. Because `read()` and `setError()` occur in different threads, this means it is possible for one thread to be calling `source.read()` while another asynchronously calls `source.close()`. Java's IO libraries do not guarantee that this will be safe and, in fact, it's possible for these operations to interleave in such a way that a lower-level `read()` system call occurs right after a `close()` call. In the best-case, this fails as a read of a closed file descriptor; in the worst-case, the file descriptor number has been re-used by an intervening `open()` operation and the read corrupts the result of an unrelated file IO operation being performed by a different thread.

The solution here is to remove the `stream.close()` call in `onError()`: the thread that is performing the `read()` calls is responsible for closing the stream in a `finally` block, so there's no need to close it here. If that thread is blocked in a `read()` then it will become unblocked when the sink end of the pipe is closed in `FileDownloadCallback.onFailure()`.

After making this change, we also need to refine the `read()` method to always check for a `setError()` result, even if the underlying channel `read()` call has succeeded.

This patch also makes a slight cleanup to a dodgy-looking `catch e: Exception` block to use a safer `try-finally` error handling idiom.

This bug was introduced in SPARK-11956 / #9941 and is present in Spark 1.6.0+.

## How was this patch tested?

This fix was tested manually against a workload which non-deterministically hit this bug.

Author: Josh Rosen <[email protected]>

Closes #20179 from JoshRosen/SPARK-22982-fix-unsafe-async-io-in-file-download-channel.

(cherry picked from commit edf0a48)
Signed-off-by: Sean Owen <[email protected]>
MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
…adChannel

## What changes were proposed in this pull request?

This patch fixes a severe asynchronous IO bug in Spark's Netty-based file transfer code. At a high-level, the problem is that an unsafe asynchronous `close()` of a pipe's source channel creates a race condition where file transfer code closes a file descriptor then attempts to read from it. If the closed file descriptor's number has been reused by an `open()` call then this invalid read may cause unrelated file operations to return incorrect results. **One manifestation of this problem is incorrect query results.**

For a high-level overview of how file download works, take a look at the control flow in `NettyRpcEnv.openChannel()`: this code creates a pipe to buffer results, then submits an asynchronous stream request to a lower-level TransportClient. The callback passes received data to the sink end of the pipe. The source end of the pipe is passed back to the caller of `openChannel()`. Thus `openChannel()` returns immediately and callers interact with the returned pipe source channel.

Because the underlying stream request is asynchronous, errors may occur after `openChannel()` has returned and after that method's caller has started to `read()` from the returned channel. For example, if a client requests an invalid stream from a remote server then the "stream does not exist" error may not be received from the remote server until after `openChannel()` has returned. In order to be able to propagate the "stream does not exist" error to the file-fetching application thread, this code wraps the pipe's source channel in a special `FileDownloadChannel` which adds an `setError(t: Throwable)` method, then calls this `setError()` method in the FileDownloadCallback's `onFailure` method.

It is possible for `FileDownloadChannel`'s `read()` and `setError()` methods to be called concurrently from different threads: the `setError()` method is called from within the Netty RPC system's stream callback handlers, while the `read()` methods are called from higher-level application code performing remote stream reads.

The problem lies in `setError()`: the existing code closed the wrapped pipe source channel. Because `read()` and `setError()` occur in different threads, this means it is possible for one thread to be calling `source.read()` while another asynchronously calls `source.close()`. Java's IO libraries do not guarantee that this will be safe and, in fact, it's possible for these operations to interleave in such a way that a lower-level `read()` system call occurs right after a `close()` call. In the best-case, this fails as a read of a closed file descriptor; in the worst-case, the file descriptor number has been re-used by an intervening `open()` operation and the read corrupts the result of an unrelated file IO operation being performed by a different thread.

The solution here is to remove the `stream.close()` call in `onError()`: the thread that is performing the `read()` calls is responsible for closing the stream in a `finally` block, so there's no need to close it here. If that thread is blocked in a `read()` then it will become unblocked when the sink end of the pipe is closed in `FileDownloadCallback.onFailure()`.

After making this change, we also need to refine the `read()` method to always check for a `setError()` result, even if the underlying channel `read()` call has succeeded.

This patch also makes a slight cleanup to a dodgy-looking `catch e: Exception` block to use a safer `try-finally` error handling idiom.

This bug was introduced in SPARK-11956 / apache#9941 and is present in Spark 1.6.0+.

## How was this patch tested?

This fix was tested manually against a workload which non-deterministically hit this bug.

Author: Josh Rosen <[email protected]>

Closes apache#20179 from JoshRosen/SPARK-22982-fix-unsafe-async-io-in-file-download-channel.

(cherry picked from commit edf0a48)
Signed-off-by: Sean Owen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants