Skip to content

Commit

Permalink
Spark 3.5: Make where clause case sensitive in rewrite data files (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
ludlows authored Dec 2, 2024
1 parent bfeaaeb commit d8326d8
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,29 @@ public void removeTable() {
sql("DROP TABLE IF EXISTS %s", tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
}

@TestTemplate
public void testFilterCaseSensitivity() {
createTable();
insertData(10);
sql("set %s = false", SQLConf.CASE_SENSITIVE().key());
List<Object[]> expectedRecords = currentData();
List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files(table=>'%s', where=>'C1 > 0')",
catalogName, tableIdent);
assertEquals(
"Action should rewrite 10 data files and add 1 data files",
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@TestTemplate
public void testZOrderSortExpression() {
List<ExtendedParser.RawOrderField> order =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.iceberg.relocated.com.google.common.math.IntMath;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.StructLikeMap;
Expand Down Expand Up @@ -102,11 +103,13 @@ public class RewriteDataFilesSparkAction
private boolean useStartingSequenceNumber;
private RewriteJobOrder rewriteJobOrder;
private FileRewriter<FileScanTask, DataFile> rewriter = null;
private boolean caseSensitive;

RewriteDataFilesSparkAction(SparkSession spark, Table table) {
super(spark.cloneSession());
// Disable Adaptive Query Execution as this may change the output partitioning of our write
spark().conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
this.caseSensitive = SparkUtil.caseSensitive(spark);
this.table = table;
}

Expand Down Expand Up @@ -198,6 +201,7 @@ StructLikeMap<List<List<FileScanTask>>> planFileGroups(long startingSnapshotId)
table
.newScan()
.useSnapshot(startingSnapshotId)
.caseSensitive(caseSensitive)
.filter(filter)
.ignoreResiduals()
.planFiles();
Expand Down

0 comments on commit d8326d8

Please sign in to comment.