Skip to content

Commit

Permalink
PARQUET-2343: Fixes NPE when rewriting file with multiple rowgroups (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ConeyLiu authored Sep 4, 2023
1 parent 910bcc4 commit 9b5a962
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ private void processBlocksFromReader() throws IOException {

writer.endBlock();
store = reader.readNextRowGroup();
crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, originalCreatedBy);
blockId++;
numBlocksRewritten++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
Expand Down Expand Up @@ -135,6 +136,7 @@ private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths) throws E

// Verify original.created.by is preserved
validateCreatedBy();
validateRowGroupRowCount();
}

@Before
Expand Down Expand Up @@ -204,6 +206,7 @@ private void testPruneNullifyTranslateCodec(List<Path> inputPaths) throws Except

// Verify original.created.by is preserved
validateCreatedBy();
validateRowGroupRowCount();
}

@Test
Expand Down Expand Up @@ -285,6 +288,7 @@ private void testPruneEncryptTranslateCodec(List<Path> inputPaths) throws Except

// Verify original.created.by is preserved
validateCreatedBy();
validateRowGroupRowCount();
}

@Test
Expand Down Expand Up @@ -368,6 +372,7 @@ public void testRewriteWithoutColumnIndexes() throws Exception {

// Verify original.created.by is preserved
validateCreatedBy();
validateRowGroupRowCount();
}

private void testNullifyAndEncryptColumn(List<Path> inputPaths) throws Exception {
Expand Down Expand Up @@ -484,6 +489,7 @@ public void testMergeTwoFilesOnly() throws Exception {

// Verify original.created.by is preserved
validateCreatedBy();
validateRowGroupRowCount();
}

@Test(expected = InvalidSchemaException.class)
Expand Down Expand Up @@ -523,14 +529,28 @@ public void testMergeTwoFilesWithDifferentSchema() throws Exception {
rewriter = new ParquetRewriter(options);
}

@Test
public void testRewriteFileWithMultipleBlocks() throws Exception {
testSingleInputFileSetup("GZIP", 1024L);
List<Path> inputPaths = new ArrayList<Path>() {{
add(new Path(inputFiles.get(0).getFileName()));
}};
testPruneSingleColumnTranslateCodec(inputPaths);
}

private void testSingleInputFileSetup(String compression) throws IOException {
testSingleInputFileSetup(compression, ParquetWriter.DEFAULT_BLOCK_SIZE);
}

private void testSingleInputFileSetup(String compression, long rowGroupSize) throws IOException {
MessageType schema = createSchema();
inputFiles = Lists.newArrayList();
inputFiles.add(new TestFileBuilder(conf, schema)
.withNumRecord(numRecord)
.withCodec(compression)
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.build());
.withNumRecord(numRecord)
.withCodec(compression)
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withRowGroupSize(rowGroupSize)
.build());
}

private void testMultipleInputFilesSetup() throws IOException {
Expand Down Expand Up @@ -769,4 +789,22 @@ private void validateCreatedBy() throws Exception {
assertEquals(inputCreatedBy, originalCreatedBy);
}

private void validateRowGroupRowCount() throws Exception {
List<Long> inputRowCounts = new ArrayList<>();
for (EncryptionTestFile inputFile : inputFiles) {
ParquetMetadata inputPmd = getFileMetaData(inputFile.getFileName(), null);
for (BlockMetaData blockMetaData: inputPmd.getBlocks()) {
inputRowCounts.add(blockMetaData.getRowCount());
}
}

List<Long> outputRowCounts = new ArrayList<>();
ParquetMetadata outPmd = getFileMetaData(outputFile, null);
for (BlockMetaData blockMetaData: outPmd.getBlocks()) {
outputRowCounts.add(blockMetaData.getRowCount());
}

assertEquals(inputRowCounts, outputRowCounts);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class TestFileBuilder
private String[] encryptColumns = {};
private ParquetCipher cipher = ParquetCipher.AES_GCM_V1;
private Boolean footerEncryption = false;
private long rowGroupSize = ParquetWriter.DEFAULT_BLOCK_SIZE;

public TestFileBuilder(Configuration conf, MessageType schema)
{
Expand Down Expand Up @@ -107,6 +108,12 @@ public TestFileBuilder withFooterEncryption()
return this;
}

public TestFileBuilder withRowGroupSize(long rowGroupSize)
{
this.rowGroupSize = rowGroupSize;
return this;
}

public EncryptionTestFile build()
throws IOException
{
Expand All @@ -119,6 +126,7 @@ public EncryptionTestFile build()
.withExtraMetaData(extraMeta)
.withValidation(true)
.withPageSize(pageSize)
.withRowGroupSize(rowGroupSize)
.withEncryption(encryptionProperties)
.withCompressionCodec(CompressionCodecName.valueOf(codec));
try (ParquetWriter writer = builder.build()) {
Expand Down

0 comments on commit 9b5a962

Please sign in to comment.