Skip to content

Commit

Permalink
PARQUET-787: Update encodings to use ByteBufferInputStream.
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue committed Feb 20, 2018
1 parent a4fa05a commit b0b6147
Show file tree
Hide file tree
Showing 36 changed files with 469 additions and 353 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@
import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
import static org.apache.parquet.column.ValuesType.VALUES;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.parquet.CorruptDeltaByteArrays;
import org.apache.parquet.VersionParser.ParsedVersion;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
Expand Down Expand Up @@ -549,7 +548,7 @@ public Void visit(DataPageV2 dataPageV2) {
});
}

private void initDataReader(Encoding dataEncoding, ByteBuffer bytes, int offset, int valueCount) {
private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) {
ValuesReader previousReader = this.dataColumn;

this.currentEncoding = dataEncoding;
Expand All @@ -565,13 +564,15 @@ private void initDataReader(Encoding dataEncoding, ByteBuffer bytes, int offset,
} else {
this.dataColumn = dataEncoding.getValuesReader(path, VALUES);
}

if (dataEncoding.usesDictionary() && converter.hasDictionarySupport()) {
bindToDictionary(dictionary);
} else {
bind(path.getType());
}

try {
dataColumn.initFromPage(pageValueCount, bytes, offset);
dataColumn.initFromPage(pageValueCount, in);
} catch (IOException e) {
throw new ParquetDecodingException("could not read page in col " + path, e);
}
Expand All @@ -589,16 +590,15 @@ private void readPageV1(DataPageV1 page) {
this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
try {
ByteBuffer bytes = page.getBytes().toByteBuffer();
LOG.debug("page size {} bytes and {} records", bytes.remaining(), pageValueCount);
BytesInput bytes = page.getBytes();
LOG.debug("page size {} bytes and {} records", bytes.size(), pageValueCount);
LOG.debug("reading repetition levels at 0");
rlReader.initFromPage(pageValueCount, bytes, 0);
int next = rlReader.getNextOffset();
LOG.debug("reading definition levels at {}", next);
dlReader.initFromPage(pageValueCount, bytes, next);
next = dlReader.getNextOffset();
LOG.debug("reading data at {}", next);
initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
ByteBufferInputStream in = bytes.toInputStream();
rlReader.initFromPage(pageValueCount, in);
LOG.debug("reading definition levels at {}", in.position());
dlReader.initFromPage(pageValueCount, in);
LOG.debug("reading data at {}", in.position());
initDataReader(page.getValueEncoding(), in, page.getValueCount());
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
}
Expand All @@ -607,9 +607,9 @@ private void readPageV1(DataPageV1 page) {
private void readPageV2(DataPageV2 page) {
this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels());
this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount);
try {
LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount);
initDataReader(page.getDataEncoding(), page.getData().toByteBuffer(), 0, page.getValueCount());
initDataReader(page.getDataEncoding(), page.getData().toInputStream(), page.getValueCount());
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@

import java.io.IOException;

import java.nio.ByteBuffer;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.io.api.Binary;

/**
Expand All @@ -40,8 +39,9 @@ public abstract class ValuesReader {
/**
* Called to initialize the column reader from a part of a page.
*
* The underlying implementation knows how much data to read, so a length
* is not provided.
* Implementations must consume all bytes from the input stream, leaving the
* stream ready to read the next section of data. The underlying
* implementation knows how much data to read, so a length is not provided.
*
* Each page may contain several sections:
* <ul>
Expand All @@ -50,36 +50,12 @@ public abstract class ValuesReader {
* <li> data column
* </ul>
*
* This function is called with 'offset' pointing to the beginning of one of these sections,
* and should return the offset to the section following it.
*
* @param valueCount count of values in this page
* @param page the array to read from containing the page data (repetition levels, definition levels, data)
* @param offset where to start reading from in the page
* @param in an input stream containing the page data at the correct offset
*
* @throws IOException
*/
public abstract void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException;

/**
* Same functionality as method of the same name that takes a ByteBuffer instead of a byte[].
*
* This method is only provided for backward compatibility and will be removed in a future release.
* Please update any code using it as soon as possible.
* @see #initFromPage(int, ByteBuffer, int)
*/
@Deprecated
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException {
this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
}

/**
* Called to return offset of the next section
* @return offset of the next section
*/
public int getNextOffset() {
throw new ParquetDecodingException("Unsupported: cannot get offset of the next section.");
}
public abstract void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException;

/**
* usable when the encoding is dictionary based
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.apache.parquet.column.values.bitpacking.BitPacking.createBitPackingReader;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
Expand All @@ -44,7 +43,6 @@ public class BitPackingValuesReader extends ValuesReader {
private ByteBufferInputStream in;
private BitPackingReader bitPackingReader;
private final int bitsPerValue;
private int nextOffset;

/**
* @param bound the maximum value stored by this column
Expand All @@ -68,25 +66,16 @@ public int readInteger() {

/**
* {@inheritDoc}
* @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBuffer, int)
* @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBufferInputStream)
*/
@Override
public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
int effectiveBitLength = valueCount * bitsPerValue;
int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength);
LOG.debug("reading {} bytes for {} values of size {} bits.", length, valueCount, bitsPerValue);

ByteBuffer buffer = in.duplicate();
in.position(in.position() + offset);
in.limit(in.position() + length);
this.in = ByteBufferInputStream.wrap(buffer);
this.in = stream.sliceStream(length);
this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
this.nextOffset = offset + length;
}

@Override
public int getNextOffset() {
return nextOffset;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
package org.apache.parquet.column.values.bitpacking;

import java.io.IOException;
import java.util.Arrays;
import java.nio.ByteBuffer;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,9 +37,7 @@ public class ByteBitPackingValuesReader extends ValuesReader {
private final BytePacker packer;
private final int[] decoded = new int[VALUES_AT_A_TIME];
private int decodedPosition = VALUES_AT_A_TIME - 1;
private ByteBuffer encoded;
private int encodedPos;
private int nextOffset;
private ByteBufferInputStream in;

public ByteBitPackingValuesReader(int bound, Packer packer) {
this.bitWidth = BytesUtils.getWidthFromMaxInt(bound);
Expand All @@ -49,37 +48,38 @@ public ByteBitPackingValuesReader(int bound, Packer packer) {
public int readInteger() {
++ decodedPosition;
if (decodedPosition == decoded.length) {
encoded.position(encodedPos);
if (encodedPos + bitWidth > encoded.limit()) {
// unpack8Values needs at least bitWidth bytes to read from,
// We have to fill in 0 byte at the end of encoded bytes.
byte[] tempEncode = new byte[bitWidth];
encoded.get(tempEncode, 0, encoded.limit() - encodedPos);
packer.unpack8Values(tempEncode, 0, decoded, 0);
} else {
packer.unpack8Values(encoded, encodedPos, decoded, 0);
try {
if (in.available() < bitWidth) {
// unpack8Values needs at least bitWidth bytes to read from,
// We have to fill in 0 byte at the end of encoded bytes.
byte[] tempEncode = new byte[bitWidth];
in.read(tempEncode, 0, in.available());
packer.unpack8Values(tempEncode, 0, decoded, 0);
} else {
ByteBuffer encoded = in.slice(bitWidth);
packer.unpack8Values(encoded, encoded.position(), decoded, 0);
}
} catch (IOException e) {
throw new ParquetDecodingException("Failed to read packed values", e);
}
encodedPos += bitWidth;
decodedPosition = 0;
}
return decoded[decodedPosition];
}

@Override
public void initFromPage(int valueCount, ByteBuffer page, int offset)
public void initFromPage(int valueCount, ByteBufferInputStream stream)
throws IOException {
int effectiveBitLength = valueCount * bitWidth;
int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); // ceil
LOG.debug("reading {} bytes for {} values of size {} bits.", length, valueCount, bitWidth);
this.encoded = page;
this.encodedPos = offset;
LOG.debug("reading {} bytes for {} values of size {} bits.",
length, valueCount, bitWidth);
// work-around for null values. this will not happen for repetition or
// definition levels (never null), but will happen when valueCount has not
// been adjusted for null values in the data.
length = Math.min(length, stream.available());
this.in = stream.sliceStream(length);
this.decodedPosition = VALUES_AT_A_TIME - 1;
this.nextOffset = offset + length;
}

@Override
public int getNextOffset() {
return nextOffset;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
*/
private int valuesRead;
private long minDeltaInCurrentBlock;
private ByteBuffer page;

/**
* stores the decoded values including the first value which is written to the header
*/
Expand All @@ -52,25 +52,16 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
*/
private int valuesBuffered;
private ByteBufferInputStream in;
private int nextOffset;
private DeltaBinaryPackingConfig config;
private int[] bitWidths;

/**
* eagerly load all the data into memory
*
* @param valueCount count of values in this page
* @param page the array to read from containing the page data (repetition levels, definition levels, data)
* @param offset where to start reading from in the page
* @throws IOException
* eagerly loads all the data into memory
*/
@Override
public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException {
ByteBuffer buffer = page.duplicate();
buffer.position(buffer.position() + offset);
in = ByteBufferInputStream.wrap(buffer);
public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
this.in = stream;
this.config = DeltaBinaryPackingConfig.readConfig(in);
this.page = page;
this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
allocateValuesBuffer();
bitWidths = new int[config.miniBlockNumInABlock];
Expand All @@ -81,14 +72,8 @@ public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOE
while (valuesBuffered < totalValueCount) { //values Buffered could be more than totalValueCount, since we flush on a mini block basis
loadNewBlockToBuffer();
}
this.nextOffset = page.limit() - in.available();
}

@Override
public int getNextOffset() {
return nextOffset;
}


/**
* the value buffer is allocated so that the size of it is multiple of mini block
* because when writing, data is flushed on a mini block basis
Expand Down Expand Up @@ -159,12 +144,11 @@ private void unpackMiniBlock(BytePackerForLong packer) throws IOException {
}

private void unpack8Values(BytePackerForLong packer) throws IOException {
//calculate the pos because the packer api uses array not stream
int pos = page.limit() - in.available();
packer.unpack8Values(page, pos, valuesBuffer, valuesBuffered);
// get a single buffer of 8 values. most of the time, this won't require a copy
// TODO: update the packer to consume from an InputStream
ByteBuffer buffer = in.slice(packer.getBitWidth());
packer.unpack8Values(buffer, buffer.position(), valuesBuffer, valuesBuffered);
this.valuesBuffered += 8;
//sync the pos in stream
in.skip(packer.getBitWidth());
}

private void readBitWidthsForMiniBlocks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,34 +40,38 @@ public class DeltaLengthByteArrayValuesReader extends ValuesReader {

private static final Logger LOG = LoggerFactory.getLogger(DeltaLengthByteArrayValuesReader.class);
private ValuesReader lengthReader;
private ByteBuffer in;
private int offset;
private ByteBufferInputStream in;

public DeltaLengthByteArrayValuesReader() {
this.lengthReader = new DeltaBinaryPackingValuesReader();
}

@Override
public void initFromPage(int valueCount, ByteBuffer in, int offset)
public void initFromPage(int valueCount, ByteBufferInputStream stream)
throws IOException {
LOG.debug("init from page at offset {} for length {}", offset, (in.limit() - offset));
lengthReader.initFromPage(valueCount, in, offset);
offset = lengthReader.getNextOffset();
this.in = in;
this.offset = offset;
LOG.debug("init from page at offset {} for length {}",
stream.position(), stream.available());
lengthReader.initFromPage(valueCount, stream);
this.in = stream.remainingStream();
}

@Override
public Binary readBytes() {
int length = lengthReader.readInteger();
int start = offset;
offset = start + length;
return Binary.fromConstantByteBuffer(in, start, length);
try {
return Binary.fromConstantByteBuffer(in.slice(length));
} catch (IOException e) {
throw new ParquetDecodingException("Failed to read " + length + " bytes");
}
}

@Override
public void skip() {
int length = lengthReader.readInteger();
offset = offset + length;
try {
in.skipFully(length);
} catch (IOException e) {
throw new ParquetDecodingException("Failed to skip " + length + " bytes");
}
}
}
Loading

0 comments on commit b0b6147

Please sign in to comment.