Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow restricting table features to auto-update protocol versions #1660

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,19 @@
],
"sqlState" : "0AKDE"
},
"DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT" : {
"message" : [
"Your table schema requires manually enablement of the following table feature(s): <unsupportedFeatures>.",
"",
"To do this, run the following command for each of features listed above:",
" ALTER TABLE table_name SET TBLPROPERTIES ('delta.feature.feature_name' = 'supported')",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Table name can be passed in to this error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems not trivial because we have only the Delta Log instance which is based on a path. The name is already lost at this stage.

xupefei marked this conversation as resolved.
Show resolved Hide resolved
"Replace \"table_name\" and \"feature_name\" with real values.",
"Note that the procedure is irreversible: once supported, a feature can never be unsupported again.",
"",
"Current supported feature(s): <supportedFeatures>."
],
"sqlState" : "0AKDE"
},
"DELTA_FEATURE_REQUIRES_HIGHER_READER_VERSION" : {
"message" : [
"Unable to enable table feature <feature> because it requires a higher reader protocol version (current <current>). Consider upgrading the table's reader protocol version to <required>, or to a version which supports reader table features. Refer to <docLink> for more information on table protocol versions."
Expand Down Expand Up @@ -2075,21 +2088,6 @@
],
"sqlState" : "0AKDC"
},
"DELTA_UNSUPPORTED_TYPE_TIMESTAMP_NTZ" : {
"message" : [
"Your table schema <schema> contains a column of type TimestampNTZ.",
"TimestampNTZ type is not supported by your table's protocol.",
"",
"Required Delta protocol version and features for TimestampNTZ:",
"<requiredVersion>",
"Your table's current Delta protocol version and enabled features:",
"<currentVersion>",
"",
"Run the following command to add TimestampNTZ support to your table.",
"ALTER TABLE table_name SET TBLPROPERTIES ('delta.feature.timestampNtz' = 'supported')"
],
"sqlState" : "0A000"
},
"DELTA_UNSUPPORTED_VACUUM_SPECIFIC_PARTITION" : {
"message" : [
"Please provide the base path (<baseDeltaPath>) when Vacuuming Delta tables. Vacuuming specific partitions is currently not supported."
Expand Down
24 changes: 10 additions & 14 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1713,20 +1713,6 @@ trait DeltaErrorsBase
)
}

def schemaContainsTimestampNTZType(
schema: StructType,
requiredProtocol: Protocol,
currentProtocol: Protocol): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_UNSUPPORTED_TYPE_TIMESTAMP_NTZ",
messageParameters = Array(
s"${formatSchema(schema)}",
s"$requiredProtocol",
s"$currentProtocol"
)
)
}

def tableAlreadyExists(table: CatalogTable): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_TABLE_ALREADY_EXISTS",
Expand Down Expand Up @@ -2168,6 +2154,16 @@ trait DeltaErrorsBase
messageParameters = Array(features.mkString(", ")))
}

def tableFeaturesRequireManualEnablementException(
unsupportedFeatures: Iterable[TableFeature],
supportedFeatures: Iterable[TableFeature]): Throwable = {
new DeltaTableFeatureException(
errorClass = "DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT",
messageParameters = Array(
unsupportedFeatures.map(_.name).toSeq.sorted.mkString(", "),
supportedFeatures.map(_.name).toSeq.sorted.mkString(", ")))
}

def concurrentAppendException(
conflictingCommit: Option[CommitInfo],
partition: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,29 +404,10 @@ trait OptimisticTransactionImpl extends TransactionalWrite
newMetadataTmp,
isCreatingNewTable)

// Check for existence of TimestampNTZ in the schema and throw an error if the feature
// is not enabled.
if (!protocolBeforeUpdate.isFeatureSupported(TimestampNTZTableFeature) &&
SchemaUtils.checkForTimestampNTZColumnsRecursively(newMetadataTmp.schema)) {
// The timestampNTZ feature is enabled if there is a table prop in this transaction,
// or if this is a new table
val isEnabled = isCreatingNewTable || TableFeatureProtocolUtils
.getSupportedFeaturesFromConfigs(
newMetadataTmp.configuration, TableFeatureProtocolUtils.FEATURE_PROP_PREFIX)
.contains(TimestampNTZTableFeature)

if (!isEnabled) {
throw DeltaErrors.schemaContainsTimestampNTZType(
newMetadataTmp.schema,
TimestampNTZTableFeature.minProtocolVersion.withFeature(TimestampNTZTableFeature),
snapshot.protocol
)
}
}

if (newMetadataTmp.schemaString != null) {
// Replace CHAR and VARCHAR with StringType
var schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(newMetadataTmp.schema)
val schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(
newMetadataTmp.schema)
newMetadataTmp = newMetadataTmp.copy(schemaString = schema.json)
}

Expand All @@ -449,16 +430,16 @@ trait OptimisticTransactionImpl extends TransactionalWrite
newProtocol = Some(Protocol.forNewTable(spark, Some(newMetadataTmp)))
} else if (newMetadataTmp.configuration.contains(Protocol.MIN_READER_VERSION_PROP) ||
newMetadataTmp.configuration.contains(Protocol.MIN_WRITER_VERSION_PROP)) {
// Table features Part 1: bump protocol version numbers
//
// Collect new reader and writer versions from table properties, which could be provided by
// the user in `ALTER TABLE TBLPROPERTIES` or copied over from session defaults.
val readerVersionInNewMetadataTmp = newMetadataTmp.configuration
.get(Protocol.MIN_READER_VERSION_PROP)
.map(Protocol.getVersion(Protocol.MIN_READER_VERSION_PROP, _))
.getOrElse(protocolBeforeUpdate.minReaderVersion)
val writerVersionInNewMetadataTmp = newMetadataTmp.configuration
.get(Protocol.MIN_WRITER_VERSION_PROP)
.map(Protocol.getVersion(Protocol.MIN_WRITER_VERSION_PROP, _))
.getOrElse(protocolBeforeUpdate.minWriterVersion)
val readerVersionInNewMetadataTmp =
Protocol.getReaderVersionFromTableConf(newMetadataTmp.configuration)
.getOrElse(protocolBeforeUpdate.minReaderVersion)
val writerVersionInNewMetadataTmp =
Protocol.getWriterVersionFromTableConf(newMetadataTmp.configuration)
.getOrElse(protocolBeforeUpdate.minWriterVersion)

// If the collected reader and writer versions are provided by the user, we must use them,
// and throw ProtocolDowngradeException when they are lower than what the table have before
Expand Down Expand Up @@ -526,18 +507,16 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}
}

// Enabling table features Part 1: add manually-supported features in table properties start
// with [[FEATURE_PROP_PREFIX]].
// Table features Part 2: add manually-supported features specified in table properties, aka
// those start with [[FEATURE_PROP_PREFIX]].
//
// This transaction's new metadata might contain some table properties to support some
// features (props start with [[FEATURE_PROP_PREFIX]]). We silently add them to the `protocol`
// action, and bump the protocol version to (3, 7) or (_, 7), depending on the existence of
// any reader-writer feature.
val newProtocolBeforeAddingFeatures = newProtocol.getOrElse(protocolBeforeUpdate)
val newFeaturesFromTableConf =
TableFeatureProtocolUtils.getSupportedFeaturesFromConfigs(
newMetadataTmp.configuration,
TableFeatureProtocolUtils.FEATURE_PROP_PREFIX)
TableFeatureProtocolUtils.getSupportedFeaturesFromTableConfigs(newMetadataTmp.configuration)
val readerVersionForNewProtocol =
if (newFeaturesFromTableConf.exists(_.isReaderWriterFeature)) {
TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION
Expand All @@ -561,7 +540,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
newMetadataTmp = newMetadataTmp.copy(configuration = configsWithoutProtocolProps)
assertMetadata(newMetadataTmp)

// Enabling table features Part 2: add automatically-enabled features.
// Table features Part 3: add automatically-enabled features by looking at the new table
// metadata.
//
// This code path is for existing tables. The new table case has been handled by
// [[Protocol.forNewTable]] earlier in this method.
Expand Down
62 changes: 52 additions & 10 deletions core/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,32 @@ sealed trait LegacyFeatureType
* A trait indicating this feature can be automatically enabled via a change in a table's
* metadata, e.g., through setting particular values of certain feature-specific table properties.
*
* When the feature's metadata requirements are satisfied during table creation
* ([[actions.Protocol.forNewTable]]) or commit ([[OptimisticTransaction.updateMetadata]]), the
* When the feature's metadata requirements are satisfied for <b>new tables</b>, or for
* <b>existing tables when [[automaticallyUpdateProtocolOfExistingTables]] set to `true`</b>, the
Comment on lines +114 to +115
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we allow html annotations in our doc comments? I thought it was just markdown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe so... Someone could correct me if I'm wrong

* client will silently add the feature to the protocol's `readerFeatures` and/or
* `writerFeatures`.
* `writerFeatures`. Otherwise, a proper protocol version bump must be present in the same
* transaction.
*/
sealed trait FeatureAutomaticallyEnabledByMetadata {
sealed trait FeatureAutomaticallyEnabledByMetadata { this: TableFeature =>

/**
* Whether the feature can automatically update the protocol of an existing table when the
* metadata requirements are satisfied. As a rule of thumb, a table feature that requires
* explicit operations (e.g., turning on a table property) should set this flag to `true`, while
* features that are used implicitly (e.g., when using a new data type) should set this flag to
* `false`.
*/
def automaticallyUpdateProtocolOfExistingTables: Boolean = this.isLegacyFeature

/**
* Determine whether the feature must be supported and enabled because its metadata requirements
* are satisfied.
*/
def metadataRequiresFeatureToBeEnabled(metadata: Metadata, spark: SparkSession): Boolean

require(
!this.isLegacyFeature || automaticallyUpdateProtocolOfExistingTables,
"Legacy feature must be auto-update capable.")
}

/**
Expand Down Expand Up @@ -205,10 +219,12 @@ object TableFeature {
if (DeltaUtils.isTesting) {
features ++= Set(
TestLegacyWriterFeature,
TestWriterFeature,
TestLegacyReaderWriterFeature,
TestWriterFeature,
TestWriterMetadataNoAutoUpdateFeature,
TestReaderWriterFeature,
TestReaderWriterMetadataFeature)
TestReaderWriterMetadataAutoUpdateFeature,
TestReaderWriterMetadataNoAutoUpdateFeature)
}
val featureMap = features.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap
require(features.size == featureMap.size, "Lowercase feature names must not duplicate.")
Expand Down Expand Up @@ -302,7 +318,6 @@ object IdentityColumnsTableFeature

object TimestampNTZTableFeature extends ReaderWriterFeature(name = "timestampNtz")
with FeatureAutomaticallyEnabledByMetadata {

override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata, spark: SparkSession): Boolean = {
SchemaUtils.checkForTimestampNTZColumnsRecursively(metadata.schema)
Expand All @@ -312,6 +327,8 @@ object TimestampNTZTableFeature extends ReaderWriterFeature(name = "timestampNtz
object DeletionVectorsTableFeature
extends ReaderWriterFeature(name = "deletionVectors")
with FeatureAutomaticallyEnabledByMetadata {
override def automaticallyUpdateProtocolOfExistingTables: Boolean = true

override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = {
Expand All @@ -330,6 +347,17 @@ object TestLegacyWriterFeature

object TestWriterFeature extends WriterFeature(name = "testWriter")

object TestWriterMetadataNoAutoUpdateFeature
extends WriterFeature(name = "testWriterMetadataNoAutoUpdate")
with FeatureAutomaticallyEnabledByMetadata {
val TABLE_PROP_KEY = "_123testWriterMetadataNoAutoUpdate321_"
override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = {
metadata.configuration.get(TABLE_PROP_KEY).exists(_.toBoolean)
}
}

object TestLegacyReaderWriterFeature
extends LegacyReaderWriterFeature(
name = "testLegacyReaderWriter",
Expand All @@ -338,10 +366,24 @@ object TestLegacyReaderWriterFeature

object TestReaderWriterFeature extends ReaderWriterFeature(name = "testReaderWriter")

object TestReaderWriterMetadataFeature
extends ReaderWriterFeature(name = "testReaderWriterMetadata")
object TestReaderWriterMetadataNoAutoUpdateFeature
extends ReaderWriterFeature(name = "testReaderWriterMetadataNoAutoUpdate")
with FeatureAutomaticallyEnabledByMetadata {
val TABLE_PROP_KEY = "_123testReaderWriterMetadata321_"
val TABLE_PROP_KEY = "_123testReaderWriterMetadataNoAutoUpdate321_"
override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = {
metadata.configuration.get(TABLE_PROP_KEY).exists(_.toBoolean)
}
}

object TestReaderWriterMetadataAutoUpdateFeature
extends ReaderWriterFeature(name = "testReaderWriterMetadataAutoUpdate")
with FeatureAutomaticallyEnabledByMetadata {
val TABLE_PROP_KEY = "_123testReaderWriterMetadataAutoUpdate321_"

override def automaticallyUpdateProtocolOfExistingTables: Boolean = true

override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ trait TableFeatureSupport { this: Protocol =>
}
}

@JsonIgnore
lazy val implicitlyAndExplicitlySupportedFeatures: Set[TableFeature] = {
readerAndWriterFeatureNames.flatMap(TableFeature.featureNameToFeature) ++
implicitlySupportedFeatures
}

/**
* Determine whether this protocol can be safely upgraded to a new protocol `to`. This means:
* - this protocol has reader protocol version less than or equals to `to`.
Expand Down Expand Up @@ -309,18 +315,15 @@ object TableFeatureProtocolUtils {
}

/**
* Get a set of [[TableFeature]]s representing supported features set in a `config` map (table
* properties or Spark session configs).
* Get a set of [[TableFeature]]s representing supported features set in a table properties map.
*/
def getSupportedFeaturesFromConfigs(
configs: Map[String, String],
propertyPrefix: String): Set[TableFeature] = {
val featureConfigs = configs.filterKeys(_.startsWith(propertyPrefix))
def getSupportedFeaturesFromTableConfigs(configs: Map[String, String]): Set[TableFeature] = {
val featureConfigs = configs.filterKeys(_.startsWith(FEATURE_PROP_PREFIX))
val unsupportedFeatureConfigs = mutable.Set.empty[String]
val collectedFeatures = featureConfigs.flatMap { case (key, value) =>
// Feature name is lower cased in table properties but not in Spark session configs.
// Feature status is not lower cased in any case.
val name = key.stripPrefix(propertyPrefix).toLowerCase(Locale.ROOT)
val name = key.stripPrefix(FEATURE_PROP_PREFIX).toLowerCase(Locale.ROOT)
val status = value.toLowerCase(Locale.ROOT)
if (status != FEATURE_PROP_SUPPORTED && status != FEATURE_PROP_ENABLED) {
throw DeltaErrors.unsupportedTableFeatureStatusException(name, status)
Expand Down
Loading