Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-787: Limit read allocation size #390

Closed

Conversation

rdblue
Copy link
Contributor

@rdblue rdblue commented Dec 6, 2016

WIP: This update the ParquetFileReader to use multiple buffers when reading a row group, instead of a single humongous allocation. As a consequence, many classes needed to be updated to accept a stream backed by multiple buffers, instead of using a single buffer directly. Assuming a single contiguous buffer would require too many copies.

@julienledem
Copy link
Member

julienledem commented Feb 2, 2017

@rdblue Could detail how big specifically is a humongous allocation? My concern with this change is that it makes the apis for decoding more complex. For example to create a vectorized reader I want to have a full page in a single buffer.
In particular I want to avoid extra if statements in decoders like ByteBitPackingValuesReader.
The main issue is managing few seeks (because of s3) and a page fitting in a single buffer (for faster decoding and fewer cases). Would that help if the page headers were in the parquet footer?

@rdblue
Copy link
Contributor Author

rdblue commented Oct 22, 2017

@julienledem, could you have another look at this? I'd like to get it in for the next release.

To address the concerns about extra complexity in the read path leading to poor performance, I've run this code through the benchmarks. First, the results without this change:

# Run complete. Total time: 00:18:31

Benchmark                                                      Mode  Cnt  Score   Error  Units
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeBROTLI        thrpt   15  0.541 ± 0.014  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeGZIP          thrpt   15  0.714 ± 0.054  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeLZ4           thrpt   15  0.855 ± 0.067  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeSNAPPY        thrpt   15  0.867 ± 0.068  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeUncompressed  thrpt   15  0.924 ± 0.092  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeZSTD          thrpt   15  0.887 ± 0.019  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeBROTLI         avgt   15  1.857 ± 0.046   s/op
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeGZIP           avgt   15  1.382 ± 0.125   s/op
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeLZ4            avgt   15  1.172 ± 0.111   s/op
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeSNAPPY         avgt   15  1.167 ± 0.104   s/op
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeUncompressed   avgt   15  1.093 ± 0.088   s/op
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeZSTD           avgt   15  1.166 ± 0.070   s/op

And with this PR applied:

# Run complete. Total time: 00:18:05

Benchmark                                                      Mode  Cnt  Score   Error  Units
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeBROTLI        thrpt   15  0.531 ± 0.019  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeGZIP          thrpt   15  0.741 ± 0.017  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeLZ4           thrpt   15  0.898 ± 0.024  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeSNAPPY        thrpt   15  0.923 ± 0.020  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeUncompressed  thrpt   15  0.965 ± 0.048  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeZSTD          thrpt   15  0.877 ± 0.020  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeBROTLI         avgt   15  1.827 ± 0.008   s/op
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeGZIP           avgt   15  1.340 ± 0.011   s/op
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeLZ4            avgt   15  1.114 ± 0.013   s/op
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeSNAPPY         avgt   15  1.084 ± 0.032   s/op
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeUncompressed   avgt   15  1.020 ± 0.023   s/op
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeZSTD           avgt   15  1.145 ± 0.032   s/op

The average time and throughput measured is very close, but differs in most cases in favor of these changes. The reason is that although this allows pages to be split across byte buffers, they are not actually split in the majority of cases. This ensures that, at most, one page will read from multiple buffers and most reads proceed without any additional if statements. In some cases, like ByteBitPackingValuesReader, there are fewer if statements this way.

For your question on the humongous allocations: a humongous allocation is half of the G1GC region size or more. Here's a table of region sizes from total heap size. These allocations are automatically considered tenured, so will sit around until a full GC. I tested with 8MB allocations, which are not humongous for applications with > 32G heaps, like Presto nodes. I think this should help in long-running database processes.

@rdblue
Copy link
Contributor Author

rdblue commented Oct 22, 2017

@danielcweeks, you may want to review this as well.

offset += length;
try {
in.skipFully(length);
} catch (IOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we already moved to Java 7, you could use a single catch block with IOException | RuntimeException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this white-space change intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I'll revert it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


ByteBuffer slice;
if (length > current.remaining()) {
// a copy is needed to return a single buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behaviour seems unintuitive to me. Sometimes the buffers will have shared content (which is probably what programmers expect based on Java's ByteBuffer.slice()), sometimes not. Sometimes the slice operation will be cheap (which is probably what programmers expect based on Java's ByteBuffer.slice()), sometimes not.

Even if the ByteBuffers are never meant to be modified, are we okay with this inconsistency? This could lead to some hard-to-reproduce bugs/slowdowns that only happen when requested slices cross buffer boundaries. I can't suggest a better solution either, but I thought this issue is worth mentioning nevertheless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copies are sometimes needed in order to get buffers back as ByteBuffer and not have one massive allocation. There's no way to get around needing to copy in some cases where a read crosses the boundary between buffers.

However, cross-buffer reads are hardly ever the case because the buffers inside these input streams are on the order of megabytes, while slices from this method are on the order of bytes. Callers that need large slices can also use sliceBuffers to avoid the problem. The benchmarks that I ran show that this is the case in practice: this buffer management is no slower than the existing code that depends on a huge allocation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another point: it's far more expensive to copy, which happens fairly often in the current read path because everything is based on byte arrays and not byte buffers. I think the copy here isn't significant enough to worry about in comparison.

int bytesAccumulated = 0;
while (bytesAccumulated < len) {
if (current.remaining() > 0) {
// get a slice of the current buffer to return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that we could avoid slicing ByteBuffers that fit entirely in the requested range, but since it's a cheap operation, I'm not sure it is worth the effort of explicitly handling them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to duplicate the buffer either way, so we'd only be able to avoid setting the position and limit, which doesn't seem worth it to me.

@rdblue rdblue force-pushed the PARQUET-787-limit-read-allocation-size branch from 31bc20b to 9e72839 Compare December 1, 2017 20:27
@rdblue rdblue force-pushed the PARQUET-787-limit-read-allocation-size branch 3 times, most recently from 75b7fcb to 1616403 Compare December 28, 2017 19:21
Copy link
Contributor

@gszadovszky gszadovszky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of my findings are minor but there are some that matter, in my opinion.

}

@Override
public synchronized void mark(int readlimit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is not thread safe. Using synchronized here is unnecessary and I think it is also misleading.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is from InputStream and can't be changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why it cannot be changed. Whether or whether not the super method is synchronized it is not part of the signature. InputStream is implemented to be thread-safe but this implementation is clearly not. I think, using synchronized here suggests that this implementation is also thread-safe which is misleading. In addition it might have some performance penalty as well but I don't think it is measurable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right and they're not in the method signature. I've removed them and we'll see if the tests pass.

}

@Override
public synchronized void reset() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is not thread safe. Using synchronized here is unnecessary and I think it is also misleading.

}

@Override
public synchronized void reset() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is not thread safe. Using synchronized here is unnecessary and I think it is also misleading.

}
}

private synchronized void discardMark() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is not thread safe. Using synchronized here is unnecessary and I think it is also misleading.

}

@Override
public synchronized void mark(int readlimit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is not thread safe. Using synchronized here is unnecessary and I think it is also misleading.


@Override
public int read(byte[] bytes, int off, int len) {
if (len <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might swallow an error here in case of len is negative. It might worth checking and throwing an IndexOutOfBoundsException if required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

discardMark();
nextBuffer(); // go back to the marked buffers
} else {
throw new IOException("No mark defined");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message might mislead the user. Reaching markLimit should also be mentioned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

throw new NoSuchElementException();
}

if (useFirst) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An Iterator shall not rely on that hasNext() is invoked every time before next(). As a private implementation it is good enough however it might worth a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do, good catch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the logic to conform to the contract.

*/
public static BytesInput from(ByteBuffer buffer, int offset, int length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct to simply remove this public implementation instead of deprecating first?

Copy link
Contributor Author

@rdblue rdblue Feb 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByteInput is internal, so I think it is fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree if it is internal but I am always confused which class is for public and which is for internal use only. It would be great if someone who have the requried knowledge (not pointing to you...) would mark these classes.

*
* @throws IOException
*/
public abstract void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to deprecate these methods instead of removing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's almost always better to remove methods, as long as they aren't part of the public API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

@rdblue rdblue force-pushed the PARQUET-787-limit-read-allocation-size branch from 1616403 to 4abba3e Compare February 21, 2018 00:04
Copy link
Contributor

@gszadovszky gszadovszky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the changes.
LGTM.

Copy link
Contributor

@zivanfi zivanfi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@rdblue
Copy link
Contributor Author

rdblue commented Feb 21, 2018

@zivanfi, @gszadovszky thank you for taking the time to review! I'm going to merge this.

@asfgit asfgit closed this in 8bbc6cb Feb 21, 2018
legend-hua pushed a commit to legend-hua/parquet-mr that referenced this pull request Mar 23, 2018
WIP: This update the `ParquetFileReader` to use multiple buffers when reading a row group, instead of a single humongous allocation. As a consequence, many classes needed to be updated to accept a stream backed by multiple buffers, instead of using a single buffer directly. Assuming a single contiguous buffer would require too many copies.

Author: Ryan Blue <[email protected]>

Closes apache#390 from rdblue/PARQUET-787-limit-read-allocation-size and squashes the following commits:

4abba3e [Ryan Blue] PARQUET-787: Update byte buffer input streams for review comments.
e7c6c5d [Ryan Blue] PARQUET-787: Fix problems from Zoltan's review.
be52b59 [Ryan Blue] PARQUET-787: Update tests for both ByteBufferInputStreams.
b0b6147 [Ryan Blue] PARQUET-787: Update encodings to use ByteBufferInputStream.
a4fa05a [Ryan Blue] Refactor ByteBufferInputStream implementations.
56b22a6 [Ryan Blue] Make allocation size configurable.
103ed3d [Ryan Blue] Add tests for ByteBufferInputStream and fix bugs.
614a2bb [Ryan Blue] Limit allocation size to 8MB chunks for better garbage collection.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants