Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18658][SQL] Write text records directly to a FileOutputStream #16089

Closed
wants to merge 7 commits into from

Conversation

NathanHowell
Copy link

What changes were proposed in this pull request?

This replaces uses of TextOutputFormat with an OutputStream, which will either write directly to the filesystem or indirectly via a compressor (if so configured). This avoids intermediate buffering.

The inverse of this (reading directly from a stream) is necessary for streaming large JSON records (when wholeFile is enabled) so I wanted to keep the read and write paths symmetric.

How was this patch tested?

Existing unit tests.

@NathanHowell
Copy link
Author

This touches a fair number of components. I also haven't done any performance testing to see what the impact of this is. Curious what your thoughts are?

cc/ @marmbrus @rxin @JoshRosen

@rxin
Copy link
Contributor

rxin commented Nov 30, 2016

Does this work against file systems with HDFS API (not local posix)? If yes, sounds good!

@NathanHowell
Copy link
Author

Yep. It uses the Hadoop FileSystem class to open files, just like TextOutputFormat does.

@rxin
Copy link
Contributor

rxin commented Dec 1, 2016

Yea then this is definitely fine.

@NathanHowell NathanHowell force-pushed the SPARK-18658 branch 4 times, most recently from 1260870 to 298e507 Compare December 1, 2016 02:06
@JoshRosen
Copy link
Contributor

Jenkins, this is ok to test

@@ -194,4 +194,8 @@ private[sql] class JacksonGenerator(
writeFields(row, schema, rootFieldWriters)
}
}

private[sql] def writeLineEnding(): Unit = {
gen.writeRaw('\n')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, is this safe to guess the newline is always \n?

buffer.set(utf8string.getBytes)
recordWriter.write(NullWritable.get(), buffer)
writer.write(utf8string.getBytes)
writer.write('\n')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too and for CSV as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.util.ReflectionUtils

private[spark] object CodecStreams {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we don't need private[spark] here assuming from the commit here (511f52f)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks that way, I've removed it.

@SparkQA
Copy link

SparkQA commented Dec 1, 2016

Test build #69449 has finished for PR 16089 at commit 298e507.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@NathanHowell
Copy link
Author

Doh, forgot to run the Hive tests. Should be fixed now.

@SparkQA
Copy link

SparkQA commented Dec 1, 2016

Test build #69457 has finished for PR 16089 at commit 56667bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}

/** Create a new file and open it for writing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you fix the comment style here? we don't use scaladoc style comment in Spark.

@@ -194,4 +194,8 @@ private[sql] class JacksonGenerator(
writeFields(row, schema, rootFieldWriters)
}
}

private[sql] def writeLineEnding(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove private[sql] here to be consistent with other methods

val fs = file.getFileSystem(context.getConfiguration)
val outputStream: OutputStream = fs.create(file, false)

getCompressionCodec(context, Some(file)).fold(outputStream) { codec =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you like Haskell but every time I see a fold I have to spend an extra 5 secs checking what it is doing :) Can we simplify this one?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yah, it's a terrible name (and it's not a fold). I'll replace them with .map(...).getOrElse.

* If compression is enabled in the [[JobContext]] the stream will write compressed data to disk.
* An exception will be thrown if the file already exists.
*/
def getOutputStream(context: JobContext, file: Path): OutputStream = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOutputStream -> createOutputStream

}
}

def getOutputStreamWriter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here - createOutputStreamWriter


/** Returns the compression codec extension to be used in a file name, e.g. ".gzip"). */
def getCompressionExtension(context: JobContext): String = {
getCompressionCodec(context).fold("") { code =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe

getCompressionCodec(context).map(_. getDefaultExtension).getOrElse("")


override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")

override protected[sql] def writeInternal(row: InternalRow): Unit = {
val utf8string = row.getUTF8String(0)
buffer.set(utf8string.getBytes)
recordWriter.write(NullWritable.get(), buffer)
writer.write(utf8string.getBytes)
Copy link
Contributor

@rxin rxin Dec 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you check UTF8String's implementation to make sure we are not creating a new byte array for each row?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is creating a new array, I'll pass the internal one through instead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, but I'm not 100% sure about the implementation. Can you have someone more familiar with UTF8String's internals double check it?

text.set(lines)
recordWriter.write(NullWritable.get(), text)
}
csvWriter.writeRow(rowToString(row), printHeader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can probably optimize this as well - but not a huge deal in the first pr.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The uniVocity CSV writer converts every column to a String before writing so it's (probably?) not possible to further optimize this without doing a whole bunch of work. I only did a quick scan through their code though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup I asked this question but never had time to follow-up: uniVocity/univocity-parsers#99

@srowen
Copy link
Member

srowen commented Dec 1, 2016

I was going to say, hm, are we sure we want to reimplement / go around the Hadoop support for this? but in practice it looks like it actually simplifies some things. At the moment I can't think of any particular behaviors we're missing by avoiding the Input/OutputFormat.

But CC @vanzin @steveloughran for any comment.

/**
* Create a new file and open it for writing.
* If compression is enabled in the [[JobContext]] the stream will write compressed data to disk.
* An exception will be thrown if the file already exists.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will "probably" be thrown; object stores have issues there

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a problem with Hadoop in general? The FileSystem docs also specify this behavior:

  /**
   * Create an FSDataOutputStream at the indicated Path.
   * @param f the file to create
   * @param overwrite if a file with this name already exists, then if true,
   *   the file will be overwritten, and if false an exception will be thrown.
   */

@@ -194,4 +194,8 @@ private[sql] class JacksonGenerator(
writeFields(row, schema, rootFieldWriters)
}
}

def writeLineEnding(): Unit = {
gen.writeRaw('\n')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TextOutputStream actually writes the UTF-8 version of a newline; don't know if that is relevant or not:

"\n".getBytes(StandardCharsets.UTF_8);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7-bit ASCII is a subset of UTF-8, \n is the same in both.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably just a convoluted way to create a byte array containing the byte 0x0a then

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is my assumption. I'm also assuming that writing a single byte is slightly more efficient than writing an array of a single byte.

@@ -147,6 +147,17 @@ public void writeTo(ByteBuffer buffer) {
buffer.position(pos + numBytes);
}

public void writeTo(OutputStream out) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

always good to have tests for the corner case codepaths here, as they are invariably the official home of off-by-one errors

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a few tests for this method.

@steveloughran
Copy link
Contributor

AFAIK, the big thing the FileOutputFormat really adds is not the compression, but the output committer and the stuff to go with that (working directories, paths, etc etc). If you aren't going near that, and just want a fast write of .csv and jackson with optional compression, well, I don't see anything in the code I'd run away from.

If you do want to think about how to write CSV files during the output of speculative work in the presence of failures, well, that's where the mapred.lib.output code really comes out to play.

Otherwise, in general PR review mode: Tests? What if the code asks for a committer that isn't there, passes in null sequences in rows to write, tries to hit the buffer corner cases. Hopefully those exist already, but if not, now is a good time to try to break things.

@NathanHowell
Copy link
Author

@steveloughran Spark is handling the output committing somewhere further up the stack. The path being passed in to OutputWriterFactory.newInstance is to a temporary file, such as /private/var/folders/sq/vmncyd7506q_ch43llrwr8sn6zfknl/T/spark-3db2844b-1f3c-45c2-8bf4-8a3c81440e38/_temporary/0/_temporary/attempt_20161201081833_0000_m_000000_0/part-00000-8dd44cea-c01e-4bfe-ab03-641ebce18afb.txt.

I'll make a pass through the existing tests to see if anything obvious is missing.

@steveloughran
Copy link
Contributor

I ask about committers as I'm staring at the V1 and V2 committer APIs right now related to S3 destinations; not directly related to this though.

@rxin
Copy link
Contributor

rxin commented Dec 1, 2016

@srowen yea the hadoop format api is pretty awkward to use, and actually makes everything more complicated than needed.

@SparkQA
Copy link

SparkQA commented Dec 1, 2016

Test build #69488 has finished for PR 16089 at commit 5707218.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 2, 2016

Test build #69519 has finished for PR 16089 at commit 27c102d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Dec 2, 2016

Merging in master. Thanks.

@asfgit asfgit closed this in c82f16c Dec 2, 2016
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 2, 2016
## What changes were proposed in this pull request?

This replaces uses of `TextOutputFormat` with an `OutputStream`, which will either write directly to the filesystem or indirectly via a compressor (if so configured). This avoids intermediate buffering.

The inverse of this (reading directly from a stream) is necessary for streaming large JSON records (when `wholeFile` is enabled) so I wanted to keep the read and write paths symmetric.

## How was this patch tested?

Existing unit tests.

Author: Nathan Howell <[email protected]>

Closes apache#16089 from NathanHowell/SPARK-18658.
@NathanHowell NathanHowell deleted the SPARK-18658 branch December 2, 2016 18:53
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

This replaces uses of `TextOutputFormat` with an `OutputStream`, which will either write directly to the filesystem or indirectly via a compressor (if so configured). This avoids intermediate buffering.

The inverse of this (reading directly from a stream) is necessary for streaming large JSON records (when `wholeFile` is enabled) so I wanted to keep the read and write paths symmetric.

## How was this patch tested?

Existing unit tests.

Author: Nathan Howell <[email protected]>

Closes apache#16089 from NathanHowell/SPARK-18658.
ghost pushed a commit to dbtsai/spark that referenced this pull request Apr 6, 2018
… not expected to be supported

## What changes were proposed in this pull request?

This PR excludes an existing UT [`writeToOutputStreamUnderflow()`](https://github.com/apache/spark/blob/master/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java#L519-L532) in `UTF8StringSuite`.

As discussed [here](apache#19222 (comment)), the behavior of this test looks surprising. This test seems to access metadata area of the JVM object where is reserved by `Platform.BYTE_ARRAY_OFFSET`.

This test is introduced thru apache#16089 by NathanHowell. More specifically, [the commit](apache@27c102d) `Improve test coverage of UTFString.write` introduced this UT. However, I cannot find any discussion about this UT.

I think that it would be good to exclude this UT.

```java
  public void writeToOutputStreamUnderflow() throws IOException {
    // offset underflow is apparently supported?
    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);

    for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
      new UTF8String(
        new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i))
          .writeTo(outputStream);
      final ByteBuffer buffer = ByteBuffer.wrap(outputStream.toByteArray(), i, test.length);
      assertEquals("01234567", StandardCharsets.UTF_8.decode(buffer).toString());
      outputStream.reset();
    }
  }
```

## How was this patch tested?

Existing UTs

Author: Kazuaki Ishizaki <[email protected]>

Closes apache#20995 from kiszk/SPARK-23882.
robert3005 pushed a commit to palantir/spark that referenced this pull request Apr 7, 2018
… not expected to be supported

## What changes were proposed in this pull request?

This PR excludes an existing UT [`writeToOutputStreamUnderflow()`](https://github.com/apache/spark/blob/master/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java#L519-L532) in `UTF8StringSuite`.

As discussed [here](apache#19222 (comment)), the behavior of this test looks surprising. This test seems to access metadata area of the JVM object where is reserved by `Platform.BYTE_ARRAY_OFFSET`.

This test is introduced thru apache#16089 by NathanHowell. More specifically, [the commit](apache@27c102d) `Improve test coverage of UTFString.write` introduced this UT. However, I cannot find any discussion about this UT.

I think that it would be good to exclude this UT.

```java
  public void writeToOutputStreamUnderflow() throws IOException {
    // offset underflow is apparently supported?
    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);

    for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
      new UTF8String(
        new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i))
          .writeTo(outputStream);
      final ByteBuffer buffer = ByteBuffer.wrap(outputStream.toByteArray(), i, test.length);
      assertEquals("01234567", StandardCharsets.UTF_8.decode(buffer).toString());
      outputStream.reset();
    }
  }
```

## How was this patch tested?

Existing UTs

Author: Kazuaki Ishizaki <[email protected]>

Closes apache#20995 from kiszk/SPARK-23882.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants