Skip to content

Commit

Permalink
Write DVs in Spark for V3 tables
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar committed Nov 29, 2024
1 parent 5530605 commit 201dc59
Show file tree
Hide file tree
Showing 12 changed files with 401 additions and 51 deletions.
63 changes: 63 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/PartitioningDVWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import java.io.IOException;
import java.util.function.Function;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.BaseDVFileWriter;
import org.apache.iceberg.deletes.DVFileWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* PartitioningDVWriter is a PartitioningWriter implementation that writes DVs for a file position
*/
public class PartitioningDVWriter<T>
implements PartitioningWriter<PositionDelete<T>, DeleteWriteResult> {
private final DVFileWriter fileWriter;
private DeleteWriteResult result;

public PartitioningDVWriter(
OutputFileFactory fileFactory,
Function<CharSequence, PositionDeleteIndex> loadPreviousDeletes) {
this.fileWriter = new BaseDVFileWriter(fileFactory, loadPreviousDeletes::apply);
}

@Override
public void write(PositionDelete<T> row, PartitionSpec spec, StructLike partition) {
fileWriter.delete(row.path().toString(), row.pos(), spec, partition);
}

@Override
public DeleteWriteResult result() {
Preconditions.checkState(result != null, "Cannot get result from unclosed writer");
return result;
}

@Override
public void close() throws IOException {
if (result == null) {
fileWriter.close();
this.result = fileWriter.result();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
import static org.apache.iceberg.PlanningMode.LOCAL;
import static org.apache.iceberg.SnapshotSummary.ADDED_DELETE_FILES_PROP;
import static org.apache.iceberg.SnapshotSummary.ADDED_DVS_PROP;
import static org.apache.iceberg.SnapshotSummary.ADDED_FILES_PROP;
import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_COUNT_PROP;
import static org.apache.iceberg.SnapshotSummary.DELETED_FILES_PROP;
import static org.apache.iceberg.TableProperties.DATA_PLANNING_MODE;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DELETE_PLANNING_MODE;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.ORC_VECTORIZATION_ENABLED;
import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED;
import static org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
Expand Down Expand Up @@ -59,8 +62,10 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
Expand Down Expand Up @@ -98,11 +103,14 @@ public abstract class SparkRowLevelOperationsTestBase extends ExtensionsTestBase
@Parameter(index = 8)
protected PlanningMode planningMode;

@Parameter(index = 9)
protected int formatVersion;

@Parameters(
name =
"catalogName = {0}, implementation = {1}, config = {2},"
+ " format = {3}, vectorized = {4}, distributionMode = {5},"
+ " fanout = {6}, branch = {7}, planningMode = {8}")
+ " fanout = {6}, branch = {7}, planningMode = {8}, formatVersion = {9}")
public static Object[][] parameters() {
return new Object[][] {
{
Expand All @@ -116,7 +124,8 @@ public static Object[][] parameters() {
WRITE_DISTRIBUTION_MODE_NONE,
true,
SnapshotRef.MAIN_BRANCH,
LOCAL
LOCAL,
2
},
{
"testhive",
Expand All @@ -129,7 +138,8 @@ public static Object[][] parameters() {
WRITE_DISTRIBUTION_MODE_NONE,
false,
"test",
DISTRIBUTED
DISTRIBUTED,
2
},
{
"testhadoop",
Expand All @@ -140,7 +150,8 @@ public static Object[][] parameters() {
WRITE_DISTRIBUTION_MODE_HASH,
true,
null,
LOCAL
LOCAL,
2
},
{
"spark_catalog",
Expand All @@ -158,16 +169,52 @@ public static Object[][] parameters() {
WRITE_DISTRIBUTION_MODE_RANGE,
false,
"test",
DISTRIBUTED
}
DISTRIBUTED,
2
},
{
"testhadoop",
SparkCatalog.class.getName(),
ImmutableMap.of("type", "hadoop"),
FileFormat.PARQUET,
RANDOM.nextBoolean(),
WRITE_DISTRIBUTION_MODE_HASH,
true,
null,
LOCAL,
3
},
{
"spark_catalog",
SparkSessionCatalog.class.getName(),
ImmutableMap.of(
"type",
"hive",
"default-namespace",
"default",
"clients",
"1",
"parquet-enabled",
"false",
"cache-enabled",
"false" // Spark will delete tables using v1, leaving the cache out of sync
),
FileFormat.AVRO,
false,
WRITE_DISTRIBUTION_MODE_RANGE,
false,
"test",
DISTRIBUTED,
3
},
};
}

protected abstract Map<String, String> extraTableProperties();

protected void initTable() {
sql(
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s', '%s' '%s', '%s' '%s', '%s' '%s', '%s' '%s')",
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s', '%s' '%s', '%s' '%s', '%s' '%s', '%s' '%s', '%s' '%s')",
tableName,
DEFAULT_FILE_FORMAT,
fileFormat,
Expand All @@ -178,7 +225,9 @@ protected void initTable() {
DATA_PLANNING_MODE,
planningMode.modeName(),
DELETE_PLANNING_MODE,
planningMode.modeName());
planningMode.modeName(),
FORMAT_VERSION,
formatVersion);

switch (fileFormat) {
case PARQUET:
Expand Down Expand Up @@ -303,6 +352,10 @@ protected void validateSnapshot(
validateProperty(snapshot, DELETED_FILES_PROP, deletedDataFiles);
validateProperty(snapshot, ADDED_DELETE_FILES_PROP, addedDeleteFiles);
validateProperty(snapshot, ADDED_FILES_PROP, addedDataFiles);
if (formatVersion >= 3) {
validateProperty(snapshot, ADDED_DVS_PROP, addedDeleteFiles);
assertThat(snapshot.summary()).doesNotContainKey(ADD_POS_DELETE_FILES_PROP);
}
}

protected void validateProperty(Snapshot snapshot, String property, Set<String> expectedValues) {
Expand Down Expand Up @@ -397,4 +450,12 @@ protected void assertAllBatchScansVectorized(SparkPlan plan) {
List<SparkPlan> batchScans = SparkPlanUtil.collectBatchScans(plan);
assertThat(batchScans).hasSizeGreaterThan(0).allMatch(SparkPlan::supportsColumnar);
}

protected void createTableWithDeleteGranularity(
String schema, String partitionedBy, DeleteGranularity deleteGranularity) {
createAndInitTable(schema, partitionedBy, null /* empty */);
sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.apache.iceberg.DataOperations.DELETE;
import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
import static org.apache.iceberg.SnapshotSummary.ADDED_DVS_PROP;
import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
Expand Down Expand Up @@ -187,6 +189,9 @@ public void testCoalesceDelete() throws Exception {
// AQE detects that all shuffle blocks are small and processes them in 1 task
// otherwise, there would be 200 tasks writing to the table
validateProperty(snapshot, SnapshotSummary.ADDED_FILES_PROP, "1");
} else if (mode(table) == MERGE_ON_READ && formatVersion >= 3) {
validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4");
validateProperty(snapshot, SnapshotSummary.ADDED_DVS_PROP, "4");
} else {
// MoR DELETE requests the deleted records to be range distributed by partition and `_file`
// each task contains only 1 file and therefore writes only 1 shuffle block
Expand Down Expand Up @@ -521,7 +526,8 @@ public void deleteSingleRecordProducesDeleteOperation() throws NoSuchTableExcept
} else {
// this is a RowDelta that produces a "delete" instead of "overwrite"
validateMergeOnRead(currentSnapshot, "1", "1", null);
validateProperty(currentSnapshot, ADD_POS_DELETE_FILES_PROP, "1");
String property = formatVersion >= 3 ? ADDED_DVS_PROP : ADD_POS_DELETE_FILES_PROP;
validateProperty(currentSnapshot, property, "1");
}

assertThat(sql("SELECT * FROM %s", tableName))
Expand Down Expand Up @@ -1292,6 +1298,8 @@ public void testDeleteWithMultipleSpecs() {
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
if (mode(table) == COPY_ON_WRITE) {
validateCopyOnWrite(currentSnapshot, "3", "4", "1");
} else if (mode(table) == MERGE_ON_READ && formatVersion >= 3) {
validateMergeOnRead(currentSnapshot, "3", "4", null);
} else {
validateMergeOnRead(currentSnapshot, "3", "3", null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ public void testMergeWithVectorizedReads() {

@TestTemplate
public void testCoalesceMerge() {
assumeThat(formatVersion).isLessThan(3);
createAndInitTable("id INT, salary INT, dep STRING");

String[] records = new String[100];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.RowLevelOperationMode;
Expand All @@ -39,9 +43,11 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.spark.source.TestSparkCatalog;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
Expand All @@ -56,10 +62,7 @@ public class TestMergeOnReadDelete extends TestDelete {
@Override
protected Map<String, String> extraTableProperties() {
return ImmutableMap.of(
TableProperties.FORMAT_VERSION,
"2",
TableProperties.DELETE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
TableProperties.DELETE_MODE, RowLevelOperationMode.MERGE_ON_READ.modeName());
}

@BeforeEach
Expand Down Expand Up @@ -93,11 +96,13 @@ public void testDeleteWithExecutorCacheLocality() throws NoSuchTableException {

@TestTemplate
public void testDeleteFileGranularity() throws NoSuchTableException {
assumeThat(formatVersion).isEqualTo(2);
checkDeleteFileGranularity(DeleteGranularity.FILE);
}

@TestTemplate
public void testDeletePartitionGranularity() throws NoSuchTableException {
assumeThat(formatVersion).isEqualTo(2);
checkDeleteFileGranularity(DeleteGranularity.PARTITION);
}

Expand Down Expand Up @@ -182,13 +187,57 @@ public void testUnpartitionedPositionDeletesAreMaintainedDuringDelete()
sql("SELECT * FROM %s ORDER BY id ASC", selectTarget()));
}

private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity)
throws NoSuchTableException {
createAndInitPartitionedTable();
@TestTemplate
public void testDeleteWithDVAndHistoricalPositionDeletes() {
assumeThat(formatVersion).isEqualTo(2);
createTableWithDeleteGranularity(
"id INT, dep STRING", "PARTITIONED BY (dep)", DeleteGranularity.PARTITION);
createBranchIfNeeded();
append(
commitTarget(),
"{ \"id\": 1, \"dep\": \"hr\" }\n"
+ "{ \"id\": 2, \"dep\": \"hr\" }\n"
+ "{ \"id\": 3, \"dep\": \"hr\" }");
append(
commitTarget(),
"{ \"id\": 4, \"dep\": \"hr\" }\n"
+ "{ \"id\": 5, \"dep\": \"hr\" }\n"
+ "{ \"id\": 6, \"dep\": \"hr\" }");

// Produce partition scoped deletes for the two modified files
sql("DELETE FROM %s WHERE id = 1 or id = 4", commitTarget());

// Produce 1 file-scoped deletes for the second update
Map<String, String> fileGranularityProps =
ImmutableMap.of(TableProperties.DELETE_GRANULARITY, DeleteGranularity.FILE.toString());
sql(
"ALTER TABLE %s SET TBLPROPERTIES (%s)",
tableName, tablePropsAsString(fileGranularityProps));
sql("DELETE FROM %s WHERE id = 5", commitTarget());

// Produce a DV which will contain 3 positions from the second data file
// 2 existing deleted positions from the earlier file-scoped and partition-scoped deletes
// and 1 new deleted position
Map<String, String> updateFormatProperties =
ImmutableMap.of(TableProperties.FORMAT_VERSION, "3");
sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);
"ALTER TABLE %s SET TBLPROPERTIES (%s)",
tableName, tablePropsAsString(updateFormatProperties));
sql("DELETE FROM %s where id = 6", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Set<DeleteFile> deleteFiles =
TestHelpers.deleteFiles(table, SnapshotUtil.latestSnapshot(table, branch));
List<DeleteFile> dvs =
deleteFiles.stream().filter(ContentFileUtil::isDV).collect(Collectors.toList());
assertThat(dvs).hasSize(1);
assertThat(dvs).allMatch(dv -> dv.recordCount() == 3);
}

private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity)
throws NoSuchTableException {
createTableWithDeleteGranularity(
"id INT, dep STRING", "(id INT, dep STRING)", deleteGranularity);

append(tableName, new Employee(1, "hr"), new Employee(2, "hr"));
append(tableName, new Employee(3, "hr"), new Employee(4, "hr"));
Expand Down
Loading

0 comments on commit 201dc59

Please sign in to comment.