Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-787: Limit read allocation size #390

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@
import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
import static org.apache.parquet.column.ValuesType.VALUES;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.parquet.CorruptDeltaByteArrays;
import org.apache.parquet.VersionParser.ParsedVersion;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
Expand Down Expand Up @@ -549,7 +548,7 @@ public Void visit(DataPageV2 dataPageV2) {
});
}

private void initDataReader(Encoding dataEncoding, ByteBuffer bytes, int offset, int valueCount) {
private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) {
ValuesReader previousReader = this.dataColumn;

this.currentEncoding = dataEncoding;
Expand All @@ -565,13 +564,15 @@ private void initDataReader(Encoding dataEncoding, ByteBuffer bytes, int offset,
} else {
this.dataColumn = dataEncoding.getValuesReader(path, VALUES);
}

if (dataEncoding.usesDictionary() && converter.hasDictionarySupport()) {
bindToDictionary(dictionary);
} else {
bind(path.getType());
}

try {
dataColumn.initFromPage(pageValueCount, bytes, offset);
dataColumn.initFromPage(pageValueCount, in);
} catch (IOException e) {
throw new ParquetDecodingException("could not read page in col " + path, e);
}
Expand All @@ -589,16 +590,15 @@ private void readPageV1(DataPageV1 page) {
this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
try {
ByteBuffer bytes = page.getBytes().toByteBuffer();
LOG.debug("page size {} bytes and {} records", bytes.remaining(), pageValueCount);
BytesInput bytes = page.getBytes();
LOG.debug("page size {} bytes and {} records", bytes.size(), pageValueCount);
LOG.debug("reading repetition levels at 0");
rlReader.initFromPage(pageValueCount, bytes, 0);
int next = rlReader.getNextOffset();
LOG.debug("reading definition levels at {}", next);
dlReader.initFromPage(pageValueCount, bytes, next);
next = dlReader.getNextOffset();
LOG.debug("reading data at {}", next);
initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
ByteBufferInputStream in = bytes.toInputStream();
rlReader.initFromPage(pageValueCount, in);
LOG.debug("reading definition levels at {}", in.position());
dlReader.initFromPage(pageValueCount, in);
LOG.debug("reading data at {}", in.position());
initDataReader(page.getValueEncoding(), in, page.getValueCount());
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
}
Expand All @@ -607,9 +607,9 @@ private void readPageV1(DataPageV1 page) {
private void readPageV2(DataPageV2 page) {
this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels());
this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount);
try {
LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount);
initDataReader(page.getDataEncoding(), page.getData().toByteBuffer(), 0, page.getValueCount());
initDataReader(page.getDataEncoding(), page.getData().toInputStream(), page.getValueCount());
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@

import java.io.IOException;

import java.nio.ByteBuffer;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.io.api.Binary;

/**
Expand All @@ -40,8 +39,9 @@ public abstract class ValuesReader {
/**
* Called to initialize the column reader from a part of a page.
*
* The underlying implementation knows how much data to read, so a length
* is not provided.
* Implementations must consume all bytes from the input stream, leaving the
* stream ready to read the next section of data. The underlying
* implementation knows how much data to read, so a length is not provided.
*
* Each page may contain several sections:
* <ul>
Expand All @@ -50,36 +50,12 @@ public abstract class ValuesReader {
* <li> data column
* </ul>
*
* This function is called with 'offset' pointing to the beginning of one of these sections,
* and should return the offset to the section following it.
*
* @param valueCount count of values in this page
* @param page the array to read from containing the page data (repetition levels, definition levels, data)
* @param offset where to start reading from in the page
* @param in an input stream containing the page data at the correct offset
*
* @throws IOException
*/
public abstract void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to deprecate these methods instead of removing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's almost always better to remove methods, as long as they aren't part of the public API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.


/**
* Same functionality as method of the same name that takes a ByteBuffer instead of a byte[].
*
* This method is only provided for backward compatibility and will be removed in a future release.
* Please update any code using it as soon as possible.
* @see #initFromPage(int, ByteBuffer, int)
*/
@Deprecated
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException {
this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
}

/**
* Called to return offset of the next section
* @return offset of the next section
*/
public int getNextOffset() {
throw new ParquetDecodingException("Unsupported: cannot get offset of the next section.");
}
public abstract void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException;

/**
* usable when the encoding is dictionary based
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.apache.parquet.column.values.bitpacking.BitPacking.createBitPackingReader;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
Expand All @@ -44,7 +43,6 @@ public class BitPackingValuesReader extends ValuesReader {
private ByteBufferInputStream in;
private BitPackingReader bitPackingReader;
private final int bitsPerValue;
private int nextOffset;

/**
* @param bound the maximum value stored by this column
Expand All @@ -68,21 +66,16 @@ public int readInteger() {

/**
* {@inheritDoc}
* @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBuffer, int)
* @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBufferInputStream)
*/
@Override
public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
int effectiveBitLength = valueCount * bitsPerValue;
int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength);
LOG.debug("reading {} bytes for {} values of size {} bits.", length, valueCount, bitsPerValue);
this.in = new ByteBufferInputStream(in, offset, length);
this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
this.nextOffset = offset + length;
}

@Override
public int getNextOffset() {
return nextOffset;
this.in = stream.sliceStream(length);
this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
package org.apache.parquet.column.values.bitpacking;

import java.io.IOException;
import java.util.Arrays;
import java.nio.ByteBuffer;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,9 +37,7 @@ public class ByteBitPackingValuesReader extends ValuesReader {
private final BytePacker packer;
private final int[] decoded = new int[VALUES_AT_A_TIME];
private int decodedPosition = VALUES_AT_A_TIME - 1;
private ByteBuffer encoded;
private int encodedPos;
private int nextOffset;
private ByteBufferInputStream in;

public ByteBitPackingValuesReader(int bound, Packer packer) {
this.bitWidth = BytesUtils.getWidthFromMaxInt(bound);
Expand All @@ -49,37 +48,38 @@ public ByteBitPackingValuesReader(int bound, Packer packer) {
public int readInteger() {
++ decodedPosition;
if (decodedPosition == decoded.length) {
encoded.position(encodedPos);
if (encodedPos + bitWidth > encoded.limit()) {
// unpack8Values needs at least bitWidth bytes to read from,
// We have to fill in 0 byte at the end of encoded bytes.
byte[] tempEncode = new byte[bitWidth];
encoded.get(tempEncode, 0, encoded.limit() - encodedPos);
packer.unpack8Values(tempEncode, 0, decoded, 0);
} else {
packer.unpack8Values(encoded, encodedPos, decoded, 0);
try {
if (in.available() < bitWidth) {
// unpack8Values needs at least bitWidth bytes to read from,
// We have to fill in 0 byte at the end of encoded bytes.
byte[] tempEncode = new byte[bitWidth];
in.read(tempEncode, 0, in.available());
packer.unpack8Values(tempEncode, 0, decoded, 0);
} else {
ByteBuffer encoded = in.slice(bitWidth);
packer.unpack8Values(encoded, encoded.position(), decoded, 0);
}
} catch (IOException e) {
throw new ParquetDecodingException("Failed to read packed values", e);
}
encodedPos += bitWidth;
decodedPosition = 0;
}
return decoded[decodedPosition];
}

@Override
public void initFromPage(int valueCount, ByteBuffer page, int offset)
public void initFromPage(int valueCount, ByteBufferInputStream stream)
throws IOException {
int effectiveBitLength = valueCount * bitWidth;
int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); // ceil
LOG.debug("reading {} bytes for {} values of size {} bits.", length, valueCount, bitWidth);
this.encoded = page;
this.encodedPos = offset;
LOG.debug("reading {} bytes for {} values of size {} bits.",
length, valueCount, bitWidth);
// work-around for null values. this will not happen for repetition or
// definition levels (never null), but will happen when valueCount has not
// been adjusted for null values in the data.
length = Math.min(length, stream.available());
this.in = stream.sliceStream(length);
this.decodedPosition = VALUES_AT_A_TIME - 1;
this.nextOffset = offset + length;
}

@Override
public int getNextOffset() {
return nextOffset;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.parquet.column.values.delta;

import java.io.ByteArrayInputStream;
import java.io.IOException;

import org.apache.parquet.bytes.ByteBufferInputStream;
Expand All @@ -28,7 +27,6 @@
import org.apache.parquet.column.values.bitpacking.Packer;
import org.apache.parquet.io.ParquetDecodingException;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
Expand All @@ -43,7 +41,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
*/
private int valuesRead;
private long minDeltaInCurrentBlock;
private ByteBuffer page;

/**
* stores the decoded values including the first value which is written to the header
*/
Expand All @@ -54,23 +52,16 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
*/
private int valuesBuffered;
private ByteBufferInputStream in;
private int nextOffset;
private DeltaBinaryPackingConfig config;
private int[] bitWidths;

/**
* eagerly load all the data into memory
*
* @param valueCount count of values in this page
* @param page the array to read from containing the page data (repetition levels, definition levels, data)
* @param offset where to start reading from in the page
* @throws IOException
* eagerly loads all the data into memory
*/
@Override
public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException {
in = new ByteBufferInputStream(page, offset, page.limit() - offset);
public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
this.in = stream;
this.config = DeltaBinaryPackingConfig.readConfig(in);
this.page = page;
this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
allocateValuesBuffer();
bitWidths = new int[config.miniBlockNumInABlock];
Expand All @@ -81,14 +72,8 @@ public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOE
while (valuesBuffered < totalValueCount) { //values Buffered could be more than totalValueCount, since we flush on a mini block basis
loadNewBlockToBuffer();
}
this.nextOffset = page.limit() - in.available();
}

@Override
public int getNextOffset() {
return nextOffset;
}


/**
* the value buffer is allocated so that the size of it is multiple of mini block
* because when writing, data is flushed on a mini block basis
Expand Down Expand Up @@ -123,7 +108,7 @@ private void checkRead() {
}
}

private void loadNewBlockToBuffer() {
private void loadNewBlockToBuffer() throws IOException {
try {
minDeltaInCurrentBlock = BytesUtils.readZigZagVarLong(in);
} catch (IOException e) {
Expand Down Expand Up @@ -152,19 +137,18 @@ private void loadNewBlockToBuffer() {
*
* @param packer the packer created from bitwidth of current mini block
*/
private void unpackMiniBlock(BytePackerForLong packer) {
private void unpackMiniBlock(BytePackerForLong packer) throws IOException {
for (int j = 0; j < config.miniBlockSizeInValues; j += 8) {
unpack8Values(packer);
}
}

private void unpack8Values(BytePackerForLong packer) {
//calculate the pos because the packer api uses array not stream
int pos = page.limit() - in.available();
packer.unpack8Values(page, pos, valuesBuffer, valuesBuffered);
private void unpack8Values(BytePackerForLong packer) throws IOException {
// get a single buffer of 8 values. most of the time, this won't require a copy
// TODO: update the packer to consume from an InputStream
ByteBuffer buffer = in.slice(packer.getBitWidth());
packer.unpack8Values(buffer, buffer.position(), valuesBuffer, valuesBuffered);
this.valuesBuffered += 8;
//sync the pos in stream
in.skip(packer.getBitWidth());
}

private void readBitWidthsForMiniBlocks() {
Expand Down
Loading