Skip to content

Commit

Permalink
Add support for writer version 7 in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jul 31, 2023
1 parent ac4a863 commit c6c4cad
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.SchemaTableName;

Expand All @@ -30,6 +31,7 @@ public class DeltaLakeInsertTableHandle
private final SchemaTableName tableName;
private final String location;
private final MetadataEntry metadataEntry;
private final ProtocolEntry protocolEntry;
private final List<DeltaLakeColumnHandle> inputColumns;
private final long readVersion;
private final boolean retriesEnabled;
Expand All @@ -39,12 +41,14 @@ public DeltaLakeInsertTableHandle(
@JsonProperty("tableName") SchemaTableName tableName,
@JsonProperty("location") String location,
@JsonProperty("metadataEntry") MetadataEntry metadataEntry,
@JsonProperty("protocolEntry") ProtocolEntry protocolEntry,
@JsonProperty("inputColumns") List<DeltaLakeColumnHandle> inputColumns,
@JsonProperty("readVersion") long readVersion,
@JsonProperty("retriesEnabled") boolean retriesEnabled)
{
this.tableName = requireNonNull(tableName, "tableName is null");
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
this.inputColumns = ImmutableList.copyOf(inputColumns);
this.location = requireNonNull(location, "location is null");
this.readVersion = readVersion;
Expand All @@ -69,6 +73,12 @@ public MetadataEntry getMetadataEntry()
return metadataEntry;
}

@JsonProperty
public ProtocolEntry getProtocolEntry()
{
return protocolEntry;
}

@JsonProperty
public List<DeltaLakeColumnHandle> getInputColumns()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeSchemaAsJson;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedReaderFeatures;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedWriterFeatures;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.validateType;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.verifySupportedColumnMapping;
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY;
Expand Down Expand Up @@ -319,7 +320,7 @@ public class DeltaLakeMetadata
public static final int DEFAULT_WRITER_VERSION = 2;
// The highest reader and writer versions Trino supports
private static final int MAX_READER_VERSION = 3;
public static final int MAX_WRITER_VERSION = 6;
public static final int MAX_WRITER_VERSION = 7;
private static final int CDF_SUPPORTED_WRITER_VERSION = 4;
private static final int COLUMN_MAPPING_MODE_SUPPORTED_READER_VERSION = 2;
private static final int COLUMN_MAPPING_MODE_SUPPORTED_WRITER_VERSION = 5;
Expand Down Expand Up @@ -1224,6 +1225,8 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table
if (columnMappingMode != ID && columnMappingMode != NAME && columnMappingMode != NONE) {
throw new TrinoException(NOT_SUPPORTED, "Setting a table comment with column mapping %s is not supported".formatted(columnMappingMode.name().toLowerCase(ENGLISH)));
}
ProtocolEntry protocolEntry = getProtocolEntry(session, handle);
checkUnsupportedWriterFeature(protocolEntry);

ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle);

Expand All @@ -1241,7 +1244,7 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table
SET_TBLPROPERTIES_OPERATION,
session,
comment,
getProtocolEntry(session, handle));
protocolEntry);
transactionLogWriter.flush();
}
catch (Exception e) {
Expand All @@ -1260,6 +1263,8 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl
if (columnMappingMode != ID && columnMappingMode != NAME && columnMappingMode != NONE) {
throw new TrinoException(NOT_SUPPORTED, "Setting a column comment with column mapping %s is not supported".formatted(columnMappingMode.name().toLowerCase(ENGLISH)));
}
ProtocolEntry protocolEntry = getProtocolEntry(session, deltaLakeTableHandle);
checkUnsupportedWriterFeature(protocolEntry);

ConnectorTableMetadata tableMetadata = getTableMetadata(session, deltaLakeTableHandle);

Expand Down Expand Up @@ -1299,7 +1304,7 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl
CHANGE_COLUMN_OPERATION,
session,
Optional.ofNullable(deltaLakeTableHandle.getMetadataEntry().getDescription()),
getProtocolEntry(session, deltaLakeTableHandle));
protocolEntry);
transactionLogWriter.flush();
}
catch (Exception e) {
Expand All @@ -1325,9 +1330,11 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
checkSupportedWriterVersion(session, handle);
ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry());
if (changeDataFeedEnabled(handle.getMetadataEntry()) && CHANGE_DATA_FEED_COLUMN_NAMES.contains(newColumnMetadata.getName())) {
ProtocolEntry protocolEntry = getProtocolEntry(session, handle);
if (changeDataFeedEnabled(handle.getMetadataEntry(), protocolEntry) && CHANGE_DATA_FEED_COLUMN_NAMES.contains(newColumnMetadata.getName())) {
throw new TrinoException(NOT_SUPPORTED, "Column name %s is forbidden when change data feed is enabled".formatted(newColumnMetadata.getName()));
}
checkUnsupportedWriterFeature(protocolEntry);

if (!newColumnMetadata.isNullable() && !transactionLogAccess.getActiveFiles(getSnapshot(handle.getSchemaTableName(), handle.getLocation(), session), session).isEmpty()) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to add NOT NULL column '%s' for non-empty table: %s.%s", newColumnMetadata.getName(), handle.getSchemaName(), handle.getTableName()));
Expand Down Expand Up @@ -1391,7 +1398,7 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
ADD_COLUMN_OPERATION,
session,
Optional.ofNullable(handle.getMetadataEntry().getDescription()),
getProtocolEntry(session, handle));
protocolEntry);
transactionLogWriter.flush();
}
catch (Exception e) {
Expand All @@ -1407,6 +1414,8 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl
verify(deltaLakeColumn.isBaseColumn(), "Unexpected dereference: %s", deltaLakeColumn);
String dropColumnName = deltaLakeColumn.getBaseColumnName();
MetadataEntry metadataEntry = table.getMetadataEntry();
ProtocolEntry protocolEntry = getProtocolEntry(session, table);
checkUnsupportedWriterFeature(protocolEntry);

checkSupportedWriterVersion(session, table);
ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry);
Expand Down Expand Up @@ -1453,7 +1462,7 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl
DROP_COLUMN_OPERATION,
session,
Optional.ofNullable(metadataEntry.getDescription()),
getProtocolEntry(session, table));
protocolEntry);
transactionLogWriter.flush();
}
catch (Exception e) {
Expand Down Expand Up @@ -1484,9 +1493,11 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan
DeltaLakeColumnHandle deltaLakeColumn = (DeltaLakeColumnHandle) columnHandle;
verify(deltaLakeColumn.isBaseColumn(), "Unexpected dereference: %s", deltaLakeColumn);
String sourceColumnName = deltaLakeColumn.getBaseColumnName();
ProtocolEntry protocolEntry = getProtocolEntry(session, table);
checkUnsupportedWriterFeature(protocolEntry);

checkSupportedWriterVersion(session, table);
if (changeDataFeedEnabled(table.getMetadataEntry())) {
if (changeDataFeedEnabled(table.getMetadataEntry(), protocolEntry)) {
throw new TrinoException(NOT_SUPPORTED, "Cannot rename column when change data feed is enabled");
}

Expand Down Expand Up @@ -1535,7 +1546,7 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan
RENAME_COLUMN_OPERATION,
session,
Optional.ofNullable(metadataEntry.getDescription()),
getProtocolEntry(session, table));
protocolEntry);
transactionLogWriter.flush();
// Don't update extended statistics because it uses physical column names internally
}
Expand Down Expand Up @@ -1654,6 +1665,7 @@ private DeltaLakeInsertTableHandle createInsertHandle(ConnectorSession session,
table.getSchemaTableName(),
tableLocation,
table.getMetadataEntry(),
getProtocolEntry(session, table),
inputColumns,
getMandatoryCurrentVersion(fileSystem, tableLocation),
retryMode != NO_RETRIES);
Expand Down Expand Up @@ -1798,12 +1810,13 @@ public Optional<ConnectorPartitioningHandle> getUpdateLayout(ConnectorSession se
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode)
{
DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
if (isAppendOnly(handle.getMetadataEntry())) {
ProtocolEntry protocolEntry = getProtocolEntry(session, handle);
if (isAppendOnly(handle.getMetadataEntry(), protocolEntry)) {
throw new TrinoException(NOT_SUPPORTED, "Cannot modify rows from a table with '" + APPEND_ONLY_CONFIGURATION_KEY + "' set to true");
}
checkWriteAllowed(session, handle);
ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry());
if (changeDataFeedEnabled(handle.getMetadataEntry()) && columnMappingMode != NONE) {
if (changeDataFeedEnabled(handle.getMetadataEntry(), protocolEntry) && columnMappingMode != NONE) {
// TODO https://github.com/trinodb/trino/issues/16967 Support CDF for tables with 'id' and 'name' column mapping
throw new TrinoException(NOT_SUPPORTED, "Unsupported column mapping mode for tables with change data feed enabled: " + columnMappingMode.name().toLowerCase(ENGLISH));
}
Expand Down Expand Up @@ -1929,6 +1942,8 @@ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
RetryMode retryMode)
{
DeltaLakeTableHandle tableHandle = checkValidTableHandle(connectorTableHandle);
ProtocolEntry protocolEntry = getProtocolEntry(session, tableHandle);
checkUnsupportedWriterFeature(protocolEntry);

DeltaLakeTableProcedureId procedureId;
try {
Expand Down Expand Up @@ -2130,7 +2145,15 @@ private void checkWriteSupported(ConnectorSession session, DeltaLakeTableHandle
if (getColumnIdentities(handle.getMetadataEntry()).values().stream().anyMatch(identity -> identity)) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with identity columns is not supported");
}
// TODO: Check writer-features
checkUnsupportedWriterFeature(getProtocolEntry(session, handle));
}

private static void checkUnsupportedWriterFeature(ProtocolEntry protocolEntry)
{
Set<String> unsupportedWriterFeatures = unsupportedWriterFeatures(protocolEntry.getWriterFeatures().orElse(ImmutableSet.of()));
if (!unsupportedWriterFeatures.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Unsupported writer feature: " + unsupportedWriterFeatures);
}
}

private void checkUnsupportedGeneratedColumns(MetadataEntry metadataEntry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction
tableHandle.getInputColumns(),
domainCompactionThreshold,
() -> createCdfPageSink(merge, session),
changeDataFeedEnabled(tableHandle.getMetadataEntry()),
changeDataFeedEnabled(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry()),
parquetSchemaMapping);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake.procedure;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.airlift.log.Logger;
Expand Down Expand Up @@ -60,6 +61,7 @@
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedWriterFeatures;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -175,11 +177,15 @@ private void doVacuum(
accessControl.checkCanDeleteFromTable(null, tableName);

TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(tableName, handle.getLocation(), session);
// TODO https://github.com/trinodb/trino/issues/15873 Check writer features when supporting writer version 7
ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot);
if (protocolEntry.getMinWriterVersion() > MAX_WRITER_VERSION) {
throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %d writer version".formatted(protocolEntry.getMinWriterVersion()));
}
Set<String> unsupportedWriterFeatures = unsupportedWriterFeatures(protocolEntry.getWriterFeatures().orElse(ImmutableSet.of()));
if (!unsupportedWriterFeatures.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %s writer features".formatted(unsupportedWriterFeatures));
}

String tableLocation = tableSnapshot.getTableLocation();
String transactionLogDir = getTransactionLogDir(tableLocation);
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ private DeltaLakeSchemaSupport() {}
.add("columnMapping")
.add("timestampNtz")
.build();
private static final Set<String> SUPPORTED_WRITER_FEATURES = ImmutableSet.<String>builder()
.add("appendOnly")
.add("invariants")
.add("checkConstraints")
.add("changeDataFeed")
.add("columnMapping")
.build();

public enum ColumnMappingMode
{
Expand All @@ -119,9 +126,10 @@ public enum ColumnMappingMode

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapperProvider().get();

public static boolean isAppendOnly(MetadataEntry metadataEntry)
public static boolean isAppendOnly(MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
{
return parseBoolean(metadataEntry.getConfiguration().getOrDefault(APPEND_ONLY_CONFIGURATION_KEY, "false"));
return parseBoolean(metadataEntry.getConfiguration().getOrDefault(APPEND_ONLY_CONFIGURATION_KEY, "false")) ||
protocolEntry.getWriterFeatures().map(features -> features.contains("appendOnly")).orElse(false);
}

public static ColumnMappingMode getColumnMappingMode(MetadataEntry metadata)
Expand Down Expand Up @@ -498,10 +506,10 @@ public static Map<String, String> getCheckConstraints(MetadataEntry metadataEntr
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
}

public static boolean changeDataFeedEnabled(MetadataEntry metadataEntry)
public static boolean changeDataFeedEnabled(MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
{
String enableChangeDataFeed = metadataEntry.getConfiguration().getOrDefault("delta.enableChangeDataFeed", "false");
return parseBoolean(enableChangeDataFeed);
return parseBoolean(enableChangeDataFeed) || protocolEntry.getWriterFeatures().map(features -> features.contains("changeDataFeed")).orElse(false);
}

public static Map<String, Map<String, Object>> getColumnsMetadata(MetadataEntry metadataEntry)
Expand Down Expand Up @@ -549,6 +557,11 @@ public static Set<String> unsupportedReaderFeatures(Set<String> features)
return Sets.difference(features, SUPPORTED_READER_FEATURES);
}

public static Set<String> unsupportedWriterFeatures(Set<String> features)
{
return Sets.difference(features, SUPPORTED_WRITER_FEATURES);
}

public static Type deserializeType(TypeManager typeManager, Object type, boolean usePhysicalName)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ public void testTimestampNtz()
""");

// TODO https://github.com/trinodb/trino/issues/15873 Support writing timestamp_ntz type when upgrading the max writer version to 7
assertQueryFails("INSERT INTO timestamp_ntz VALUES NULL", "Table .* requires Delta Lake writer version 7 which is not supported");
assertQueryFails("INSERT INTO timestamp_ntz VALUES NULL", "\\QUnsupported writer feature: [timestampNtz]");
}

/**
Expand Down Expand Up @@ -503,7 +503,7 @@ public void testTimestampNtzPartitioned()
// TODO https://github.com/trinodb/trino/issues/15873 Support writing timestamp_ntz type when upgrading the max writer version to 7
assertQueryFails(
"INSERT INTO timestamp_ntz_partition VALUES (NULL, NULL)",
"Table .* requires Delta Lake writer version 7 which is not supported");
"\\QUnsupported writer feature: [timestampNtz]");
}

@Test
Expand Down
Loading

0 comments on commit c6c4cad

Please sign in to comment.