Skip to content

Commit

Permalink
Add support for non nullable columns in delta lake
Browse files Browse the repository at this point in the history
  • Loading branch information
homar authored and ebyhr committed Aug 6, 2022
1 parent ba858c7 commit 8ca27d3
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.plugin.hive.HiveTransactionHandle;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorAccessControl;
import io.trino.spi.connector.ConnectorCapabilities;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import io.trino.spi.connector.ConnectorPageSinkProvider;
Expand All @@ -41,6 +42,8 @@
import java.util.Set;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Sets.immutableEnumSet;
import static io.trino.spi.connector.ConnectorCapabilities.NOT_NULL_COLUMN_CONSTRAINT;
import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED;
import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -214,4 +217,10 @@ public final void shutdown()
{
lifeCycleManager.stop();
}

@Override
public Set<ConnectorCapabilities> getCapabilities()
{
return immutableEnumSet(NOT_NULL_COLUMN_CONSTRAINT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,9 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) table;
String location = metastore.getTableLocation(tableHandle.getSchemaTableName(), session);
Map<String, String> columnComments = getColumnComments(tableHandle.getMetadataEntry());
Map<String, Boolean> columnsNullability = getColumnsNullability(tableHandle.getMetadataEntry());
List<ColumnMetadata> columns = getColumns(tableHandle.getMetadataEntry()).stream()
.map(column -> getColumnMetadata(column, columnComments.get(column.getName())))
.map(column -> getColumnMetadata(column, columnComments.get(column.getName()), columnsNullability.getOrDefault(column.getName(), true)))
.collect(toImmutableList());

ImmutableMap.Builder<String, Object> properties = ImmutableMap.<String, Object>builder()
Expand Down Expand Up @@ -474,7 +475,10 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
{
DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle;
DeltaLakeColumnHandle column = (DeltaLakeColumnHandle) columnHandle;
return getColumnMetadata(column, getColumnComments(table.getMetadataEntry()).get(column.getName()));
return getColumnMetadata(
column,
getColumnComments(table.getMetadataEntry()).get(column.getName()),
getColumnsNullability(table.getMetadataEntry()).getOrDefault(column.getName(), true));
}

/**
Expand Down Expand Up @@ -536,8 +540,9 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio
// intentionally skip case when table snapshot is present but it lacks metadata portion
return metastore.getMetadata(metastore.getSnapshot(table, session), session).stream().map(metadata -> {
Map<String, String> columnComments = getColumnComments(metadata);
Map<String, Boolean> columnsNullability = getColumnsNullability(metadata);
List<ColumnMetadata> columnMetadata = getColumns(metadata).stream()
.map(column -> getColumnMetadata(column, columnComments.get(column.getName())))
.map(column -> getColumnMetadata(column, columnComments.get(column.getName()), columnsNullability.getOrDefault(column.getName(), true)))
.collect(toImmutableList());
return TableColumnsMetadata.forTable(table, columnMetadata);
});
Expand Down Expand Up @@ -686,6 +691,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
Map<String, String> columnComments = tableMetadata.getColumns().stream()
.filter(column -> column.getComment() != null)
.collect(toImmutableMap(ColumnMetadata::getName, ColumnMetadata::getComment));
Map<String, Boolean> columnsNullability = tableMetadata.getColumns().stream()
.collect(toImmutableMap(ColumnMetadata::getName, ColumnMetadata::isNullable));
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, targetPath.toString());
appendTableEntries(
0,
Expand All @@ -694,7 +701,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
deltaLakeColumns,
partitionColumns,
columnComments,
deltaLakeColumns.stream().collect(toImmutableMap(DeltaLakeColumnHandle::getName, ignored -> true)),
columnsNullability,
deltaLakeColumns.stream().collect(toImmutableMap(DeltaLakeColumnHandle::getName, ignored -> ImmutableMap.of())),
configurationForNewTable(checkpointInterval),
CREATE_TABLE_OPERATION,
Expand Down Expand Up @@ -1126,6 +1133,9 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
if (newColumnMetadata.getComment() != null) {
columnComments.put(newColumnMetadata.getName(), newColumnMetadata.getComment());
}
ImmutableMap.Builder<String, Boolean> columnsNullability = ImmutableMap.builder();
columnsNullability.putAll(getColumnsNullability(handle.getMetadataEntry()));
columnsNullability.put(newColumnMetadata.getName(), newColumnMetadata.isNullable());

ImmutableMap.Builder<String, Boolean> columnNullability = ImmutableMap.builder();
columnNullability.putAll(getColumnsNullability(handle.getMetadataEntry()));
Expand Down Expand Up @@ -1236,11 +1246,6 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
String fileSystem = new Path(table.getLocation()).toUri().getScheme();
throw new TrinoException(NOT_SUPPORTED, format("Inserts are not supported on the %s filesystem", fileSystem));
}
Map<String, Boolean> columnNullabilities = getColumnsNullability(table.getMetadataEntry());
boolean nonNullableColumnsExist = columnNullabilities.values().stream().anyMatch(nullability -> !nullability);
if (nonNullableColumnsExist) {
throw new TrinoException(NOT_SUPPORTED, "Inserts are not supported for tables with non-nullable columns");
}
Map<String, String> columnInvariants = getColumnInvariants(table.getMetadataEntry());
if (!columnInvariants.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Inserts are not supported for tables with delta invariants");
Expand Down Expand Up @@ -1431,11 +1436,7 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable
String fileSystem = new Path(handle.getLocation()).toUri().getScheme();
throw new TrinoException(NOT_SUPPORTED, format("Updates are not supported on the %s filesystem", fileSystem));
}
Map<String, Boolean> columnNullabilities = getColumnsNullability(handle.getMetadataEntry());
boolean nonNullableColumnsExist = columnNullabilities.values().stream().anyMatch(nullability -> !nullability);
if (nonNullableColumnsExist) {
throw new TrinoException(NOT_SUPPORTED, "Updates are not supported for tables with non-nullable columns");
}

Map<String, String> columnInvariants = getColumnInvariants(handle.getMetadataEntry());
if (!columnInvariants.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Updates are not supported for tables with delta invariants");
Expand Down Expand Up @@ -2548,13 +2549,14 @@ public DeltaLakeMetastore getMetastore()
return metastore;
}

private static ColumnMetadata getColumnMetadata(DeltaLakeColumnHandle column, @Nullable String comment)
private static ColumnMetadata getColumnMetadata(DeltaLakeColumnHandle column, @Nullable String comment, boolean nullability)
{
return ColumnMetadata.builder()
.setName(column.getName())
.setType(column.getType())
.setHidden(column.getColumnType() == SYNTHESIZED)
.setComment(Optional.ofNullable(comment))
.setNullable(nullability)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
switch (connectorBehavior) {
case SUPPORTS_DELETE:
case SUPPORTS_ROW_LEVEL_DELETE:
return true;
case SUPPORTS_UPDATE:
case SUPPORTS_NOT_NULL_CONSTRAINT:
return true;
case SUPPORTS_MERGE:
return true;
Expand All @@ -116,13 +116,18 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
case SUPPORTS_DROP_COLUMN:
case SUPPORTS_RENAME_COLUMN:
case SUPPORTS_RENAME_SCHEMA:
case SUPPORTS_NOT_NULL_CONSTRAINT:
return false;
default:
return super.hasBehavior(connectorBehavior);
}
}

@Override
protected String errorMessageForInsertIntoNotNullColumn(String columnName)
{
return "NULL value not allowed for NOT NULL column: " + columnName;
}

@Override
protected void verifyConcurrentUpdateFailurePermissible(Exception e)
{
Expand Down Expand Up @@ -770,6 +775,25 @@ public Object[][] targetAndSourceWithDifferentPartitioning()
};
}

@Test
public void testTableWithNonNullableColumns()
{
String tableName = "test_table_with_non_nullable_columns_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + "(col1 INTEGER NOT NULL, col2 INTEGER, col3 INTEGER)");
assertUpdate("INSERT INTO " + tableName + " VALUES(1, 10, 100)", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES(2, 20, 200)", 1);
assertThatThrownBy(() -> query("INSERT INTO " + tableName + " VALUES(null, 30, 300)"))
.hasMessageContaining("NULL value not allowed for NOT NULL column: col1");

//TODO this should fail https://github.com/trinodb/trino/issues/13434
assertUpdate("INSERT INTO " + tableName + " VALUES(TRY(5/0), 40, 400)", 1);
//TODO these 2 should fail https://github.com/trinodb/trino/issues/13435
assertUpdate("UPDATE " + tableName + " SET col2 = NULL where col3 = 100", 1);
assertUpdate("UPDATE " + tableName + " SET col2 = TRY(5/0) where col3 = 200", 1);

assertQuery("SELECT * FROM " + tableName, "VALUES(1, null, 100), (2, null, 200), (null, 40, 400)");
}

@Override
protected String createSchemaSql(String schemaName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,34 +406,6 @@ public void verifyCompressionCodecsDataProvider()
.collect(toImmutableList())));
}

@Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS})
public void testPreventingWritesToTableWithNotNullableColumns()
{
String tableName = "test_preventing_inserts_into_table_with_not_nullable_columns_" + randomTableSuffix();

try {
onDelta().executeQuery("CREATE TABLE default." + tableName + "( " +
" id INT NOT NULL, " +
" a_number INT) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'");

onDelta().executeQuery("INSERT INTO " + tableName + " VALUES(1,1)");
assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO " + tableName + " VALUES (2, 2)"))
.hasMessageContaining("Inserts are not supported for tables with non-nullable columns");
assertThat(onTrino().executeQuery("SELECT * FROM " + tableName))
.containsOnly(row(1, 1));
onDelta().executeQuery("UPDATE " + tableName + " SET a_number = 2 WHERE id = 1");
assertQueryFailure(() -> onTrino().executeQuery("UPDATE " + tableName + " SET a_number = 3 WHERE id = 1"))
.hasMessageContaining("Updates are not supported for tables with non-nullable columns");
assertThat(onTrino().executeQuery("SELECT * FROM " + tableName))
.containsOnly(row(1, 2));
}
finally {
onDelta().executeQuery("DROP TABLE IF EXISTS " + tableName);
}
}

@DataProvider
public Object[][] compressionCodecs()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure;
import static io.trino.tempto.assertions.QueryAssert.assertThat;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_73;
import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS;
import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix;
import static io.trino.tests.product.utils.QueryExecutors.onDelta;
Expand Down Expand Up @@ -204,6 +205,87 @@ public void testCaseDeleteEntirePartition(String partitionColumn)
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS})
public void testTrinoRespectsDatabricksSettingNonNullableColumn()
{
String tableName = "test_databricks_table_with_nonnullable_columns_" + randomTableSuffix();

onDelta().executeQuery(format(
"CREATE TABLE default.%1$s (non_nullable_col INT NOT NULL, nullable_col INT) USING DELTA LOCATION '%2$s%1$s'",
tableName,
getBaseLocation()));

try {
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 2)");
assertQueryFailure(() -> onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (null, 4)"))
.hasMessageContaining("NOT NULL constraint violated for column: non_nullable_col");
assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (null, 5)"))
.hasMessageContaining("NULL value not allowed for NOT NULL column: non_nullable_col");

assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(row(1, 2));
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.containsOnly(row(1, 2));
}
finally {
onDelta().executeQuery("DROP TABLE default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS})
public void testDatabricksRespectsTrinoSettingNonNullableColumn()
{
String tableName = "test_trino_table_with_nonnullable_columns_" + randomTableSuffix();

onTrino().executeQuery("CREATE TABLE delta.default.\"" + tableName + "\" " +
"(non_nullable_col INT NOT NULL, nullable_col INT) " +
"WITH (location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "')");

try {
onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 2)");
assertQueryFailure(() -> onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (null, 4)"))
.hasMessageContaining("NOT NULL constraint violated for column: non_nullable_col");
assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (null, 5)"))
.hasMessageContaining("NULL value not allowed for NOT NULL column: non_nullable_col");

assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(row(1, 2));
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.containsOnly(row(1, 2));
}
finally {
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS})
public void testInsertingIntoDatabricksTableWithAddedNotNullConstraint()
{
String tableName = "test_databricks_table_altered_after_initial_write_" + randomTableSuffix();

onDelta().executeQuery(format(
"CREATE TABLE default.%1$s (non_nullable_col INT, nullable_col INT) USING DELTA LOCATION '%2$s%1$s'",
tableName,
getBaseLocation()));

try {
onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 2)");
onDelta().executeQuery("ALTER TABLE default." + tableName + " ALTER COLUMN non_nullable_col SET NOT NULL");
assertQueryFailure(() -> onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (null, 4)"))
.hasMessageContaining("NOT NULL constraint violated for column: non_nullable_col");
assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (null, 5)"))
.hasMessageContaining("NULL value not allowed for NOT NULL column: non_nullable_col");

assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(row(1, 2));
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.containsOnly(row(1, 2));
}
finally {
onDelta().executeQuery("DROP TABLE default." + tableName);
}
}

@DataProvider(name = "partition_column_names")
public static Object[][] partitionColumns()
{
Expand Down

0 comments on commit 8ca27d3

Please sign in to comment.