Skip to content

Commit

Permalink
[SPARK-12992] [SQL] Update parquet reader to support more types when …
Browse files Browse the repository at this point in the history
…decoding to ColumnarBatch.

This patch implements support for more types when doing the vectorized decode. There are
a few more types remaining but they should be very straightforward after this. This code
has a few copy and paste pieces but they are difficult to eliminate due to performance
considerations.

Specifically, this patch adds support for:
  - String, Long, Byte types
  - Dictionary encoding for those types.

Author: Nong Li <[email protected]>

Closes #10908 from nongli/spark-12992.
  • Loading branch information
nongli authored and davies committed Feb 3, 2016
1 parent 672032d commit 21112e8
Show file tree
Hide file tree
Showing 6 changed files with 424 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import java.util.List;

import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.parquet.Preconditions;
Expand All @@ -41,6 +42,7 @@
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.types.UTF8String;
Expand Down Expand Up @@ -207,13 +209,7 @@ public boolean nextBatch() throws IOException {

int num = (int)Math.min((long) columnarBatch.capacity(), totalRowCount - rowsReturned);
for (int i = 0; i < columnReaders.length; ++i) {
switch (columnReaders[i].descriptor.getType()) {
case INT32:
columnReaders[i].readIntBatch(num, columnarBatch.column(i));
break;
default:
throw new IOException("Unsupported type: " + columnReaders[i].descriptor.getType());
}
columnReaders[i].readBatch(num, columnarBatch.column(i));
}
rowsReturned += num;
columnarBatch.setNumRows(num);
Expand All @@ -237,7 +233,8 @@ private void initializeInternal() throws IOException {

// TODO: Be extremely cautious in what is supported. Expand this.
if (originalTypes[i] != null && originalTypes[i] != OriginalType.DECIMAL &&
originalTypes[i] != OriginalType.UTF8 && originalTypes[i] != OriginalType.DATE) {
originalTypes[i] != OriginalType.UTF8 && originalTypes[i] != OriginalType.DATE &&
originalTypes[i] != OriginalType.INT_8 && originalTypes[i] != OriginalType.INT_16) {
throw new IOException("Unsupported type: " + t);
}
if (originalTypes[i] == OriginalType.DECIMAL &&
Expand Down Expand Up @@ -464,6 +461,11 @@ private final class ColumnReader {
*/
private boolean useDictionary;

/**
* If useDictionary is true, the staging vector used to decode the ids.
*/
private ColumnVector dictionaryIds;

/**
* Maximum definition level for this column.
*/
Expand Down Expand Up @@ -587,9 +589,8 @@ private boolean next() throws IOException {

/**
* Reads `total` values from this columnReader into column.
* TODO: implement the other encodings.
*/
private void readIntBatch(int total, ColumnVector column) throws IOException {
private void readBatch(int total, ColumnVector column) throws IOException {
int rowId = 0;
while (total > 0) {
// Compute the number of values we want to read in this page.
Expand All @@ -599,21 +600,134 @@ private void readIntBatch(int total, ColumnVector column) throws IOException {
leftInPage = (int)(endOfPageValueCount - valuesRead);
}
int num = Math.min(total, leftInPage);
defColumn.readIntegers(
num, column, rowId, maxDefLevel, (VectorizedValuesReader)dataColumn, 0);

// Remap the values if it is dictionary encoded.
if (useDictionary) {
for (int i = rowId; i < rowId + num; ++i) {
column.putInt(i, dictionary.decodeToInt(column.getInt(i)));
// Data is dictionary encoded. We will vector decode the ids and then resolve the values.
if (dictionaryIds == null) {
dictionaryIds = ColumnVector.allocate(total, DataTypes.IntegerType, MemoryMode.ON_HEAP);
} else {
dictionaryIds.reset();
dictionaryIds.reserve(total);
}
// Read and decode dictionary ids.
readIntBatch(rowId, num, dictionaryIds);
decodeDictionaryIds(rowId, num, column);
} else {
switch (descriptor.getType()) {
case INT32:
readIntBatch(rowId, num, column);
break;
case INT64:
readLongBatch(rowId, num, column);
break;
case BINARY:
readBinaryBatch(rowId, num, column);
break;
default:
throw new IOException("Unsupported type: " + descriptor.getType());
}
}

valuesRead += num;
rowId += num;
total -= num;
}
}

/**
* Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
*/
private void decodeDictionaryIds(int rowId, int num, ColumnVector column) {
switch (descriptor.getType()) {
case INT32:
if (column.dataType() == DataTypes.IntegerType) {
for (int i = rowId; i < rowId + num; ++i) {
column.putInt(i, dictionary.decodeToInt(dictionaryIds.getInt(i)));
}
} else if (column.dataType() == DataTypes.ByteType) {
for (int i = rowId; i < rowId + num; ++i) {
column.putByte(i, (byte)dictionary.decodeToInt(dictionaryIds.getInt(i)));
}
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
}
break;

case INT64:
for (int i = rowId; i < rowId + num; ++i) {
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getInt(i)));
}
break;

case BINARY:
// TODO: this is incredibly inefficient as it blows up the dictionary right here. We
// need to do this better. We should probably add the dictionary data to the ColumnVector
// and reuse it across batches. This should mean adding a ByteArray would just update
// the length and offset.
for (int i = rowId; i < rowId + num; ++i) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
column.putByteArray(i, v.getBytes());
}
break;

default:
throw new NotImplementedException("Unsupported type: " + descriptor.getType());
}

if (dictionaryIds.numNulls() > 0) {
// Copy the NULLs over.
// TODO: we can improve this by decoding the NULLs directly into column. This would
// mean we decode the int ids into `dictionaryIds` and the NULLs into `column` and then
// just do the ID remapping as above.
for (int i = 0; i < num; ++i) {
if (dictionaryIds.getIsNull(rowId + i)) {
column.putNull(rowId + i);
}
}
}
}

/**
* For all the read*Batch functions, reads `num` values from this columnReader into column. It
* is guaranteed that num is smaller than the number of values left in the current page.
*/

private void readIntBatch(int rowId, int num, ColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
if (column.dataType() == DataTypes.IntegerType) {
defColumn.readIntegers(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, 0);
} else if (column.dataType() == DataTypes.ByteType) {
defColumn.readBytes(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
}
}

private void readLongBatch(int rowId, int num, ColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
if (column.dataType() == DataTypes.LongType) {
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
}
}

private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
if (column.isArray()) {
defColumn.readBinarys(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
}
}


private void readPage() throws IOException {
DataPage page = pageReader.readPage();
// TODO: Why is this a visitor?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

import java.io.IOException;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.unsafe.Platform;

import org.apache.commons.lang.NotImplementedException;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;

/**
* An implementation of the Parquet PLAIN decoder that supports the vectorized interface.
Expand Down Expand Up @@ -52,15 +55,53 @@ public void skip(int n) {
}

@Override
public void readIntegers(int total, ColumnVector c, int rowId) {
public final void readIntegers(int total, ColumnVector c, int rowId) {
c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
offset += 4 * total;
}

@Override
public int readInteger() {
public final void readLongs(int total, ColumnVector c, int rowId) {
c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
offset += 8 * total;
}

@Override
public final void readBytes(int total, ColumnVector c, int rowId) {
for (int i = 0; i < total; i++) {
// Bytes are stored as a 4-byte little endian int. Just read the first byte.
// TODO: consider pushing this in ColumnVector by adding a readBytes with a stride.
c.putInt(rowId + i, buffer[offset]);
offset += 4;
}
}

@Override
public final int readInteger() {
int v = Platform.getInt(buffer, offset);
offset += 4;
return v;
}

@Override
public final long readLong() {
long v = Platform.getLong(buffer, offset);
offset += 8;
return v;
}

@Override
public final byte readByte() {
return (byte)readInteger();
}

@Override
public final void readBinary(int total, ColumnVector v, int rowId) {
for (int i = 0; i < total; i++) {
int len = readInteger();
int start = offset;
offset += len;
v.putByteArray(rowId + i, buffer, start - Platform.BYTE_ARRAY_OFFSET, len);
}
}
}
Loading

0 comments on commit 21112e8

Please sign in to comment.