Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0. #21070

Closed

Conversation

rdblue
Copy link
Contributor

@rdblue rdblue commented Apr 13, 2018

What changes were proposed in this pull request?

This updates Parquet to 1.10.0 and updates the vectorized path for buffer management changes. Parquet 1.10.0 uses ByteBufferInputStream instead of byte arrays in encoders. This allows Parquet to break allocations into smaller chunks that are better for garbage collection.

How was this patch tested?

Existing Parquet tests. Running in production at Netflix for about 3 months.

@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89362 has finished for PR 21070 at commit 4df17a6.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Could you share the performance number?

@rdblue
Copy link
Contributor Author

rdblue commented Apr 13, 2018

Upstream benchmarks for buffer management changes are here: apache/parquet-java#390 (comment)

That doesn't show the GC benefit for smaller buffer allocations because of the heap size. It is just to show that the changes do no harm.

int requiredBytes = total * 4;
ByteBuffer buffer = getBuffer(requiredBytes);

for (int i = 0; i < total; i += 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and elsewhere a bulk copy has been replaced by many smaller copies. It would be better to be able to use the bulk version. I think it would be preferable to at least have:

if (buffer.hasArray()) { 
  c.putIntsLittleEndian(rowId, total, buffer.array(), 0); 
} else {
  for (int i = 0 // ... etc
} 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks that way, but it actually replaces a similar loop: https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java#L283-L291

The main problem is that ByteBufffer isn't supported in the column vectors. That seems beyond the scope of this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't it also be writing to an OffHeapColumnVector? https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L199

If so, I think the copy is 1MB at a time: https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L189

I agree that ByteBuffer shouldn't be supported in this PR. But there's an opportunity to use the bulk copy APIs which would benefit from any future optimization that happens. Plus even if the copy does eventually become a loop inside the column vector implementation, there's more chance of the JIT unrolling the loop since it's smaller.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that better integration between ByteBuffer and ColumnVector would be addressed in another PR.
Since tableCache also uses ByteBuffer, it would be good to discuss in another PR.
cc: @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that fixing the ByteBuffer / ColumnVector interaction should be dealt with elsewhere. I'm just raising the possibility of regressing the read path here because the copies are less efficient. Since it's going to be a while before 2.4.0, that might be ok if we commit to fixing it - but it superficially seems like a manageable change to the PR since the code to call the bulk APIs is already there. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I moved the loop out was that I didn't think that using the byte[] API would actually be better. Parquet doesn't guarantee that these byte buffers are on the heap and backed by byte arrays, so we would need to copy the bytes out of the buffer into an array only to copy them again in the column vector call. Two copies (and possibly allocation) seemed worse than moving the loop.

We could catch the case where the buffers are on-heap and make the bulk call. The drawback is that this will be temporary and will be removed when ColumnVector supports ByteBuffer. And, it only works/matters when Parquet uses on-heap buffers and Spark uses off-heap buffers. Is that worth the change to this PR? I can take a shot at it if you think it is. I'd rather update ColumnVector and then follow up though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that what hasArray() is for though? If the buffers are backed by a byte array, hasArray() returns true and accessing the byte array via array() should be 0 cost. (If array() actually copies any data, that would invalidate this line of reasoning but would also be unexpected).

So for example, here you'd have:

public final void readIntegers(int total, WritableColumnVector c, int rowId) {
    int requiredBytes = total * 4;
    ByteBuffer buffer = getBuffer(requiredBytes);
    if (buffer.hasArray()) {
      c.putIntsLittleEndian(rowId, total, buffer.array(), 0);         
    } else {
        for (int i = 0; i < total; i += 1) {
            c.putInt(rowId + i, buffer.getInt());
        }
    }
}

This seems to be the same pattern that's in readBinary(), below. Let me know if I'm missing something!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, exactly. We can detect that the buffer is backed by an array and use the other call. If we think this is worth it as a short-term fix, I'll update this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems worth it to me, to be defensive against performance changes - but feel free to punt it to me as a follow-on patch if you'd rather.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@HyukjinKwon
Copy link
Member

@rdblue, BTW mind fixing the title to [SPARK-23972][...] ...? It's actually written in the guide.

@@ -129,7 +129,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>1.2.1</hive.version.short>
<derby.version>10.12.1.1</derby.version>
<parquet.version>1.8.2</parquet.version>
<parquet.version>1.10.0</parquet.version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue . To see the Jenkins result, could you resolve the dependency check failure with the following?

./dev/test-dependencies.sh --replace-manifest

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike last time, it seems that this PR touches commons-pool dependency together? Can we avoid this?

-commons-pool-1.5.4.jar
+commons-pool-1.6.jar

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I excluded the commons-pool dependency from parquet-hadoop to avoid this. I also tested the latest Parquet release with commons-pool 1.5.4 and everything passes. I don't think it actually requires 1.6.

@rdblue rdblue changed the title SPARK-23972: Update Parquet to 1.10.0. [SPARK-23972][SQL] Update Parquet to 1.10.0. Apr 16, 2018
@rdblue rdblue changed the title [SPARK-23972][SQL] Update Parquet to 1.10.0. [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0. Apr 16, 2018
@SparkQA
Copy link

SparkQA commented Apr 16, 2018

Test build #89408 has finished for PR 21070 at commit 8446b40.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue rdblue force-pushed the SPARK-23972-update-parquet-to-1.10.0 branch from 8446b40 to d724811 Compare April 16, 2018 17:38
@SparkQA
Copy link

SparkQA commented Apr 16, 2018

Test build #89409 has finished for PR 21070 at commit d724811.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 16, 2018

Test build #89412 has finished for PR 21070 at commit 839cf28.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@henryr henryr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making this change! Looks pretty good to me, let's see what others have to say.

}
}

private byte getByte() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere other than line 154? If not, can be inlined.

ByteBuffer buffer = getBuffer(requiredBytes);

for (int i = 0; i < total; i += 1) {
c.putByte(rowId + i, buffer.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you preserve the comment about "Bytes are stored as 4-byte little endian int. Just read the first byte."?

@SparkQA
Copy link

SparkQA commented Apr 17, 2018

Test build #89415 has finished for PR 21070 at commit 27a66d8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
offset += 8;
return v;
return getBuffer(8).getDouble();
Copy link

@scottcarey scottcarey Apr 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would seem to me that if ByteBufferInputStream had a few extra methods on it, this would all be easier and potentially faster:

(probably addressed in another issue though):

getDouble() getLong() getInt() getBuffer(...) etc. The ByteBufferInputStream itself could have an Order set and it could clone().order(...) any buffers passed to it with the wrong order.

In other words, it looks like a lot of what is done here is probably both more efficient for ByteBufferInputStream to do, and probably useful to other users of that class. For example, inside of ByteBufferInputStream it would not always need to create a new slice for every read of an int or long, etc -- it would create less garbage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good point. It isn't read this way in Parquet, but it would be useful for libraries that use this deep integration. Feel free to submit a PR on Parquet and I'll review it there.

@scottcarey
Copy link

scottcarey commented Apr 17, 2018

This PR should include changes to ParquetOptions.scala to expose the LZ4, ZSTD and BROTLI parquet compression codecs, or else spark users won't be able to leverage those parquet 1.10.0 features.

(as an aside, I find it strange that Spark has to know about and keep in sync with parquet's options, meaning any new options in parquet must include corresponding changes in Spark, whouldn't it be better if that was fully delegated to the Parquet artifacts?)

@henryr
Copy link
Contributor

henryr commented Apr 17, 2018

@scottcarey I agree that's important. Perhaps it could be done as a follow-up PR?

@rdblue rdblue force-pushed the SPARK-23972-update-parquet-to-1.10.0 branch from d036ce3 to 5a78030 Compare April 18, 2018 21:04
@scottcarey
Copy link

I tested this with the addition of some changes to ParquetOptions.scala, but this alone does not allow for writing or reading zstd compressed parquet files, because it is using reflection to acquire hadoop classes for compression which are not in the supplied dependencies.

From what I can see, anyone that wants to use the new compression codecs are going to have to build their own custom version of spark. And probably with modified versions of hadoop libraries as well, including changing how the native bindings are built.... because that would be easier than updating the whole thing to hadoop-common 3.0 where the required compressors exist.

Alternatively, spark+parquet should avoid the hadoop dependencies like the plague for compression / decompression. They bring in a steaming heap of dependencies and possible library conflicts and users often have versions (or CDH versions) that don't exactly match.
In my mind, parquet should handle the compression itself, or with a light-weight dependency.
Perhaps it can use either the hadoop flavor, or if that is not found, another one, or even a user-supplied one so that it works stand-alone or from inside hadoop without issue.
Right now it is bound together with reflection and an awkward stack of brittle dependencies with no escape hatch.

Or am I missing something here, and it is possible to read/write with the new codecs if I configure it differently?

@rdblue
Copy link
Contributor Author

rdblue commented Apr 18, 2018

@scottcarey, Parquet will use the compressors if they are available. You can add them from an external Jar and it will work. LZ4 should also work out of the box because it is included in Hadoop 2.7.

I agree that it would be nice if Parquet didn't rely on Hadoop for compression libraries, but that's how it is at the moment. Feel free to fix it.

@scottcarey
Copy link

@rdblue
The problem with zstd is that it is only in Hadoop 3.0, and dropping that jar in breaks things as it is a major release. Extracting out only the ZStandardCodec from that and recompiling to a 2.x release does not work either, because it depends on some low level hadoop native library management to load the native library (it does not appear to use https://github.com/luben/zstd-jni).

The alternative is to write a custom ZStandardCodec implementation that uses luben:zstd-jni

Furthermore, if you add a o.a.h.io.codecs.ZStandardCodec class to a jar on the client side, it is still not found -- my guess is there is some classloader isolation between client code and spark itself and spark itself is what needs to find the class. So one has to have it installed inside of the spark distribution.

I may take you up on fixing the compression codec dependency mess in a couple months. The hardest part will be lining up the configuration options with what users already expect -- the raw codecs aren't that hard to do.

@rdblue
Copy link
Contributor Author

rdblue commented Apr 18, 2018

I backported the Hadoop zstd codec to 2.7.3 without much trouble. But either way, I think that's a separate concern. This is about getting Parquet updated, not about worrying whether users can easily add compression implementations to their classpath.

@scottcarey
Copy link

This is about getting Parquet updated, not about worrying whether users can easily add compression implementations to their classpath.

Yes, of course.

My hunch is that someone else will read the release notes that spark 2.3.0 supports zstandard, and parquet 1.10.0 supports zstandard, then realize it doesn't work in combination and end up here. So I feel that this is the right place to discuss the state of these features until there is another more specific place to do so.

The discussion here has been useful to get closer to understanding what further tasks there may be. If there are any follow-on issues, the discussion can move there.

I would love to be able to test this with my full use case, and give it a big thumbs up if it works. Unfortunately my only motivation for this upgrade is access to ZStandard, and I'm not as excited to say 'works for me if I don't use new parquet codecs'.

@SparkQA
Copy link

SparkQA commented Apr 19, 2018

Test build #89531 has finished for PR 21070 at commit 5a78030.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 19, 2018

Test build #89533 has finished for PR 21070 at commit 5fca3ce.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 20, 2018

Test build #89600 has finished for PR 21070 at commit 40a9cdd.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor Author

rdblue commented Apr 20, 2018

Retest this please.

@SparkQA
Copy link

SparkQA commented Apr 20, 2018

Test build #89644 has finished for PR 21070 at commit 40a9cdd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@henryr
Copy link
Contributor

henryr commented Apr 23, 2018

This looks pretty good to me - are there any committers that can give it a (hopefully) final review?

@cloud-fan
Copy link
Contributor

Can we run a TPCDS and show that this upgrade doesn't cause performance regression in Spark? I can see that this new version doesn't have perf regression at parquet side, just want to be sure the Spark parquet integration is also OK.

@rdblue rdblue force-pushed the SPARK-23972-update-parquet-to-1.10.0 branch from 40a9cdd to 6c9d47b Compare May 7, 2018 16:04
@SparkQA
Copy link

SparkQA commented May 7, 2018

Test build #90327 has finished for PR 21070 at commit 6c9d47b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
offset += 4;
return v;
return getBuffer(4).getInt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we have a performance problem here? i.e. creating ByteBuffer too frequently. cc @michal-databricks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also assume ByteBuffer.getInt is slower than Platform.getInt.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The performance testing didn't surface any performance problems. Is there a case you'd suggest testing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just thinking https://github.com/apache/spark/pull/21070/files#diff-ebb4762e900515a0cb6b0fc6327bb04cR569 should be faster than getBuffer(4).getInt(), but either way should not be slower than the previous code.

This may not have a big performance impact and maybe a premature optimization, it's just my empiricism so it's up to you to decide to change it or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be better than readIntLittleEndian because that's doing length checks when it accesses the underlying byte array. Using Platform.getInt is a good idea, but I'd rather test that in a follow-up if that's alright, since we are confident that this as-is doesn't harm performance. We need to follow up with better unsafe support for byte buffers anyway.

// values are bit packed 8 at a time, so reading bitWidth will always work
ByteBuffer buffer = in.slice(bitWidth);
this.packer.unpack8Values(
buffer, buffer.arrayOffset() + buffer.position(), this.currentBuffer, valueIndex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we assume the ByteBuffer may not be on-heap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Fixed to remove the call to arrayOffset. It should work with both on- and off-heap buffers now.

@cloud-fan
Copy link
Contributor

LGTM except 2 comments

@HyukjinKwon
Copy link
Member

HyukjinKwon commented May 8, 2018

(While I am here) seems fine to me otherwise too.

@SparkQA
Copy link

SparkQA commented May 8, 2018

Test build #90375 has finished for PR 21070 at commit 95ecde0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in cac9b1d May 9, 2018
asfgit pushed a commit that referenced this pull request May 24, 2018
…y filters.

## What changes were proposed in this pull request?

I missed this commit when preparing #21070.

When Parquet is able to filter blocks with dictionary filtering, the expected total value count to be too high in Spark, leading to an error when there were fewer than expected row groups to process. Spark should get the row groups from Parquet to pick up new filter schemes in Parquet like dictionary filtering.

## How was this patch tested?

Using in production at Netflix. Added test case for dictionary-filtered blocks.

Author: Ryan Blue <[email protected]>

Closes #21295 from rdblue/SPARK-24230-fix-parquet-block-tracking.
asfgit pushed a commit that referenced this pull request May 24, 2018
…y filters.

## What changes were proposed in this pull request?

I missed this commit when preparing #21070.

When Parquet is able to filter blocks with dictionary filtering, the expected total value count to be too high in Spark, leading to an error when there were fewer than expected row groups to process. Spark should get the row groups from Parquet to pick up new filter schemes in Parquet like dictionary filtering.

## How was this patch tested?

Using in production at Netflix. Added test case for dictionary-filtered blocks.

Author: Ryan Blue <[email protected]>

Closes #21295 from rdblue/SPARK-24230-fix-parquet-block-tracking.

(cherry picked from commit 3469f5c)
Signed-off-by: Wenchen Fan <[email protected]>
curtishoward pushed a commit to twosigma/spark that referenced this pull request Oct 12, 2018
This updates Parquet to 1.10.0 and updates the vectorized path for buffer management changes. Parquet 1.10.0 uses ByteBufferInputStream instead of byte arrays in encoders. This allows Parquet to break allocations into smaller chunks that are better for garbage collection.

Existing Parquet tests. Running in production at Netflix for about 3 months.

Author: Ryan Blue <[email protected]>

Closes apache#21070 from rdblue/SPARK-23972-update-parquet-to-1.10.0.

(cherry picked from commit cac9b1d)
curtishoward pushed a commit to twosigma/spark that referenced this pull request Oct 12, 2018
This updates Parquet to 1.10.0 and updates the vectorized path for buffer management changes. Parquet 1.10.0 uses ByteBufferInputStream instead of byte arrays in encoders. This allows Parquet to break allocations into smaller chunks that are better for garbage collection.

Existing Parquet tests. Running in production at Netflix for about 3 months.

Author: Ryan Blue <[email protected]>

Closes apache#21070 from rdblue/SPARK-23972-update-parquet-to-1.10.0.

(cherry picked from commit cac9b1d)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.