diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 22c5f0f683..043eb24235 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -329,6 +329,7 @@ private void processBlocksFromReader() throws IOException { writer.endBlock(); store = reader.readNextRowGroup(); + crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, originalCreatedBy); blockId++; numBlocksRewritten++; } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index bc8d451994..a08633d15a 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -37,6 +37,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -135,6 +136,7 @@ private void testPruneSingleColumnTranslateCodec(List inputPaths) throws E // Verify original.created.by is preserved validateCreatedBy(); + validateRowGroupRowCount(); } @Before @@ -204,6 +206,7 @@ private void testPruneNullifyTranslateCodec(List inputPaths) throws Except // Verify original.created.by is preserved validateCreatedBy(); + validateRowGroupRowCount(); } @Test @@ -285,6 +288,7 @@ private void testPruneEncryptTranslateCodec(List inputPaths) throws Except // Verify original.created.by is preserved validateCreatedBy(); + validateRowGroupRowCount(); } @Test @@ -368,6 +372,7 @@ public void testRewriteWithoutColumnIndexes() throws Exception { // Verify original.created.by is preserved validateCreatedBy(); + validateRowGroupRowCount(); } private void testNullifyAndEncryptColumn(List inputPaths) throws Exception { @@ -484,6 +489,7 @@ public void testMergeTwoFilesOnly() throws Exception { // Verify original.created.by is preserved validateCreatedBy(); + validateRowGroupRowCount(); } @Test(expected = InvalidSchemaException.class) @@ -523,14 +529,28 @@ public void testMergeTwoFilesWithDifferentSchema() throws Exception { rewriter = new ParquetRewriter(options); } + @Test + public void testRewriteFileWithMultipleBlocks() throws Exception { + testSingleInputFileSetup("GZIP", 1024L); + List inputPaths = new ArrayList() {{ + add(new Path(inputFiles.get(0).getFileName())); + }}; + testPruneSingleColumnTranslateCodec(inputPaths); + } + private void testSingleInputFileSetup(String compression) throws IOException { + testSingleInputFileSetup(compression, ParquetWriter.DEFAULT_BLOCK_SIZE); + } + + private void testSingleInputFileSetup(String compression, long rowGroupSize) throws IOException { MessageType schema = createSchema(); inputFiles = Lists.newArrayList(); inputFiles.add(new TestFileBuilder(conf, schema) - .withNumRecord(numRecord) - .withCodec(compression) - .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) - .build()); + .withNumRecord(numRecord) + .withCodec(compression) + .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) + .withRowGroupSize(rowGroupSize) + .build()); } private void testMultipleInputFilesSetup() throws IOException { @@ -769,4 +789,22 @@ private void validateCreatedBy() throws Exception { assertEquals(inputCreatedBy, originalCreatedBy); } + private void validateRowGroupRowCount() throws Exception { + List inputRowCounts = new ArrayList<>(); + for (EncryptionTestFile inputFile : inputFiles) { + ParquetMetadata inputPmd = getFileMetaData(inputFile.getFileName(), null); + for (BlockMetaData blockMetaData: inputPmd.getBlocks()) { + inputRowCounts.add(blockMetaData.getRowCount()); + } + } + + List outputRowCounts = new ArrayList<>(); + ParquetMetadata outPmd = getFileMetaData(outputFile, null); + for (BlockMetaData blockMetaData: outPmd.getBlocks()) { + outputRowCounts.add(blockMetaData.getRowCount()); + } + + assertEquals(inputRowCounts, outputRowCounts); + } + } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java index 5018beb3ce..bea744cf5d 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java @@ -51,6 +51,7 @@ public class TestFileBuilder private String[] encryptColumns = {}; private ParquetCipher cipher = ParquetCipher.AES_GCM_V1; private Boolean footerEncryption = false; + private long rowGroupSize = ParquetWriter.DEFAULT_BLOCK_SIZE; public TestFileBuilder(Configuration conf, MessageType schema) { @@ -107,6 +108,12 @@ public TestFileBuilder withFooterEncryption() return this; } + public TestFileBuilder withRowGroupSize(long rowGroupSize) + { + this.rowGroupSize = rowGroupSize; + return this; + } + public EncryptionTestFile build() throws IOException { @@ -119,6 +126,7 @@ public EncryptionTestFile build() .withExtraMetaData(extraMeta) .withValidation(true) .withPageSize(pageSize) + .withRowGroupSize(rowGroupSize) .withEncryption(encryptionProperties) .withCompressionCodec(CompressionCodecName.valueOf(codec)); try (ParquetWriter writer = builder.build()) {