Skip to content

Commit

Permalink
PARQUET-2413: Make extraMetadata configurable via ParquetProperties
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Jan 2, 2024
1 parent 8ed9f3c commit 0a8e7c1
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalDouble;
import java.util.OptionalLong;
Expand Down Expand Up @@ -113,6 +115,7 @@ public static WriterVersion fromString(String name) {
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;
private final boolean enableByteStreamSplit;
private final Map<String, String> extraMetaData;

private ParquetProperties(Builder builder) {
this.pageSizeThreshold = builder.pageSize;
Expand All @@ -139,6 +142,7 @@ private ParquetProperties(Builder builder) {
this.pageRowCountLimit = builder.pageRowCountLimit;
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
this.enableByteStreamSplit = builder.enableByteStreamSplit;
this.extraMetaData = builder.extraMetaData;
}

public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
Expand Down Expand Up @@ -293,6 +297,10 @@ public int getBloomFilterCandidatesCount(ColumnDescriptor column) {
return numBloomFilterCandidates.getValue(column);
}

public Map<String, String> getExtraMetaData() {
return extraMetaData;
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -342,6 +350,7 @@ public static class Builder {
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
private boolean enableByteStreamSplit = DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED;
private Map<String, String> extraMetaData = new HashMap<>();

private Builder() {
enableDict = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED);
Expand Down Expand Up @@ -373,6 +382,7 @@ private Builder(ParquetProperties toCopy) {
this.numBloomFilterCandidates = ColumnProperty.<Integer>builder(toCopy.numBloomFilterCandidates);
this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
this.enableByteStreamSplit = toCopy.enableByteStreamSplit;
this.extraMetaData = toCopy.extraMetaData;
}

/**
Expand Down Expand Up @@ -584,6 +594,11 @@ public Builder withPageWriteChecksumEnabled(boolean val) {
return this;
}

public Builder withExtraMetaData(Map<String, String> extraMetaData) {
this.extraMetaData = extraMetaData;
return this;
}

public ParquetProperties build() {
ParquetProperties properties = new ParquetProperties(this);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ public static enum JobSummaryLevel {
public static final String ENABLE_DICTIONARY = "parquet.enable.dictionary";
public static final String VALIDATION = "parquet.validation";
public static final String WRITER_VERSION = "parquet.writer.version";

public static final String EXTRA_WRITE_METADATA = "parquet.write.metadata.extra";
public static final String MEMORY_POOL_RATIO = "parquet.memory.pool.ratio";
public static final String MIN_MEMORY_ALLOCATION = "parquet.memory.min.chunk.size";
public static final String MAX_PADDING_BYTES = "parquet.writer.max-padding";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,25 +406,25 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport
this.codecFactory = codecFactory;
CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(compressionCodecName);

final String extraMetadataConfPrefix = ParquetOutputFormat.EXTRA_WRITE_METADATA + ".";
final Map<String, String> extraMetadata = new HashMap<>(writeContext.getExtraMetaData());

conf.iterator().forEachRemaining(entry -> {
if (entry.getKey().startsWith(extraMetadataConfPrefix)) {
final String metadataKey = entry.getKey().replaceFirst(extraMetadataConfPrefix, "");

final Map<String, String> extraMetadata;
if (encodingProps.getExtraMetaData() == null
|| encodingProps.getExtraMetaData().isEmpty()) {
extraMetadata = writeContext.getExtraMetaData();
} else {
extraMetadata = new HashMap<>(writeContext.getExtraMetaData());

encodingProps.getExtraMetaData().forEach((metadataKey, metadataValue) -> {
if (metadataKey.equals(OBJECT_MODEL_NAME_PROP)) {
throw new IllegalArgumentException("Cannot overwrite metadata key " + OBJECT_MODEL_NAME_PROP
+ ". Please use another key name.");
}

if (extraMetadata.put(metadataKey, entry.getValue()) != null) {
throw new IllegalArgumentException("Extra metadata key " + metadataKey
+ " conflicts with reserved metadata keys present in "
+ writeSupport.getClass().getName() + ". Please use another key name.");
if (extraMetadata.put(metadataKey, metadataValue) != null) {
throw new IllegalArgumentException(
"Duplicate metadata key " + metadataKey + ". Please use another key name.");
}
}
});
});
}

this.writer = new InternalParquetRecordWriter<T>(
fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, compressor, validating, encodingProps);
Expand Down Expand Up @@ -865,6 +865,17 @@ public SELF withStatisticsTruncateLength(int length) {
return self();
}

/**
* Sets additional metadata entries to be included in the file footer.
*
* @param extraMetaData a Map of additional stringly-typed metadata entries
* @return this builder for method chaining
*/
public SELF withExtraMetaData(Map<String, String> extraMetaData) {
encodingPropsBuilder.withExtraMetaData(extraMetaData);
return self();
}

/**
* Set a property that will be available to the read path. For writers that use a Hadoop
* configuration, this is the recommended way to add configuration values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.parquet.hadoop.example;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
Expand Down Expand Up @@ -98,7 +96,6 @@ public static Builder builder(OutputFile file) {

public static class Builder extends ParquetWriter.Builder<Group, Builder> {
private MessageType type = null;
private Map<String, String> extraMetaData = new HashMap<String, String>();

private Builder(Path file) {
super(file);
Expand All @@ -113,11 +110,6 @@ public Builder withType(MessageType type) {
return this;
}

public Builder withExtraMetaData(Map<String, String> extraMetaData) {
this.extraMetaData = extraMetaData;
return this;
}

@Override
protected Builder self() {
return this;
Expand All @@ -130,7 +122,7 @@ protected WriteSupport<Group> getWriteSupport(Configuration conf) {

@Override
protected WriteSupport<Group> getWriteSupport(ParquetConfiguration conf) {
return new GroupWriteSupport(type, extraMetaData);
return new GroupWriteSupport(type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,6 @@ public void testParquetFileWritesExpectedNumberOfBlocks() throws IOException {
@Test
public void testExtraMetaData() throws Exception {
final Configuration conf = new Configuration();
conf.set("parquet.write.metadata.extra.simple-key", "some-value-1");
conf.set("parquet.write.metadata.extra.nested.key", "some-value-2");

final File testDir = temp.newFile();
testDir.delete();

Expand All @@ -425,6 +422,7 @@ public void testExtraMetaData() throws Exception {
final Path filePath = new Path(testDir.getAbsolutePath(), version.name());
final ParquetWriter<Group> writer = ExampleParquetWriter.builder(new TestOutputFile(filePath, conf))
.withConf(conf)
.withExtraMetaData(ImmutableMap.of("simple-key", "some-value-1", "nested.key", "some-value-2"))
.build();
for (int i = 0; i < 1000; i++) {
writer.write(f.newGroup().append("int32_field", 32));
Expand All @@ -451,8 +449,6 @@ public void testExtraMetaData() throws Exception {
@Test
public void testFailsOnConflictingExtraMetaDataKey() throws Exception {
final Configuration conf = new Configuration();
conf.set("parquet.write.metadata.extra.simple.key", "some-value-1");

final File testDir = temp.newFile();
testDir.delete();

Expand All @@ -461,10 +457,11 @@ public void testFailsOnConflictingExtraMetaDataKey() throws Exception {

for (WriterVersion version : WriterVersion.values()) {
final Path filePath = new Path(testDir.getAbsolutePath(), version.name());

Assert.assertThrows(IllegalArgumentException.class, () -> ExampleParquetWriter.builder(
new TestOutputFile(filePath, conf))
.withConf(conf)
.withExtraMetaData(ImmutableMap.of("simple.key", "some-value-2"))
.withExtraMetaData(ImmutableMap.of(ParquetWriter.OBJECT_MODEL_NAME_PROP, "some-value-3"))
.build());
}
}
Expand Down

0 comments on commit 0a8e7c1

Please sign in to comment.