Skip to content

Commit

Permalink
Spark 3.4: Add write and SQL options to override compression config (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jerqi authored Aug 29, 2023
1 parent 96bda46 commit ab0a2fd
Show file tree
Hide file tree
Showing 9 changed files with 386 additions and 8 deletions.
3 changes: 3 additions & 0 deletions docs/spark-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ df.write
| check-ordering | true | Checks if input schema and table schema are same |
| isolation-level | null | Desired isolation level for Dataframe overwrite operations. `null` => no checks (for idempotent writes), `serializable` => check for concurrent inserts or deletes in destination partitions, `snapshot` => checks for concurrent deletes in destination partitions. |
| validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via [Table API](../../api#table-metadata) or [Snapshots table](../spark-queries#snapshots). If null, the table's oldest known snapshot is used. |
| compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write |
| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |

CommitMetadata provides an interface to add custom metadata to a snapshot summary during a SQL execution, which can be beneficial for purposes such as auditing or change tracking. If properties start with `snapshot-property.`, then that prefix will be removed from each property. Here is an example:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,9 @@ private SparkSQLProperties() {}
// Controls the WAP branch used for write-audit-publish workflow.
// When set, new snapshots will be committed to this branch.
public static final String WAP_BRANCH = "spark.wap.branch";

// Controls write compress options
public static final String COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec";
public static final String COMPRESSION_LEVEL = "spark.sql.iceberg.compression-level";
public static final String COMPRESSION_STRATEGY = "spark.sql.iceberg.compression-strategy";
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
import static org.apache.iceberg.DistributionMode.HASH;
import static org.apache.iceberg.DistributionMode.NONE;
import static org.apache.iceberg.DistributionMode.RANGE;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;

import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -418,4 +424,96 @@ public String branch() {

return branch;
}

public String parquetCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(TableProperties.PARQUET_COMPRESSION)
.defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT)
.parse();
}

public String parquetCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
.tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL)
.defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT)
.parseOptional();
}

public String avroCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(TableProperties.AVRO_COMPRESSION)
.defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT)
.parse();
}

public String avroCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
.tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL)
.defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT)
.parseOptional();
}

public String orcCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(TableProperties.ORC_COMPRESSION)
.defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT)
.parse();
}

public String orcCompressionStrategy() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_STRATEGY)
.sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY)
.tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY)
.defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT)
.parse();
}

public Map<String, String> writeProperties(FileFormat format) {
Map<String, String> writeProperties = Maps.newHashMap();

switch (format) {
case PARQUET:
writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec());
String parquetCompressionLevel = parquetCompressionLevel();
if (parquetCompressionLevel != null) {
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
}
break;

case AVRO:
writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec());
String avroCompressionLevel = avroCompressionLevel();
if (avroCompressionLevel != null) {
writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel());
}
break;

case ORC:
writeProperties.put(ORC_COMPRESSION, orcCompressionCodec());
writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy());
break;

default:
// skip
}

return writeProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,9 @@ private SparkWriteOptions() {}

// Isolation Level for DataFrame calls. Currently supported by overwritePartitions
public static final String ISOLATION_LEVEL = "isolation-level";

// Controls write compress options
public static final String COMPRESSION_CODEC = "compression-codec";
public static final String COMPRESSION_LEVEL = "compression-level";
public static final String COMPRESSION_STRATEGY = "compression-strategy";
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.SparkAvroWriter;
import org.apache.iceberg.spark.data.SparkOrcWriter;
Expand All @@ -47,6 +48,7 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
private StructType dataSparkType;
private StructType equalityDeleteSparkType;
private StructType positionDeleteSparkType;
private Map<String, String> writeProperties;

SparkFileWriterFactory(
Table table,
Expand All @@ -60,7 +62,8 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
StructType equalityDeleteSparkType,
SortOrder equalityDeleteSortOrder,
Schema positionDeleteRowSchema,
StructType positionDeleteSparkType) {
StructType positionDeleteSparkType,
Map<String, String> writeProperties) {

super(
table,
Expand All @@ -76,6 +79,7 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
this.dataSparkType = dataSparkType;
this.equalityDeleteSparkType = equalityDeleteSparkType;
this.positionDeleteSparkType = positionDeleteSparkType;
this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of();
}

static Builder builderFor(Table table) {
Expand All @@ -85,11 +89,13 @@ static Builder builderFor(Table table) {
@Override
protected void configureDataWrite(Avro.DataWriteBuilder builder) {
builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType()));
builder.setAll(writeProperties);
}

@Override
protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
builder.createWriterFunc(ignored -> new SparkAvroWriter(equalityDeleteSparkType()));
builder.setAll(writeProperties);
}

@Override
Expand All @@ -102,40 +108,48 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
StructType positionDeleteRowSparkType = (StructType) rowField.dataType();
builder.createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType));
}

builder.setAll(writeProperties);
}

@Override
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dataSparkType(), msgType));
builder.setAll(writeProperties);
}

@Override
protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
builder.createWriterFunc(
msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), msgType));
builder.setAll(writeProperties);
}

@Override
protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
builder.createWriterFunc(
msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType));
builder.transformPaths(path -> UTF8String.fromString(path.toString()));
builder.setAll(writeProperties);
}

@Override
protected void configureDataWrite(ORC.DataWriteBuilder builder) {
builder.createWriterFunc(SparkOrcWriter::new);
builder.setAll(writeProperties);
}

@Override
protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
builder.createWriterFunc(SparkOrcWriter::new);
builder.setAll(writeProperties);
}

@Override
protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
builder.createWriterFunc(SparkOrcWriter::new);
builder.transformPaths(path -> UTF8String.fromString(path.toString()));
builder.setAll(writeProperties);
}

private StructType dataSparkType() {
Expand Down Expand Up @@ -180,6 +194,7 @@ static class Builder {
private SortOrder equalityDeleteSortOrder;
private Schema positionDeleteRowSchema;
private StructType positionDeleteSparkType;
private Map<String, String> writeProperties;

Builder(Table table) {
this.table = table;
Expand Down Expand Up @@ -250,6 +265,11 @@ Builder positionDeleteSparkType(StructType newPositionDeleteSparkType) {
return this;
}

Builder writeProperties(Map<String, String> properties) {
this.writeProperties = properties;
return this;
}

SparkFileWriterFactory build() {
boolean noEqualityDeleteConf = equalityFieldIds == null && equalityDeleteRowSchema == null;
boolean fullEqualityDeleteConf = equalityFieldIds != null && equalityDeleteRowSchema != null;
Expand All @@ -269,7 +289,8 @@ SparkFileWriterFactory build() {
equalityDeleteSparkType,
equalityDeleteSortOrder,
positionDeleteRowSchema,
positionDeleteSparkType);
positionDeleteSparkType,
writeProperties);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
Expand Down Expand Up @@ -74,6 +75,7 @@ public class SparkPositionDeletesRewrite implements Write {
private final String fileSetId;
private final int specId;
private final StructLike partition;
private final Map<String, String> writeProperties;

/**
* Constructs a {@link SparkPositionDeletesRewrite}.
Expand Down Expand Up @@ -106,6 +108,7 @@ public class SparkPositionDeletesRewrite implements Write {
this.fileSetId = writeConf.rewrittenFileSetId();
this.specId = specId;
this.partition = partition;
this.writeProperties = writeConf.writeProperties(format);
}

@Override
Expand All @@ -129,7 +132,8 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
writeSchema,
dsSchema,
specId,
partition);
partition,
writeProperties);
}

@Override
Expand Down Expand Up @@ -174,6 +178,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
private final StructType dsSchema;
private final int specId;
private final StructLike partition;
private final Map<String, String> writeProperties;

PositionDeletesWriterFactory(
Broadcast<Table> tableBroadcast,
Expand All @@ -183,7 +188,8 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
Schema writeSchema,
StructType dsSchema,
int specId,
StructLike partition) {
StructLike partition,
Map<String, String> writeProperties) {
this.tableBroadcast = tableBroadcast;
this.queryId = queryId;
this.format = format;
Expand All @@ -192,6 +198,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
this.dsSchema = dsSchema;
this.specId = specId;
this.partition = partition;
this.writeProperties = writeProperties;
}

@Override
Expand Down Expand Up @@ -219,6 +226,7 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
SparkFileWriterFactory.builderFor(table)
.deleteFileFormat(format)
.positionDeleteSparkType(deleteSparkTypeWithoutRow)
.writeProperties(writeProperties)
.build();

return new DeleteWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
private final Context context;

private boolean cleanupOnAbort = true;
private final Map<String, String> writeProperties;

SparkPositionDeltaWrite(
SparkSession spark,
Expand All @@ -126,6 +127,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
this.writeRequirements = writeConf.positionDeltaRequirements(command);
this.context = new Context(dataSchema, writeConf, info, writeRequirements);
this.writeProperties = writeConf.writeProperties(context.dataFileFormat);
}

@Override
Expand Down Expand Up @@ -155,7 +157,7 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
// broadcast the table metadata as the writer factory will be sent to executors
Broadcast<Table> tableBroadcast =
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
return new PositionDeltaWriteFactory(tableBroadcast, command, context);
return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties);
}

@Override
Expand Down Expand Up @@ -326,11 +328,17 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory {
private final Broadcast<Table> tableBroadcast;
private final Command command;
private final Context context;
private final Map<String, String> writeProperties;

PositionDeltaWriteFactory(Broadcast<Table> tableBroadcast, Command command, Context context) {
PositionDeltaWriteFactory(
Broadcast<Table> tableBroadcast,
Command command,
Context context,
Map<String, String> writeProperties) {
this.tableBroadcast = tableBroadcast;
this.command = command;
this.context = context;
this.writeProperties = writeProperties;
}

@Override
Expand All @@ -356,6 +364,7 @@ public DeltaWriter<InternalRow> createWriter(int partitionId, long taskId) {
.dataSparkType(context.dataSparkType())
.deleteFileFormat(context.deleteFileFormat())
.positionDeleteSparkType(context.deleteSparkType())
.writeProperties(writeProperties)
.build();

if (command == DELETE) {
Expand Down
Loading

0 comments on commit ab0a2fd

Please sign in to comment.