From 458a92c36a0f7180f8f47bf6c52e11cab5576954 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Thu, 19 Oct 2023 17:29:17 +0800 Subject: [PATCH] interface IndexCache --- .../org/apache/parquet/hadoop/IndexCache.java | 98 ++++++++++ .../apache/parquet/hadoop/NoneIndexCache.java | 63 +++++++ ...dexCacher.java => PrefetchIndexCache.java} | 94 ++++++---- .../hadoop/rewrite/ParquetRewriter.java | 31 ++-- .../hadoop/rewrite/RewriteOptions.java | 26 +-- .../apache/parquet/hadoop/TestIndexCache.java | 168 ++++++++++++++++++ .../hadoop/rewrite/ParquetRewriterTest.java | 25 +-- 7 files changed, 428 insertions(+), 77 deletions(-) create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/IndexCache.java create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/NoneIndexCache.java rename parquet-hadoop/src/main/java/org/apache/parquet/hadoop/{rewrite/IndexCacher.java => PrefetchIndexCache.java} (56%) create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/IndexCache.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/IndexCache.java new file mode 100644 index 0000000000..365900c43d --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/IndexCache.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; + +import java.io.IOException; +import java.util.Set; + +/** + * A cache for caching indexes(including: ColumnIndex, OffsetIndex and BloomFilter) + */ +public interface IndexCache { + + enum CacheStrategy { + NONE, /* No cache */ + PRECACHE_BLOCK /* Precache for block indexes */ + } + + /** + * Create an index cache for the given file reader + * + * @param fileReader the file reader + * @param columns the columns that need to do cache + * @param cacheStrategy the cache strategy, supports NONE and PRECACHE_BLOCK + * @return the index cache + */ + static IndexCache create( + ParquetFileReader fileReader, + Set columns, + CacheStrategy cacheStrategy) { + if (cacheStrategy == CacheStrategy.NONE) { + return new NoneIndexCache(fileReader); + } else if (cacheStrategy == CacheStrategy.PRECACHE_BLOCK) { + return new PrefetchIndexCache(fileReader, columns); + } else { + throw new UnsupportedOperationException("Unknown cache strategy: " + cacheStrategy); + } + } + + /** + * Set the current BlockMetadata + */ + void setBlockMetadata(BlockMetaData currentBlockMetadata) throws IOException; + + /** + * Get the ColumnIndex for the given column in the set row group. + * + * @param chunk the given column chunk + * @return the ColumnIndex for the given column + * @throws IOException if any I/O error occurs during get the ColumnIndex + */ + ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws IOException; + + /** + * Get the OffsetIndex for the given column in the set row group. + * + * @param chunk the given column chunk + * @return the OffsetIndex for the given column + * @throws IOException if any I/O error occurs during get the OffsetIndex + */ + OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws IOException; + + /** + * Get the BloomFilter for the given column in the set row group. + * + * @param chunk the given column chunk + * @return the BloomFilter for the given column + * @throws IOException if any I/O error occurs during get the BloomFilter + */ + BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws IOException; + + /** + * Clean the cache + */ + void clean(); +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/NoneIndexCache.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/NoneIndexCache.java new file mode 100644 index 0000000000..e1aded1990 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/NoneIndexCache.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; + +import java.io.IOException; + +/** + * Cache nothing. All the get methods are pushed to ParquetFileReader to read the given index. + */ +class NoneIndexCache implements IndexCache { + private final ParquetFileReader fileReader; + + NoneIndexCache(ParquetFileReader fileReader) { + this.fileReader = fileReader; + } + + @Override + public void setBlockMetadata(BlockMetaData currentBlockMetadata) throws IOException { + // Do nothing + } + + @Override + public ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws IOException { + return fileReader.readColumnIndex(chunk); + } + + @Override + public OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws IOException { + return fileReader.readOffsetIndex(chunk); + } + + @Override + public BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws IOException { + return fileReader.readBloomFilter(chunk); + } + + @Override + public void clean() { + // Do nothing + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/IndexCacher.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrefetchIndexCache.java similarity index 56% rename from parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/IndexCacher.java rename to parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrefetchIndexCache.java index 8beedee02b..ac829ef375 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/IndexCacher.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrefetchIndexCache.java @@ -16,10 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.parquet.hadoop.rewrite; +package org.apache.parquet.hadoop; +import org.apache.parquet.Preconditions; import org.apache.parquet.column.values.bloomfilter.BloomFilter; -import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; @@ -32,61 +32,79 @@ import java.util.Set; /** - * A cacher for caching file indexes(ColumnIndex, OffsetIndex, BloomFilter) + * This index cache will prefetch those columns' indexes when calling {@link #setBlockMetadata(BlockMetaData)}. + *

+ * + * Note: the given index will be freed from the cache after calling the related get method. */ -class IndexCacher { +class PrefetchIndexCache implements IndexCache { private final ParquetFileReader fileReader; - private final Set columnPathSet; - private final boolean prefetchBlockAllIndexes; + private final Set columns; - // Only used when prefetchBlockAllIndexes is true private Map columnIndexCache; private Map offsetIndexCache; private Map bloomIndexCache; - IndexCacher( + PrefetchIndexCache( ParquetFileReader fileReader, - Set columnPathSet, - boolean prefetchBlockAllIndexes) { + Set columns) { this.fileReader = fileReader; - this.columnPathSet = columnPathSet; - this.prefetchBlockAllIndexes = prefetchBlockAllIndexes; + this.columns = columns; } - void setCurrentBlockMetadata(BlockMetaData blockMetaData) throws IOException { - if (prefetchBlockAllIndexes) { - free(); - this.columnIndexCache = readAllColumnIndexes(blockMetaData); - this.offsetIndexCache = readAllOffsetIndexes(blockMetaData); - this.bloomIndexCache = readAllBloomFilters(blockMetaData); - } + @Override + public void setBlockMetadata(BlockMetaData currentBlockMetadata) throws IOException { + this.columnIndexCache = readAllColumnIndexes(currentBlockMetadata); + this.offsetIndexCache = readAllOffsetIndexes(currentBlockMetadata); + this.bloomIndexCache = readAllBloomFilters(currentBlockMetadata); } - ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws IOException { - if (prefetchBlockAllIndexes) { - return columnIndexCache.remove(chunk.getPath()); + @Override + public ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws IOException { + ColumnPath columnPath = chunk.getPath(); + if (columns.contains(columnPath)) { + Preconditions.checkState( + columnIndexCache.containsKey(columnPath), + "Not found cached ColumnIndex for column: %s with cache strategy: %s", + columnPath.toDotString(), + CacheStrategy.PRECACHE_BLOCK); } - return fileReader.readColumnIndex(chunk); + return columnIndexCache.remove(columnPath); } - OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws IOException { - if (prefetchBlockAllIndexes) { - return offsetIndexCache.remove(chunk.getPath()); + @Override + public OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws IOException { + ColumnPath columnPath = chunk.getPath(); + + if (columns.contains(columnPath)) { + Preconditions.checkState( + offsetIndexCache.containsKey(columnPath), + "Not found cached OffsetIndex for column: %s with cache strategy: %s", + columnPath.toDotString(), + CacheStrategy.PRECACHE_BLOCK); } - return fileReader.readOffsetIndex(chunk); + return offsetIndexCache.remove(columnPath); } - BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws IOException { - if (prefetchBlockAllIndexes) { - return bloomIndexCache.remove(chunk.getPath()); + @Override + public BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws IOException { + ColumnPath columnPath = chunk.getPath(); + + if (columns.contains(columnPath)) { + Preconditions.checkState( + bloomIndexCache.containsKey(columnPath), + "Not found cached BloomFilter for column: %s with cache strategy: %s", + columnPath.toDotString(), + CacheStrategy.PRECACHE_BLOCK); } - return fileReader.readBloomFilter(chunk); + return bloomIndexCache.remove(columnPath); } - void free() { + @Override + public void clean() { if (columnIndexCache != null) { columnIndexCache.clear(); columnIndexCache = null; @@ -104,9 +122,9 @@ void free() { } private Map readAllColumnIndexes(BlockMetaData blockMetaData) throws IOException { - Map columnIndexMap = new HashMap<>(columnPathSet.size()); + Map columnIndexMap = new HashMap<>(columns.size()); for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) { - if (columnPathSet.contains(chunk.getPath())) { + if (columns.contains(chunk.getPath())) { columnIndexMap.put(chunk.getPath(), fileReader.readColumnIndex(chunk)); } } @@ -115,9 +133,9 @@ private Map readAllColumnIndexes(BlockMetaData blockMet } private Map readAllOffsetIndexes(BlockMetaData blockMetaData) throws IOException { - Map offsetIndexMap = new HashMap<>(columnPathSet.size()); + Map offsetIndexMap = new HashMap<>(columns.size()); for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) { - if (columnPathSet.contains(chunk.getPath())) { + if (columns.contains(chunk.getPath())) { offsetIndexMap.put(chunk.getPath(), fileReader.readOffsetIndex(chunk)); } } @@ -126,9 +144,9 @@ private Map readAllOffsetIndexes(BlockMetaData blockMet } private Map readAllBloomFilters(BlockMetaData blockMetaData) throws IOException { - Map bloomFilterMap = new HashMap<>(columnPathSet.size()); + Map bloomFilterMap = new HashMap<>(columns.size()); for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) { - if (columnPathSet.contains(chunk.getPath())) { + if (columns.contains(chunk.getPath())) { bloomFilterMap.put(chunk.getPath(), fileReader.readBloomFilter(chunk)); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 80b99427c2..8467d4b1e0 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -45,6 +45,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.CodecFactory; import org.apache.parquet.hadoop.ColumnChunkPageWriteStore; +import org.apache.parquet.hadoop.IndexCache; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -117,8 +118,8 @@ public class ParquetRewriter implements Closeable { private String originalCreatedBy = ""; // Unique created_by information from all input files private final Set allOriginalCreatedBys = new HashSet<>(); - // Whether prefetch all block indexes - private final boolean prefetchBlockAllIndexes; + // The index cache strategy + private final IndexCache.CacheStrategy indexCacheStrategy; public ParquetRewriter(RewriteOptions options) throws IOException { Configuration conf = options.getConf(); @@ -161,7 +162,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException { this.encryptMode = true; } - this.prefetchBlockAllIndexes = options.prefetchBlockAllIndexes(); + this.indexCacheStrategy = options.getIndexCacheStrategy(); ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE; writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, writerMode, @@ -196,7 +197,7 @@ public ParquetRewriter(TransParquetFileReader reader, this.maskColumns.put(ColumnPath.fromDotString(col), maskMode); } } - this.prefetchBlockAllIndexes = false; + this.indexCacheStrategy = IndexCache.CacheStrategy.NONE; } // Open all input files to validate their schemas are compatible to merge @@ -256,14 +257,14 @@ public void close() throws IOException { public void processBlocks() throws IOException { while (reader != null) { - IndexCacher indexCacher = new IndexCacher(reader, descriptorsMap.keySet(), prefetchBlockAllIndexes); - processBlocksFromReader(indexCacher); - indexCacher.free(); + IndexCache indexCache = IndexCache.create(reader, descriptorsMap.keySet(), indexCacheStrategy); + processBlocksFromReader(indexCache); + indexCache.clean(); initNextReader(); } } - private void processBlocksFromReader(IndexCacher indexCacher) throws IOException { + private void processBlocksFromReader(IndexCache indexCache) throws IOException { PageReadStore store = reader.readNextRowGroup(); ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, originalCreatedBy); @@ -272,7 +273,7 @@ private void processBlocksFromReader(IndexCacher indexCacher) throws IOException writer.startBlock(store.getRowCount()); BlockMetaData blockMetaData = meta.getBlocks().get(blockId); - indexCacher.setCurrentBlockMetadata(blockMetaData); + indexCache.setBlockMetadata(blockMetaData); List columnsInOrder = blockMetaData.getColumns(); for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) { ColumnChunkMetaData chunk = columnsInOrder.get(i); @@ -328,15 +329,15 @@ private void processBlocksFromReader(IndexCacher indexCacher) throws IOException newCodecName, columnChunkEncryptorRunTime, encryptColumn, - indexCacher.getBloomFilter(chunk), - indexCacher.getColumnIndex(chunk), - indexCacher.getOffsetIndex(chunk)); + indexCache.getBloomFilter(chunk), + indexCache.getColumnIndex(chunk), + indexCache.getOffsetIndex(chunk)); writer.endColumn(); } else { // Nothing changed, simply copy the binary data. - BloomFilter bloomFilter = indexCacher.getBloomFilter(chunk); - ColumnIndex columnIndex = indexCacher.getColumnIndex(chunk); - OffsetIndex offsetIndex = indexCacher.getOffsetIndex(chunk); + BloomFilter bloomFilter = indexCache.getBloomFilter(chunk); + ColumnIndex columnIndex = indexCache.getColumnIndex(chunk); + OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk); writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index 81ebc5b62b..d817b06168 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.Preconditions; import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.hadoop.IndexCache; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import java.util.Arrays; @@ -41,7 +42,7 @@ public class RewriteOptions { private final Map maskColumns; private final List encryptColumns; private final FileEncryptionProperties fileEncryptionProperties; - private final boolean prefetchBlockAllIndexes; + private final IndexCache.CacheStrategy indexCacheStrategy; private RewriteOptions(Configuration conf, List inputFiles, @@ -51,7 +52,7 @@ private RewriteOptions(Configuration conf, Map maskColumns, List encryptColumns, FileEncryptionProperties fileEncryptionProperties, - boolean prefetchBlockAllIndexes) { + IndexCache.CacheStrategy indexCacheStrategy) { this.conf = conf; this.inputFiles = inputFiles; this.outputFile = outputFile; @@ -60,7 +61,7 @@ private RewriteOptions(Configuration conf, this.maskColumns = maskColumns; this.encryptColumns = encryptColumns; this.fileEncryptionProperties = fileEncryptionProperties; - this.prefetchBlockAllIndexes = prefetchBlockAllIndexes; + this.indexCacheStrategy = indexCacheStrategy; } public Configuration getConf() { @@ -95,8 +96,8 @@ public FileEncryptionProperties getFileEncryptionProperties() { return fileEncryptionProperties; } - public boolean prefetchBlockAllIndexes() { - return prefetchBlockAllIndexes; + public IndexCache.CacheStrategy getIndexCacheStrategy() { + return indexCacheStrategy; } // Builder to create a RewriterOptions. @@ -109,7 +110,7 @@ public static class Builder { private Map maskColumns; private List encryptColumns; private FileEncryptionProperties fileEncryptionProperties; - private boolean prefetchBlockAllIndexes; + private IndexCache.CacheStrategy indexCacheStrategy = IndexCache.CacheStrategy.NONE; /** * Create a builder to create a RewriterOptions. @@ -222,15 +223,16 @@ public Builder addInputFile(Path path) { } /** - * Whether enable prefetch block indexes into cache. + * Set the index(ColumnIndex, Offset and BloomFilter) cache strategy. *

- * This could reduce the random seek while rewriting, disabled by default. + * This could reduce the random seek while rewriting with PRECACHE_BLOCK strategy, NONE by default. * - * @param prefetchBlockAllIndexes enable or not + * @param cacheStrategy the index cache strategy, supports: {@link IndexCache.CacheStrategy#NONE} or + * {@link IndexCache.CacheStrategy#PRECACHE_BLOCK} * @return self */ - public Builder prefetchBlockAllIndex(boolean prefetchBlockAllIndexes) { - this.prefetchBlockAllIndexes = prefetchBlockAllIndexes; + public Builder indexCacheStrategy(IndexCache.CacheStrategy cacheStrategy) { + this.indexCacheStrategy = cacheStrategy; return this; } @@ -277,7 +279,7 @@ public RewriteOptions build() { maskColumns, encryptColumns, fileEncryptionProperties, - prefetchBlockAllIndexes); + indexCacheStrategy); } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java new file mode 100644 index 0000000000..bd63fdbedf --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.util.TestFileBuilder; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.LocalInputFile; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; +import static org.apache.parquet.schema.Type.Repetition.REPEATED; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; + +@RunWith(Parameterized.class) +public class TestIndexCache { + private final Configuration conf = new Configuration(); + private final int numRecords = 100000; + private final MessageType schema = new MessageType("schema", + new PrimitiveType(OPTIONAL, INT64, "DocId"), + new PrimitiveType(REQUIRED, BINARY, "Name"), + new PrimitiveType(OPTIONAL, BINARY, "Gender"), + new GroupType(OPTIONAL, "Links", + new PrimitiveType(REPEATED, BINARY, "Backward"), + new PrimitiveType(REPEATED, BINARY, "Forward"))); + + private final ParquetProperties.WriterVersion writerVersion; + + @Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy = {1}") + public static Object[] parameters() { + return new Object[] {"v1", "v2"}; + } + + public TestIndexCache(String writerVersion) { + this.writerVersion = ParquetProperties.WriterVersion.fromString(writerVersion); + } + + @Test + public void testNoneCacheStrategy() throws IOException { + String file = createTestFile("DocID"); + + ParquetReadOptions options = ParquetReadOptions.builder().build(); + ParquetFileReader fileReader = new ParquetFileReader( + new LocalInputFile(Paths.get(file)), options); + IndexCache indexCache = IndexCache.create(fileReader, new HashSet<>(), IndexCache.CacheStrategy.NONE); + Assert.assertTrue(indexCache instanceof NoneIndexCache); + List blocks = fileReader.getFooter().getBlocks(); + for (BlockMetaData blockMetaData : blocks) { + indexCache.setBlockMetadata(blockMetaData); + for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) { + validateColumnIndex(fileReader.readColumnIndex(chunk), indexCache.getColumnIndex(chunk)); + validateOffsetIndex(fileReader.readOffsetIndex(chunk), indexCache.getOffsetIndex(chunk)); + + Assert.assertEquals( + "BloomFilter should match", + fileReader.readBloomFilter(chunk), + indexCache.getBloomFilter(chunk)); + } + } + } + + @Test + public void testPrefetchCacheStrategy() throws IOException { + String file = createTestFile("DocID", "Name"); + + ParquetReadOptions options = ParquetReadOptions.builder().build(); + ParquetFileReader fileReader = new ParquetFileReader( + new LocalInputFile(Paths.get(file)), options); + Set columns = new HashSet<>(); + columns.add(ColumnPath.fromDotString("DocId")); + columns.add(ColumnPath.fromDotString("Name")); + columns.add(ColumnPath.fromDotString("Gender")); + columns.add(ColumnPath.fromDotString("Links.Backward")); + columns.add(ColumnPath.fromDotString("Links.Forward")); + IndexCache indexCache = IndexCache.create(fileReader, columns, IndexCache.CacheStrategy.PRECACHE_BLOCK); + Assert.assertTrue(indexCache instanceof PrefetchIndexCache); + List blocks = fileReader.getFooter().getBlocks(); + for (BlockMetaData blockMetaData : blocks) { + indexCache.setBlockMetadata(blockMetaData); + for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) { + validateColumnIndex(fileReader.readColumnIndex(chunk), indexCache.getColumnIndex(chunk)); + validateOffsetIndex(fileReader.readOffsetIndex(chunk), indexCache.getOffsetIndex(chunk)); + + Assert.assertEquals( + "BloomFilter should match", + fileReader.readBloomFilter(chunk), + indexCache.getBloomFilter(chunk)); + + Assert.assertThrows(IllegalStateException.class, () -> indexCache.getColumnIndex(chunk)); + Assert.assertThrows(IllegalStateException.class, () -> indexCache.getOffsetIndex(chunk)); + if (columns.contains(chunk.getPath())) { + Assert.assertThrows(IllegalStateException.class, () -> indexCache.getBloomFilter(chunk)); + } + } + } + } + + private String createTestFile(String... bloomFilterEnabledColumns) throws IOException { + return new TestFileBuilder(conf, schema) + .withNumRecord(numRecords) + .withCodec("UNCOMPRESSED") + .withRowGroupSize(1024L) + .withBloomFilterEnabled(bloomFilterEnabledColumns) + .withWriterVersion(writerVersion) + .build() + .getFileName(); + } + + private void validateColumnIndex(ColumnIndex expected, ColumnIndex target) { + if (expected == null) { + Assert.assertEquals("ColumnIndex should should equal", expected, target); + } else { + Assert.assertNotNull("ColumnIndex should not be null", target); + Assert.assertEquals(expected.getClass(), target.getClass()); + Assert.assertEquals(expected.getMinValues(), target.getMinValues()); + Assert.assertEquals(expected.getMaxValues(), target.getMaxValues()); + Assert.assertEquals(expected.getBoundaryOrder(), target.getBoundaryOrder()); + Assert.assertEquals(expected.getNullCounts(), target.getNullCounts()); + Assert.assertEquals(expected.getNullPages(), target.getNullPages()); + } + } + + private void validateOffsetIndex(OffsetIndex expected, OffsetIndex target) { + if (expected == null) { + Assert.assertEquals("OffsetIndex should should equal", expected, target); + } else { + Assert.assertNotNull("OffsetIndex should not be null", target); + Assert.assertEquals(expected.getClass(), target.getClass()); + Assert.assertEquals(expected.toString(), target.toString()); + } + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index 9faf01f133..00dc46ae06 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -37,6 +37,7 @@ import org.apache.parquet.format.DataPageHeaderV2; import org.apache.parquet.format.PageHeader; import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.IndexCache; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; @@ -97,20 +98,20 @@ public class ParquetRewriterTest { private final int numRecord = 100000; private final Configuration conf = new Configuration(); private final ParquetProperties.WriterVersion writerVersion; - private final boolean prefetchBlockAllIndexes; + private final IndexCache.CacheStrategy indexCacheStrategy; private List inputFiles = null; private String outputFile = null; private ParquetRewriter rewriter = null; - @Parameterized.Parameters(name = "WriterVersion = {0}, PrefetchBlockAllIndexes = {1}") + @Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy = {1}") public static Object[][] parameters() { - return new Object[][] {{"v1", true}, {"v1", false}, {"v2", true}, {"v2", false}}; + return new Object[][] {{"v1", "NONE"}, {"v1", "PRECACHE_BLOCK"}, {"v2", "NONE"}, {"v2", "PRECACHE_BLOCK"}}; } - public ParquetRewriterTest(String writerVersion, boolean prefetchBlockAllIndexes) { + public ParquetRewriterTest(String writerVersion, String indexCacheStrategy) { this.writerVersion = ParquetProperties.WriterVersion.fromString(writerVersion); - this.prefetchBlockAllIndexes = prefetchBlockAllIndexes; + this.indexCacheStrategy = IndexCache.CacheStrategy.valueOf(indexCacheStrategy); } private void testPruneSingleColumnTranslateCodec(List inputPaths) throws Exception { @@ -119,7 +120,7 @@ private void testPruneSingleColumnTranslateCodec(List inputPaths) throws E CompressionCodecName newCodec = CompressionCodecName.ZSTD; RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath); RewriteOptions options = - builder.prune(pruneColumns).transform(newCodec).prefetchBlockAllIndex(prefetchBlockAllIndexes).build(); + builder.prune(pruneColumns).transform(newCodec).indexCacheStrategy(indexCacheStrategy).build(); rewriter = new ParquetRewriter(options); rewriter.processBlocks(); @@ -191,7 +192,7 @@ private void testPruneNullifyTranslateCodec(List inputPaths) throws Except CompressionCodecName newCodec = CompressionCodecName.ZSTD; RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath); RewriteOptions options = - builder.prune(pruneColumns).mask(maskColumns).transform(newCodec).prefetchBlockAllIndex(prefetchBlockAllIndexes).build(); + builder.prune(pruneColumns).mask(maskColumns).transform(newCodec).indexCacheStrategy(indexCacheStrategy).build(); rewriter = new ParquetRewriter(options); rewriter.processBlocks(); @@ -267,7 +268,7 @@ private void testPruneEncryptTranslateCodec(List inputPaths) throws Except EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false); builder.encrypt(Arrays.asList(encryptColumns)).encryptionProperties(fileEncryptionProperties); - builder.prefetchBlockAllIndex(prefetchBlockAllIndexes); + builder.indexCacheStrategy(indexCacheStrategy); RewriteOptions options = builder.build(); rewriter = new ParquetRewriter(options); @@ -352,7 +353,7 @@ public void testRewriteWithoutColumnIndexes() throws Exception { List pruneCols = Lists.newArrayList("phoneNumbers"); RewriteOptions options = - builder.mask(maskCols).prune(pruneCols).prefetchBlockAllIndex(prefetchBlockAllIndexes).build(); + builder.mask(maskCols).prune(pruneCols).indexCacheStrategy(indexCacheStrategy).build(); rewriter = new ParquetRewriter(options); rewriter.processBlocks(); rewriter.close(); @@ -413,7 +414,7 @@ private void testNullifyAndEncryptColumn(List inputPaths) throws Exception .transform(CompressionCodecName.ZSTD) .encrypt(Arrays.asList(encryptColumns)) .encryptionProperties(fileEncryptionProperties) - .prefetchBlockAllIndex(prefetchBlockAllIndexes) + .indexCacheStrategy(indexCacheStrategy) .build(); rewriter = new ParquetRewriter(options); @@ -485,7 +486,7 @@ public void testMergeTwoFilesOnly() throws Exception { } Path outputPath = new Path(outputFile); RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath); - RewriteOptions options = builder.prefetchBlockAllIndex(prefetchBlockAllIndexes).build(); + RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy).build(); rewriter = new ParquetRewriter(options); rewriter.processBlocks(); @@ -553,7 +554,7 @@ public void testMergeTwoFilesWithDifferentSchema() throws Exception { } Path outputPath = new Path(outputFile); RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath); - RewriteOptions options = builder.prefetchBlockAllIndex(prefetchBlockAllIndexes).build(); + RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy).build(); // This should throw an exception because the schemas are different rewriter = new ParquetRewriter(options);