Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21475][Core] Use NIO's Files API to replace FileInputStream/FileOutputStream in some critical paths #18684

Closed
wants to merge 3 commits into from

Conversation

jerryshao
Copy link
Contributor

@jerryshao jerryshao commented Jul 19, 2017

What changes were proposed in this pull request?

Java's FileInputStream and FileOutputStream overrides finalize(), even this file input/output stream is closed correctly and promptly, it will still leave some memory footprints which will only get cleaned in Full GC. This will introduce two side effects:

  1. Lots of memory footprints regarding to Finalizer will be kept in memory and this will increase the memory overhead. In our use case of external shuffle service, a busy shuffle service will have bunch of this object and potentially lead to OOM.
  2. The Finalizer will only be called in Full GC, and this will increase the overhead of Full GC and lead to long GC pause.

https://bugs.openjdk.java.net/browse/JDK-8080225

https://www.cloudbees.com/blog/fileinputstream-fileoutputstream-considered-harmful

So to fix this potential issue, here propose to use NIO's Files#newInput/OutputStream instead in some critical paths like shuffle.

Left unchanged FileInputStream in core which I think is not so critical:

./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:467:    val file = new DataInputStream(new FileInputStream(filename))
./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:942:    val in = new FileInputStream(new File(path))
./core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala:76:    val fileIn = new FileInputStream(file)
./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:248:        val fis = new FileInputStream(file)
./core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:910:                input = new FileInputStream(new File(t))
./core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala:20:import java.io.{FileInputStream, InputStream}
./core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala:132:        case Some(f) => new FileInputStream(f)
./core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala:20:import java.io.{FileInputStream, InputStream}
./core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala:77:        val fis = new FileInputStream(f)
./core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala:27:import org.apache.spark.io.NioBufferedFileInputStream
./core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala:94:      new DataInputStream(new NioBufferedFileInputStream(index))
./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:111:        val channel = new FileInputStream(file).getChannel()
./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:219:    val channel = new FileInputStream(file).getChannel()
./core/src/main/scala/org/apache/spark/TestUtils.scala:20:import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
./core/src/main/scala/org/apache/spark/TestUtils.scala:106:      val in = new FileInputStream(file)
./core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala:89:        inputStream = new FileInputStream(activeFile)
./core/src/main/scala/org/apache/spark/util/Utils.scala:329:      if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
./core/src/main/scala/org/apache/spark/util/Utils.scala:332:        val inChannel = in.asInstanceOf[FileInputStream].getChannel()
./core/src/main/scala/org/apache/spark/util/Utils.scala:1533:      gzInputStream = new GZIPInputStream(new FileInputStream(file))
./core/src/main/scala/org/apache/spark/util/Utils.scala:1560:      new GZIPInputStream(new FileInputStream(file))
./core/src/main/scala/org/apache/spark/util/Utils.scala:1562:      new FileInputStream(file)
./core/src/main/scala/org/apache/spark/util/Utils.scala:2090:    val inReader = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8)

Left unchanged FileOutputStream in core:

./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:957:    val out = new FileOutputStream(file)
./core/src/main/scala/org/apache/spark/api/r/RBackend.scala:20:import java.io.{DataOutputStream, File, FileOutputStream, IOException}
./core/src/main/scala/org/apache/spark/api/r/RBackend.scala:131:      val dos = new DataOutputStream(new FileOutputStream(f))
./core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala:62:    val fileOut = new FileOutputStream(file)
./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:160:          val outStream = new FileOutputStream(outPath)
./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:239:    val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false))
./core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:949:        val out = new FileOutputStream(tempFile)
./core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala:20:import java.io.{File, FileOutputStream, InputStream, IOException}
./core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala:106:    val out = new FileOutputStream(file, true)
./core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:109:     * Therefore, for local files, use FileOutputStream instead. */
./core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:112:        new FileOutputStream(uri.getPath)
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:20:import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream}
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:71:  private var fos: FileOutputStream = null
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:102:    fos = new FileOutputStream(file, true)
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:213:      var truncateStream: FileOutputStream = null
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:215:        truncateStream = new FileOutputStream(file, true)
./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:153:    val out = new FileOutputStream(file).getChannel()
./core/src/main/scala/org/apache/spark/TestUtils.scala:20:import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
./core/src/main/scala/org/apache/spark/TestUtils.scala:81:    val jarStream = new JarOutputStream(new FileOutputStream(jarFile))
./core/src/main/scala/org/apache/spark/TestUtils.scala:96:    val jarFileStream = new FileOutputStream(jarFile)
./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:20:import java.io.{File, FileOutputStream, InputStream, IOException}
./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:31:  @volatile private var outputStream: FileOutputStream = null
./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:97:    outputStream = new FileOutputStream(file, true)
./core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala:90:        gzOutputStream = new GZIPOutputStream(new FileOutputStream(gzFile))
./core/src/main/scala/org/apache/spark/util/Utils.scala:329:      if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
./core/src/main/scala/org/apache/spark/util/Utils.scala:333:        val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
./core/src/main/scala/org/apache/spark/util/Utils.scala:527:      val out = new FileOutputStream(tempFile)

Here in DiskBlockObjectWriter, it uses FileDescriptor so it is not easy to change to NIO Files API.

For the FileInputStream and FileOutputStream in common/shuffle* I changed them all.

How was this patch tested?

Existing tests and manual verification.

Change-Id: I0f11b9e0cbe62ca3d0bac7bfe0e2df838da80b48
Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose using NIO, yeah. Are there more places that could be updated that could make a difference?

@@ -188,17 +189,20 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
return lengths;
}

final FileOutputStream out = new FileOutputStream(outputFile, true);
final FileChannel out = FileChannel.open(outputFile.toPath(),
ImmutableSet.of(WRITE, APPEND, CREATE));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent 4 spaces?

boolean copyThrewException = true;
try {
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
final long size = in.size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why final? we generally don't write that unless it's required. I know the surrounding code does though anyway

@jerryshao
Copy link
Contributor Author

Thanks @srowen for your review. I generally focused on shuffle related parts to avoid using FileInputStream and FileOutputStream, since this part is quite IO intensive. For other parts I think they are not so critical, so I don't change all of them.

And for other modules like SQL I don't address the issues there, not sure if it is also needed.

Change-Id: I883b36420471a20615dbb1e7e10421884fa0b690
@SparkQA
Copy link

SparkQA commented Jul 19, 2017

Test build #79769 has finished for PR 18684 at commit b9dad5a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 19, 2017

Test build #79771 has finished for PR 18684 at commit f2d534a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -132,7 +134,8 @@ public Object convertToNetty() throws IOException {
if (conf.lazyFileDescriptor()) {
return new DefaultFileRegion(file, offset, length);
} else {
FileChannel fileChannel = new FileInputStream(file).getChannel();
FileChannel fileChannel = FileChannel.open(file.toPath(),
ImmutableSet.of(StandardOpenOption.READ));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a new set for this? should we call:

FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ)

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're actually the same, the one you mentioned is just a simple wrapper of the former one. I will change to yours for the simplicity.

@@ -188,17 +189,20 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
return lengths;
}

final FileOutputStream out = new FileOutputStream(outputFile, true);
final FileChannel out = FileChannel.open(outputFile.toPath(),
ImmutableSet.of(WRITE, APPEND, CREATE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final FileChannel out = FileChannel.open(outputFile.toPath(), WRITE, APPEND, CREATE)

?

lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
long size = in.size();
Utils.copyFileStreamNIO(in, out, 0, size);
lengths[i] = size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we modify Utils.copyStream() to support this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There're lot of other places using Utils#copyStream, it would be better not to change this API.

Change-Id: Ibb0036d0ac88c01310cba817da0bb40535c12351
@SparkQA
Copy link

SparkQA commented Jul 20, 2017

Test build #79801 has finished for PR 18684 at commit 6d7224c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jerryshao
Copy link
Contributor Author

@srowen any further comment?

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth updating, yes, if you can see a performance improvement

@jerryshao
Copy link
Contributor Author

@cloud-fan @JoshRosen can you please help to review, thanks!

@srowen
Copy link
Member

srowen commented Aug 1, 2017

Merged to master

@asfgit asfgit closed this in 5fd0294 Aug 1, 2017
asfgit pushed a commit that referenced this pull request Jan 4, 2018
…ternal shuffle service

## What changes were proposed in this pull request?

This PR is the second attempt of #18684 , NIO's Files API doesn't override `skip` method for `InputStream`, so it will bring in performance issue (mentioned in #20119). But using `FileInputStream`/`FileOutputStream` will also bring in memory issue (https://dzone.com/articles/fileinputstream-fileoutputstream-considered-harmful), which is severe for long running external shuffle service. So here in this proposal, only fixing the external shuffle service related code.

## How was this patch tested?

Existing tests.

Author: jerryshao <[email protected]>

Closes #20144 from jerryshao/SPARK-21475-v2.

(cherry picked from commit 93f92c0)
Signed-off-by: Shixiong Zhu <[email protected]>
asfgit pushed a commit that referenced this pull request Jan 4, 2018
…ternal shuffle service

## What changes were proposed in this pull request?

This PR is the second attempt of #18684 , NIO's Files API doesn't override `skip` method for `InputStream`, so it will bring in performance issue (mentioned in #20119). But using `FileInputStream`/`FileOutputStream` will also bring in memory issue (https://dzone.com/articles/fileinputstream-fileoutputstream-considered-harmful), which is severe for long running external shuffle service. So here in this proposal, only fixing the external shuffle service related code.

## How was this patch tested?

Existing tests.

Author: jerryshao <[email protected]>

Closes #20144 from jerryshao/SPARK-21475-v2.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants