Skip to content

Commit

Permalink
PARQUET-2413: Support configurable extraMetadata in ParquetWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Dec 18, 2023
1 parent afd39dd commit 8cfcb06
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
Expand Down Expand Up @@ -50,6 +52,8 @@ public class ParquetWriter<T> implements Closeable {

public static final String OBJECT_MODEL_NAME_PROP = "writer.model.name";

public static final String EXTRA_METADATA_NAME_PROP = "parquet.writer.metadata.extra";

// max size (bytes) to write as padding and the min size of a row group
public static final int MAX_PADDING_SIZE_DEFAULT = 8 * 1024 * 1024; // 8MB

Expand Down Expand Up @@ -403,11 +407,34 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport

this.codecFactory = codecFactory;
CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(compressionCodecName);
this.writer = new InternalParquetRecordWriter<T>(

final String extraMetadataConfPrefix = EXTRA_METADATA_NAME_PROP + ".";
final Map<String, String> extraMetadata = new HashMap<>(writeContext.getExtraMetaData());

conf.iterator().forEachRemaining(entry -> {
if (entry.getKey().startsWith(extraMetadataConfPrefix)) {
final String metadataKey = entry.getKey().replaceFirst(extraMetadataConfPrefix, "");

if (metadataKey.equals(OBJECT_MODEL_NAME_PROP)) {
throw new IllegalArgumentException(
"Cannot overwrite metadata key " + OBJECT_MODEL_NAME_PROP +". Please use another key name."
);
}

if (
extraMetadata.put(metadataKey, entry.getValue()) != null
) {
throw new IllegalArgumentException("Extra metadata key " + metadataKey + " conflicts with reserved metadata " +
"keys present in " + writeSupport.getClass().getName() + ". Please use another key name.");
}
}
});

this.writer = new InternalParquetRecordWriter<T>(
fileWriter,
writeSupport,
schema,
writeContext.getExtraMetaData(),
extraMetadata,
rowGroupSize,
compressor,
validating,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import com.google.common.collect.ImmutableMap;
import net.openhft.hashing.LongHashFunction;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -62,6 +63,7 @@
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.io.LocalInputFile;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;
import org.apache.parquet.io.api.Binary;
Expand Down Expand Up @@ -407,6 +409,66 @@ public void testParquetFileWritesExpectedNumberOfBlocks() throws IOException {
testParquetFileNumberOfBlocks(1, 1, 3);
}

@Test
public void testExtraMetaData() throws Exception {
final Configuration conf = new Configuration();
conf.set("parquet.writer.metadata.extra.simple-key", "some-value-1");
conf.set("parquet.writer.metadata.extra.nested.key", "some-value-2");

final File testDir = temp.newFile();
testDir.delete();

final MessageType schema = parseMessageType("message test { required int32 int32_field; }");
GroupWriteSupport.setSchema(schema, conf);
final SimpleGroupFactory f = new SimpleGroupFactory(schema);

for (WriterVersion version : WriterVersion.values()) {
final Path filePath = new Path(testDir.getAbsolutePath(), version.name());
final ParquetWriter<Group> writer = ExampleParquetWriter.builder(new TestOutputFile(filePath, conf))
.withConf(conf)
.build();
for (int i = 0; i < 1000; i++) {
writer.write(f.newGroup().append("int32_field", 32));
}
writer.close();

final ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(filePath, new Configuration()));
assertEquals(1000, reader.readNextRowGroup().getRowCount());
assertEquals(
ImmutableMap.of(
"simple-key", "some-value-1",
"nested.key", "some-value-2",
ParquetWriter.OBJECT_MODEL_NAME_PROP, "example"
),
reader.getFileMetaData().getKeyValueMetaData()
);

reader.close();
}
}

@Test
public void testFailsOnConflictingExtraMetaDataKey() throws Exception {
final Configuration conf = new Configuration();
conf.set("parquet.writer.metadata.extra.simple.key", "some-value-1");

final File testDir = temp.newFile();
testDir.delete();

final MessageType schema = parseMessageType("message test { required int32 int32_field; }");
GroupWriteSupport.setSchema(schema, conf);

for (WriterVersion version : WriterVersion.values()) {
final Path filePath = new Path(testDir.getAbsolutePath(), version.name());
Assert.assertThrows(
IllegalArgumentException.class,
() -> ExampleParquetWriter.builder(new TestOutputFile(filePath, conf))
.withConf(conf)
.withExtraMetaData(ImmutableMap.of("simple.key", "some-value-2"))
.build());
}
}

private void testParquetFileNumberOfBlocks(
int minRowCountForPageSizeCheck, int maxRowCountForPageSizeCheck, int expectedNumberOfBlocks)
throws IOException {
Expand Down

0 comments on commit 8cfcb06

Please sign in to comment.