diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index 7bf4009ee1..5152d5b070 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -20,6 +20,8 @@ import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.OptionalDouble; import java.util.OptionalLong; @@ -113,6 +115,7 @@ public static WriterVersion fromString(String name) { private final int pageRowCountLimit; private final boolean pageWriteChecksumEnabled; private final boolean enableByteStreamSplit; + private final Map extraMetaData; private ParquetProperties(Builder builder) { this.pageSizeThreshold = builder.pageSize; @@ -139,6 +142,7 @@ private ParquetProperties(Builder builder) { this.pageRowCountLimit = builder.pageRowCountLimit; this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled; this.enableByteStreamSplit = builder.enableByteStreamSplit; + this.extraMetaData = builder.extraMetaData; } public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) { @@ -293,6 +297,10 @@ public int getBloomFilterCandidatesCount(ColumnDescriptor column) { return numBloomFilterCandidates.getValue(column); } + public Map getExtraMetaData() { + return extraMetaData; + } + public static Builder builder() { return new Builder(); } @@ -342,6 +350,7 @@ public static class Builder { private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT; private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED; private boolean enableByteStreamSplit = DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED; + private Map extraMetaData = new HashMap<>(); private Builder() { enableDict = ColumnProperty.builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED); @@ -373,6 +382,7 @@ private Builder(ParquetProperties toCopy) { this.numBloomFilterCandidates = ColumnProperty.builder(toCopy.numBloomFilterCandidates); this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes; this.enableByteStreamSplit = toCopy.enableByteStreamSplit; + this.extraMetaData = toCopy.extraMetaData; } /** @@ -584,6 +594,11 @@ public Builder withPageWriteChecksumEnabled(boolean val) { return this; } + public Builder withExtraMetaData(Map extraMetaData) { + this.extraMetaData = extraMetaData; + return this; + } + public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(this); // we pass a constructed but uninitialized factory to ParquetProperties above as currently diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index befea4488d..158c432458 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -140,8 +140,6 @@ public static enum JobSummaryLevel { public static final String ENABLE_DICTIONARY = "parquet.enable.dictionary"; public static final String VALIDATION = "parquet.validation"; public static final String WRITER_VERSION = "parquet.writer.version"; - - public static final String EXTRA_WRITE_METADATA = "parquet.write.metadata.extra"; public static final String MEMORY_POOL_RATIO = "parquet.memory.pool.ratio"; public static final String MIN_MEMORY_ALLOCATION = "parquet.memory.min.chunk.size"; public static final String MAX_PADDING_BYTES = "parquet.writer.max-padding"; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 43f5499560..1838d1db44 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -406,25 +406,25 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport this.codecFactory = codecFactory; CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(compressionCodecName); - final String extraMetadataConfPrefix = ParquetOutputFormat.EXTRA_WRITE_METADATA + "."; - final Map extraMetadata = new HashMap<>(writeContext.getExtraMetaData()); - - conf.iterator().forEachRemaining(entry -> { - if (entry.getKey().startsWith(extraMetadataConfPrefix)) { - final String metadataKey = entry.getKey().replaceFirst(extraMetadataConfPrefix, ""); - + final Map extraMetadata; + if (encodingProps.getExtraMetaData() == null + || encodingProps.getExtraMetaData().isEmpty()) { + extraMetadata = writeContext.getExtraMetaData(); + } else { + extraMetadata = new HashMap<>(writeContext.getExtraMetaData()); + + encodingProps.getExtraMetaData().forEach((metadataKey, metadataValue) -> { if (metadataKey.equals(OBJECT_MODEL_NAME_PROP)) { throw new IllegalArgumentException("Cannot overwrite metadata key " + OBJECT_MODEL_NAME_PROP + ". Please use another key name."); } - if (extraMetadata.put(metadataKey, entry.getValue()) != null) { - throw new IllegalArgumentException("Extra metadata key " + metadataKey - + " conflicts with reserved metadata keys present in " - + writeSupport.getClass().getName() + ". Please use another key name."); + if (extraMetadata.put(metadataKey, metadataValue) != null) { + throw new IllegalArgumentException( + "Duplicate metadata key " + metadataKey + ". Please use another key name."); } - } - }); + }); + } this.writer = new InternalParquetRecordWriter( fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, compressor, validating, encodingProps); @@ -865,6 +865,17 @@ public SELF withStatisticsTruncateLength(int length) { return self(); } + /** + * Sets additional metadata entries to be included in the file footer. + * + * @param extraMetaData a Map of additional stringly-typed metadata entries + * @return this builder for method chaining + */ + public SELF withExtraMetaData(Map extraMetaData) { + encodingPropsBuilder.withExtraMetaData(extraMetaData); + return self(); + } + /** * Set a property that will be available to the read path. For writers that use a Hadoop * configuration, this is the recommended way to add configuration values. diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java index 23df1faa35..04c629f35c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java @@ -19,8 +19,6 @@ package org.apache.parquet.hadoop.example; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ParquetProperties; @@ -98,7 +96,6 @@ public static Builder builder(OutputFile file) { public static class Builder extends ParquetWriter.Builder { private MessageType type = null; - private Map extraMetaData = new HashMap(); private Builder(Path file) { super(file); @@ -113,11 +110,6 @@ public Builder withType(MessageType type) { return this; } - public Builder withExtraMetaData(Map extraMetaData) { - this.extraMetaData = extraMetaData; - return this; - } - @Override protected Builder self() { return this; @@ -130,7 +122,7 @@ protected WriteSupport getWriteSupport(Configuration conf) { @Override protected WriteSupport getWriteSupport(ParquetConfiguration conf) { - return new GroupWriteSupport(type, extraMetaData); + return new GroupWriteSupport(type); } } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index 0e396b510f..fa9ee865d0 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -411,9 +411,6 @@ public void testParquetFileWritesExpectedNumberOfBlocks() throws IOException { @Test public void testExtraMetaData() throws Exception { final Configuration conf = new Configuration(); - conf.set("parquet.write.metadata.extra.simple-key", "some-value-1"); - conf.set("parquet.write.metadata.extra.nested.key", "some-value-2"); - final File testDir = temp.newFile(); testDir.delete(); @@ -425,6 +422,7 @@ public void testExtraMetaData() throws Exception { final Path filePath = new Path(testDir.getAbsolutePath(), version.name()); final ParquetWriter writer = ExampleParquetWriter.builder(new TestOutputFile(filePath, conf)) .withConf(conf) + .withExtraMetaData(ImmutableMap.of("simple-key", "some-value-1", "nested.key", "some-value-2")) .build(); for (int i = 0; i < 1000; i++) { writer.write(f.newGroup().append("int32_field", 32)); @@ -451,8 +449,6 @@ public void testExtraMetaData() throws Exception { @Test public void testFailsOnConflictingExtraMetaDataKey() throws Exception { final Configuration conf = new Configuration(); - conf.set("parquet.write.metadata.extra.simple.key", "some-value-1"); - final File testDir = temp.newFile(); testDir.delete(); @@ -461,10 +457,11 @@ public void testFailsOnConflictingExtraMetaDataKey() throws Exception { for (WriterVersion version : WriterVersion.values()) { final Path filePath = new Path(testDir.getAbsolutePath(), version.name()); + Assert.assertThrows(IllegalArgumentException.class, () -> ExampleParquetWriter.builder( new TestOutputFile(filePath, conf)) .withConf(conf) - .withExtraMetaData(ImmutableMap.of("simple.key", "some-value-2")) + .withExtraMetaData(ImmutableMap.of(ParquetWriter.OBJECT_MODEL_NAME_PROP, "some-value-3")) .build()); } }