Skip to content

Commit

Permalink
[SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This updates Parquet to 1.10.0 and updates the vectorized path for buffer management changes. Parquet 1.10.0 uses ByteBufferInputStream instead of byte arrays in encoders. This allows Parquet to break allocations into smaller chunks that are better for garbage collection.

## How was this patch tested?

Existing Parquet tests. Running in production at Netflix for about 3 months.

Author: Ryan Blue <[email protected]>

Closes #21070 from rdblue/SPARK-23972-update-parquet-to-1.10.0.
  • Loading branch information
rdblue authored and cloud-fan committed May 9, 2018
1 parent 7e73502 commit cac9b1d
Show file tree
Hide file tree
Showing 13 changed files with 241 additions and 198 deletions.
12 changes: 6 additions & 6 deletions dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,13 @@ orc-mapreduce-1.4.3-nohive.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
parquet-column-1.8.2.jar
parquet-common-1.8.2.jar
parquet-encoding-1.8.2.jar
parquet-format-2.3.1.jar
parquet-hadoop-1.8.2.jar
parquet-column-1.10.0.jar
parquet-common-1.10.0.jar
parquet-encoding-1.10.0.jar
parquet-format-2.4.0.jar
parquet-hadoop-1.10.0.jar
parquet-hadoop-bundle-1.6.0.jar
parquet-jackson-1.8.2.jar
parquet-jackson-1.10.0.jar
protobuf-java-2.5.0.jar
py4j-0.10.6.jar
pyrolite-4.13.jar
Expand Down
12 changes: 6 additions & 6 deletions dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,13 @@ orc-mapreduce-1.4.3-nohive.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
parquet-column-1.8.2.jar
parquet-common-1.8.2.jar
parquet-encoding-1.8.2.jar
parquet-format-2.3.1.jar
parquet-hadoop-1.8.2.jar
parquet-column-1.10.0.jar
parquet-common-1.10.0.jar
parquet-encoding-1.10.0.jar
parquet-format-2.4.0.jar
parquet-hadoop-1.10.0.jar
parquet-hadoop-bundle-1.6.0.jar
parquet-jackson-1.8.2.jar
parquet-jackson-1.10.0.jar
protobuf-java-2.5.0.jar
py4j-0.10.6.jar
pyrolite-4.13.jar
Expand Down
12 changes: 6 additions & 6 deletions dev/deps/spark-deps-hadoop-3.1
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ orc-mapreduce-1.4.3-nohive.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
parquet-column-1.8.2.jar
parquet-common-1.8.2.jar
parquet-encoding-1.8.2.jar
parquet-format-2.3.1.jar
parquet-hadoop-1.8.2.jar
parquet-column-1.10.0.jar
parquet-common-1.10.0.jar
parquet-encoding-1.10.0.jar
parquet-format-2.4.0.jar
parquet-hadoop-1.10.0.jar
parquet-hadoop-bundle-1.6.0.jar
parquet-jackson-1.8.2.jar
parquet-jackson-1.10.0.jar
protobuf-java-2.5.0.jar
py4j-0.10.6.jar
pyrolite-4.13.jar
Expand Down
2 changes: 1 addition & 1 deletion docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
Sets the compression codec used when writing Parquet files. If either `compression` or
`parquet.compression` is specified in the table-specific options/properties, the precedence would be
`compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include:
none, uncompressed, snappy, gzip, lzo.
none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.
</td>
</tr>
<tr>
Expand Down
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>1.2.1</hive.version.short>
<derby.version>10.12.1.1</derby.version>
<parquet.version>1.8.2</parquet.version>
<parquet.version>1.10.0</parquet.version>
<orc.version>1.4.3</orc.version>
<orc.classifier>nohive</orc.classifier>
<hive.parquet.version>1.6.0</hive.parquet.version>
Expand Down Expand Up @@ -1778,6 +1778,12 @@
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
<scope>${parquet.deps.scope}</scope>
<exclusions>
<exclusion>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ object SQLConf {
"snappy, gzip, lzo.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo"))
.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo", "lz4", "brotli", "zstd"))
.createWithDefault("snappy")

val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ protected static IntIterator createRLEIterator(
return new RLEIntIterator(
new RunLengthBitPackingHybridDecoder(
BytesUtils.getWidthFromMaxInt(maxLevel),
new ByteArrayInputStream(bytes.toByteArray())));
bytes.toInputStream()));
} catch (IOException e) {
throw new IOException("could not read levels in page for col " + descriptor, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Arrays;
import java.util.TimeZone;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
Expand Down Expand Up @@ -388,15 +390,16 @@ private void decodeDictionaryIds(
* is guaranteed that num is smaller than the number of values left in the current page.
*/

private void readBooleanBatch(int rowId, int num, WritableColumnVector column) {
private void readBooleanBatch(int rowId, int num, WritableColumnVector column)
throws IOException {
if (column.dataType() != DataTypes.BooleanType) {
throw constructConvertNotSupportedException(descriptor, column);
}
defColumn.readBooleans(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
}

private void readIntBatch(int rowId, int num, WritableColumnVector column) {
private void readIntBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType ||
Expand All @@ -414,7 +417,7 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) {
}
}

private void readLongBatch(int rowId, int num, WritableColumnVector column) {
private void readLongBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType()) ||
Expand All @@ -434,7 +437,7 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) {
}
}

private void readFloatBatch(int rowId, int num, WritableColumnVector column) {
private void readFloatBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: support implicit cast to double?
if (column.dataType() == DataTypes.FloatType) {
Expand All @@ -445,7 +448,7 @@ private void readFloatBatch(int rowId, int num, WritableColumnVector column) {
}
}

private void readDoubleBatch(int rowId, int num, WritableColumnVector column) {
private void readDoubleBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
if (column.dataType() == DataTypes.DoubleType) {
Expand All @@ -456,7 +459,7 @@ private void readDoubleBatch(int rowId, int num, WritableColumnVector column) {
}
}

private void readBinaryBatch(int rowId, int num, WritableColumnVector column) {
private void readBinaryBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
Expand Down Expand Up @@ -556,7 +559,7 @@ public Void visit(DataPageV2 dataPageV2) {
});
}

private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset) throws IOException {
private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in) throws IOException {
this.endOfPageValueCount = valuesRead + pageValueCount;
if (dataEncoding.usesDictionary()) {
this.dataColumn = null;
Expand All @@ -581,7 +584,7 @@ private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset) thr
}

try {
dataColumn.initFromPage(pageValueCount, bytes, offset);
dataColumn.initFromPage(pageValueCount, in);
} catch (IOException e) {
throw new IOException("could not read page in col " + descriptor, e);
}
Expand All @@ -602,12 +605,11 @@ private void readPageV1(DataPageV1 page) throws IOException {
this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
try {
byte[] bytes = page.getBytes().toByteArray();
rlReader.initFromPage(pageValueCount, bytes, 0);
int next = rlReader.getNextOffset();
dlReader.initFromPage(pageValueCount, bytes, next);
next = dlReader.getNextOffset();
initDataReader(page.getValueEncoding(), bytes, next);
BytesInput bytes = page.getBytes();
ByteBufferInputStream in = bytes.toInputStream();
rlReader.initFromPage(pageValueCount, in);
dlReader.initFromPage(pageValueCount, in);
initDataReader(page.getValueEncoding(), in);
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " + descriptor, e);
}
Expand All @@ -619,12 +621,13 @@ private void readPageV2(DataPageV2 page) throws IOException {
page.getRepetitionLevels(), descriptor);

int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
this.defColumn = new VectorizedRleValuesReader(bitWidth);
// do not read the length from the stream. v2 pages handle dividing the page bytes.
this.defColumn = new VectorizedRleValuesReader(bitWidth, false);
this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
this.defColumn.initFromBuffer(
this.pageValueCount, page.getDefinitionLevels().toByteArray());
this.defColumn.initFromPage(
this.pageValueCount, page.getDefinitionLevels().toInputStream());
try {
initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0);
initDataReader(page.getDataEncoding(), page.getData().toInputStream());
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " + descriptor, e);
}
Expand Down
Loading

0 comments on commit cac9b1d

Please sign in to comment.