diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index df3e2051f771..327184546de6 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -179,7 +179,7 @@ public boolean fanoutWriterEnabled() { .booleanConf() .option(SparkWriteOptions.FANOUT_ENABLED) .tableProperty(TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED) - .defaultValue(TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT) + .defaultValue(true) .parse(); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 15881098e7a3..9aac9cf1f822 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -98,7 +98,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { private final Schema writeSchema; private final StructType dsSchema; private final Map extraSnapshotMetadata; - private final boolean partitionedFanoutEnabled; + private final boolean useFanoutWriters; private final SparkWriteRequirements writeRequirements; private final Map writeProperties; @@ -126,7 +126,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { this.writeSchema = writeSchema; this.dsSchema = dsSchema; this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata(); - this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled(); + this.useFanoutWriters = writeConf.fanoutWriterEnabled() && !writeRequirements.hasOrdering(); this.writeRequirements = writeRequirements; this.outputSpecId = writeConf.outputSpecId(); this.writeProperties = writeConf.writeProperties(); @@ -188,7 +188,7 @@ private WriterFactory createWriterFactory() { targetFileSize, writeSchema, dsSchema, - partitionedFanoutEnabled, + useFanoutWriters, writeProperties); } @@ -617,7 +617,7 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr private final long targetFileSize; private final Schema writeSchema; private final StructType dsSchema; - private final boolean partitionedFanoutEnabled; + private final boolean useFanoutWriters; private final String queryId; private final Map writeProperties; @@ -629,7 +629,7 @@ protected WriterFactory( long targetFileSize, Schema writeSchema, StructType dsSchema, - boolean partitionedFanoutEnabled, + boolean useFanoutWriters, Map writeProperties) { this.tableBroadcast = tableBroadcast; this.format = format; @@ -637,7 +637,7 @@ protected WriterFactory( this.targetFileSize = targetFileSize; this.writeSchema = writeSchema; this.dsSchema = dsSchema; - this.partitionedFanoutEnabled = partitionedFanoutEnabled; + this.useFanoutWriters = useFanoutWriters; this.queryId = queryId; this.writeProperties = writeProperties; } @@ -678,7 +678,7 @@ public DataWriter createWriter(int partitionId, long taskId, long e writeSchema, dsSchema, targetFileSize, - partitionedFanoutEnabled); + useFanoutWriters); } } } @@ -742,8 +742,8 @@ private PartitionedDataWriter( Schema dataSchema, StructType dataSparkType, long targetFileSize, - boolean fanoutEnabled) { - if (fanoutEnabled) { + boolean fanout) { + if (fanout) { this.delegate = new FanoutDataWriter<>(writerFactory, fileFactory, io, targetFileSize); } else { this.delegate = new ClusteredDataWriter<>(writerFactory, fileFactory, io, targetFileSize); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java index 79374edc3f16..7ed34d4016ba 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java @@ -241,6 +241,8 @@ public void testDefaultWritePartitionedUnsortedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; Distribution expectedDistribution = Distributions.clustered(expectedClustering); @@ -252,23 +254,8 @@ public void testDefaultWritePartitionedUnsortedTable() { }; checkWriteDistributionAndOrdering(table, expectedDistribution, expectedOrdering); - } - - @Test - public void testDefaultWritePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - Table table = validationCatalog.loadTable(tableIdent); - - table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true").commit(); - - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); + enableFanoutWriters(table); checkWriteDistributionAndOrdering(table, expectedDistribution, EMPTY_ORDERING); } @@ -285,6 +272,8 @@ public void testHashWritePartitionedUnsortedTable() { table.updateProperties().set(WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; Distribution expectedDistribution = Distributions.clustered(expectedClustering); @@ -296,27 +285,8 @@ public void testHashWritePartitionedUnsortedTable() { }; checkWriteDistributionAndOrdering(table, expectedDistribution, expectedOrdering); - } - @Test - public void testHashWritePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); - - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); + enableFanoutWriters(table); checkWriteDistributionAndOrdering(table, expectedDistribution, EMPTY_ORDERING); } @@ -333,6 +303,8 @@ public void testRangeWritePartitionedUnsortedTable() { table.updateProperties().set(WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), @@ -342,31 +314,8 @@ public void testRangeWritePartitionedUnsortedTable() { Distribution expectedDistribution = Distributions.ordered(expectedOrdering); checkWriteDistributionAndOrdering(table, expectedDistribution, expectedOrdering); - } - - @Test - public void testRangeWritePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); - - SortOrder[] expectedOrdering = - new SortOrder[] { - Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), - Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING) - }; - - Distribution expectedDistribution = Distributions.ordered(expectedOrdering); + enableFanoutWriters(table); checkWriteDistributionAndOrdering(table, expectedDistribution, EMPTY_ORDERING); } @@ -434,6 +383,8 @@ public void testRangeWritePartitionedSortedTable() { table.replaceSortOrder().asc("id").commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), @@ -443,29 +394,8 @@ public void testRangeWritePartitionedSortedTable() { Distribution expectedDistribution = Distributions.ordered(expectedOrdering); checkWriteDistributionAndOrdering(table, expectedDistribution, expectedOrdering); - } - - @Test - public void testRangeWritePartitionedSortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date)", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - - table.replaceSortOrder().asc("id").commit(); - - table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true").commit(); - - SortOrder[] expectedOrdering = - new SortOrder[] { - Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), - Expressions.sort(Expressions.column("id"), SortDirection.ASCENDING) - }; - Distribution expectedDistribution = Distributions.ordered(expectedOrdering); + enableFanoutWriters(table); checkWriteDistributionAndOrdering(table, expectedDistribution, expectedOrdering); } @@ -642,6 +572,8 @@ public void testDefaultCopyOnWriteDeletePartitionedUnsortedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; Distribution expectedDistribution = Distributions.clustered(expectedClustering); @@ -653,23 +585,8 @@ public void testDefaultCopyOnWriteDeletePartitionedUnsortedTable() { }; checkCopyOnWriteDistributionAndOrdering(table, DELETE, expectedDistribution, expectedOrdering); - } - @Test - public void testDefaultCopyOnWriteDeletePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - - table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true").commit(); - - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, DELETE, expectedDistribution, EMPTY_ORDERING); } @@ -686,6 +603,8 @@ public void testNoneCopyOnWriteDeletePartitionedUnsortedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), @@ -694,23 +613,8 @@ public void testNoneCopyOnWriteDeletePartitionedUnsortedTable() { checkCopyOnWriteDistributionAndOrdering( table, DELETE, UNSPECIFIED_DISTRIBUTION, expectedOrdering); - } - - @Test - public void testNoneCopyOnWriteDeletePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering( table, DELETE, UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING); @@ -728,6 +632,8 @@ public void testHashCopyOnWriteDeletePartitionedUnsortedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; Distribution expectedDistribution = Distributions.clustered(expectedClustering); @@ -739,27 +645,8 @@ public void testHashCopyOnWriteDeletePartitionedUnsortedTable() { }; checkCopyOnWriteDistributionAndOrdering(table, DELETE, expectedDistribution, expectedOrdering); - } - - @Test - public void testHashCopyOnWriteDeletePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - table - .updateProperties() - .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); - - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, DELETE, expectedDistribution, EMPTY_ORDERING); } @@ -776,6 +663,8 @@ public void testRangeCopyOnWriteDeletePartitionedUnsortedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), @@ -785,30 +674,8 @@ public void testRangeCopyOnWriteDeletePartitionedUnsortedTable() { Distribution expectedDistribution = Distributions.ordered(expectedOrdering); checkCopyOnWriteDistributionAndOrdering(table, DELETE, expectedDistribution, expectedOrdering); - } - - @Test - public void testRangeCopyOnWriteDeletePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); - - SortOrder[] expectedOrdering = - new SortOrder[] { - Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), - Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING) - }; - Distribution expectedDistribution = Distributions.ordered(expectedOrdering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, DELETE, expectedDistribution, EMPTY_ORDERING); } @@ -1086,6 +953,8 @@ public void testDefaultCopyOnWriteUpdatePartitionedUnsortedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; Distribution expectedDistribution = Distributions.clustered(expectedClustering); @@ -1097,23 +966,8 @@ public void testDefaultCopyOnWriteUpdatePartitionedUnsortedTable() { }; checkCopyOnWriteDistributionAndOrdering(table, UPDATE, expectedDistribution, expectedOrdering); - } - - @Test - public void testDefaultCopyOnWriteUpdatePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - - table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true").commit(); - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, UPDATE, expectedDistribution, EMPTY_ORDERING); } @@ -1130,6 +984,8 @@ public void testNoneCopyOnWriteUpdatePartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), @@ -1138,23 +994,8 @@ public void testNoneCopyOnWriteUpdatePartitionedUnsortedTable() { checkCopyOnWriteDistributionAndOrdering( table, UPDATE, UNSPECIFIED_DISTRIBUTION, expectedOrdering); - } - @Test - public void testNoneCopyOnWriteUpdatePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering( table, UPDATE, UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING); @@ -1172,6 +1013,8 @@ public void testHashCopyOnWriteUpdatePartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; Distribution expectedDistribution = Distributions.clustered(expectedClustering); @@ -1183,27 +1026,8 @@ public void testHashCopyOnWriteUpdatePartitionedUnsortedTable() { }; checkCopyOnWriteDistributionAndOrdering(table, UPDATE, expectedDistribution, expectedOrdering); - } - - @Test - public void testHashCopyOnWriteUpdatePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, UPDATE, expectedDistribution, EMPTY_ORDERING); } @@ -1220,6 +1044,8 @@ public void testRangeCopyOnWriteUpdatePartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), @@ -1229,30 +1055,8 @@ public void testRangeCopyOnWriteUpdatePartitionedUnsortedTable() { Distribution expectedDistribution = Distributions.ordered(expectedOrdering); checkCopyOnWriteDistributionAndOrdering(table, UPDATE, expectedDistribution, expectedOrdering); - } - - @Test - public void testRangeCopyOnWriteUpdatePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); - - SortOrder[] expectedOrdering = - new SortOrder[] { - Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), - Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING) - }; - Distribution expectedDistribution = Distributions.ordered(expectedOrdering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, UPDATE, expectedDistribution, EMPTY_ORDERING); } @@ -1526,6 +1330,8 @@ public void testDefaultCopyOnWriteMergePartitionedUnsortedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; Distribution expectedDistribution = Distributions.clustered(expectedClustering); @@ -1537,23 +1343,8 @@ public void testDefaultCopyOnWriteMergePartitionedUnsortedTable() { }; checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering); - } - - @Test - public void testDefaultCopyOnWriteMergePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - Table table = validationCatalog.loadTable(tableIdent); - - table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true").commit(); - - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, EMPTY_ORDERING); } @@ -1570,6 +1361,8 @@ public void testNoneCopyOnWriteMergePartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), @@ -1578,23 +1371,8 @@ public void testNoneCopyOnWriteMergePartitionedUnsortedTable() { checkCopyOnWriteDistributionAndOrdering( table, MERGE, UNSPECIFIED_DISTRIBUTION, expectedOrdering); - } - - @Test - public void testNoneCopyOnWriteMergePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - table - .updateProperties() - .set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, MERGE, UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING); } @@ -1611,6 +1389,8 @@ public void testHashCopyOnWriteMergePartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; Distribution expectedDistribution = Distributions.clustered(expectedClustering); @@ -1622,27 +1402,8 @@ public void testHashCopyOnWriteMergePartitionedUnsortedTable() { }; checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering); - } - @Test - public void testHashCopyOnWriteMergePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); - - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, EMPTY_ORDERING); } @@ -1659,6 +1420,8 @@ public void testRangeCopyOnWriteMergePartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), @@ -1668,30 +1431,8 @@ public void testRangeCopyOnWriteMergePartitionedUnsortedTable() { Distribution expectedDistribution = Distributions.ordered(expectedOrdering); checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering); - } - - @Test - public void testRangeCopyOnWriteMergePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); - SortOrder[] expectedOrdering = - new SortOrder[] { - Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), - Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING) - }; - Distribution expectedDistribution = Distributions.ordered(expectedOrdering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, EMPTY_ORDERING); } @@ -1838,6 +1579,8 @@ public void testDefaultPositionDeltaDeleteUnpartitionedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, DELETE, @@ -1858,6 +1601,8 @@ public void testNonePositionDeltaDeleteUnpartitionedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, DELETE, UNSPECIFIED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING); @@ -1875,6 +1620,8 @@ public void testHashPositionDeltaDeleteUnpartitionedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, DELETE, @@ -1895,6 +1642,8 @@ public void testRangePositionDeltaDeleteUnpartitionedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + Distribution expectedDistribution = Distributions.ordered(SPEC_ID_PARTITION_FILE_ORDERING); checkPositionDeltaDistributionAndOrdering( @@ -1915,6 +1664,8 @@ public void testDefaultPositionDeltaDeletePartitionedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, DELETE, @@ -1939,6 +1690,8 @@ public void testNonePositionDeltaDeletePartitionedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, DELETE, UNSPECIFIED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING); @@ -1960,6 +1713,8 @@ public void testHashPositionDeltaDeletePartitionedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, DELETE, @@ -1984,6 +1739,8 @@ public void testRangePositionDeltaDeletePartitionedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + Distribution expectedDistribution = Distributions.ordered(SPEC_ID_PARTITION_ORDERING); checkPositionDeltaDistributionAndOrdering( @@ -2063,6 +1820,8 @@ public void testDefaultPositionDeltaUpdateUnpartitionedUnsortedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, UPDATE, @@ -2083,6 +1842,8 @@ public void testNonePositionDeltaUpdateUnpartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, UPDATE, UNSPECIFIED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING); @@ -2100,6 +1861,8 @@ public void testHashPositionDeltaUpdateUnpartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, UPDATE, @@ -2120,12 +1883,14 @@ public void testRangePositionDeltaUpdateUnpartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + Distribution expectedDistribution = Distributions.ordered(SPEC_ID_PARTITION_FILE_ORDERING); checkPositionDeltaDistributionAndOrdering( table, UPDATE, expectedDistribution, SPEC_ID_PARTITION_FILE_POSITION_ORDERING); - table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true").commit(); + enableFanoutWriters(table); checkPositionDeltaDistributionAndOrdering(table, UPDATE, expectedDistribution, EMPTY_ORDERING); } @@ -2263,6 +2028,8 @@ public void testDefaultPositionDeltaUpdatePartitionedUnsortedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] { Expressions.column(MetadataColumns.SPEC_ID.name()), @@ -2306,6 +2073,8 @@ public void testNonePositionDeltaUpdatePartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort( @@ -2341,6 +2110,8 @@ public void testHashPositionDeltaUpdatePartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] { Expressions.column(MetadataColumns.SPEC_ID.name()), @@ -2384,6 +2155,8 @@ public void testRangePositionDeltaUpdatePartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedDistributionOrdering = new SortOrder[] { Expressions.sort( @@ -2647,6 +2420,8 @@ public void testDefaultPositionDeltaMergeUnpartitionedUnsortedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] { Expressions.column(MetadataColumns.SPEC_ID.name()), @@ -2671,6 +2446,8 @@ public void testNonePositionDeltaMergeUnpartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, MERGE, UNSPECIFIED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING); @@ -2688,6 +2465,8 @@ public void testHashPositionDeltaMergeUnpartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] { Expressions.column(MetadataColumns.SPEC_ID.name()), @@ -2712,6 +2491,8 @@ public void testRangePositionDeltaMergeUnpartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedDistributionOrdering = new SortOrder[] { Expressions.sort( @@ -2877,6 +2658,8 @@ public void testDefaultPositionDeltaMergePartitionedUnsortedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] { Expressions.column(MetadataColumns.SPEC_ID.name()), @@ -2919,6 +2702,8 @@ public void testNonePositionDeltaMergePartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort( @@ -2954,6 +2739,8 @@ public void testHashPositionDeltaMergePartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] { Expressions.column(MetadataColumns.SPEC_ID.name()), @@ -2996,6 +2783,8 @@ public void testRangePositionDeltaMergePartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedDistributionOrdering = new SortOrder[] { Expressions.sort( @@ -3227,6 +3016,10 @@ private void checkPositionDeltaDistributionAndOrdering( Assert.assertArrayEquals("Ordering must match", expectedOrdering, ordering); } + private void disableFanoutWriters(Table table) { + table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "false").commit(); + } + private void enableFanoutWriters(Table table) { table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true").commit(); }