Skip to content

Commit

Permalink
[SPARK-20868][CORE] UnsafeShuffleWriter should verify the position af…
Browse files Browse the repository at this point in the history
…ter FileChannel.transferTo

## What changes were proposed in this pull request?

Long time ago we fixed a [bug](https://issues.apache.org/jira/browse/SPARK-3948) in shuffle writer about `FileChannel.transferTo`. We were not very confident about that fix, so we added a position check after the writing, try to discover the bug earlier.

 However this checking is missing in the new `UnsafeShuffleWriter`, this PR adds it.

https://issues.apache.org/jira/browse/SPARK-18105 maybe related to that `FileChannel.transferTo` bug, hopefully we can find out the root cause after adding this position check.

## How was this patch tested?

N/A

Author: Wenchen Fan <[email protected]>

Closes #18091 from cloud-fan/shuffle.
  • Loading branch information
cloud-fan committed May 26, 2017
1 parent a97c497 commit d9ad789
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -422,17 +422,14 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
for (int partition = 0; partition < numPartitions; partition++) {
for (int i = 0; i < spills.length; i++) {
final long partitionLengthInSpill = spills[i].partitionLengths[partition];
long bytesToTransfer = partitionLengthInSpill;
final FileChannel spillInputChannel = spillInputChannels[i];
final long writeStartTime = System.nanoTime();
while (bytesToTransfer > 0) {
final long actualBytesTransferred = spillInputChannel.transferTo(
spillInputChannelPositions[i],
bytesToTransfer,
mergedFileOutputChannel);
spillInputChannelPositions[i] += actualBytesTransferred;
bytesToTransfer -= actualBytesTransferred;
}
Utils.copyFileStreamNIO(
spillInputChannel,
mergedFileOutputChannel,
spillInputChannelPositions[i],
partitionLengthInSpill);
spillInputChannelPositions[i] += partitionLengthInSpill;
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
bytesWrittenToMergedFile += partitionLengthInSpill;
partitionLengths[partition] += partitionLengthInSpill;
Expand Down
71 changes: 41 additions & 30 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo, ThreadInf
import java.math.{MathContext, RoundingMode}
import java.net._
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.channels.{Channels, FileChannel}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.util.{Locale, Properties, Random, UUID}
Expand Down Expand Up @@ -60,7 +60,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.util.logging.RollingFileAppender

/** CallSite represents a place in user code. It can have a short and a long form. */
private[spark] case class CallSite(shortForm: String, longForm: String)
Expand Down Expand Up @@ -319,41 +318,22 @@ private[spark] object Utils extends Logging {
* copying is disabled by default unless explicitly set transferToEnabled as true,
* the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false].
*/
def copyStream(in: InputStream,
out: OutputStream,
closeStreams: Boolean = false,
transferToEnabled: Boolean = false): Long =
{
var count = 0L
def copyStream(
in: InputStream,
out: OutputStream,
closeStreams: Boolean = false,
transferToEnabled: Boolean = false): Long = {
tryWithSafeFinally {
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
&& transferToEnabled) {
// When both streams are File stream, use transferTo to improve copy performance.
val inChannel = in.asInstanceOf[FileInputStream].getChannel()
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
val initialPos = outChannel.position()
val size = inChannel.size()

// In case transferTo method transferred less data than we have required.
while (count < size) {
count += inChannel.transferTo(count, size - count, outChannel)
}

// Check the position after transferTo loop to see if it is in the right position and
// give user information if not.
// Position will not be increased to the expected length after calling transferTo in
// kernel version 2.6.32, this issue can be seen in
// https://bugs.openjdk.java.net/browse/JDK-7052359
// This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
val finalPos = outChannel.position()
assert(finalPos == initialPos + size,
s"""
|Current position $finalPos do not equal to expected position ${initialPos + size}
|after transferTo, please check your kernel version to see if it is 2.6.32,
|this is a kernel bug which will lead to unexpected behavior when using transferTo.
|You can set spark.file.transferTo = false to disable this NIO feature.
""".stripMargin)
copyFileStreamNIO(inChannel, outChannel, 0, size)
size
} else {
var count = 0L
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
Expand All @@ -363,8 +343,8 @@ private[spark] object Utils extends Logging {
count += n
}
}
count
}
count
} {
if (closeStreams) {
try {
Expand All @@ -376,6 +356,37 @@ private[spark] object Utils extends Logging {
}
}

def copyFileStreamNIO(
input: FileChannel,
output: FileChannel,
startPosition: Long,
bytesToCopy: Long): Unit = {
val initialPos = output.position()
var count = 0L
// In case transferTo method transferred less data than we have required.
while (count < bytesToCopy) {
count += input.transferTo(count + startPosition, bytesToCopy - count, output)
}
assert(count == bytesToCopy,
s"request to copy $bytesToCopy bytes, but actually copied $count bytes.")

// Check the position after transferTo loop to see if it is in the right position and
// give user information if not.
// Position will not be increased to the expected length after calling transferTo in
// kernel version 2.6.32, this issue can be seen in
// https://bugs.openjdk.java.net/browse/JDK-7052359
// This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
val finalPos = output.position()
val expectedPos = initialPos + bytesToCopy
assert(finalPos == expectedPos,
s"""
|Current position $finalPos do not equal to expected position $expectedPos
|after transferTo, please check your kernel version to see if it is 2.6.32,
|this is a kernel bug which will lead to unexpected behavior when using transferTo.
|You can set spark.file.transferTo = false to disable this NIO feature.
""".stripMargin)
}

/**
* Construct a URI container information used for authentication.
* This also sets the default authenticator to properly negotiation the
Expand Down

0 comments on commit d9ad789

Please sign in to comment.