Skip to content

Commit

Permalink
[SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's Files API to…
Browse files Browse the repository at this point in the history
… replace FileInputStream/FileOutputStream in some critical paths"

## What changes were proposed in this pull request?

This reverts commit 5fd0294 because of a huge performance regression.
I manually fixed a minor conflict in `OneForOneBlockFetcher.java`.

`Files.newInputStream` returns `sun.nio.ch.ChannelInputStream`. `ChannelInputStream` doesn't override `InputStream.skip`, so it's using the default `InputStream.skip` which just consumes and discards data. This causes a huge performance regression when reading shuffle files.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes #20119 from zsxwing/revert-SPARK-21475.
  • Loading branch information
zsxwing committed Dec 30, 2017
1 parent f2b3525 commit 14c4a62
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
package org.apache.spark.network.buffer;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;

import com.google.common.base.Objects;
import com.google.common.io.ByteStreams;
Expand Down Expand Up @@ -94,9 +93,9 @@ public ByteBuffer nioByteBuffer() throws IOException {

@Override
public InputStream createInputStream() throws IOException {
InputStream is = null;
FileInputStream is = null;
try {
is = Files.newInputStream(file.toPath());
is = new FileInputStream(file);
ByteStreams.skipFully(is, offset);
return new LimitedInputStream(is, length);
} catch (IOException e) {
Expand Down Expand Up @@ -133,7 +132,7 @@ public Object convertToNetty() throws IOException {
if (conf.lazyFileDescriptor()) {
return new DefaultFileRegion(file, offset, length);
} else {
FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
FileChannel fileChannel = new FileInputStream(file).getChannel();
return new DefaultFileRegion(fileChannel, offset, length);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.spark.network.shuffle;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.util.Arrays;

import org.slf4j.Logger;
Expand Down Expand Up @@ -165,7 +165,7 @@ private class DownloadCallback implements StreamCallback {

DownloadCallback(int chunkIndex) throws IOException {
this.targetFile = tempFileManager.createTempFile();
this.channel = Channels.newChannel(Files.newOutputStream(targetFile.toPath()));
this.channel = Channels.newChannel(new FileOutputStream(targetFile));
this.chunkIndex = chunkIndex;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.LongBuffer;
import java.nio.file.Files;

/**
* Keeps the index information for a particular map output
Expand All @@ -39,7 +39,7 @@ public ShuffleIndexInformation(File indexFile) throws IOException {
offsets = buffer.asLongBuffer();
DataInputStream dis = null;
try {
dis = new DataInputStream(Files.newInputStream(indexFile.toPath()));
dis = new DataInputStream(new FileInputStream(indexFile));
dis.readFully(buffer.array());
} finally {
if (dis != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.spark.shuffle.sort;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import static java.nio.file.StandardOpenOption.*;
import javax.annotation.Nullable;

import scala.None$;
Expand Down Expand Up @@ -75,6 +75,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private static final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);

private final int fileBufferSize;
private final boolean transferToEnabled;
private final int numPartitions;
private final BlockManager blockManager;
private final Partitioner partitioner;
Expand Down Expand Up @@ -106,6 +107,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
SparkConf conf) {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapId = mapId;
Expand Down Expand Up @@ -186,21 +188,17 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
return lengths;
}

// This file needs to opened in append mode in order to work around a Linux kernel bug that
// affects transferTo; see SPARK-3948 for more details.
final FileChannel out = FileChannel.open(outputFile.toPath(), WRITE, APPEND, CREATE);
final FileOutputStream out = new FileOutputStream(outputFile, true);
final long writeStartTime = System.nanoTime();
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
final File file = partitionWriterSegments[i].file();
if (file.exists()) {
final FileChannel in = FileChannel.open(file.toPath(), READ);
final FileInputStream in = new FileInputStream(file);
boolean copyThrewException = true;
try {
long size = in.size();
Utils.copyFileStreamNIO(in, out, 0, size);
lengths[i] = size;
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import javax.annotation.Nullable;
import java.io.*;
import java.nio.channels.FileChannel;
import static java.nio.file.StandardOpenOption.*;
import java.util.Iterator;

import scala.Option;
Expand Down Expand Up @@ -291,7 +290,7 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti
final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();
try {
if (spills.length == 0) {
java.nio.file.Files.newOutputStream(outputFile.toPath()).close(); // Create an empty file
new FileOutputStream(outputFile).close(); // Create an empty file
return new long[partitioner.numPartitions()];
} else if (spills.length == 1) {
// Here, we don't need to perform any metrics updates because the bytes written to this
Expand Down Expand Up @@ -368,7 +367,7 @@ private long[] mergeSpillsWithFileStream(
final InputStream[] spillInputStreams = new InputStream[spills.length];

final OutputStream bos = new BufferedOutputStream(
java.nio.file.Files.newOutputStream(outputFile.toPath()),
new FileOutputStream(outputFile),
outputBufferSizeInBytes);
// Use a counting output stream to avoid having to close the underlying file and ask
// the file system for its size after each partition is written.
Expand Down Expand Up @@ -443,11 +442,11 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
boolean threwException = true;
try {
for (int i = 0; i < spills.length; i++) {
spillInputChannels[i] = FileChannel.open(spills[i].file.toPath(), READ);
spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel();
}
// This file needs to opened in append mode in order to work around a Linux kernel bug that
// affects transferTo; see SPARK-3948 for more details.
mergedFileOutputChannel = FileChannel.open(outputFile.toPath(), WRITE, CREATE, APPEND);
mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel();

long bytesWrittenToMergedFile = 0;
for (int partition = 0; partition < numPartitions; partition++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.shuffle

import java.io._
import java.nio.file.Files

import com.google.common.io.ByteStreams

Expand Down Expand Up @@ -142,8 +141,7 @@ private[spark] class IndexShuffleBlockResolver(
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
val out = new DataOutputStream(
new BufferedOutputStream(Files.newOutputStream(indexTmp.toPath)))
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
Expand Down Expand Up @@ -198,7 +196,7 @@ private[spark] class IndexShuffleBlockResolver(
// find out the consolidated file, then the offset within that from our index
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)

val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
val in = new DataInputStream(new FileInputStream(indexFile))
try {
ByteStreams.skipFully(in, blockId.reduceId * 8)
val offset = in.readLong()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.spark.util.collection

import java.io._
import java.nio.channels.{Channels, FileChannel}
import java.nio.file.StandardOpenOption
import java.util.Comparator

import scala.collection.BufferedIterator
Expand Down Expand Up @@ -461,7 +459,7 @@ class ExternalAppendOnlyMap[K, V, C](
)

private var batchIndex = 0 // Which batch we're in
private var fileChannel: FileChannel = null
private var fileStream: FileInputStream = null

// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
Expand All @@ -478,23 +476,22 @@ class ExternalAppendOnlyMap[K, V, C](
if (batchIndex < batchOffsets.length - 1) {
if (deserializeStream != null) {
deserializeStream.close()
fileChannel.close()
fileStream.close()
deserializeStream = null
fileChannel = null
fileStream = null
}

val start = batchOffsets(batchIndex)
fileChannel = FileChannel.open(file.toPath, StandardOpenOption.READ)
fileChannel.position(start)
fileStream = new FileInputStream(file)
fileStream.getChannel.position(start)
batchIndex += 1

val end = batchOffsets(batchIndex)

assert(end >= start, "start = " + start + ", end = " + end +
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))

val bufferedStream = new BufferedInputStream(
ByteStreams.limit(Channels.newInputStream(fileChannel), end - start))
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
val wrappedStream = serializerManager.wrapStream(blockId, bufferedStream)
ser.deserializeStream(wrappedStream)
} else {
Expand Down Expand Up @@ -554,9 +551,9 @@ class ExternalAppendOnlyMap[K, V, C](
ds.close()
deserializeStream = null
}
if (fileChannel != null) {
fileChannel.close()
fileChannel = null
if (fileStream != null) {
fileStream.close()
fileStream = null
}
if (file.exists()) {
if (!file.delete()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.spark.util.collection

import java.io._
import java.nio.channels.{Channels, FileChannel}
import java.nio.file.StandardOpenOption
import java.util.Comparator

import scala.collection.mutable
Expand Down Expand Up @@ -494,7 +492,7 @@ private[spark] class ExternalSorter[K, V, C](

// Intermediate file and deserializer streams that read from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
var fileChannel: FileChannel = null
var fileStream: FileInputStream = null
var deserializeStream = nextBatchStream() // Also sets fileStream

var nextItem: (K, C) = null
Expand All @@ -507,23 +505,22 @@ private[spark] class ExternalSorter[K, V, C](
if (batchId < batchOffsets.length - 1) {
if (deserializeStream != null) {
deserializeStream.close()
fileChannel.close()
fileStream.close()
deserializeStream = null
fileChannel = null
fileStream = null
}

val start = batchOffsets(batchId)
fileChannel = FileChannel.open(spill.file.toPath, StandardOpenOption.READ)
fileChannel.position(start)
fileStream = new FileInputStream(spill.file)
fileStream.getChannel.position(start)
batchId += 1

val end = batchOffsets(batchId)

assert(end >= start, "start = " + start + ", end = " + end +
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))

val bufferedStream = new BufferedInputStream(
ByteStreams.limit(Channels.newInputStream(fileChannel), end - start))
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))

val wrappedStream = serializerManager.wrapStream(spill.blockId, bufferedStream)
serInstance.deserializeStream(wrappedStream)
Expand Down Expand Up @@ -613,7 +610,7 @@ private[spark] class ExternalSorter[K, V, C](
batchId = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
deserializeStream = null
fileChannel = null
fileStream = null
if (ds != null) {
ds.close()
}
Expand Down

0 comments on commit 14c4a62

Please sign in to comment.