Skip to content

Commit

Permalink
Spark 3.5: Use fanout writers for unsorted tables by default
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi committed Sep 23, 2023
1 parent f2ce4ef commit 53704f3
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 318 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public boolean fanoutWriterEnabled() {
.booleanConf()
.option(SparkWriteOptions.FANOUT_ENABLED)
.tableProperty(TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED)
.defaultValue(TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT)
.defaultValue(true)
.parse();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
private final Schema writeSchema;
private final StructType dsSchema;
private final Map<String, String> extraSnapshotMetadata;
private final boolean partitionedFanoutEnabled;
private final boolean useFanoutWriters;
private final SparkWriteRequirements writeRequirements;
private final Map<String, String> writeProperties;

Expand Down Expand Up @@ -126,7 +126,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
this.useFanoutWriters = writeConf.fanoutWriterEnabled() && !writeRequirements.hasOrdering();
this.writeRequirements = writeRequirements;
this.outputSpecId = writeConf.outputSpecId();
this.writeProperties = writeConf.writeProperties();
Expand Down Expand Up @@ -188,7 +188,7 @@ private WriterFactory createWriterFactory() {
targetFileSize,
writeSchema,
dsSchema,
partitionedFanoutEnabled,
useFanoutWriters,
writeProperties);
}

Expand Down Expand Up @@ -617,7 +617,7 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr
private final long targetFileSize;
private final Schema writeSchema;
private final StructType dsSchema;
private final boolean partitionedFanoutEnabled;
private final boolean useFanoutWriters;
private final String queryId;
private final Map<String, String> writeProperties;

Expand All @@ -629,15 +629,15 @@ protected WriterFactory(
long targetFileSize,
Schema writeSchema,
StructType dsSchema,
boolean partitionedFanoutEnabled,
boolean useFanoutWriters,
Map<String, String> writeProperties) {
this.tableBroadcast = tableBroadcast;
this.format = format;
this.outputSpecId = outputSpecId;
this.targetFileSize = targetFileSize;
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
this.partitionedFanoutEnabled = partitionedFanoutEnabled;
this.useFanoutWriters = useFanoutWriters;
this.queryId = queryId;
this.writeProperties = writeProperties;
}
Expand Down Expand Up @@ -678,7 +678,7 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long e
writeSchema,
dsSchema,
targetFileSize,
partitionedFanoutEnabled);
useFanoutWriters);
}
}
}
Expand Down Expand Up @@ -742,8 +742,8 @@ private PartitionedDataWriter(
Schema dataSchema,
StructType dataSparkType,
long targetFileSize,
boolean fanoutEnabled) {
if (fanoutEnabled) {
boolean fanout) {
if (fanout) {
this.delegate = new FanoutDataWriter<>(writerFactory, fileFactory, io, targetFileSize);
} else {
this.delegate = new ClusteredDataWriter<>(writerFactory, fileFactory, io, targetFileSize);
Expand Down
Loading

0 comments on commit 53704f3

Please sign in to comment.