diff --git a/core/src/main/resources/error/delta-error-classes.json b/core/src/main/resources/error/delta-error-classes.json index 42c7b7b757f..6a8db40f997 100644 --- a/core/src/main/resources/error/delta-error-classes.json +++ b/core/src/main/resources/error/delta-error-classes.json @@ -663,6 +663,19 @@ ], "sqlState" : "0AKDE" }, + "DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT" : { + "message" : [ + "Your table schema requires manually enablement of the following table feature(s): .", + "", + "To do this, run the following command for each of features listed above:", + " ALTER TABLE table_name SET TBLPROPERTIES ('delta.feature.feature_name' = 'supported')", + "Replace \"table_name\" and \"feature_name\" with real values.", + "Note that the procedure is irreversible: once supported, a feature can never be unsupported again.", + "", + "Current supported feature(s): ." + ], + "sqlState" : "0AKDE" + }, "DELTA_FEATURE_REQUIRES_HIGHER_READER_VERSION" : { "message" : [ "Unable to enable table feature because it requires a higher reader protocol version (current ). Consider upgrading the table's reader protocol version to , or to a version which supports reader table features. Refer to for more information on table protocol versions." @@ -2075,21 +2088,6 @@ ], "sqlState" : "0AKDC" }, - "DELTA_UNSUPPORTED_TYPE_TIMESTAMP_NTZ" : { - "message" : [ - "Your table schema contains a column of type TimestampNTZ.", - "TimestampNTZ type is not supported by your table's protocol.", - "", - "Required Delta protocol version and features for TimestampNTZ:", - "", - "Your table's current Delta protocol version and enabled features:", - "", - "", - "Run the following command to add TimestampNTZ support to your table.", - "ALTER TABLE table_name SET TBLPROPERTIES ('delta.feature.timestampNtz' = 'supported')" - ], - "sqlState" : "0A000" - }, "DELTA_UNSUPPORTED_VACUUM_SPECIFIC_PARTITION" : { "message" : [ "Please provide the base path () when Vacuuming Delta tables. Vacuuming specific partitions is currently not supported." diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index f8bd8044d52..581d7c56a5f 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -1713,20 +1713,6 @@ trait DeltaErrorsBase ) } - def schemaContainsTimestampNTZType( - schema: StructType, - requiredProtocol: Protocol, - currentProtocol: Protocol): Throwable = { - new DeltaAnalysisException( - errorClass = "DELTA_UNSUPPORTED_TYPE_TIMESTAMP_NTZ", - messageParameters = Array( - s"${formatSchema(schema)}", - s"$requiredProtocol", - s"$currentProtocol" - ) - ) - } - def tableAlreadyExists(table: CatalogTable): Throwable = { new DeltaAnalysisException( errorClass = "DELTA_TABLE_ALREADY_EXISTS", @@ -2168,6 +2154,16 @@ trait DeltaErrorsBase messageParameters = Array(features.mkString(", "))) } + def tableFeaturesRequireManualEnablementException( + unsupportedFeatures: Iterable[TableFeature], + supportedFeatures: Iterable[TableFeature]): Throwable = { + new DeltaTableFeatureException( + errorClass = "DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT", + messageParameters = Array( + unsupportedFeatures.map(_.name).toSeq.sorted.mkString(", "), + supportedFeatures.map(_.name).toSeq.sorted.mkString(", "))) + } + def concurrentAppendException( conflictingCommit: Option[CommitInfo], partition: String, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index b6450682e78..54319745169 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -404,29 +404,10 @@ trait OptimisticTransactionImpl extends TransactionalWrite newMetadataTmp, isCreatingNewTable) - // Check for existence of TimestampNTZ in the schema and throw an error if the feature - // is not enabled. - if (!protocolBeforeUpdate.isFeatureSupported(TimestampNTZTableFeature) && - SchemaUtils.checkForTimestampNTZColumnsRecursively(newMetadataTmp.schema)) { - // The timestampNTZ feature is enabled if there is a table prop in this transaction, - // or if this is a new table - val isEnabled = isCreatingNewTable || TableFeatureProtocolUtils - .getSupportedFeaturesFromConfigs( - newMetadataTmp.configuration, TableFeatureProtocolUtils.FEATURE_PROP_PREFIX) - .contains(TimestampNTZTableFeature) - - if (!isEnabled) { - throw DeltaErrors.schemaContainsTimestampNTZType( - newMetadataTmp.schema, - TimestampNTZTableFeature.minProtocolVersion.withFeature(TimestampNTZTableFeature), - snapshot.protocol - ) - } - } - if (newMetadataTmp.schemaString != null) { // Replace CHAR and VARCHAR with StringType - var schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(newMetadataTmp.schema) + val schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema( + newMetadataTmp.schema) newMetadataTmp = newMetadataTmp.copy(schemaString = schema.json) } @@ -449,16 +430,16 @@ trait OptimisticTransactionImpl extends TransactionalWrite newProtocol = Some(Protocol.forNewTable(spark, Some(newMetadataTmp))) } else if (newMetadataTmp.configuration.contains(Protocol.MIN_READER_VERSION_PROP) || newMetadataTmp.configuration.contains(Protocol.MIN_WRITER_VERSION_PROP)) { + // Table features Part 1: bump protocol version numbers + // // Collect new reader and writer versions from table properties, which could be provided by // the user in `ALTER TABLE TBLPROPERTIES` or copied over from session defaults. - val readerVersionInNewMetadataTmp = newMetadataTmp.configuration - .get(Protocol.MIN_READER_VERSION_PROP) - .map(Protocol.getVersion(Protocol.MIN_READER_VERSION_PROP, _)) - .getOrElse(protocolBeforeUpdate.minReaderVersion) - val writerVersionInNewMetadataTmp = newMetadataTmp.configuration - .get(Protocol.MIN_WRITER_VERSION_PROP) - .map(Protocol.getVersion(Protocol.MIN_WRITER_VERSION_PROP, _)) - .getOrElse(protocolBeforeUpdate.minWriterVersion) + val readerVersionInNewMetadataTmp = + Protocol.getReaderVersionFromTableConf(newMetadataTmp.configuration) + .getOrElse(protocolBeforeUpdate.minReaderVersion) + val writerVersionInNewMetadataTmp = + Protocol.getWriterVersionFromTableConf(newMetadataTmp.configuration) + .getOrElse(protocolBeforeUpdate.minWriterVersion) // If the collected reader and writer versions are provided by the user, we must use them, // and throw ProtocolDowngradeException when they are lower than what the table have before @@ -526,8 +507,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite } } - // Enabling table features Part 1: add manually-supported features in table properties start - // with [[FEATURE_PROP_PREFIX]]. + // Table features Part 2: add manually-supported features specified in table properties, aka + // those start with [[FEATURE_PROP_PREFIX]]. // // This transaction's new metadata might contain some table properties to support some // features (props start with [[FEATURE_PROP_PREFIX]]). We silently add them to the `protocol` @@ -535,9 +516,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite // any reader-writer feature. val newProtocolBeforeAddingFeatures = newProtocol.getOrElse(protocolBeforeUpdate) val newFeaturesFromTableConf = - TableFeatureProtocolUtils.getSupportedFeaturesFromConfigs( - newMetadataTmp.configuration, - TableFeatureProtocolUtils.FEATURE_PROP_PREFIX) + TableFeatureProtocolUtils.getSupportedFeaturesFromTableConfigs(newMetadataTmp.configuration) val readerVersionForNewProtocol = if (newFeaturesFromTableConf.exists(_.isReaderWriterFeature)) { TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION @@ -561,7 +540,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite newMetadataTmp = newMetadataTmp.copy(configuration = configsWithoutProtocolProps) assertMetadata(newMetadataTmp) - // Enabling table features Part 2: add automatically-enabled features. + // Table features Part 3: add automatically-enabled features by looking at the new table + // metadata. // // This code path is for existing tables. The new table case has been handled by // [[Protocol.forNewTable]] earlier in this method. @@ -633,7 +613,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite } private def setNewProtocolWithFeaturesEnabledByMetadata(metadata: Metadata): Unit = { - val requiredProtocolOpt = Protocol.checkProtocolRequirements(spark, metadata, protocol) + val requiredProtocolOpt = + Protocol.upgradeProtocolFromMetadataForExistingTable(spark, metadata, protocol) if (requiredProtocolOpt.isDefined) { newProtocol = requiredProtocolOpt } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/core/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index d36ca2e1ba9..3d37cd7b41f 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -111,18 +111,32 @@ sealed trait LegacyFeatureType * A trait indicating this feature can be automatically enabled via a change in a table's * metadata, e.g., through setting particular values of certain feature-specific table properties. * - * When the feature's metadata requirements are satisfied during table creation - * ([[actions.Protocol.forNewTable]]) or commit ([[OptimisticTransaction.updateMetadata]]), the + * When the feature's metadata requirements are satisfied for new tables, or for + * existing tables when [[automaticallyUpdateProtocolOfExistingTables]] set to `true`, the * client will silently add the feature to the protocol's `readerFeatures` and/or - * `writerFeatures`. + * `writerFeatures`. Otherwise, a proper protocol version bump must be present in the same + * transaction. */ -sealed trait FeatureAutomaticallyEnabledByMetadata { +sealed trait FeatureAutomaticallyEnabledByMetadata { this: TableFeature => + + /** + * Whether the feature can automatically update the protocol of an existing table when the + * metadata requirements are satisfied. As a rule of thumb, a table feature that requires + * explicit operations (e.g., turning on a table property) should set this flag to `true`, while + * features that are used implicitly (e.g., when using a new data type) should set this flag to + * `false`. + */ + def automaticallyUpdateProtocolOfExistingTables: Boolean = this.isLegacyFeature /** * Determine whether the feature must be supported and enabled because its metadata requirements * are satisfied. */ def metadataRequiresFeatureToBeEnabled(metadata: Metadata, spark: SparkSession): Boolean + + require( + !this.isLegacyFeature || automaticallyUpdateProtocolOfExistingTables, + "Legacy feature must be auto-update capable.") } /** @@ -205,10 +219,12 @@ object TableFeature { if (DeltaUtils.isTesting) { features ++= Set( TestLegacyWriterFeature, - TestWriterFeature, TestLegacyReaderWriterFeature, + TestWriterFeature, + TestWriterMetadataNoAutoUpdateFeature, TestReaderWriterFeature, - TestReaderWriterMetadataFeature) + TestReaderWriterMetadataAutoUpdateFeature, + TestReaderWriterMetadataNoAutoUpdateFeature) } val featureMap = features.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap require(features.size == featureMap.size, "Lowercase feature names must not duplicate.") @@ -302,7 +318,6 @@ object IdentityColumnsTableFeature object TimestampNTZTableFeature extends ReaderWriterFeature(name = "timestampNtz") with FeatureAutomaticallyEnabledByMetadata { - override def metadataRequiresFeatureToBeEnabled( metadata: Metadata, spark: SparkSession): Boolean = { SchemaUtils.checkForTimestampNTZColumnsRecursively(metadata.schema) @@ -312,6 +327,8 @@ object TimestampNTZTableFeature extends ReaderWriterFeature(name = "timestampNtz object DeletionVectorsTableFeature extends ReaderWriterFeature(name = "deletionVectors") with FeatureAutomaticallyEnabledByMetadata { + override def automaticallyUpdateProtocolOfExistingTables: Boolean = true + override def metadataRequiresFeatureToBeEnabled( metadata: Metadata, spark: SparkSession): Boolean = { @@ -330,6 +347,17 @@ object TestLegacyWriterFeature object TestWriterFeature extends WriterFeature(name = "testWriter") +object TestWriterMetadataNoAutoUpdateFeature + extends WriterFeature(name = "testWriterMetadataNoAutoUpdate") + with FeatureAutomaticallyEnabledByMetadata { + val TABLE_PROP_KEY = "_123testWriterMetadataNoAutoUpdate321_" + override def metadataRequiresFeatureToBeEnabled( + metadata: Metadata, + spark: SparkSession): Boolean = { + metadata.configuration.get(TABLE_PROP_KEY).exists(_.toBoolean) + } +} + object TestLegacyReaderWriterFeature extends LegacyReaderWriterFeature( name = "testLegacyReaderWriter", @@ -338,10 +366,24 @@ object TestLegacyReaderWriterFeature object TestReaderWriterFeature extends ReaderWriterFeature(name = "testReaderWriter") -object TestReaderWriterMetadataFeature - extends ReaderWriterFeature(name = "testReaderWriterMetadata") +object TestReaderWriterMetadataNoAutoUpdateFeature + extends ReaderWriterFeature(name = "testReaderWriterMetadataNoAutoUpdate") with FeatureAutomaticallyEnabledByMetadata { - val TABLE_PROP_KEY = "_123testReaderWriterMetadata321_" + val TABLE_PROP_KEY = "_123testReaderWriterMetadataNoAutoUpdate321_" + override def metadataRequiresFeatureToBeEnabled( + metadata: Metadata, + spark: SparkSession): Boolean = { + metadata.configuration.get(TABLE_PROP_KEY).exists(_.toBoolean) + } +} + +object TestReaderWriterMetadataAutoUpdateFeature + extends ReaderWriterFeature(name = "testReaderWriterMetadataAutoUpdate") + with FeatureAutomaticallyEnabledByMetadata { + val TABLE_PROP_KEY = "_123testReaderWriterMetadataAutoUpdate321_" + + override def automaticallyUpdateProtocolOfExistingTables: Boolean = true + override def metadataRequiresFeatureToBeEnabled( metadata: Metadata, spark: SparkSession): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala b/core/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala index 41043247530..e602552a01c 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala @@ -195,6 +195,12 @@ trait TableFeatureSupport { this: Protocol => } } + @JsonIgnore + lazy val implicitlyAndExplicitlySupportedFeatures: Set[TableFeature] = { + readerAndWriterFeatureNames.flatMap(TableFeature.featureNameToFeature) ++ + implicitlySupportedFeatures + } + /** * Determine whether this protocol can be safely upgraded to a new protocol `to`. This means: * - this protocol has reader protocol version less than or equals to `to`. @@ -309,18 +315,15 @@ object TableFeatureProtocolUtils { } /** - * Get a set of [[TableFeature]]s representing supported features set in a `config` map (table - * properties or Spark session configs). + * Get a set of [[TableFeature]]s representing supported features set in a table properties map. */ - def getSupportedFeaturesFromConfigs( - configs: Map[String, String], - propertyPrefix: String): Set[TableFeature] = { - val featureConfigs = configs.filterKeys(_.startsWith(propertyPrefix)) + def getSupportedFeaturesFromTableConfigs(configs: Map[String, String]): Set[TableFeature] = { + val featureConfigs = configs.filterKeys(_.startsWith(FEATURE_PROP_PREFIX)) val unsupportedFeatureConfigs = mutable.Set.empty[String] val collectedFeatures = featureConfigs.flatMap { case (key, value) => // Feature name is lower cased in table properties but not in Spark session configs. // Feature status is not lower cased in any case. - val name = key.stripPrefix(propertyPrefix).toLowerCase(Locale.ROOT) + val name = key.stripPrefix(FEATURE_PROP_PREFIX).toLowerCase(Locale.ROOT) val status = value.toLowerCase(Locale.ROOT) if (status != FEATURE_PROP_SUPPORTED && status != FEATURE_PROP_ENABLED) { throw DeltaErrors.unsupportedTableFeatureStatusException(name, status) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index 8d05753afc3..fd3e925060e 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -250,7 +250,7 @@ object Protocol { val tableConf = metadata.configuration // There might be features enabled by the table properties aka // `CREATE TABLE ... TBLPROPERTIES ...`. - val tablePropEnabledFeatures = getSupportedFeaturesFromConfigs(tableConf, FEATURE_PROP_PREFIX) + val tablePropEnabledFeatures = getSupportedFeaturesFromTableConfigs(tableConf) val metaEnabledFeatures = extractAutomaticallyEnabledFeatures(spark, metadata) val allEnabledFeatures = tablePropEnabledFeatures ++ metaEnabledFeatures @@ -282,10 +282,8 @@ object Protocol { // Protocol version provided in table properties can upgrade the protocol, but only when they // are higher than which required by the enabled features. - val readerVersionFromTableConfOpt = - tableConf.get(MIN_READER_VERSION_PROP).map(getVersion(MIN_READER_VERSION_PROP, _)) - val writerVersionFromTableConfOpt = - tableConf.get(MIN_WRITER_VERSION_PROP).map(getVersion(MIN_WRITER_VERSION_PROP, _)) + val (readerVersionFromTableConfOpt, writerVersionFromTableConfOpt) = + getProtocolVersionsFromTableConf(tableConf) // Decide the final protocol version: // a. 1, aka the lowest version possible @@ -313,7 +311,8 @@ object Protocol { * and [[FEATURE_PROP_PREFIX]]. */ def minProtocolComponentsFromAutomaticallyEnabledFeatures( - spark: SparkSession, metadata: Metadata): (Int, Int, Set[TableFeature]) = { + spark: SparkSession, + metadata: Metadata): (Int, Int, Set[TableFeature]) = { val enabledFeatures = extractAutomaticallyEnabledFeatures(spark, metadata) var (readerVersion, writerVersion) = (0, 0) enabledFeatures.foreach { feature => @@ -325,20 +324,28 @@ object Protocol { } /** Cast the table property for the protocol version to an integer. */ - def getVersion(key: String, value: String): Int = { - try value.toInt catch { - case n: NumberFormatException => throw DeltaErrors.protocolPropNotIntException(key, value) + private def tryCastProtocolVersionToInt(key: String, value: String): Int = { + try value.toInt + catch { + case _: NumberFormatException => + throw DeltaErrors.protocolPropNotIntException(key, value) } } - /** - * Verify that the protocol version of the table satisfies the version requirements of all the - * configurations to be set for the table. Returns the minimum required protocol if not. - */ - def checkProtocolRequirements( - spark: SparkSession, - metadata: Metadata, - current: Protocol): Option[Protocol] = { + def getReaderVersionFromTableConf(conf: Map[String, String]): Option[Int] = { + conf.get(MIN_READER_VERSION_PROP).map(tryCastProtocolVersionToInt(MIN_READER_VERSION_PROP, _)) + } + + def getWriterVersionFromTableConf(conf: Map[String, String]): Option[Int] = { + conf.get(MIN_WRITER_VERSION_PROP).map(tryCastProtocolVersionToInt(MIN_WRITER_VERSION_PROP, _)) + } + + def getProtocolVersionsFromTableConf(conf: Map[String, String]): (Option[Int], Option[Int]) = { + (getReaderVersionFromTableConf(conf), getWriterVersionFromTableConf(conf)) + } + + /** Assert a table metadata contains no protocol-related table properties. */ + private def assertMetadataContainsNoProtocolProps(metadata: Metadata): Unit = { assert( !metadata.configuration.contains(MIN_READER_VERSION_PROP), "Should not have the " + @@ -355,21 +362,77 @@ object Protocol { !metadata.configuration.contains(DeltaConfigs.CREATE_TABLE_IGNORE_PROTOCOL_DEFAULTS.key), "Should not have the table property " + s"${DeltaConfigs.CREATE_TABLE_IGNORE_PROTOCOL_DEFAULTS.key} stored in table metadata") - val (readerVersion, writerVersion, enabledFeatures) = + } + + /** + * Upgrade the current protocol to satisfy all auto-update capable features required by the table + * metadata. An Delta error will be thrown if a non-auto-update capable feature is required by + * the metadata and not in the resulting protocol, in such a case the user must run `ALTER TABLE` + * to add support for this feature beforehand using the `delta.feature.featureName` table + * property. + * + * Refer to [[FeatureAutomaticallyEnabledByMetadata.automaticallyUpdateProtocolOfExistingTables]] + * to know more about "auto-update capable" features. + * + * Note: this method only considers metadata-enabled features. To avoid confusion, the caller + * must apply and remove protocol-related table properties from the metadata before calling this + * method. + */ + def upgradeProtocolFromMetadataForExistingTable( + spark: SparkSession, + metadata: Metadata, + current: Protocol): Option[Protocol] = { + assertMetadataContainsNoProtocolProps(metadata) + + val (readerVersion, writerVersion, minRequiredFeatures) = minProtocolComponentsFromAutomaticallyEnabledFeatures(spark, metadata) // Increment the reader and writer version to accurately add enabled legacy table features // either to the implicitly enabled table features or the table feature lists val required = Protocol( - readerVersion.max(current.minReaderVersion), writerVersion.max(current.minWriterVersion)) - .withFeatures(enabledFeatures) + readerVersion.max(current.minReaderVersion), writerVersion.max(current.minWriterVersion)) + .withFeatures(minRequiredFeatures) if (!required.canUpgradeTo(current)) { + // When the current protocol does not satisfy metadata requirement, some additional features + // must be supported by the protocol. We assert those features can actually perform the + // auto-update. + assertMetadataTableFeaturesAutomaticallySupported( + current.implicitlyAndExplicitlySupportedFeatures, + required.implicitlyAndExplicitlySupportedFeatures) Some(required.merge(current)) } else { None } } + /** + * Ensure all features listed in `currentFeatures` are also listed in `requiredFeatures`, or, if + * one is not listed, it must be capable to auto-update a protocol. + * + * Refer to [[FeatureAutomaticallyEnabledByMetadata.automaticallyUpdateProtocolOfExistingTables]] + * to know more about "auto-update capable" features. + * + * Note: Caller must make sure `requiredFeatures` is obtained from a min protocol that satisfies + * a table metadata. + */ + private def assertMetadataTableFeaturesAutomaticallySupported( + currentFeatures: Set[TableFeature], + requiredFeatures: Set[TableFeature]): Unit = { + val (autoUpdateCapableFeatures, nonAutoUpdateCapableFeatures) = + requiredFeatures.diff(currentFeatures) + .collect { case f: FeatureAutomaticallyEnabledByMetadata => f } + .partition(_.automaticallyUpdateProtocolOfExistingTables) + if (nonAutoUpdateCapableFeatures.nonEmpty) { + // The "current features" we give the user are which from the original protocol, plus + // features newly supported by table properties in the current transaction, plus + // metadata-enabled features that are auto-update capable. The first two are provided by + // `currentFeatures`. + throw DeltaErrors.tableFeaturesRequireManualEnablementException( + nonAutoUpdateCapableFeatures, + currentFeatures ++ autoUpdateCapableFeatures) + } + } + /** * Verify that the table properties satisfy legality constraints. Throw an exception if not. */ diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala index 4007accd9b3..14fd85a99b1 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.util.FileNames.deltaFile -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkThrowable} import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.test.SharedSparkSession @@ -973,14 +973,14 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest val deltaLog = DeltaLog.forTable(spark, dir) sql( s"CREATE TABLE delta.`${dir.getCanonicalPath}` (id bigint) USING delta TBLPROPERTIES (" + - s" ${TestReaderWriterMetadataFeature.TABLE_PROP_KEY}='true'" + + s" ${TestReaderWriterMetadataAutoUpdateFeature.TABLE_PROP_KEY}='true'" + ")") assert( deltaLog.snapshot.protocol === Protocol( minReaderVersion = TABLE_FEATURES_MIN_READER_VERSION, minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = Some(Set(TestReaderWriterMetadataFeature.name)), - writerFeatures = Some(Set(TestReaderWriterMetadataFeature.name)))) + readerFeatures = Some(Set(TestReaderWriterMetadataAutoUpdateFeature.name)), + writerFeatures = Some(Set(TestReaderWriterMetadataAutoUpdateFeature.name)))) assertPropertiesAndShowTblProperties(deltaLog, tableHasFeatures = true) } } @@ -1006,14 +1006,14 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest s"CREATE TABLE delta.`${dir.getCanonicalPath}` (id bigint) USING delta TBLPROPERTIES (" + " delta.minReaderVersion='1'," + " delta.minWriterVersion='2'," + - s" ${TestReaderWriterMetadataFeature.TABLE_PROP_KEY}='true'" + + s" ${TestReaderWriterMetadataAutoUpdateFeature.TABLE_PROP_KEY}='true'" + ")") assert( deltaLog.snapshot.protocol === Protocol( minReaderVersion = TABLE_FEATURES_MIN_READER_VERSION, minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = Some(Set(TestReaderWriterMetadataFeature.name)), - writerFeatures = Some(Set(TestReaderWriterMetadataFeature.name)))) + readerFeatures = Some(Set(TestReaderWriterMetadataAutoUpdateFeature.name)), + writerFeatures = Some(Set(TestReaderWriterMetadataAutoUpdateFeature.name)))) assertPropertiesAndShowTblProperties(deltaLog, tableHasFeatures = true) } } @@ -1040,19 +1040,19 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest def testCreateTable( name: String, props: Map[String, String], - expectedExceptionMessage: Option[String] = None, + expectedExceptionClass: Option[String] = None, expectedFinalProtocol: Option[Protocol] = None): Unit = { test(s"create table - $name") { withTempDir { dir => val log = DeltaLog.forTable(spark, dir) val propString = props.map(kv => s"'${kv._1}'='${kv._2}'").mkString(",") - if (expectedExceptionMessage.isDefined) { - intercept[Exception] { + if (expectedExceptionClass.isDefined) { + assert(intercept[DeltaTableFeatureException] { sql( s"CREATE TABLE delta.`${dir.getCanonicalPath}` (id bigint) USING delta " + s"TBLPROPERTIES ($propString)") - }.getMessage.contains(expectedExceptionMessage.get) + }.getErrorClass === expectedExceptionClass.get) } else { sql( s"CREATE TABLE delta.`${dir.getCanonicalPath}` (id bigint) USING delta " + @@ -1086,18 +1086,32 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest .withFeature(TestLegacyWriterFeature))) testCreateTable( - "legacy protocol, native feature, metadata", - Map(TestReaderWriterMetadataFeature.TABLE_PROP_KEY -> "true"), + "legacy protocol, native auto-update feature, metadata", + Map(TestReaderWriterMetadataAutoUpdateFeature.TABLE_PROP_KEY -> "true"), expectedFinalProtocol = Some( Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestReaderWriterMetadataFeature))) + .withFeature(TestReaderWriterMetadataAutoUpdateFeature))) testCreateTable( - "legacy protocol, native feature, feature property", - Map(s"delta.feature.${TestReaderWriterMetadataFeature.name}" -> "enabled"), + "legacy protocol, native non-auto-update feature, metadata", + Map(TestReaderWriterMetadataNoAutoUpdateFeature.TABLE_PROP_KEY -> "true"), expectedFinalProtocol = Some( Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestReaderWriterMetadataFeature))) + .withFeature(TestReaderWriterMetadataNoAutoUpdateFeature))) + + testCreateTable( + "legacy protocol, native auto-update feature, feature property", + Map(s"delta.feature.${TestReaderWriterMetadataAutoUpdateFeature.name}" -> "enabled"), + expectedFinalProtocol = Some( + Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeature(TestReaderWriterMetadataAutoUpdateFeature))) + + testCreateTable( + "legacy protocol, native non-auto-update feature, feature property", + Map(s"delta.feature.${TestReaderWriterMetadataNoAutoUpdateFeature.name}" -> "enabled"), + expectedFinalProtocol = Some( + Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeature(TestReaderWriterMetadataNoAutoUpdateFeature))) testCreateTable( "legacy protocol with supported version props, legacy feature, feature property", @@ -1126,10 +1140,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest Map( DeltaConfigs.MIN_READER_VERSION.key -> TABLE_FEATURES_MIN_READER_VERSION.toString, DeltaConfigs.MIN_WRITER_VERSION.key -> TABLE_FEATURES_MIN_WRITER_VERSION.toString, - s"delta.feature.${TestReaderWriterMetadataFeature.name}" -> "enabled"), + s"delta.feature.${TestReaderWriterMetadataAutoUpdateFeature.name}" -> "enabled"), expectedFinalProtocol = Some( Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestReaderWriterMetadataFeature))) + .withFeature(TestReaderWriterMetadataAutoUpdateFeature))) testCreateTable( "table features protocol, legacy feature, metadata", @@ -1152,29 +1166,49 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest .withFeature(TestLegacyReaderWriterFeature))) testCreateTable( - "table features protocol, native feature, metadata", + "table features protocol, native auto-update feature, metadata", + Map( + DeltaConfigs.MIN_READER_VERSION.key -> TABLE_FEATURES_MIN_READER_VERSION.toString, + DeltaConfigs.MIN_WRITER_VERSION.key -> TABLE_FEATURES_MIN_WRITER_VERSION.toString, + TestReaderWriterMetadataAutoUpdateFeature.TABLE_PROP_KEY -> "true"), + expectedFinalProtocol = Some( + Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeature(TestReaderWriterMetadataAutoUpdateFeature))) + + testCreateTable( + "table features protocol, native non-auto-update feature, metadata", + Map( + DeltaConfigs.MIN_READER_VERSION.key -> TABLE_FEATURES_MIN_READER_VERSION.toString, + DeltaConfigs.MIN_WRITER_VERSION.key -> TABLE_FEATURES_MIN_WRITER_VERSION.toString, + TestReaderWriterMetadataNoAutoUpdateFeature.TABLE_PROP_KEY -> "true"), + expectedFinalProtocol = Some( + Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeature(TestReaderWriterMetadataNoAutoUpdateFeature))) + + testCreateTable( + "table features protocol, native auto-update feature, feature property", Map( DeltaConfigs.MIN_READER_VERSION.key -> TABLE_FEATURES_MIN_READER_VERSION.toString, DeltaConfigs.MIN_WRITER_VERSION.key -> TABLE_FEATURES_MIN_WRITER_VERSION.toString, - TestReaderWriterMetadataFeature.TABLE_PROP_KEY -> "true"), + s"delta.feature.${TestReaderWriterMetadataAutoUpdateFeature.name}" -> "enabled"), expectedFinalProtocol = Some( Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestReaderWriterMetadataFeature))) + .withFeature(TestReaderWriterMetadataAutoUpdateFeature))) testCreateTable( - "table features protocol, native feature, feature property", + "table features protocol, native non-auto-update feature, feature property", Map( DeltaConfigs.MIN_READER_VERSION.key -> TABLE_FEATURES_MIN_READER_VERSION.toString, DeltaConfigs.MIN_WRITER_VERSION.key -> TABLE_FEATURES_MIN_WRITER_VERSION.toString, - s"delta.feature.${TestReaderWriterMetadataFeature.name}" -> "enabled"), + s"delta.feature.${TestReaderWriterMetadataNoAutoUpdateFeature.name}" -> "enabled"), expectedFinalProtocol = Some( Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestReaderWriterMetadataFeature))) + .withFeature(TestReaderWriterMetadataNoAutoUpdateFeature))) def testAlterTable( name: String, props: Map[String, String], - expectedExceptionMessage: Option[String] = None, + expectedExceptionClass: Option[String] = None, expectedFinalProtocol: Option[Protocol] = None, tableProtocol: Protocol = Protocol(1, 1)): Unit = { test(s"alter table - $name") { @@ -1182,10 +1216,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest val log = createTableWithProtocol(tableProtocol, dir) val propString = props.map(kv => s"'${kv._1}'='${kv._2}'").mkString(",") - if (expectedExceptionMessage.isDefined) { - intercept[Exception] { + if (expectedExceptionClass.isDefined) { + assert(intercept[DeltaTableFeatureException] { sql(s"ALTER TABLE delta.`${dir.getCanonicalPath}` SET TBLPROPERTIES ($propString)") - }.getMessage.contains(expectedExceptionMessage.get) + }.getErrorClass === expectedExceptionClass.get) } else { sql(s"ALTER TABLE delta.`${dir.getCanonicalPath}` SET TBLPROPERTIES ($propString)") } @@ -1219,18 +1253,39 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest tableProtocol = Protocol(1, 2)) testAlterTable( - "legacy protocol, native feature, metadata", - Map(TestReaderWriterMetadataFeature.TABLE_PROP_KEY -> "true"), + "legacy protocol, native auto-update feature, metadata", + Map(TestReaderWriterMetadataAutoUpdateFeature.TABLE_PROP_KEY -> "true"), + expectedFinalProtocol = Some( + Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeature(TestReaderWriterMetadataAutoUpdateFeature))) + + testAlterTable( + "legacy protocol, native non-auto-update feature, metadata", + Map(TestReaderWriterMetadataNoAutoUpdateFeature.TABLE_PROP_KEY -> "true"), + expectedExceptionClass = Some("DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT")) + + testAlterTable( + "legacy protocol, native non-auto-update feature, metadata and feature property", + Map( + TestReaderWriterMetadataNoAutoUpdateFeature.TABLE_PROP_KEY -> "true", + s"delta.feature.${TestReaderWriterMetadataNoAutoUpdateFeature.name}" -> "enabled"), + expectedFinalProtocol = Some( + Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeature(TestReaderWriterMetadataNoAutoUpdateFeature))) + + testAlterTable( + "legacy protocol, native auto-update feature, feature property", + Map(s"delta.feature.${TestReaderWriterMetadataAutoUpdateFeature.name}" -> "supported"), expectedFinalProtocol = Some( Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestReaderWriterMetadataFeature))) + .withFeature(TestReaderWriterMetadataAutoUpdateFeature))) testAlterTable( - "legacy protocol, native feature, feature property", - Map(s"delta.feature.${TestReaderWriterMetadataFeature.name}" -> "enabled"), + "legacy protocol, native non-auto-update feature, feature property", + Map(s"delta.feature.${TestReaderWriterMetadataNoAutoUpdateFeature.name}" -> "enabled"), expectedFinalProtocol = Some( Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestReaderWriterMetadataFeature))) + .withFeature(TestReaderWriterMetadataNoAutoUpdateFeature))) testAlterTable( "legacy protocol with supported version props, legacy feature, feature property", @@ -1259,10 +1314,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest Map( DeltaConfigs.MIN_READER_VERSION.key -> TABLE_FEATURES_MIN_READER_VERSION.toString, DeltaConfigs.MIN_WRITER_VERSION.key -> TABLE_FEATURES_MIN_WRITER_VERSION.toString, - s"delta.feature.${TestReaderWriterMetadataFeature.name}" -> "enabled"), + s"delta.feature.${TestReaderWriterMetadataAutoUpdateFeature.name}" -> "enabled"), expectedFinalProtocol = Some( Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestReaderWriterMetadataFeature))) + .withFeature(TestReaderWriterMetadataAutoUpdateFeature))) testAlterTable( "table features protocol, legacy feature, metadata", @@ -1283,31 +1338,132 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION)) testAlterTable( - "table features protocol, native feature, metadata", - Map(TestReaderWriterMetadataFeature.TABLE_PROP_KEY -> "true"), + "table features protocol, native auto-update feature, metadata", + Map(TestReaderWriterMetadataAutoUpdateFeature.TABLE_PROP_KEY -> "true"), + expectedFinalProtocol = Some( + Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeature(TestReaderWriterMetadataAutoUpdateFeature)), + tableProtocol = + Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION)) + + testAlterTable( + "table features protocol, native non-auto-update feature, metadata", + Map(TestReaderWriterMetadataNoAutoUpdateFeature.TABLE_PROP_KEY -> "true"), + tableProtocol = + Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION), + expectedExceptionClass = Some("DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT")) + + testAlterTable( + "table features protocol, native non-auto-update feature, metadata and feature property", + Map( + TestReaderWriterMetadataNoAutoUpdateFeature.TABLE_PROP_KEY -> "true", + s"delta.feature.${TestReaderWriterMetadataNoAutoUpdateFeature.name}" -> "enabled"), + tableProtocol = + Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION), + expectedFinalProtocol = Some( + Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeature(TestReaderWriterMetadataNoAutoUpdateFeature))) + + testAlterTable( + "table features protocol, native auto-update feature, feature property", + Map(s"delta.feature.${TestReaderWriterMetadataAutoUpdateFeature.name}" -> "enabled"), expectedFinalProtocol = Some( Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestReaderWriterMetadataFeature)), + .withFeature(TestReaderWriterMetadataAutoUpdateFeature)), tableProtocol = Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION)) testAlterTable( - "table features protocol, native feature, feature property", - Map(s"delta.feature.${TestReaderWriterMetadataFeature.name}" -> "enabled"), + "table features protocol, native non-auto-update feature, feature property", + Map(s"delta.feature.${TestReaderWriterMetadataNoAutoUpdateFeature.name}" -> "enabled"), expectedFinalProtocol = Some( Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestReaderWriterMetadataFeature)), + .withFeature(TestReaderWriterMetadataNoAutoUpdateFeature)), tableProtocol = Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION)) testAlterTable( "feature property merges the old protocol", - Map(s"delta.feature.${TestReaderWriterMetadataFeature.name}" -> "enabled"), + Map(s"delta.feature.${TestReaderWriterMetadataAutoUpdateFeature.name}" -> "enabled"), expectedFinalProtocol = Some( Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestReaderWriterMetadataFeature).merge(Protocol(1, 2))), + .withFeature(TestReaderWriterMetadataAutoUpdateFeature).merge(Protocol(1, 2))), tableProtocol = Protocol(1, 2)) + test("non-auto-update capable feature requires manual enablement (via feature prop)") { + withTempDir { dir => + val deltaLog = DeltaLog.forTable(spark, dir) + withSQLConf( + DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION.key -> "1", + DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION.key -> "1") { + spark.range(10).writeTo(s"delta.`${dir.getCanonicalPath}`").using("delta").create() + } + val expectedProtocolOnCreation = Protocol(1, 1) + assert(deltaLog.update().protocol === expectedProtocolOnCreation) + + assert(intercept[DeltaTableFeatureException] { + withSQLConf(defaultPropertyKey(TestWriterMetadataNoAutoUpdateFeature) -> "supported") { + sql( + s"ALTER TABLE delta.`${dir.getCanonicalPath}` SET TBLPROPERTIES (" + + s" '${TestWriterMetadataNoAutoUpdateFeature.TABLE_PROP_KEY}' = 'true')") + } + }.getErrorClass === "DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT", + "existing tables should ignore session defaults.") + + sql( + s"ALTER TABLE delta.`${dir.getCanonicalPath}` SET TBLPROPERTIES (" + + s" '${propertyKey(TestWriterMetadataNoAutoUpdateFeature)}' = 'supported'," + + s" '${TestWriterMetadataNoAutoUpdateFeature.TABLE_PROP_KEY}' = 'true')") + assert( + deltaLog.update().protocol === + expectedProtocolOnCreation + .merge(TestWriterMetadataNoAutoUpdateFeature.minProtocolVersion) + .withFeature(TestWriterMetadataNoAutoUpdateFeature)) + } + } + + test("non-auto-update capable error message is correct") { + withTempDir { dir => + val deltaLog = DeltaLog.forTable(spark, dir) + + withSQLConf( + DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION.key -> "1", + DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION.key -> "1") { + spark.range(10).writeTo(s"delta.`${dir.getCanonicalPath}`") + .tableProperty("delta.appendOnly", "true") + .using("delta") + .create() + val protocolOfNewTable = Protocol(1, 2) + assert(deltaLog.update().protocol === protocolOfNewTable) + + val e = intercept[DeltaTableFeatureException] { + // ALTER TABLE must not consider this SQL config + withSQLConf(defaultPropertyKey(TestWriterFeature) -> "supported") { + sql( + s"ALTER TABLE delta.`${dir.getCanonicalPath}` SET TBLPROPERTIES (" + + s" 'delta.appendOnly' = 'false'," + + s" 'delta.enableChangeDataFeed' = 'true'," + + s" '${TestReaderWriterMetadataAutoUpdateFeature.TABLE_PROP_KEY}' = 'true'," + + s" '${TestWriterMetadataNoAutoUpdateFeature.TABLE_PROP_KEY}' = 'true')") + } + } + + val unsupportedFeatures = TestWriterMetadataNoAutoUpdateFeature.name + val supportedFeatures = + (protocolOfNewTable.implicitlyAndExplicitlySupportedFeatures + + ChangeDataFeedTableFeature + + TestReaderWriterMetadataAutoUpdateFeature).map(_.name).toSeq.sorted.mkString(", ") + assert(e.getErrorClass === "DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT") + + // `getMessageParameters` is available starting from Spark 3.4. + // For now we have to check for substrings. + assert(e.getMessage.contains(s" $unsupportedFeatures.")) + assert(e.getMessage.contains(s" $supportedFeatures.")) + + } + } + } + test("table creation with protocol as table property - property wins over conf") { withTempDir { dir => val deltaLog = DeltaLog.forTable(spark, dir) diff --git a/core/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala b/core/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala index 1a0316aea29..d91f24454f6 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala @@ -33,7 +33,7 @@ object DeltaTestImplicits { if (txn.readVersion == -1) { val metadataOpt = if (!actions.exists(_.isInstanceOf[Metadata])) Some(Metadata()) else None val metadata = metadataOpt.getOrElse(actions.collectFirst { case m: Metadata => m }.get) - val needsProtocolUpdate = Protocol.checkProtocolRequirements( + val needsProtocolUpdate = Protocol.upgradeProtocolFromMetadataForExistingTable( SparkSession.active, metadata, txn.protocol) if (needsProtocolUpdate.isDefined) { // if the metadata triggers a protocol upgrade, commit without an explicit protocol