Skip to content

Commit

Permalink
Core: Default to zstd compression for Parquet in new tables
Browse files Browse the repository at this point in the history
Co-authored-by: Szehon Ho <[email protected]>
Co-authored-by: Kyle Bendickson <[email protected]>
  • Loading branch information
3 people authored and aokolnychyi committed Sep 20, 2023
1 parent 16b6cef commit 22f9039
Show file tree
Hide file tree
Showing 15 changed files with 133 additions and 114 deletions.
21 changes: 18 additions & 3 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static TableMetadata newTableMetadata(
PropertyUtil.propertyAsInt(
properties, TableProperties.FORMAT_VERSION, DEFAULT_TABLE_FORMAT_VERSION);
return newTableMetadata(
schema, spec, sortOrder, location, unreservedProperties(properties), formatVersion);
schema, spec, sortOrder, location, persistedProperties(properties), formatVersion);
}

public static TableMetadata newTableMetadata(
Expand All @@ -78,7 +78,7 @@ public static TableMetadata newTableMetadata(
PropertyUtil.propertyAsInt(
properties, TableProperties.FORMAT_VERSION, DEFAULT_TABLE_FORMAT_VERSION);
return newTableMetadata(
schema, spec, sortOrder, location, unreservedProperties(properties), formatVersion);
schema, spec, sortOrder, location, persistedProperties(properties), formatVersion);
}

private static Map<String, String> unreservedProperties(Map<String, String> rawProperties) {
Expand All @@ -87,6 +87,21 @@ private static Map<String, String> unreservedProperties(Map<String, String> rawP
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private static Map<String, String> persistedProperties(Map<String, String> rawProperties) {
Map<String, String> persistedProperties = Maps.newHashMap();

// explicitly set defaults that apply only to new tables
persistedProperties.put(
TableProperties.PARQUET_COMPRESSION,
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);

rawProperties.entrySet().stream()
.filter(entry -> !TableProperties.RESERVED_PROPERTIES.contains(entry.getKey()))
.forEach(entry -> persistedProperties.put(entry.getKey(), entry.getValue()));

return persistedProperties;
}

static TableMetadata newTableMetadata(
Schema schema,
PartitionSpec spec,
Expand Down Expand Up @@ -685,7 +700,7 @@ public TableMetadata buildReplacement(
.setDefaultPartitionSpec(freshSpec)
.setDefaultSortOrder(freshSortOrder)
.setLocation(newLocation)
.setProperties(unreservedProperties(updatedProperties))
.setProperties(persistedProperties(updatedProperties))
.build();
}

Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ private TableProperties() {}
public static final String PARQUET_COMPRESSION = "write.parquet.compression-codec";
public static final String DELETE_PARQUET_COMPRESSION = "write.delete.parquet.compression-codec";
public static final String PARQUET_COMPRESSION_DEFAULT = "gzip";
public static final String PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0 = "zstd";

public static final String PARQUET_COMPRESSION_LEVEL = "write.parquet.compression-level";
public static final String DELETE_PARQUET_COMPRESSION_LEVEL =
Expand Down
26 changes: 10 additions & 16 deletions core/src/test/java/org/apache/iceberg/TestTableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.apache.iceberg.TableMetadataParser.SCHEMA;
import static org.apache.iceberg.TableMetadataParser.SNAPSHOTS;
import static org.apache.iceberg.TestHelpers.assertSameSchemaList;
import static org.assertj.core.api.Assertions.assertThat;

import com.fasterxml.jackson.core.JsonGenerator;
import java.io.File;
Expand Down Expand Up @@ -1457,14 +1458,10 @@ public void testCreateV2MetadataThroughTableProperty() {
null,
ImmutableMap.of(TableProperties.FORMAT_VERSION, "2", "key", "val"));

Assert.assertEquals(
"format version should be configured based on the format-version key",
2,
meta.formatVersion());
Assert.assertEquals(
"should not contain format-version in properties",
ImmutableMap.of("key", "val"),
meta.properties());
assertThat(meta.formatVersion()).isEqualTo(2);
assertThat(meta.properties())
.containsEntry("key", "val")
.doesNotContainKey(TableProperties.FORMAT_VERSION);
}

@Test
Expand All @@ -1486,14 +1483,11 @@ public void testReplaceV1MetadataToV2ThroughTableProperty() {
meta.location(),
ImmutableMap.of(TableProperties.FORMAT_VERSION, "2", "key2", "val2"));

Assert.assertEquals(
"format version should be configured based on the format-version key",
2,
meta.formatVersion());
Assert.assertEquals(
"should not contain format-version but should contain old and new properties",
ImmutableMap.of("key", "val", "key2", "val2"),
meta.properties());
assertThat(meta.formatVersion()).isEqualTo(2);
assertThat(meta.properties())
.containsEntry("key", "val")
.containsEntry("key2", "val2")
.doesNotContainKey(TableProperties.FORMAT_VERSION);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
Expand All @@ -32,7 +34,6 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
Expand All @@ -48,12 +49,12 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
Expand Down Expand Up @@ -121,12 +122,10 @@ public void testCreateTable() throws TableNotExistException {
Assert.assertEquals(
new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(),
table.schema().asStruct());
Assert.assertEquals(Maps.newHashMap(), table.properties());

CatalogTable catalogTable = catalogTable("tl");
Assert.assertEquals(
TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema());
Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
}

@Test
Expand Down Expand Up @@ -176,7 +175,7 @@ public void testCreateTableIfNotExists() {
sql("CREATE TABLE tl(id BIGINT)");

// Assert that table does exist.
Assert.assertEquals(Maps.newHashMap(), table("tl").properties());
assertThat(table("tl")).isNotNull();

sql("DROP TABLE tl");
AssertHelpers.assertThrows(
Expand All @@ -186,15 +185,13 @@ public void testCreateTableIfNotExists() {
() -> table("tl"));

sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT)");
Assert.assertEquals(Maps.newHashMap(), table("tl").properties());
assertThat(table("tl").properties()).doesNotContainKey("key");

final Map<String, String> expectedProperties = ImmutableMap.of("key", "value");
table("tl").updateProperties().set("key", "value").commit();
Assert.assertEquals(expectedProperties, table("tl").properties());
assertThat(table("tl").properties()).containsEntry("key", "value");

sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT)");
Assert.assertEquals(
"Should still be the old table.", expectedProperties, table("tl").properties());
assertThat(table("tl").properties()).containsEntry("key", "value");
}

@Test
Expand All @@ -206,12 +203,10 @@ public void testCreateTableLike() throws TableNotExistException {
Assert.assertEquals(
new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(),
table.schema().asStruct());
Assert.assertEquals(Maps.newHashMap(), table.properties());

CatalogTable catalogTable = catalogTable("tl2");
Assert.assertEquals(
TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema());
Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
}

@Test
Expand All @@ -226,7 +221,6 @@ public void testCreateTableLocation() {
new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(),
table.schema().asStruct());
Assert.assertEquals("file:///tmp/location", table.location());
Assert.assertEquals(Maps.newHashMap(), table.properties());
}

@Test
Expand All @@ -242,7 +236,6 @@ public void testCreatePartitionTable() throws TableNotExistException {
table.schema().asStruct());
Assert.assertEquals(
PartitionSpec.builderFor(table.schema()).identity("dt").build(), table.spec());
Assert.assertEquals(Maps.newHashMap(), table.properties());

CatalogTable catalogTable = catalogTable("tl");
Assert.assertEquals(
Expand All @@ -251,7 +244,6 @@ public void testCreatePartitionTable() throws TableNotExistException {
.field("dt", DataTypes.STRING())
.build(),
catalogTable.getSchema());
Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
Assert.assertEquals(Collections.singletonList("dt"), catalogTable.getPartitionKeys());
}

Expand Down Expand Up @@ -304,7 +296,6 @@ public void testLoadTransformPartitionTable() throws TableNotExistException {
CatalogTable catalogTable = catalogTable("tl");
Assert.assertEquals(
TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema());
Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
Assert.assertEquals(Collections.emptyList(), catalogTable.getPartitionKeys());
}

Expand All @@ -317,12 +308,12 @@ public void testAlterTable() throws TableNotExistException {
// new
sql("ALTER TABLE tl SET('newK'='newV')");
properties.put("newK", "newV");
Assert.assertEquals(properties, table("tl").properties());
assertThat(table("tl").properties()).containsAllEntriesOf(properties);

// update old
sql("ALTER TABLE tl SET('oldK'='oldV2')");
properties.put("oldK", "oldV2");
Assert.assertEquals(properties, table("tl").properties());
assertThat(table("tl").properties()).containsAllEntriesOf(properties);

// remove property
CatalogTable catalogTable = catalogTable("tl");
Expand All @@ -331,7 +322,7 @@ public void testAlterTable() throws TableNotExistException {
.getCatalog(getTableEnv().getCurrentCatalog())
.get()
.alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false);
Assert.assertEquals(properties, table("tl").properties());
assertThat(table("tl").properties()).containsAllEntriesOf(properties);
}

@Test
Expand All @@ -343,12 +334,12 @@ public void testAlterTableWithPrimaryKey() throws TableNotExistException {
// new
sql("ALTER TABLE tl SET('newK'='newV')");
properties.put("newK", "newV");
Assert.assertEquals(properties, table("tl").properties());
assertThat(table("tl").properties()).containsAllEntriesOf(properties);

// update old
sql("ALTER TABLE tl SET('oldK'='oldV2')");
properties.put("oldK", "oldV2");
Assert.assertEquals(properties, table("tl").properties());
assertThat(table("tl").properties()).containsAllEntriesOf(properties);

// remove property
CatalogTable catalogTable = catalogTable("tl");
Expand All @@ -357,7 +348,7 @@ public void testAlterTableWithPrimaryKey() throws TableNotExistException {
.getCatalog(getTableEnv().getCurrentCatalog())
.get()
.alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false);
Assert.assertEquals(properties, table("tl").properties());
assertThat(table("tl").properties()).containsAllEntriesOf(properties);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ public void testCompressionParquet() throws Exception {

if (initProperties.get(TableProperties.PARQUET_COMPRESSION) == null) {
Assert.assertEquals(
TableProperties.PARQUET_COMPRESSION_DEFAULT,
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0,
resultProperties.get(TableProperties.PARQUET_COMPRESSION));
Assert.assertEquals(
TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT,
resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL));
resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT));
} else {
Assert.assertEquals(
initProperties.get(TableProperties.PARQUET_COMPRESSION),
Expand Down
Loading

0 comments on commit 22f9039

Please sign in to comment.