diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/LazyByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/LazyByteBufferInputStream.java new file mode 100644 index 0000000000..07eb7f3c69 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/LazyByteBufferInputStream.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.parquet.io.SeekableInputStream; + +public class LazyByteBufferInputStream extends InputStream { + private final SeekableInputStream underlying; + private final int bufferSize; + private final long totalReadableBytes; + + private long streamOffset; + private int bytesReadInBuffer; + private int totalBytesRead; + private ByteBuffer delegate; + private ByteBufferInputStream buffer; + + public static LazyByteBufferInputStream wrap( + SeekableInputStream underlying, + ByteBufferAllocator allocator, + long startingOffset, + long readableBytes, + int bufferSize) + throws IOException { + return new LazyByteBufferInputStream(underlying, allocator, startingOffset, readableBytes, bufferSize); + } + + LazyByteBufferInputStream( + SeekableInputStream underlying, + ByteBufferAllocator allocator, + long startingOffset, + long readableBytes, + int bufferSize) { + this.underlying = underlying; + this.totalReadableBytes = readableBytes; + this.bytesReadInBuffer = 0; + this.bufferSize = (int) Math.min(readableBytes, bufferSize); // @Todo clean up casting... + this.streamOffset = startingOffset; + this.delegate = allocator.allocate(this.bufferSize); + try { + underlying.seek(startingOffset); + underlying.read(delegate); + delegate.flip(); + } catch (IOException e) { + throw new RuntimeException("Failed to initialize LazyByteBufferInputStream", e); + } + this.buffer = ByteBufferInputStream.wrap(delegate); + } + + public ByteBuffer getDelegate() { + return delegate; + } + + public void refillBuffers() { + try { + final byte[] unreadBytes = + Arrays.copyOfRange(delegate.array(), delegate.capacity() - buffer.available(), delegate.capacity()); + this.streamOffset += bytesReadInBuffer; + delegate.put(unreadBytes); + underlying.seek(streamOffset + unreadBytes.length); + underlying.read(delegate); // Todo check that we're not reading past total chunk size + delegate.flip(); + } catch (IOException e) { + throw new RuntimeException(e); // @Todo better exception handling + } + + this.bytesReadInBuffer = 0; + this.buffer = ByteBufferInputStream.wrap(delegate); + } + + private void checkRefill() { + checkRefill(1); + } + + private void checkRefill(long bytesNeeded) { + if (buffer.available() < bytesNeeded) { + refillBuffers(); + } + } + + public List sliceBuffers(int len) throws IOException { + if (len == 0) { + return Collections.emptyList(); + } + + checkRefill(len); + + final ByteBuffer slice = + ByteBuffer.wrap(Arrays.copyOfRange(delegate.array(), bytesReadInBuffer, bytesReadInBuffer + len)); + slice.position(0); + + buffer.skip(len); + bytesReadInBuffer += len; + totalBytesRead += len; + return Collections.singletonList(slice); + } + + public long position() { + return totalBytesRead; + } + + private int wrapRead(int bytesRead) { + this.totalBytesRead += bytesRead; + this.bytesReadInBuffer += bytesRead; + return bytesRead; + } + + @Override + public int read() throws IOException { + checkRefill(); + wrapRead(1); + return buffer.read(); + } + + @Override + public int read(byte[] b) throws IOException { + checkRefill(b.length); + return wrapRead(buffer.read(b)); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + checkRefill(len); + return wrapRead(buffer.read(b, off, len)); + } + + @Override + public byte[] readAllBytes() throws IOException { + throw new UnsupportedOperationException("readAllBytes"); + } + + @Override + public byte[] readNBytes(int len) throws IOException { + throw new UnsupportedOperationException("readNBytes"); + } + + @Override + public int readNBytes(byte[] b, int off, int len) throws IOException { + throw new UnsupportedOperationException("readNBytes"); + } + + @Override + public long skip(long n) { + checkRefill(n); + totalBytesRead += n; + bytesReadInBuffer += n; + return buffer.skip(n); + } + + @Override + public int available() { + return (int) (totalReadableBytes - totalBytesRead); + } + + @Override + public void close() throws IOException { + buffer.close(); + } + + @Override + public synchronized void mark(int readlimit) { + buffer.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + buffer.reset(); + } + + @Override + public boolean markSupported() { + return buffer.markSupported(); + } +} diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java index 82b804bec4..89d67691a2 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java @@ -129,7 +129,7 @@ public List sliceBuffers(long length) throws EOFException { if (length == 0) { return Collections.emptyList(); } - + // System.out.println("Length = " + length + ", remaining= " + buffer.remaining()); if (length > buffer.remaining()) { throw new EOFException(); } diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestLazyBufferInputStream.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestLazyBufferInputStream.java new file mode 100644 index 0000000000..e2e23e27a6 --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestLazyBufferInputStream.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.apache.parquet.io.DelegatingSeekableInputStream; +import org.apache.parquet.io.SeekableInputStream; +import org.junit.Assert; +import org.junit.Test; + +public class TestLazyBufferInputStream { + static final ByteBuffer DATA = ByteBuffer.wrap(new byte[] { + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, + 30, 31, 32, 33, 34 + }); + + protected LazyByteBufferInputStream newStream() { + ByteBufferInputStream stream = ByteBufferInputStream.wrap(DATA); + SeekableInputStream inputStream = new DelegatingSeekableInputStream(stream) { + @Override + public long getPos() throws IOException { + return stream.position(); + } + + @Override + public void seek(long newPos) throws IOException { + stream.skip(newPos - stream.position()); + } + }; + + return new LazyByteBufferInputStream(inputStream, new HeapByteBufferAllocator(), 2, 20, 6); + } + + @Test + public void testReadBytes() throws Exception { + LazyByteBufferInputStream stream = newStream(); + Assert.assertEquals(20, stream.available()); + + Assert.assertEquals(stream.read(), DATA.array()[2]); + Assert.assertEquals(stream.read(), DATA.array()[3]); + Assert.assertEquals(stream.read(), DATA.array()[4]); + Assert.assertEquals(stream.read(), DATA.array()[5]); + Assert.assertEquals(stream.read(), DATA.array()[6]); + + Assert.assertEquals(15, stream.available()); + + Assert.assertEquals(stream.read(), DATA.array()[7]); + Assert.assertEquals(stream.read(), DATA.array()[8]); + Assert.assertEquals(stream.read(), DATA.array()[9]); + Assert.assertEquals(stream.read(), DATA.array()[10]); + Assert.assertEquals(stream.read(), DATA.array()[11]); + + Assert.assertEquals(10, stream.available()); + stream.close(); + } + + @Test + public void testReadByteBuffers() throws Exception { + LazyByteBufferInputStream stream = newStream(); + byte[] buf1 = new byte[4]; + Assert.assertEquals(stream.read(buf1), 4); + System.out.println( + "actual bytes = " + Arrays.toString(buf1) + ", expected =" + Arrays.toString(new byte[] {2, 3, 4, 5})); + Assert.assertArrayEquals(new byte[] {2, 3, 4, 5}, buf1); + Assert.assertEquals(stream.available(), 16); + Assert.assertEquals(stream.position(), 4); + + byte[] buf2 = new byte[4]; + Assert.assertEquals(stream.read(buf2), 4); + Assert.assertArrayEquals(new byte[] {6, 7, 8, 9}, buf2); + Assert.assertEquals(stream.available(), 12); + Assert.assertEquals(stream.position(), 8); + + byte[] buf3 = new byte[6]; + Assert.assertEquals(stream.read(buf3), 6); + Assert.assertArrayEquals(new byte[] {10, 11, 12, 13, 14, 15}, buf3); + Assert.assertEquals(stream.available(), 6); + Assert.assertEquals(stream.position(), 14); + + byte[] buf4 = new byte[6]; + Assert.assertEquals(stream.read(buf4), 6); + Assert.assertArrayEquals(new byte[] {16, 17, 18, 19, 20, 21}, buf4); + Assert.assertEquals(stream.available(), 0); + Assert.assertEquals(stream.position(), 20); + + Assert.assertEquals(0, stream.available()); + stream.close(); + } + + @Test + public void testSliceBuffers() throws Exception { + final LazyByteBufferInputStream stream = newStream(); + + // Initialize with a few reads to test when position != 0 + stream.read(); + stream.read(); + + Assert.assertEquals(18, stream.available()); + Assert.assertEquals(2, stream.position()); + + ByteBuffer slice1 = stream.sliceBuffers(4).get(0); + Assert.assertEquals(0, slice1.position()); + Assert.assertEquals(4, slice1.remaining()); + + byte[] buf = new byte[4]; + buf[0] = slice1.get(); + buf[1] = slice1.get(); + buf[2] = slice1.get(); + buf[3] = slice1.get(); + Assert.assertArrayEquals(new byte[] {4, 5, 6, 7}, buf); + Assert.assertEquals(14, stream.available()); + Assert.assertEquals(6, stream.position()); + + ByteBuffer slice2 = stream.sliceBuffers(6).get(0); + Assert.assertEquals(0, slice2.position()); + Assert.assertEquals(6, slice2.remaining()); + + byte[] buf2 = new byte[6]; + buf2[0] = slice2.get(); + buf2[1] = slice2.get(); + buf2[2] = slice2.get(); + buf2[3] = slice2.get(); + buf2[4] = slice2.get(); + buf2[5] = slice2.get(); + Assert.assertArrayEquals(new byte[] {8, 9, 10, 11, 12, 13}, buf2); + Assert.assertEquals(8, stream.available()); + Assert.assertEquals(12, stream.position()); + + stream.close(); + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java index b27f280349..c0d2da530e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java @@ -43,7 +43,7 @@ private HadoopReadOptions( boolean usePageChecksumVerification, boolean useBloomFilter, boolean useOffHeapDecryptBuffer, - boolean eagerlyReadFullRowGroup, + int columnChunkReadSize, FilterCompat.Filter recordFilter, MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -62,7 +62,7 @@ private HadoopReadOptions( usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, - eagerlyReadFullRowGroup, + columnChunkReadSize, recordFilter, metadataFilter, codecFactory, @@ -125,7 +125,7 @@ public ParquetReadOptions build() { usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, - eagerlyReadFullRowGroup, + columnChunkBufferSize, recordFilter, metadataFilter, codecFactory, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index f5170038d4..d8192b1b0e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -21,9 +21,9 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.hadoop.ParquetInputFormat.BLOOM_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_CHUNK_BUFFER_SIZE; import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.EAGERLY_READ_FULL_ROW_GROUP; import static org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; @@ -61,7 +61,7 @@ public class ParquetReadOptions { private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true; private static final boolean USE_OFF_HEAP_DECRYPT_BUFFER_DEFAULT = false; - private static final boolean EAGERLY_READ_FULL_ROW_GROUP_DEFAULT = true; + private static final int COLUMN_CHUNK_BUFFER_SIZE_DEFAULT = -1; private final boolean useSignedStringMinMax; private final boolean useStatsFilter; @@ -72,7 +72,7 @@ public class ParquetReadOptions { private final boolean useBloomFilter; private final boolean useOffHeapDecryptBuffer; - private final boolean eagerlyReadFullRowGroup; + private final int columnChunkBufferSize; private final FilterCompat.Filter recordFilter; private final ParquetMetadataConverter.MetadataFilter metadataFilter; private final CompressionCodecFactory codecFactory; @@ -92,7 +92,7 @@ public class ParquetReadOptions { boolean usePageChecksumVerification, boolean useBloomFilter, boolean useOffHeapDecryptBuffer, - boolean eagerlyReadFullRowGroup, + int columnChunkBufferSize, FilterCompat.Filter recordFilter, ParquetMetadataConverter.MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -110,7 +110,7 @@ public class ParquetReadOptions { usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, - eagerlyReadFullRowGroup, + columnChunkBufferSize, recordFilter, metadataFilter, codecFactory, @@ -131,7 +131,7 @@ public class ParquetReadOptions { boolean usePageChecksumVerification, boolean useBloomFilter, boolean useOffHeapDecryptBuffer, - boolean eagerlyReadFullRowGroup, + int columnChunkBufferSize, FilterCompat.Filter recordFilter, ParquetMetadataConverter.MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -149,7 +149,7 @@ public class ParquetReadOptions { this.usePageChecksumVerification = usePageChecksumVerification; this.useBloomFilter = useBloomFilter; this.useOffHeapDecryptBuffer = useOffHeapDecryptBuffer; - this.eagerlyReadFullRowGroup = eagerlyReadFullRowGroup; + this.columnChunkBufferSize = columnChunkBufferSize; this.recordFilter = recordFilter; this.metadataFilter = metadataFilter; this.codecFactory = codecFactory; @@ -193,8 +193,8 @@ public boolean usePageChecksumVerification() { return usePageChecksumVerification; } - public boolean eagerlyReadFullRowGroup() { - return eagerlyReadFullRowGroup; + public int columnChunkBufferSize() { + return columnChunkBufferSize; } public FilterCompat.Filter getRecordFilter() { @@ -259,7 +259,7 @@ public static class Builder { protected boolean usePageChecksumVerification = PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT; protected boolean useBloomFilter = BLOOM_FILTER_ENABLED_DEFAULT; protected boolean useOffHeapDecryptBuffer = USE_OFF_HEAP_DECRYPT_BUFFER_DEFAULT; - protected boolean eagerlyReadFullRowGroup = EAGERLY_READ_FULL_ROW_GROUP_DEFAULT; + protected int columnChunkBufferSize = COLUMN_CHUNK_BUFFER_SIZE_DEFAULT; protected FilterCompat.Filter recordFilter = null; protected ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER; // the page size parameter isn't used when only using the codec factory to get decompressors @@ -285,10 +285,10 @@ public Builder(ParquetConfiguration conf) { usePageChecksumVerification(conf.getBoolean(PAGE_VERIFY_CHECKSUM_ENABLED, usePageChecksumVerification)); useBloomFilter(conf.getBoolean(BLOOM_FILTERING_ENABLED, true)); useOffHeapDecryptBuffer(conf.getBoolean(OFF_HEAP_DECRYPT_BUFFER_ENABLED, false)); - eagerlyReadFullRowGroup(conf.getBoolean(EAGERLY_READ_FULL_ROW_GROUP, true)); withCodecFactory(HadoopCodecs.newFactory(conf, 0)); withRecordFilter(getFilter(conf)); withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608)); + withColumnChunkBufferSize(conf.getInt(COLUMN_CHUNK_BUFFER_SIZE, -1)); String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY); if (badRecordThresh != null) { set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh); @@ -367,12 +367,8 @@ public Builder useOffHeapDecryptBuffer(boolean useOffHeapDecryptBuffer) { return this; } - public Builder eagerlyReadFullRowGroup() { - return eagerlyReadFullRowGroup(true); - } - - public Builder eagerlyReadFullRowGroup(boolean eagerlyReadFullRowGroup) { - this.eagerlyReadFullRowGroup = eagerlyReadFullRowGroup; + public Builder withColumnChunkBufferSize(int columnChunkBufferSize) { + this.columnChunkBufferSize = columnChunkBufferSize; return this; } @@ -448,6 +444,7 @@ public Builder copy(ParquetReadOptions options) { withPageChecksumVerification(options.usePageChecksumVerification); withDecryption(options.fileDecryptionProperties); withMetricsCallback(options.metricsCallback); + withColumnChunkBufferSize(options.columnChunkBufferSize); conf = options.conf; for (Map.Entry keyValue : options.properties.entrySet()) { set(keyValue.getKey(), keyValue.getValue()); @@ -473,7 +470,7 @@ public ParquetReadOptions build() { usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, - eagerlyReadFullRowGroup, + columnChunkBufferSize, recordFilter, metadataFilter, codecFactory, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 52f106c93c..d6a9567f19 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -66,6 +66,7 @@ import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.ByteBufferReleaser; import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.LazyByteBufferInputStream; import org.apache.parquet.bytes.ReusingByteBufferAllocator; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.DataPage; @@ -73,6 +74,7 @@ import org.apache.parquet.column.page.DataPageV2; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.DictionaryPageReadStore; +import org.apache.parquet.column.page.Page; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter; import org.apache.parquet.column.values.bloomfilter.BloomFilter; @@ -1092,10 +1094,14 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE } } // actually read all the chunks + final ParquetFileStream parquetFileStream = new ParquetFileStream(currentParts.chunks); + ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount()); + for (ConsecutivePartList consecutiveChunks : allParts) { - consecutiveChunks.readAll(f, builder); + consecutiveChunks.readAll(parquetFileStream, builder); } + rowGroup.setReleaser(builder.releaser); for (Chunk chunk : builder.build()) { readChunkPages(chunk, block, rowGroup); @@ -1251,8 +1257,9 @@ private ColumnChunkPageReadStore internalReadFilteredRowGroup( } } // actually read all the chunks + final ParquetFileStream parquetFileStream = new ParquetFileStream(currentParts.chunks); for (ConsecutivePartList consecutiveChunks : allParts) { - consecutiveChunks.readAll(f, builder); + consecutiveChunks.readAll(parquetFileStream, builder); } rowGroup.setReleaser(builder.releaser); for (Chunk chunk : builder.build()) { @@ -1276,7 +1283,7 @@ private void readChunkPages(Chunk chunk, BlockMetaData block, ColumnChunkPageRea } else { // encrypted column rowGroup.addColumn( chunk.descriptor.col, - chunk.readAllPages( // @Todo this must be made lazy too? + chunk.readAllPages( columnDecryptionSetup.getMetaDataDecryptor(), columnDecryptionSetup.getDataDecryptor(), fileDecryptor.getFileAAD(), @@ -1614,13 +1621,262 @@ public void close() throws IOException { } } + private class PageWithHeader { + final Page page; + final PageHeader header; + + public PageWithHeader(PageHeader header, Page page) { + this.page = page; + this.header = header; + } + } + + abstract class ChunkStream { + private StreamT stream; + + StreamT getStream() { + if (stream == null) { + try { + this.stream = computeStream(); + } catch (IOException e) { + throw new ParquetDecodingException("Failed to initialize ParquetFileStream", e); + } + } + return this.stream; + } + + abstract StreamT computeStream() throws IOException; + + abstract List sliceBuffers(int len) throws IOException; + + abstract CombinatorT getCombinator(); + + // @Todo clean up typing/generic type casting... + abstract void combine(ChunkStream other); + + PageWithHeader readNextPage( + Chunk chunk, PrimitiveType type, BlockCipher.Decryptor blockDecryptor, byte[] pageHeaderAAD) + throws IOException { + final PageHeader pageHeader = chunk.readPageHeader(blockDecryptor, pageHeaderAAD); + + final int uncompressedPageSize = pageHeader.getUncompressed_page_size(); + final int compressedPageSize = pageHeader.getCompressed_page_size(); + Page page; + switch (pageHeader.type) { + case DATA_PAGE: + DataPageHeader dataHeaderV1 = pageHeader.getData_page_header(); + final BytesInput pageV1Bytes = chunk.readAsBytesInput(pageHeader.compressed_page_size); + if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) { + verifyCrc( + pageHeader.getCrc(), + pageV1Bytes, + "could not verify page integrity, CRC checksum verification failed"); + } + page = new DataPageV1( + pageV1Bytes, + dataHeaderV1.getNum_values(), + uncompressedPageSize, + converter.fromParquetStatistics( + getFileMetaData().getCreatedBy(), dataHeaderV1.getStatistics(), type), + converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()), + converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()), + converter.getEncoding(dataHeaderV1.getEncoding())); + break; + case DATA_PAGE_V2: + DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2(); + int dataSize = compressedPageSize + - dataHeaderV2.getRepetition_levels_byte_length() + - dataHeaderV2.getDefinition_levels_byte_length(); + final BytesInput repetitionLevels = + chunk.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()); + final BytesInput definitionLevels = + chunk.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()); + final BytesInput values = chunk.readAsBytesInput(dataSize); + final BytesInput pageV2Bytes; + if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) { + pageV2Bytes = BytesInput.concat(repetitionLevels, definitionLevels, values); + verifyCrc( + pageHeader.getCrc(), + pageV2Bytes, + "could not verify page integrity, CRC checksum verification failed"); + } + + page = new DataPageV2( + dataHeaderV2.getNum_rows(), + dataHeaderV2.getNum_nulls(), + dataHeaderV2.getNum_values(), + repetitionLevels, + definitionLevels, + converter.getEncoding(dataHeaderV2.getEncoding()), + values, + uncompressedPageSize, + converter.fromParquetStatistics( + getFileMetaData().getCreatedBy(), dataHeaderV2.getStatistics(), type), + dataHeaderV2.isIs_compressed()); + break; + case DICTIONARY_PAGE: + final BytesInput dictPageBytes = chunk.readAsBytesInput(compressedPageSize); + if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) { + verifyCrc( + pageHeader.getCrc(), + dictPageBytes, + "could not verify dictionary page integrity, CRC checksum verification failed"); + } + final DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header(); + page = new DictionaryPage( + dictPageBytes, + uncompressedPageSize, + dicHeader.getNum_values(), + converter.getEncoding(dicHeader.getEncoding())); + break; + default: + LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize); + stream.skip(compressedPageSize); + page = null; + } + + if (page != null) { + // Copy crc to new page, used for testing + if (pageHeader.isSetCrc()) { + page.setCrc(pageHeader.getCrc()); + } + } + + return new PageWithHeader(pageHeader, page); + } + } + + class ParquetFileStream { + final List chunkDescriptors; + + ParquetFileStream(List chunkDescriptors) { + this.chunkDescriptors = chunkDescriptors; + } + + void createEagerChunkStream(ChunkListBuilder builder, long offset, long length) throws IOException { + f.seek(offset); + + int fullAllocations = Math.toIntExact(length / options.getMaxAllocationSize()); + int lastAllocationSize = Math.toIntExact(length % options.getMaxAllocationSize()); + + int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0); + List buffers = new ArrayList<>(numAllocations); + + for (int i = 0; i < fullAllocations; i += 1) { + buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize())); + } + + if (lastAllocationSize > 0) { + buffers.add(options.getAllocator().allocate(lastAllocationSize)); + } + + builder.addBuffersToRelease(buffers); + + long readStart = System.nanoTime(); + for (ByteBuffer buffer : buffers) { + f.readFully(buffer); + buffer.flip(); + } + + ParquetMetricsCallback metricsCallback = options.getMetricsCallback(); + if (metricsCallback != null) { + long totalFileReadTimeNs = Math.max(System.nanoTime() - readStart, 0); + double sizeInMb = ((double) length) / (1024 * 1024); + double timeInSec = ((double) totalFileReadTimeNs) / 1000_0000_0000L; + double throughput = sizeInMb / timeInSec; + LOG.debug( + "Parquet: File Read stats: Length: {} MB, Time: {} secs, throughput: {} MB/sec ", + sizeInMb, + timeInSec, + throughput); + metricsCallback.setDuration(ParquetFileReaderMetrics.ReadTime.name(), totalFileReadTimeNs); + metricsCallback.setValueLong(ParquetFileReaderMetrics.ReadSize.name(), length); + metricsCallback.setValueDouble(ParquetFileReaderMetrics.ReadThroughput.name(), throughput); + } + final ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers); + for (final ChunkDescriptor descriptor : chunkDescriptors) { + final List chunkBuffers = stream.sliceBuffers(descriptor.size); + builder.add( + descriptor, + new ChunkStream, ByteBufferInputStream>() { + @Override + ByteBufferInputStream computeStream() { + return ByteBufferInputStream.wrap(chunkBuffers); + } + + @Override + List sliceBuffers(int len) throws IOException { + return getStream().sliceBuffers(len); + } + + @Override + List getCombinator() { + return chunkBuffers; + } + + @Override + void combine(ChunkStream other) { + chunkBuffers.addAll((List) other.getCombinator()); + } + }, + f); + } + } + + // @Todo: how to implement read metrics for lazy reads? + void createLazyChunkStream(ChunkListBuilder builder, int bufferSizeBytes) throws IOException { + for (final ChunkDescriptor descriptor : chunkDescriptors) { + builder.add( + descriptor, + new ChunkStream() { + private Long chunkLength = descriptor.size; + + @Override + LazyByteBufferInputStream computeStream() throws IOException { + final LazyByteBufferInputStream stream = LazyByteBufferInputStream.wrap( + f, options.getAllocator(), descriptor.fileOffset, chunkLength, bufferSizeBytes); + builder.addBuffersToRelease(Collections.singletonList(stream.getDelegate())); + return stream; + } + + @Override + List sliceBuffers(int len) throws IOException { + return getStream().sliceBuffers(len); + } + + @Override + Long getCombinator() { + return chunkLength; + } + + @Override + void combine(ChunkStream other) { + this.chunkLength += (Long) other.getCombinator(); + } + }, + f); + } + } + } + + private void verifyCrc(int referenceCrc, BytesInput bytes, String exceptionMsg) { + crc.reset(); + try (ByteBufferReleaser releaser = crcAllocator.getReleaser()) { + crc.update(bytes.toByteBuffer(releaser)); + } + if (crc.getValue() != ((long) referenceCrc & 0xffffffffL)) { + throw new ParquetDecodingException(exceptionMsg); + } + } + /* * Builder to concatenate the buffers of the discontinuous parts for the same column. These parts are generated as a * result of the column-index based filtering when some pages might be skipped at reading. */ private class ChunkListBuilder { + private class ChunkData { - final List buffers = new ArrayList<>(); + ChunkStream stream; OffsetIndex offsetIndex; } @@ -1634,8 +1890,15 @@ public ChunkListBuilder(long rowCount) { this.rowCount = rowCount; } - void add(ChunkDescriptor descriptor, List buffers, SeekableInputStream f) { - map.computeIfAbsent(descriptor, d -> new ChunkData()).buffers.addAll(buffers); + void add(ChunkDescriptor descriptor, ChunkStream chunkStream, SeekableInputStream f) { + final ChunkData chunkData = map.computeIfAbsent(descriptor, d -> new ChunkData()); + if (chunkData.stream == null) { + chunkData.stream = chunkStream; + } else { + chunkData.stream.combine(chunkStream); + } + + map.put(descriptor, chunkData); lastDescriptor = descriptor; this.f = f; } @@ -1656,9 +1919,9 @@ List build() { ChunkData data = entry.getValue(); if (descriptor.equals(lastDescriptor)) { // because of a bug, the last chunk might be larger than descriptor.size - chunks.add(new WorkaroundChunk(lastDescriptor, data.buffers, f, data.offsetIndex, rowCount)); + chunks.add(new WorkaroundChunk(lastDescriptor, data.stream, f, data.offsetIndex, rowCount)); } else { - chunks.add(new Chunk(descriptor, data.buffers, data.offsetIndex, rowCount)); + chunks.add(new Chunk(descriptor, data.stream, data.offsetIndex, rowCount)); } } return chunks; @@ -1671,18 +1934,18 @@ List build() { private class Chunk { protected final ChunkDescriptor descriptor; - protected final ByteBufferInputStream stream; + protected final ChunkStream chunkStream; final OffsetIndex offsetIndex; final long rowCount; /** * @param descriptor descriptor for the chunk - * @param buffers ByteBuffers that contain the chunk * @param offsetIndex the offset index for this column; might be null */ - public Chunk(ChunkDescriptor descriptor, List buffers, OffsetIndex offsetIndex, long rowCount) { + public Chunk( + ChunkDescriptor descriptor, ChunkStream chunkStream, OffsetIndex offsetIndex, long rowCount) { this.descriptor = descriptor; - this.stream = ByteBufferInputStream.wrap(buffers); + this.chunkStream = chunkStream; this.offsetIndex = offsetIndex; this.rowCount = rowCount; } @@ -1693,21 +1956,7 @@ protected PageHeader readPageHeader() throws IOException { protected PageHeader readPageHeader(BlockCipher.Decryptor blockDecryptor, byte[] pageHeaderAAD) throws IOException { - return Util.readPageHeader(stream, blockDecryptor, pageHeaderAAD); - } - - /** - * Calculate checksum of input bytes, throw decoding exception if it does not match the provided - * reference crc - */ - private void verifyCrc(int referenceCrc, BytesInput bytes, String exceptionMsg) { - crc.reset(); - try (ByteBufferReleaser releaser = crcAllocator.getReleaser()) { - crc.update(bytes.toByteBuffer(releaser)); - } - if (crc.getValue() != ((long) referenceCrc & 0xffffffffL)) { - throw new ParquetDecodingException(exceptionMsg); - } + return Util.readPageHeader(chunkStream.getStream(), blockDecryptor, pageHeaderAAD); } /** @@ -1736,13 +1985,14 @@ public ColumnChunkPageReader readAllPages( moduleAAD = AesCipher.createModuleAAD( aadPrefix, ModuleType.DataPageHeader, rowGroupOrdinal, columnOrdinal, getPageOrdinal(0)); } + final Chunk chunk = this; final PageIterator iterator = new PageIterator(moduleAAD) { private long valuesCountReadSoFar = 0L; private int dataPageCountReadSoFar = 0; - private DictionaryPage dictionaryPage = null; - private PageHeader currentPageHeader = null; + private DictionaryPage dictionaryPage; + private PageWithHeader nextPage = null; private boolean bufferedToFirstDataPage = false; @Override @@ -1758,18 +2008,10 @@ public boolean hasNext() { if (!bufferedToFirstDataPage) { bufferNextDataPage(); } - return hasMorePages(); - } - - private BytesInput readAsBytesInput(int size) { - try { - return BytesInput.from(stream.sliceBuffers(size)); - } catch (IOException e) { - throw new ParquetDecodingException("Failed to read page bytes", e); - } + return nextPage != null; } - private boolean hasMorePages() { + private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) { return offsetIndex == null ? valuesCountReadSoFar < descriptor.metadata.getValueCount() : dataPageCountReadSoFar < offsetIndex.getPageCount(); @@ -1777,26 +2019,12 @@ private boolean hasMorePages() { private void bufferNextDataPage() { bufferedToFirstDataPage = true; - while (true) { - if (!hasMorePages()) { - this.currentPageHeader = null; - // Done reading, validate all bytes have been read - if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCount()) { - // Would be nice to have a CorruptParquetFileException or something as a subclass? - throw new ParquetDecodingException(new IOException("Expected " - + descriptor.metadata.getValueCount() - + " values in column chunk at " + getPath() - + " offset " + descriptor.metadata.getFirstDataPageOffset() + " but got " - + valuesCountReadSoFar + " values instead over " + dataPageCountReadSoFar - + " pages ending at file offset " - + (descriptor.fileOffset + stream.position()))); - } + if (!hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) { return; } - try { - byte[] pageHeaderAAD = this.dataPageHeaderAAD; + byte[] pageHeaderAAD = dataPageHeaderAAD; if (null != headerBlockDecryptor) { // Important: this verifies file integrity (makes sure dictionary page had not been // removed) @@ -1809,138 +2037,45 @@ private void bufferNextDataPage() { -1); } else { int pageOrdinal = getPageOrdinal(dataPageCountReadSoFar); - AesCipher.quickUpdatePageAAD(this.dataPageHeaderAAD, pageOrdinal); + AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal); } } - currentPageHeader = readPageHeader(headerBlockDecryptor, pageHeaderAAD); - } catch (IOException e) { - throw new ParquetDecodingException("Exception reading page header", e); - } - - final int compressedPageSize = currentPageHeader.getCompressed_page_size(); - final PageType pageType = currentPageHeader.type; - if (pageType == PageType.DATA_PAGE || pageType == PageType.DATA_PAGE_V2) { - return; - } - - final int uncompressedPageSize = currentPageHeader.getUncompressed_page_size(); - if (pageType == PageType.DICTIONARY_PAGE) { - final BytesInput pageBytes = readAsBytesInput(compressedPageSize); - if (options.usePageChecksumVerification() && currentPageHeader.isSetCrc()) { - verifyCrc( - currentPageHeader.getCrc(), - pageBytes, - "could not verify dictionary page integrity, CRC checksum verification failed"); + PageWithHeader pageWithHeader = + chunkStream.readNextPage(chunk, type, headerBlockDecryptor, pageHeaderAAD); + if (pageWithHeader == null) { + return; } - DictionaryPageHeader dicHeader = currentPageHeader.getDictionary_page_header(); - this.dictionaryPage = new DictionaryPage( - pageBytes, - uncompressedPageSize, - dicHeader.getNum_values(), - converter.getEncoding(dicHeader.getEncoding())); - // Copy crc to new page, used for testing - if (currentPageHeader.isSetCrc()) { - dictionaryPage.setCrc(currentPageHeader.getCrc()); - } - } else { - LOG.debug( - "skipping page of type {} of size {}", - currentPageHeader.getType(), - compressedPageSize); - try { - stream.skipFully(compressedPageSize); - } catch (IOException e) { - throw new ParquetDecodingException(e); + + if (pageWithHeader.header.type == PageType.DICTIONARY_PAGE) { + this.dictionaryPage = (DictionaryPage) pageWithHeader.page; + } else if (pageWithHeader.header.type == PageType.DATA_PAGE + || pageWithHeader.header.type == PageType.DATA_PAGE_V2) { + this.nextPage = pageWithHeader; + return; } + } catch (IOException e) { + throw new ParquetDecodingException(e); } } } @Override public DataPage next() { - if (currentPageHeader == null) { + if (!bufferedToFirstDataPage) { bufferNextDataPage(); } - - int uncompressedPageSize = currentPageHeader.getUncompressed_page_size(); - int compressedPageSize = currentPageHeader.getCompressed_page_size(); - - final DataPage page; - final BytesInput pageBytes; - switch (currentPageHeader.type) { - case DATA_PAGE: - DataPageHeader dataHeaderV1 = currentPageHeader.getData_page_header(); - pageBytes = readAsBytesInput(compressedPageSize); - if (options.usePageChecksumVerification() && currentPageHeader.isSetCrc()) { - verifyCrc( - currentPageHeader.getCrc(), - pageBytes, - "could not verify page integrity, CRC checksum verification failed"); - } - DataPageV1 dataPageV1 = new DataPageV1( - pageBytes, - dataHeaderV1.getNum_values(), - uncompressedPageSize, - converter.fromParquetStatistics( - getFileMetaData().getCreatedBy(), dataHeaderV1.getStatistics(), type), - converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()), - converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()), - converter.getEncoding(dataHeaderV1.getEncoding())); - // Copy crc to new page, used for testing - if (currentPageHeader.isSetCrc()) { - dataPageV1.setCrc(currentPageHeader.getCrc()); - } - valuesCountReadSoFar += dataHeaderV1.getNum_values(); - ++dataPageCountReadSoFar; - - page = dataPageV1; - break; - case DATA_PAGE_V2: - DataPageHeaderV2 dataHeaderV2 = currentPageHeader.getData_page_header_v2(); - int dataSize = compressedPageSize - - dataHeaderV2.getRepetition_levels_byte_length() - - dataHeaderV2.getDefinition_levels_byte_length(); - final BytesInput repetitionLevels = - readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()); - final BytesInput definitionLevels = - readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()); - final BytesInput values = readAsBytesInput(dataSize); - - if (options.usePageChecksumVerification() && currentPageHeader.isSetCrc()) { - pageBytes = BytesInput.concat(repetitionLevels, definitionLevels, values); - verifyCrc( - currentPageHeader.getCrc(), - pageBytes, - "could not verify page integrity, CRC checksum verification failed"); - } - DataPageV2 dataPageV2 = new DataPageV2( - dataHeaderV2.getNum_rows(), - dataHeaderV2.getNum_nulls(), - dataHeaderV2.getNum_values(), - repetitionLevels, - definitionLevels, - converter.getEncoding(dataHeaderV2.getEncoding()), - values, - uncompressedPageSize, - converter.fromParquetStatistics( - getFileMetaData().getCreatedBy(), dataHeaderV2.getStatistics(), type), - dataHeaderV2.isIs_compressed()); - // Copy crc to new page, used for testing - if (currentPageHeader.isSetCrc()) { - dataPageV2.setCrc(currentPageHeader.getCrc()); - } - - valuesCountReadSoFar += dataHeaderV2.getNum_values(); - ++dataPageCountReadSoFar; - page = dataPageV2; - break; - default: - throw new IllegalStateException("Encountered unknown page type " + currentPageHeader.type); + if (nextPage == null) { + return null; } - currentPageHeader = null; - return page; + valuesCountReadSoFar += ((DataPage) nextPage.page).getValueCount(); + ++dataPageCountReadSoFar; + + final DataPage next = (DataPage) nextPage.page; + nextPage = null; + bufferNextDataPage(); + return next; } }; @@ -1958,7 +2093,7 @@ public DataPage next() { rowGroupOrdinal, columnOrdinal, options, - options.eagerlyReadFullRowGroup()); + options.columnChunkBufferSize() <= 0); } private int getPageOrdinal(int dataPageCountReadSoFar) { @@ -1969,13 +2104,8 @@ private int getPageOrdinal(int dataPageCountReadSoFar) { return offsetIndex.getPageOrdinal(dataPageCountReadSoFar); } - /** - * @param size the size of the page - * @return the page - * @throws IOException if there is an error while reading from the file stream - */ public BytesInput readAsBytesInput(int size) throws IOException { - return BytesInput.from(stream.sliceBuffers(size)); + return BytesInput.from(chunkStream.sliceBuffers(size)); } } @@ -1993,44 +2123,41 @@ abstract static class PageIterator implements Iterator { * deals with a now fixed bug where compressedLength was missing a few bytes. */ private class WorkaroundChunk extends Chunk { - - private final SeekableInputStream f; - + SeekableInputStream f; /** * @param descriptor the descriptor of the chunk - * @param f the file stream positioned at the end of this chunk */ private WorkaroundChunk( ChunkDescriptor descriptor, - List buffers, + ChunkStream stream, SeekableInputStream f, OffsetIndex offsetIndex, long rowCount) { - super(descriptor, buffers, offsetIndex, rowCount); + super(descriptor, stream, offsetIndex, rowCount); this.f = f; } protected PageHeader readPageHeader() throws IOException { PageHeader pageHeader; - stream.mark(8192); // headers should not be larger than 8k + chunkStream.getStream().mark(8192); // headers should not be larger than 8k try { - pageHeader = Util.readPageHeader(stream); + pageHeader = Util.readPageHeader(chunkStream.stream); } catch (IOException e) { // this is to workaround a bug where the compressedLength // of the chunk is missing the size of the header of the dictionary // to allow reading older files (using dictionary) we need this. // usually 13 to 19 bytes are missing // if the last page is smaller than this, the page header itself is truncated in the buffer. - stream.reset(); // resetting the buffer to the position before we got the error + chunkStream.getStream().reset(); // resetting the buffer to the position before we got the error LOG.info("completing the column chunk to read the page header"); - pageHeader = Util.readPageHeader( - new SequenceInputStream(stream, f)); // trying again from the buffer + remainder of the stream. + pageHeader = Util.readPageHeader(new SequenceInputStream( + chunkStream.getStream(), f)); // trying again from the buffer + remainder of the stream. } return pageHeader; } public BytesInput readAsBytesInput(int size) throws IOException { - int available = stream.available(); + int available = chunkStream.getStream().available(); if (size > available) { // this is to workaround a bug where the compressedLength // of the chunk is missing the size of the header of the dictionary @@ -2038,8 +2165,7 @@ public BytesInput readAsBytesInput(int size) throws IOException { // usually 13 to 19 bytes are missing int missingBytes = size - available; LOG.info("completed the column chunk with {} bytes", missingBytes); - - List streamBuffers = stream.sliceBuffers(available); + List streamBuffers = chunkStream.sliceBuffers(available); ByteBuffer lastBuffer = ByteBuffer.allocate(missingBytes); f.readFully(lastBuffer); @@ -2125,58 +2251,15 @@ public void addChunk(ChunkDescriptor descriptor) { } /** - * @param f file to read the chunks from + * @param fileStream file to read the chunks from * @param builder used to build chunk list to read the pages for the different columns * @throws IOException if there is an error while reading from the stream */ - public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException { - f.seek(offset); - - int fullAllocations = Math.toIntExact(length / options.getMaxAllocationSize()); - int lastAllocationSize = Math.toIntExact(length % options.getMaxAllocationSize()); - - int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0); - List buffers = new ArrayList<>(numAllocations); - - for (int i = 0; i < fullAllocations; i += 1) { - buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize())); - } - - if (lastAllocationSize > 0) { - buffers.add(options.getAllocator().allocate(lastAllocationSize)); - } - builder.addBuffersToRelease(buffers); - - long readStart = System.nanoTime(); - for (ByteBuffer buffer : buffers) { - f.readFully(buffer); - buffer.flip(); - } - setReadMetrics(readStart); - - // report in a counter the data we just scanned - BenchmarkCounter.incrementBytesRead(length); - ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers); - for (final ChunkDescriptor descriptor : chunks) { - builder.add(descriptor, stream.sliceBuffers(descriptor.size), f); - } - } - - private void setReadMetrics(long startNs) { - ParquetMetricsCallback metricsCallback = options.getMetricsCallback(); - if (metricsCallback != null) { - long totalFileReadTimeNs = Math.max(System.nanoTime() - startNs, 0); - double sizeInMb = ((double) length) / (1024 * 1024); - double timeInSec = ((double) totalFileReadTimeNs) / 1000_0000_0000L; - double throughput = sizeInMb / timeInSec; - LOG.debug( - "Parquet: File Read stats: Length: {} MB, Time: {} secs, throughput: {} MB/sec ", - sizeInMb, - timeInSec, - throughput); - metricsCallback.setDuration(ParquetFileReaderMetrics.ReadTime.name(), totalFileReadTimeNs); - metricsCallback.setValueLong(ParquetFileReaderMetrics.ReadSize.name(), length); - metricsCallback.setValueDouble(ParquetFileReaderMetrics.ReadThroughput.name(), throughput); + public void readAll(ParquetFileStream fileStream, ChunkListBuilder builder) throws IOException { + if (options.columnChunkBufferSize() <= 0) { + fileStream.createEagerChunkStream(builder, offset, length); + } else { + fileStream.createLazyChunkStream(builder, options.columnChunkBufferSize()); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index 104aedecf6..95e22190e9 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -147,7 +147,7 @@ public class ParquetInputFormat extends FileInputFormat { */ public static final String OFF_HEAP_DECRYPT_BUFFER_ENABLED = "parquet.decrypt.off-heap.buffer.enabled"; - public static final String EAGERLY_READ_FULL_ROW_GROUP = "parquet.read.materializerowgroup"; + public static final String COLUMN_CHUNK_BUFFER_SIZE = "parquet.read.column-chunk-buffer-size"; /** * key to turn on or off task side metadata loading (default true) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index ffa31edec9..87609d306d 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -356,7 +356,7 @@ public void testWriteReadLazy() throws Exception { final ParquetMetadata readFooter = ParquetFileReader.readFooter(readConfiguration, path); final BlockMetaData rowGroup = readFooter.getBlocks().get(0); - readConfiguration.setBoolean(ParquetInputFormat.EAGERLY_READ_FULL_ROW_GROUP, false); + readConfiguration.setInt(ParquetInputFormat.COLUMN_CHUNK_BUFFER_SIZE, 25); { // read first block of col #1 try (ParquetFileReader r = new ParquetFileReader( @@ -1197,7 +1197,9 @@ private void validateContains(MessageType schema, PageReadStore pages, String[] PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path)); DataPage page = pageReader.readPage(); assertEquals(values, page.getValueCount()); - assertArrayEquals(bytes.toByteArray(), ((DataPageV1) page).getBytes().toByteArray()); + byte[] expected = bytes.toByteArray(); + byte[] actual = ((DataPageV1) page).getBytes().toByteArray(); + assertArrayEquals(expected, actual); } @Test diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java index 569b916821..ad11de162c 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java @@ -159,7 +159,10 @@ private List readUsers( long rangeEnd) throws IOException { Configuration conf = new Configuration(); - conf.setBoolean(ParquetInputFormat.EAGERLY_READ_FULL_ROW_GROUP, materializeRowGroup); + if (!materializeRowGroup) { + // @Todo parameterize this value? + conf.setInt(ParquetInputFormat.COLUMN_CHUNK_BUFFER_SIZE, 25); + } return PhoneBookWriter.readUsers( ParquetReader.builder(new GroupReadSupport(), file) .withAllocator(allocator)