Skip to content

Commit

Permalink
[Iceberg] Refine the partition specs that really need to be checked
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed May 24, 2024
1 parent 887bc5b commit e17d22d
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
Expand Down Expand Up @@ -125,6 +124,7 @@
import static com.facebook.presto.iceberg.IcebergUtil.getDeleteMode;
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeyColumnHandles;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionSpecsIncludingValidData;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitions;
import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdAsOfTime;
import static com.facebook.presto.iceberg.IcebergUtil.getTableComment;
Expand Down Expand Up @@ -874,11 +874,7 @@ public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHa

// Get partition specs that really need to be checked
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
Set<Integer> partitionSpecIds = handle.getIcebergTableName().getSnapshotId().map(
snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream()
.map(ManifestFile::partitionSpecId)
.collect(toImmutableSet()))
.orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().keySet())); // No snapshot, so no data. This case doesn't matter.
Set<Integer> partitionSpecIds = getPartitionSpecsIncludingValidData(icebergTable, handle.getIcebergTableName().getSnapshotId());

Set<Integer> enforcedColumnIds = getEnforcedColumns(icebergTable, partitionSpecIds, domainPredicate, session)
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,15 @@ public static Map<PartitionField, Integer> getIdentityPartitions(PartitionSpec p
return columns.build();
}

public static Set<Integer> getPartitionSpecsIncludingValidData(Table icebergTable, Optional<Long> snapshotId)
{
return snapshotId.map(snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream()
.filter(manifestFile -> manifestFile.hasAddedFiles() || manifestFile.hasExistingFiles())
.map(ManifestFile::partitionSpecId)
.collect(toImmutableSet()))
.orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().keySet())); // No snapshot, so no data. This case doesn't matter.
}

public static List<Column> toHiveColumns(List<NestedField> columns)
{
return columns.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
Expand All @@ -71,11 +70,11 @@
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
import static com.facebook.presto.iceberg.IcebergUtil.getAdjacentValue;
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionSpecsIncludingValidData;
import static com.facebook.presto.spi.ConnectorPlanRewriter.rewriteWith;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

Expand Down Expand Up @@ -177,12 +176,7 @@ public PlanNode visitFilter(FilterNode filter, RewriteContext<Void> context)
RowExpression subfieldPredicate = rowExpressionService.getDomainTranslator().toPredicate(subfieldTupleDomain);

// Get partition specs that really need to be checked
Set<Integer> partitionSpecIds = tableHandle.getIcebergTableName().getSnapshotId().map(
snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream()
.map(ManifestFile::partitionSpecId)
.collect(toImmutableSet()))
.orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().keySet())); // No snapshot, so no data. This case doesn't matter.

Set<Integer> partitionSpecIds = getPartitionSpecsIncludingValidData(icebergTable, tableHandle.getIcebergTableName().getSnapshotId());
Set<IcebergColumnHandle> enforcedColumns = getEnforcedColumns(icebergTable,
partitionSpecIds,
entireColumnDomain,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1588,6 +1588,58 @@ public void testDeleteOnPartitionedV1Table()
dropTable(session, tableName);
}

@Test(dataProvider = "version_and_mode")
public void testMetadataDeleteOnTableWithUnsupportedSpecsIncludingNoData(String version, String mode)
{
String tableName = "test_empty_partition_spec_table";
try {
// Create a table with no partition
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "')");

// Do not insert data, and evaluate the partition spec by adding a partition column `c`
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");

// Insert data under the new partition spec
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)", 4);

// We can do metadata delete on partition column `c`, because the initial partition spec contains no data
assertUpdate("DELETE FROM " + tableName + " WHERE c in (1, 3)", 2);
assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', 2), (4, '1004', 4)");
}
finally {
dropTable(getSession(), tableName);
}
}

@Test(dataProvider = "version_and_mode")
public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(String version, String mode)
{
String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
String tableName = "test_data_deleted_partition_spec_table";
try {
// Create a table with partition column `a`, and insert some data under this partition spec
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "', partitioning = ARRAY['a'])");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);

// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);

// Do not support metadata delete with filter on column `c`, because we have data with old partition spec
assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);

// Do metadata delete on column `a`, because all partition specs contains partition column `a`
assertUpdate("DELETE FROM " + tableName + " WHERE a in (1, 2)", 2);

// Then we can do metadata delete on column `c`, because the old partition spec contains no data now
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
assertQuery("SELECT * FROM " + tableName, "VALUES (3, '1003', 3)");
}
finally {
dropTable(getSession(), tableName);
}
}

@DataProvider(name = "version_and_mode")
public Object[][] versionAndMode()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,82 @@ public void testMetadataDeleteOnPartitionedTableWithDeleteFiles()
}
}

@Test
public void testMetadataDeleteOnV2MorTableWithEmptyUnsupportedSpecs()
{
String tableName = "test_empty_partition_spec_table";
try {
// Create a table with no partition
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '2', delete_mode = 'merge-on-read')");

// Do not insert data, and evaluate the partition spec by adding a partition column `c`
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");

// Insert data under the new partition spec
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)", 4);

Table icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 4);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);

// Do metadata delete on partition column `c`, because the initial partition spec contains no data
assertUpdate("DELETE FROM " + tableName + " WHERE c in (1, 3)", 2);
assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', 2), (4, '1004', 4)");

icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 2);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tableName);
}
}

@Test
public void testMetadataDeleteOnV2MorTableWithUnsupportedSpecsWhoseDataAllDeleted()
{
String tableName = "test_data_deleted_partition_spec_table";
try {
// Create a table with partition column `a`, and insert some data under this partition spec
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '2', delete_mode = 'merge-on-read', partitioning = ARRAY['a'])");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);

Table icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 2);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);

// Evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);

icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 5);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);

// Execute row level delete with filter on column `c`, because we have data with old partition spec
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 5);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 2);

// Do metadata delete on column `a`, because all partition specs contains partition column `a`
assertUpdate("DELETE FROM " + tableName + " WHERE a in (1, 2)", 2);
icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 3);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 2);

// Then do metadata delete on column `c`, because the old partition spec contains no data now
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 0);
assertQuery("SELECT * FROM " + tableName, "VALUES (3, '1003', 3)");
icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 1);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tableName);
}
}

private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List<FileContent> expectedFileContent)
{
// check delete file list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.predicate.ValueSet;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.cost.StatsProvider;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.FilterNode;
Expand Down Expand Up @@ -88,6 +90,7 @@
import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
import static com.facebook.presto.iceberg.IcebergSessionProperties.PARQUET_DEREFERENCE_PUSHDOWN_ENABLED;
import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled;
import static com.facebook.presto.parquet.ParquetTypeUtils.pushdownColumnNameForSubfield;
import static com.facebook.presto.spi.relation.SpecialFormExpression.Form.AND;
import static com.facebook.presto.spi.relation.SpecialFormExpression.Form.OR;
Expand Down Expand Up @@ -635,6 +638,96 @@ public void testFiltersWithPushdownDisable()
assertUpdate("DROP TABLE test_filters_with_pushdown_disable");
}

@Test
public void testThoroughlyPushdownForTableWithUnsupportedSpecsIncludingNoData()
{
// The filter pushdown session property is disabled by default
Session sessionWithoutFilterPushdown = getQueryRunner().getDefaultSession();
assertEquals(isPushdownFilterEnabled(sessionWithoutFilterPushdown.toConnectorSession(new ConnectorId(ICEBERG_CATALOG))), false);

String tableName = "test_empty_partition_spec_table";
try {
// Create a table with no partition
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '1')");

// Do not insert data, and evaluate the partition spec by adding a partition column `c`
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");

// Insert data under the new partition spec
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)", 4);

// Only identity partition column predicates, would be enforced totally by tableScan
assertPlan(sessionWithoutFilterPushdown, "SELECT a, b FROM " + tableName + " WHERE c > 2",
output(exchange(
strictTableScan(tableName, identityMap("a", "b")))),
plan -> assertTableLayout(
plan,
tableName,
withColumnDomains(ImmutableMap.of(new Subfield(
"c",
ImmutableList.of()),
Domain.create(ValueSet.ofRanges(greaterThan(INTEGER, 2L)), false))),
TRUE_CONSTANT,
ImmutableSet.of("c")));
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)");
}
finally {
assertUpdate("DROP TABLE " + tableName);
}
}

@Test
public void testThoroughlyPushdownForTableWithUnsupportedSpecsWhoseDataAllDeleted()
{
// The filter pushdown session property is disabled by default
Session sessionWithoutFilterPushdown = getQueryRunner().getDefaultSession();
assertEquals(isPushdownFilterEnabled(sessionWithoutFilterPushdown.toConnectorSession(new ConnectorId(ICEBERG_CATALOG))), false);

String tableName = "test_data_deleted_partition_spec_table";
try {
// Create a table with partition column `a`, and insert some data under this partition spec
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '1', partitioning = ARRAY['a'])");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);

// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);

// The predicate was enforced partially by tableScan, filter on `c` could not be thoroughly pushed down, so the filterNode drop it's filter condition `a > 2`
assertPlan(sessionWithoutFilterPushdown, "SELECT b FROM " + tableName + " WHERE a > 2 and c = 4",
output(exchange(project(
filter("c = 4",
strictTableScan(tableName, identityMap("b", "c")))))));
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)");

// Do metadata delete on column `a`, because all partition specs contains partition column `a`
assertUpdate("DELETE FROM " + tableName + " WHERE a IN (1, 2)", 2);

// Only identity partition column predicates, would be enforced totally by tableScan
assertPlan(sessionWithoutFilterPushdown, "SELECT b FROM " + tableName + " WHERE a > 2 and c = 4",
output(exchange(
strictTableScan(tableName, identityMap("b")))),
plan -> assertTableLayout(
plan,
tableName,
withColumnDomains(ImmutableMap.of(
new Subfield(
"a",
ImmutableList.of()),
Domain.create(ValueSet.ofRanges(greaterThan(INTEGER, 2L)), false),
new Subfield(
"c",
ImmutableList.of()),
singleValue(INTEGER, 4L))),
TRUE_CONSTANT,
ImmutableSet.of("a", "c")));
assertQuery("SELECT * FROM " + tableName, "VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)");
}
finally {
assertUpdate("DROP TABLE " + tableName);
}
}

@DataProvider(name = "timezones")
public Object[][] timezones()
{
Expand Down
Loading

0 comments on commit e17d22d

Please sign in to comment.