Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing changeDataFeed writer feature when table version supports reader/writer features in Delta Lake #23644

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@
import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.verifyDeltaLakeTable;
import static io.trino.plugin.deltalake.procedure.DeltaLakeTableProcedureId.OPTIMIZE;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.APPEND_ONLY_CONFIGURATION_KEY;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.CHANGE_DATA_FEED_FEATURE_NAME;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.COLUMN_MAPPING_PHYSICAL_NAME_CONFIGURATION_KEY;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode.ID;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode.NAME;
Expand Down Expand Up @@ -381,6 +382,9 @@ public class DeltaLakeMetadata
// The highest reader and writer versions Trino supports
private static final int MAX_READER_VERSION = 3;
public static final int MAX_WRITER_VERSION = 7;
// The lowest versions Delta Lake supports reader and writer features
private static final int MIN_READER_FEATURE_VERSION = 3;
private static final int MIN_WRITER_FEATURE_VERSION = 7;
private static final int CDF_SUPPORTED_WRITER_VERSION = 4;
private static final int COLUMN_MAPPING_MODE_SUPPORTED_READER_VERSION = 2;
private static final int COLUMN_MAPPING_MODE_SUPPORTED_WRITER_VERSION = 5;
Expand Down Expand Up @@ -2905,6 +2909,7 @@ private ProtocolEntry protocolEntry(int readerVersion, int writerVersion, boolea
if (changeDataFeedEnabled.isPresent() && changeDataFeedEnabled.get()) {
// Enabling cdf (change data feed) requires setting the writer version to 4
writerVersion = CDF_SUPPORTED_WRITER_VERSION;
writerFeatures.add(CHANGE_DATA_FEED_FEATURE_NAME);
}
ColumnMappingMode columnMappingMode = getColumnMappingMode(properties);
if (columnMappingMode == ID || columnMappingMode == NAME) {
Expand All @@ -2926,8 +2931,8 @@ private ProtocolEntry protocolEntry(int readerVersion, int writerVersion, boolea
return new ProtocolEntry(
readerVersion,
writerVersion,
readerFeatures.isEmpty() ? Optional.empty() : Optional.of(readerFeatures),
writerFeatures.isEmpty() ? Optional.empty() : Optional.of(writerFeatures));
readerVersion < MIN_READER_FEATURE_VERSION || readerFeatures.isEmpty() ? Optional.empty() : Optional.of(readerFeatures),
writerVersion < MIN_WRITER_FEATURE_VERSION || writerFeatures.isEmpty() ? Optional.empty() : Optional.of(writerFeatures));
}

private void writeCheckpointIfNeeded(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private DeltaLakeSchemaSupport() {}

// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#valid-feature-names-in-table-features
private static final String APPEND_ONLY_FEATURE_NAME = "appendOnly";
private static final String CHANGE_DATA_FEED_FEATURE_NAME = "changeDataFeed";
public static final String CHANGE_DATA_FEED_FEATURE_NAME = "changeDataFeed";
private static final String CHECK_CONSTRAINTS_FEATURE_NAME = "checkConstraints";
private static final String COLUMN_MAPPING_FEATURE_NAME = "columnMapping";
public static final String DELETION_VECTORS_FEATURE_NAME = "deletionVectors";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,33 @@ public void testCreateTableWithChangeDataFeedColumnName()
}
}

@Test
public void testCreateTableWithChangeDataFeed()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_cdf", "(x int) WITH (change_data_feed_enabled = true)")) {
assertThat(query("SELECT * FROM \"" + table.getName() + "$properties\""))
.skippingTypesCheck()
.matches("VALUES " +
"('delta.enableChangeDataFeed', 'true')," +
"('delta.enableDeletionVectors', 'false')," +
"('delta.minReaderVersion', '1')," +
"('delta.minWriterVersion', '4')");
}

// timestamp type requires reader version 3 and writer version 7
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_cdf", "(x timestamp) WITH (change_data_feed_enabled = true)")) {
assertThat(query("SELECT * FROM \"" + table.getName() + "$properties\""))
.skippingTypesCheck()
.matches("VALUES " +
"('delta.enableChangeDataFeed', 'true')," +
"('delta.enableDeletionVectors', 'false')," +
"('delta.minReaderVersion', '3')," +
"('delta.minWriterVersion', '7')," +
"('delta.feature.timestampNtz', 'supported')," +
"('delta.feature.changeDataFeed', 'supported')");
}
}

@Test
public void testUnsupportedCreateTableWithChangeDataFeed()
{
Expand Down