From 92948e73713f6f6629e1610ed0975fa8e619f1a8 Mon Sep 17 00:00:00 2001 From: Dejan Krakovic Date: Thu, 26 Dec 2024 17:05:24 +0800 Subject: [PATCH] [SPARK-50675][SQL] Table and view level collations support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? This change introduces table and view level collations support in Spark SQL, allowing CREATE TABLE, ALTER TABLE and CREATE VIEW commands to specify DEFAULT COLLATION to be used. For CREATE commands, this refers to all the underlying columns added as part of the table/view creation. For ALTER TABLE command, this refers to only newly created columns in the future, whereas existing ones are not affected, i.e. their collation remains the same. The PR has been modelled after the original changes made by stefankandic in https://github.com/apache/spark/pull/48090, with this PR covering table and view-level collations, whereas a follow up PR will be made covering schema-level collations. This PR is adding/extending the corresponding DDL commands for specifying table/view level collation, whereas a follow up PR will be created separately to leverage the table/view collation in order to determine default collations for input queries of DML commands. ### Why are the changes needed? From our internal users feedback, many people would like to be able to specify collation for their objects, instead of each individual columns. This change adds support for table and view level collations, whereas subsequent changes will add support for other objects such as schema-level collations. ### Does this PR introduce _any_ user-facing change? The change follows the agreed additions in syntax for collation support. The following syntax is now supported (**bold** parts denote additions): { { [CREATE OR] REPLACE TABLE | CREATE [EXTERNAL] TABLE [ IF NOT EXISTS ] } table_name [ table_specification ] [ USING data_source ] [ table_clauses ] [ AS query ] } table_specification ( { column_identifier column_type [ column_properties ] ] } [, ...] [ , table_constraint ] [...] ) table_clauses { OPTIONS clause | PARTITIONED BY clause | CLUSTER BY clause | clustered_by_clause | LOCATION path [ WITH ( CREDENTIAL credential_name ) ] | COMMENT table_comment | TBLPROPERTIES clause | **DEFAULT COLLATION table_collation_name |** WITH { ROW FILTER clause } } [...] CREATE [ OR REPLACE ] [ TEMPORARY ] VIEW [ IF NOT EXISTS ] view_name [ column_list ] [ schema_binding | COMMENT view_comment | TBLPROPERTIES clause | **DEFAULT COLLATION collation_name** ] [...] AS query ALTER TABLE table_name { ADD COLUMN clause | ALTER COLUMN clause | DROP COLUMN clause | RENAME COLUMN clause | **DEFAULT COLLATION clause | …** } ### How was this patch tested? Tests for the new syntax/functionality were added as part of the change. Also, some of the existing tests were extended/amended to cover the new DEFAULT COLLATION for table/view objects. ### Was this patch authored or co-authored using generative AI tooling? No Closes #49084 from dejankrak-db/object-level-collations. Authored-by: Dejan Krakovic Signed-off-by: Wenchen Fan --- .../sql/catalyst/parser/SqlBaseParser.g4 | 7 +++ .../sql/connector/catalog/TableCatalog.java | 5 ++ .../catalyst/analysis/ResolveTableSpec.scala | 1 + .../sql/catalyst/catalog/interface.scala | 2 + .../sql/catalyst/parser/AstBuilder.scala | 54 ++++++++++++++---- .../plans/logical/v2AlterTableCommands.scala | 12 ++++ .../catalyst/plans/logical/v2Commands.scala | 6 +- .../sql/connector/catalog/CatalogV2Util.scala | 5 +- .../spark/sql/connector/catalog/V1Table.scala | 1 + ...eateTablePartitioningValidationSuite.scala | 2 +- .../sql/catalyst/parser/DDLParserSuite.scala | 55 +++++++++++++++++-- .../connect/planner/SparkConnectPlanner.scala | 1 + .../scala/org/apache/spark/sql/Dataset.scala | 1 + .../analysis/ResolveSessionCatalog.scala | 14 +++-- .../spark/sql/execution/SparkSqlParser.scala | 6 +- .../spark/sql/execution/command/views.scala | 5 +- .../datasources/v2/CacheTableExec.scala | 1 + .../datasources/v2/ShowCreateTableExec.scala | 7 +++ .../datasources/v2/V2SessionCatalog.scala | 6 +- .../spark/sql/internal/CatalogImpl.scala | 1 + .../sql/internal/DataFrameWriterImpl.scala | 3 + .../sql/internal/DataFrameWriterV2Impl.scala | 2 + .../sql/streaming/DataStreamWriter.scala | 1 + .../sql/connector/DataSourceV2SQLSuite.scala | 10 +++- .../V2CommandsCaseSensitivitySuite.scala | 12 ++-- .../AlterTableSetTblPropertiesSuiteBase.scala | 6 +- ...lterTableUnsetTblPropertiesSuiteBase.scala | 6 +- .../command/DDLCommandTestUtils.scala | 6 ++ .../execution/command/DDLParserSuite.scala | 6 ++ .../command/DescribeTableSuiteBase.scala | 25 +++++++++ .../command/v1/DescribeTableSuite.scala | 2 + .../sql/hive/client/HiveClientImpl.scala | 5 ++ .../sql/hive/client/HiveClientSuite.scala | 19 +++++++ 33 files changed, 257 insertions(+), 38 deletions(-) diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index e743aa2a744f7..a5d217486bf20 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -236,6 +236,7 @@ statement | ALTER TABLE identifierReference RECOVER PARTITIONS #recoverPartitions | ALTER TABLE identifierReference (clusterBySpec | CLUSTER BY NONE) #alterClusterBy + | ALTER TABLE identifierReference collationSpec #alterTableCollation | DROP TABLE (IF EXISTS)? identifierReference PURGE? #dropTable | DROP VIEW (IF EXISTS)? identifierReference #dropView | CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)? @@ -243,6 +244,7 @@ statement identifierCommentList? (commentSpec | schemaBinding | + collationSpec | (PARTITIONED ON identifierList) | (TBLPROPERTIES propertyList))* AS query #createView @@ -528,6 +530,7 @@ createTableClauses createFileFormat | locationSpec | commentSpec | + collationSpec | (TBLPROPERTIES tableProps=propertyList))* ; @@ -1232,6 +1235,10 @@ colPosition : position=FIRST | position=AFTER afterCol=errorCapturingIdentifier ; +collationSpec + : DEFAULT COLLATION collationName=identifier + ; + collateClause : COLLATE collationName=multipartIdentifier ; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index ba3470f85338c..77dbaa7687b41 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -67,6 +67,11 @@ public interface TableCatalog extends CatalogPlugin { */ String PROP_COMMENT = "comment"; + /** + * A reserved property to specify the collation of the table. + */ + String PROP_COLLATION = "collation"; + /** * A reserved property to specify the provider of the table. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala index cc9979ad4c5e5..05158fbee3de6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala @@ -92,6 +92,7 @@ object ResolveTableSpec extends Rule[LogicalPlan] { options = newOptions.toMap, location = u.location, comment = u.comment, + collation = u.collation, serde = u.serde, external = u.external) withNewSpec(newTableSpec) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index dcd1d3137da3f..32a90833e2e7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -350,6 +350,7 @@ case class CatalogTable( stats: Option[CatalogStatistics] = None, viewText: Option[String] = None, comment: Option[String] = None, + collation: Option[String] = None, unsupportedFeatures: Seq[String] = Seq.empty, tracksPartitionsInCatalog: Boolean = false, schemaPreservesCase: Boolean = true, @@ -546,6 +547,7 @@ case class CatalogTable( provider.foreach(map.put("Provider", _)) bucketSpec.foreach(map ++= _.toLinkedHashMap) comment.foreach(map.put("Comment", _)) + collation.foreach(map.put("Collation", _)) if (tableType == CatalogTableType.VIEW) { viewText.foreach(map.put("View Text", _)) viewOriginalText.foreach(map.put("View Original Text", _)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f37879ecd9356..aa32cc9100515 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin} import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER import org.apache.spark.sql.catalyst.types.DataTypeUtils -import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, IntervalUtils, SparkParserUtils} +import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, CollationFactory, DateTimeUtils, IntervalUtils, SparkParserUtils} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, stringToTimestamp, stringToTimestampWithoutTimeZone} import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces, TableCatalog, TableWritePrivilege} import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition @@ -3869,6 +3869,16 @@ class AstBuilder extends DataTypeAstBuilder ctx.asScala.headOption.map(visitCommentSpec) } + protected def visitCollationSpecList( + ctx: java.util.List[CollationSpecContext]): Option[String] = { + ctx.asScala.headOption.map(visitCollationSpec) + } + + override def visitCollationSpec(ctx: CollationSpecContext): String = withOrigin(ctx) { + val collationName = ctx.identifier.getText + CollationFactory.fetchCollation(collationName).collationName + } + /** * Create a [[BucketSpec]]. */ @@ -4000,6 +4010,7 @@ class AstBuilder extends DataTypeAstBuilder * - options * - location * - comment + * - collation * - serde * - clusterBySpec * @@ -4008,8 +4019,8 @@ class AstBuilder extends DataTypeAstBuilder * types like `i INT`, which should be appended to the existing table schema. */ type TableClauses = ( - Seq[Transform], Seq[ColumnDefinition], Option[BucketSpec], Map[String, String], - OptionList, Option[String], Option[String], Option[SerdeInfo], Option[ClusterBySpec]) + Seq[Transform], Seq[ColumnDefinition], Option[BucketSpec], Map[String, String], OptionList, + Option[String], Option[String], Option[String], Option[SerdeInfo], Option[ClusterBySpec]) /** * Validate a create table statement and return the [[TableIdentifier]]. @@ -4296,6 +4307,10 @@ class AstBuilder extends DataTypeAstBuilder throw QueryParsingErrors.cannotCleanReservedTablePropertyError( PROP_EXTERNAL, ctx, "please use CREATE EXTERNAL TABLE") case (PROP_EXTERNAL, _) => false + case (PROP_COLLATION, _) if !legacyOn => + throw QueryParsingErrors.cannotCleanReservedTablePropertyError( + PROP_COLLATION, ctx, "please use the DEFAULT COLLATION clause to specify it") + case (PROP_COLLATION, _) => false // It's safe to set whatever table comment, so we don't make it a reserved table property. case (PROP_COMMENT, _) => true case (k, _) => @@ -4475,6 +4490,7 @@ class AstBuilder extends DataTypeAstBuilder checkDuplicateClauses(ctx.createFileFormat, "STORED AS/BY", ctx) checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx) checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx) + checkDuplicateClauses(ctx.collationSpec(), "DEFAULT COLLATION", ctx) checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx) checkDuplicateClauses(ctx.clusterBySpec(), "CLUSTER BY", ctx) checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) @@ -4493,6 +4509,7 @@ class AstBuilder extends DataTypeAstBuilder val location = visitLocationSpecList(ctx.locationSpec()) val (cleanedOptions, newLocation) = cleanTableOptions(ctx, options, location) val comment = visitCommentSpecList(ctx.commentSpec()) + val collation = visitCollationSpecList(ctx.collationSpec()) val serdeInfo = getSerdeInfo(ctx.rowFormat.asScala.toSeq, ctx.createFileFormat.asScala.toSeq, ctx) val clusterBySpec = ctx.clusterBySpec().asScala.headOption.map(visitClusterBySpec) @@ -4507,7 +4524,7 @@ class AstBuilder extends DataTypeAstBuilder } (partTransforms, partCols, bucketSpec, cleanedProperties, cleanedOptions, newLocation, comment, - serdeInfo, clusterBySpec) + collation, serdeInfo, clusterBySpec) } protected def getSerdeInfo( @@ -4567,6 +4584,7 @@ class AstBuilder extends DataTypeAstBuilder * ] * [LOCATION path] * [COMMENT table_comment] + * [DEFAULT COLLATION collation_name] * [TBLPROPERTIES (property_name=property_value, ...)] * * partition_fields: @@ -4580,8 +4598,8 @@ class AstBuilder extends DataTypeAstBuilder val columns = Option(ctx.colDefinitionList()).map(visitColDefinitionList).getOrElse(Nil) val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText) - val (partTransforms, partCols, bucketSpec, properties, options, location, - comment, serdeInfo, clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses()) + val (partTransforms, partCols, bucketSpec, properties, options, location, comment, + collation, serdeInfo, clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses()) if (provider.isDefined && serdeInfo.isDefined) { invalidStatement(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx) @@ -4599,7 +4617,7 @@ class AstBuilder extends DataTypeAstBuilder clusterBySpec.map(_.asTransform) val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment, - serdeInfo, external) + collation, serdeInfo, external) Option(ctx.query).map(plan) match { case Some(_) if columns.nonEmpty => @@ -4648,6 +4666,7 @@ class AstBuilder extends DataTypeAstBuilder * ] * [LOCATION path] * [COMMENT table_comment] + * [DEFAULT COLLATION collation_name] * [TBLPROPERTIES (property_name=property_value, ...)] * * partition_fields: @@ -4657,8 +4676,8 @@ class AstBuilder extends DataTypeAstBuilder */ override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan = withOrigin(ctx) { val orCreate = ctx.replaceTableHeader().CREATE() != null - val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo, - clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses()) + val (partTransforms, partCols, bucketSpec, properties, options, location, comment, collation, + serdeInfo, clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses()) val columns = Option(ctx.colDefinitionList()).map(visitColDefinitionList).getOrElse(Nil) val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText) @@ -4672,7 +4691,7 @@ class AstBuilder extends DataTypeAstBuilder clusterBySpec.map(_.asTransform) val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment, - serdeInfo, external = false) + collation, serdeInfo, external = false) Option(ctx.query).map(plan) match { case Some(_) if columns.nonEmpty => @@ -5078,6 +5097,21 @@ class AstBuilder extends DataTypeAstBuilder } } + /** + * Parse a [[AlterTableCollation]] command. + * + * For example: + * {{{ + * ALTER TABLE table1 DEFAULT COLLATION name + * }}} + */ + override def visitAlterTableCollation(ctx: AlterTableCollationContext): LogicalPlan = + withOrigin(ctx) { + val table = createUnresolvedTable( + ctx.identifierReference, "ALTER TABLE ... DEFAULT COLLATION") + AlterTableCollation(table, visitCollationSpec(ctx.collationSpec())) + } + /** * Parse [[SetViewProperties]] or [[SetTableProperties]] commands. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala index 2f5d4b9c86e25..dbd2c0ba8e420 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala @@ -261,3 +261,15 @@ case class AlterTableClusterBy( protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(table = newChild) } + +/** + * The logical plan of the ALTER TABLE ... DEFAULT COLLATION name command. + */ +case class AlterTableCollation( + table: LogicalPlan, collation: String) extends AlterTableCommand { + override def changes: Seq[TableChange] = { + Seq(TableChange.setProperty(TableCatalog.PROP_COLLATION, collation)) + } + + protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(table = newChild) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 857522728eaff..85b5e8379d3d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -1338,6 +1338,7 @@ case class CreateView( child: LogicalPlan, userSpecifiedColumns: Seq[(String, Option[String])], comment: Option[String], + collation: Option[String], properties: Map[String, String], originalText: Option[String], query: LogicalPlan, @@ -1486,6 +1487,7 @@ trait TableSpecBase { def provider: Option[String] def location: Option[String] def comment: Option[String] + def collation: Option[String] def serde: Option[SerdeInfo] def external: Boolean } @@ -1496,6 +1498,7 @@ case class UnresolvedTableSpec( optionExpression: OptionList, location: Option[String], comment: Option[String], + collation: Option[String], serde: Option[SerdeInfo], external: Boolean) extends UnaryExpression with Unevaluable with TableSpecBase { @@ -1541,10 +1544,11 @@ case class TableSpec( options: Map[String, String], location: Option[String], comment: Option[String], + collation: Option[String], serde: Option[SerdeInfo], external: Boolean) extends TableSpecBase { def withNewLocation(newLocation: Option[String]): TableSpec = { - TableSpec(properties, provider, options, newLocation, comment, serde, external) + TableSpec(properties, provider, options, newLocation, comment, collation, serde, external) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index e1f114a6170a4..97cc263c56c5f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -53,6 +53,7 @@ private[sql] object CatalogV2Util { */ val TABLE_RESERVED_PROPERTIES = Seq(TableCatalog.PROP_COMMENT, + TableCatalog.PROP_COLLATION, TableCatalog.PROP_LOCATION, TableCatalog.PROP_PROVIDER, TableCatalog.PROP_OWNER, @@ -459,7 +460,7 @@ private[sql] object CatalogV2Util { def convertTableProperties(t: TableSpec): Map[String, String] = { val props = convertTableProperties( t.properties, t.options, t.serde, t.location, t.comment, - t.provider, t.external) + t.collation, t.provider, t.external) withDefaultOwnership(props) } @@ -469,6 +470,7 @@ private[sql] object CatalogV2Util { serdeInfo: Option[SerdeInfo], location: Option[String], comment: Option[String], + collation: Option[String], provider: Option[String], external: Boolean = false): Map[String, String] = { properties ++ @@ -478,6 +480,7 @@ private[sql] object CatalogV2Util { (if (external) Some(TableCatalog.PROP_EXTERNAL -> "true") else None) ++ provider.map(TableCatalog.PROP_PROVIDER -> _) ++ comment.map(TableCatalog.PROP_COMMENT -> _) ++ + collation.map(TableCatalog.PROP_COLLATION -> _) ++ location.map(TableCatalog.PROP_LOCATION -> _) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala index 4a5a607e8a8ae..570ab1338dbf2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala @@ -85,6 +85,7 @@ private[sql] object V1Table { TableCatalog.OPTION_PREFIX + key -> value } ++ v1Table.provider.map(TableCatalog.PROP_PROVIDER -> _) ++ v1Table.comment.map(TableCatalog.PROP_COMMENT -> _) ++ + v1Table.collation.map(TableCatalog.PROP_COLLATION -> _) ++ v1Table.storage.locationUri.map { loc => TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(loc) } ++ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala index 6b034d3dbee09..133670d5fcced 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.util.ArrayImplicits._ class CreateTablePartitioningValidationSuite extends AnalysisTest { val tableSpec = - UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false) + UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, None, false) test("CreateTableAsSelect: fail missing top-level column") { val plan = CreateTableAsSelect( UnresolvedIdentifier(Array("table_name").toImmutableArraySeq), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 5e871208698af..0ec2c80282fc2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2655,7 +2655,7 @@ class DDLParserSuite extends AnalysisTest { val createTableResult = CreateTable(UnresolvedIdentifier(Seq("my_tab")), columnsWithDefaultValue, Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"), - OptionList(Seq.empty), None, None, None, false), false) + OptionList(Seq.empty), None, None, None, None, false), false) // Parse the CREATE TABLE statement twice, swapping the order of the NOT NULL and DEFAULT // options, to make sure that the parser accepts any ordering of these options. comparePlans(parsePlan( @@ -2668,7 +2668,7 @@ class DDLParserSuite extends AnalysisTest { "b STRING NOT NULL DEFAULT 'abc') USING parquet"), ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), columnsWithDefaultValue, Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"), - OptionList(Seq.empty), None, None, None, false), false)) + OptionList(Seq.empty), None, None, None, None, false), false)) // These ALTER TABLE statements should parse successfully. comparePlans( parsePlan("ALTER TABLE t1 ADD COLUMN x int NOT NULL DEFAULT 42"), @@ -2828,12 +2828,12 @@ class DDLParserSuite extends AnalysisTest { "CREATE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"), CreateTable(UnresolvedIdentifier(Seq("my_tab")), columnsWithGenerationExpr, Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"), - OptionList(Seq.empty), None, None, None, false), false)) + OptionList(Seq.empty), None, None, None, None, false), false)) comparePlans(parsePlan( "REPLACE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"), ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), columnsWithGenerationExpr, Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"), - OptionList(Seq.empty), None, None, None, false), false)) + OptionList(Seq.empty), None, None, None, None, false), false)) // Two generation expressions checkError( exception = parseException("CREATE TABLE my_tab(a INT, " + @@ -2903,6 +2903,7 @@ class DDLParserSuite extends AnalysisTest { None, None, None, + None, false ), false @@ -2925,6 +2926,7 @@ class DDLParserSuite extends AnalysisTest { None, None, None, + None, false ), false @@ -3198,4 +3200,49 @@ class DDLParserSuite extends AnalysisTest { condition = "INTERNAL_ERROR", parameters = Map("message" -> "INSERT OVERWRITE DIRECTORY is not supported.")) } + + test("create table with bad collation name") { + checkError( + exception = internalException("CREATE TABLE t DEFAULT COLLATION XD"), + condition = "COLLATION_INVALID_NAME", + parameters = Map("proposals" -> "id, xh, af", "collationName" -> "XD") + ) + } + + private val testSuppCollations = + Seq("UTF8_BINARY", "UTF8_LCASE", "UNICODE", "UNICODE_CI", "UNICODE_CI_RTRIM", "sr", "sr_CI_AI") + + test("create table with default collation") { + testSuppCollations.foreach { collation => + comparePlans(parsePlan( + s"CREATE TABLE t (c STRING) USING parquet DEFAULT COLLATION ${collation.toLowerCase()}"), + CreateTable(UnresolvedIdentifier(Seq("t")), + Seq(ColumnDefinition("c", StringType)), + Seq.empty[Transform], + UnresolvedTableSpec(Map.empty[String, String], Some("parquet"), OptionList(Seq.empty), + None, None, Some(collation), None, false), false)) + } + } + + test("replace table with default collation") { + testSuppCollations.foreach { collation => + comparePlans(parsePlan( + s"REPLACE TABLE t (c STRING) USING parquet DEFAULT COLLATION ${collation.toLowerCase()}"), + ReplaceTable(UnresolvedIdentifier(Seq("t")), + Seq(ColumnDefinition("c", StringType)), + Seq.empty[Transform], + UnresolvedTableSpec(Map.empty[String, String], Some("parquet"), OptionList(Seq.empty), + None, None, Some(collation), None, false), false)) + } + } + + test("alter table collation") { + testSuppCollations.foreach { collation => + comparePlans(parsePlan( + s"ALTER TABLE t DEFAULT COLLATION ${collation.toLowerCase()}"), + AlterTableCollation(UnresolvedTable(Seq("t"), + "ALTER TABLE ... DEFAULT COLLATION"), collation) + ) + } + } } diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 82dfcf7a3694e..bfb5f8f3fab75 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -2719,6 +2719,7 @@ class SparkConnectPlanner( name = tableIdentifier, userSpecifiedColumns = Nil, comment = None, + collation = None, properties = Map.empty, originalText = None, plan = transformRelation(createView.getInput), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index c8c1bacfb9ded..b9ae0e5b91318 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1618,6 +1618,7 @@ class Dataset[T] private[sql]( name = TableIdentifier(identifier.last), userSpecifiedColumns = Nil, comment = None, + collation = None, properties = Map.empty, originalText = None, plan = logicalPlan, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 5f1ab089cf3e9..fa28a2cb9ead6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -421,11 +421,12 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) AlterViewSchemaBindingCommand(ident, viewSchemaMode) case CreateView(ResolvedIdentifierInSessionCatalog(ident), userSpecifiedColumns, comment, - properties, originalText, child, allowExisting, replace, viewSchemaMode) => + collation, properties, originalText, child, allowExisting, replace, viewSchemaMode) => CreateViewCommand( name = ident, userSpecifiedColumns = userSpecifiedColumns, comment = comment, + collation = collation, properties = properties, originalText = originalText, plan = child, @@ -434,7 +435,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) viewType = PersistedView, viewSchemaMode = viewSchemaMode) - case CreateView(ResolvedIdentifier(catalog, _), _, _, _, _, _, _, _, _) => + case CreateView(ResolvedIdentifier(catalog, _), _, _, _, _, _, _, _, _, _) => throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "views") case ShowViews(ns: ResolvedNamespace, pattern, output) => @@ -508,8 +509,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) storageFormat: CatalogStorageFormat, provider: String): CreateTableV1 = { val tableDesc = buildCatalogTable( - ident, tableSchema, partitioning, tableSpec.properties, provider, - tableSpec.location, tableSpec.comment, storageFormat, tableSpec.external) + ident, tableSchema, partitioning, tableSpec.properties, provider, tableSpec.location, + tableSpec.comment, tableSpec.collation, storageFormat, tableSpec.external) val mode = if (ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTableV1(tableDesc, mode, query) } @@ -585,6 +586,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) provider: String, location: Option[String], comment: Option[String], + collation: Option[String], storageFormat: CatalogStorageFormat, external: Boolean): CatalogTable = { val tableType = if (external || location.isDefined) { @@ -605,7 +607,9 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) properties = properties ++ maybeClusterBySpec.map( clusterBySpec => ClusterBySpec.toProperty(schema, clusterBySpec, conf.resolver)), - comment = comment) + comment = comment, + collation = collation + ) } object ResolvedViewIdentifier { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index e15250eb46b5c..8d5ddb2d85c4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -377,7 +377,7 @@ class SparkSqlAstBuilder extends AstBuilder { invalidStatement("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx) } - val (_, _, _, _, options, location, _, _, _) = + val (_, _, _, _, options, location, _, _, _, _) = visitCreateTableClauses(ctx.createTableClauses()) val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText).getOrElse( throw QueryParsingErrors.createTempTableNotSpecifyProviderError(ctx)) @@ -520,6 +520,7 @@ class SparkSqlAstBuilder extends AstBuilder { * * create_view_clauses (order insensitive): * [COMMENT view_comment] + * [DEFAULT COLLATION collation_name] * [TBLPROPERTIES (property_name = property_value, ...)] * }}} */ @@ -529,6 +530,7 @@ class SparkSqlAstBuilder extends AstBuilder { } checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx) + checkDuplicateClauses(ctx.collationSpec(), "DEFAULT COLLATION", ctx) checkDuplicateClauses(ctx.schemaBinding(), "WITH SCHEMA", ctx) checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED ON", ctx) checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx) @@ -584,6 +586,7 @@ class SparkSqlAstBuilder extends AstBuilder { withIdentClause(ctx.identifierReference(), UnresolvedIdentifier(_)), userSpecifiedColumns, visitCommentSpecList(ctx.commentSpec()), + visitCollationSpecList(ctx.collationSpec()), properties, Some(originalText), qPlan, @@ -609,6 +612,7 @@ class SparkSqlAstBuilder extends AstBuilder { tableIdentifier, userSpecifiedColumns, visitCommentSpecList(ctx.commentSpec()), + visitCollationSpecList(ctx.collationSpec()), properties, Option(source(ctx.query)), otherPlans.head, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index a98d9886a2730..d5a72fd6c441a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -47,6 +47,7 @@ import org.apache.spark.util.ArrayImplicits._ * @param userSpecifiedColumns the output column names and optional comments specified by users, * can be Nil if not specified. * @param comment the comment of this view. + * @param collation the collation of this view. * @param properties the properties of this view. * @param originalText the original SQL text of this view, can be None if this view is created via * Dataset API. @@ -64,6 +65,7 @@ case class CreateViewCommand( name: TableIdentifier, userSpecifiedColumns: Seq[(String, Option[String])], comment: Option[String], + collation: Option[String], properties: Map[String, String], originalText: Option[String], plan: LogicalPlan, @@ -220,7 +222,8 @@ case class CreateViewCommand( properties = newProperties, viewOriginalText = originalText, viewText = originalText, - comment = comment + comment = comment, + collation = collation ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala index 56c44a1256815..86fa0c8523f1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala @@ -89,6 +89,7 @@ case class CacheTableAsSelectExec( name = TableIdentifier(tempViewName), userSpecifiedColumns = Nil, comment = None, + collation = None, properties = Map.empty, originalText = Some(originalText), plan = query, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala index 37339a34af3db..4195560c5cc1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala @@ -57,6 +57,7 @@ case class ShowCreateTableExec( showTableOptions(builder, tableOptions) showTablePartitioning(table, builder) showTableComment(table, builder) + showTableCollation(table, builder) showTableLocation(table, builder) showTableProperties(table, builder, tableOptions) } @@ -155,6 +156,12 @@ case class ShowCreateTableExec( .foreach(builder.append) } + private def showTableCollation(table: Table, builder: StringBuilder): Unit = { + Option(table.properties.get(TableCatalog.PROP_COLLATION)) + .map("COLLATION '" + escapeSingleQuotedString(_) + "'\n") + .foreach(builder.append) + } + private def concatByMultiLines(iter: Iterable[String]): String = { iter.mkString("(\n ", ",\n ", ")\n") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 22c13fd98ced1..e9927cdcc7a33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -239,7 +239,8 @@ class V2SessionCatalog(catalog: SessionCatalog) maybeClusterBySpec.map( clusterBySpec => ClusterBySpec.toProperty(newSchema, clusterBySpec, conf.resolver)), tracksPartitionsInCatalog = conf.manageFilesourcePartitions, - comment = Option(properties.get(TableCatalog.PROP_COMMENT))) + comment = Option(properties.get(TableCatalog.PROP_COMMENT)), + collation = Option(properties.get(TableCatalog.PROP_COLLATION))) try { catalog.createTable(tableDesc, ignoreIfExists = false) @@ -290,6 +291,7 @@ class V2SessionCatalog(catalog: SessionCatalog) val schema = CatalogV2Util.applySchemaChanges( catalogTable.schema, changes, catalogTable.provider, "ALTER TABLE") val comment = properties.get(TableCatalog.PROP_COMMENT) + val collation = properties.get(TableCatalog.PROP_COLLATION) val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner) val location = properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI) val storage = if (location.isDefined) { @@ -303,7 +305,7 @@ class V2SessionCatalog(catalog: SessionCatalog) catalog.alterTable( catalogTable.copy( properties = finalProperties, schema = schema, owner = owner, comment = comment, - storage = storage)) + collation = collation, storage = storage)) } catch { case _: NoSuchTableException => throw QueryCompilationErrors.noSuchTableError(ident) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 64689e75e2e5e..5fd88b417ac44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -685,6 +685,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { optionExpression = newOptions, location = location, comment = { if (description.isEmpty) None else Some(description) }, + collation = None, serde = None, external = tableType == CatalogTableType.EXTERNAL) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala index 16f9fcf77d622..0069062e63078 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala @@ -209,6 +209,7 @@ final class DataFrameWriterImpl[T] private[sql](ds: Dataset[T]) extends DataFram optionExpression = OptionList(Seq.empty), location = extraOptions.get("path"), comment = extraOptions.get(TableCatalog.PROP_COMMENT), + collation = extraOptions.get(TableCatalog.PROP_COLLATION), serde = None, external = false) runCommand(df.sparkSession) { @@ -469,6 +470,7 @@ final class DataFrameWriterImpl[T] private[sql](ds: Dataset[T]) extends DataFram optionExpression = OptionList(Seq.empty), location = extraOptions.get("path"), comment = extraOptions.get(TableCatalog.PROP_COMMENT), + collation = extraOptions.get(TableCatalog.PROP_COLLATION), serde = None, external = false) ReplaceTableAsSelect( @@ -489,6 +491,7 @@ final class DataFrameWriterImpl[T] private[sql](ds: Dataset[T]) extends DataFram optionExpression = OptionList(Seq.empty), location = extraOptions.get("path"), comment = extraOptions.get(TableCatalog.PROP_COMMENT), + collation = extraOptions.get(TableCatalog.PROP_COLLATION), serde = None, external = false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterV2Impl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterV2Impl.scala index 0a19e6c47afa9..86ea55bc59b7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterV2Impl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterV2Impl.scala @@ -150,6 +150,7 @@ final class DataFrameWriterV2Impl[T] private[sql](table: String, ds: Dataset[T]) optionExpression = OptionList(Seq.empty), location = None, comment = None, + collation = None, serde = None, external = false) runCommand( @@ -215,6 +216,7 @@ final class DataFrameWriterV2Impl[T] private[sql](table: String, ds: Dataset[T]) optionExpression = OptionList(Seq.empty), location = None, comment = None, + collation = None, serde = None, external = false) runCommand(ReplaceTableAsSelect( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index b0233d2c51b75..d41933c6a135c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -175,6 +175,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends api.DataStr extraOptions.get("path"), None, None, + None, external = false) val cmd = CreateTable( UnresolvedIdentifier(originalMultipartIdentifier), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 6a659fa6e3ee9..87d0a1ff4e7bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1260,8 +1260,12 @@ class DataSourceV2SQLSuiteV1Filter PROP_OWNER -> "it will be set to the current user", PROP_EXTERNAL -> "please use CREATE EXTERNAL TABLE" ) + val excludedProperties = Set(TableCatalog.PROP_COMMENT, TableCatalog.PROP_COLLATION) + val tableLegacyProperties = CatalogV2Util.TABLE_RESERVED_PROPERTIES + .filterNot(excludedProperties.contains) + withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) { - CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => + tableLegacyProperties.foreach { key => Seq("OPTIONS", "TBLPROPERTIES").foreach { clause => Seq("CREATE", "REPLACE").foreach { action => val sqlText = s"$action TABLE testcat.reservedTest (key int) " + @@ -1314,7 +1318,7 @@ class DataSourceV2SQLSuiteV1Filter } } withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) { - CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => + tableLegacyProperties.foreach { key => Seq("OPTIONS", "TBLPROPERTIES").foreach { clause => withTable("testcat.reservedTest") { Seq("CREATE", "REPLACE").foreach { action => @@ -3389,6 +3393,7 @@ class DataSourceV2SQLSuiteV1Filter |TBLPROPERTIES ('prop1' = '1', 'prop2' = '2') |PARTITIONED BY (a) |LOCATION '/tmp' + |DEFAULT COLLATION sr_CI_AI """.stripMargin) val table = spark.sessionState.catalogManager.v2SessionCatalog.asTableCatalog @@ -3396,6 +3401,7 @@ class DataSourceV2SQLSuiteV1Filter val properties = table.properties assert(properties.get(TableCatalog.PROP_PROVIDER) == "parquet") assert(properties.get(TableCatalog.PROP_COMMENT) == "This is a comment") + assert(properties.get(TableCatalog.PROP_COLLATION) == "sr_CI_AI") assert(properties.get(TableCatalog.PROP_LOCATION) == "file:/tmp") assert(properties.containsKey(TableCatalog.PROP_OWNER)) assert(properties.get(TableCatalog.PROP_EXTERNAL) == "true") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala index 5091c72ef96ac..67fca09802139 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala @@ -53,7 +53,8 @@ class V2CommandsCaseSensitivitySuite withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("ID", "iD").foreach { ref => val tableSpec = - UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false) + UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), + None, None, None, None, false) val plan = CreateTableAsSelect( UnresolvedIdentifier(Array("table_name").toImmutableArraySeq), Expressions.identity(ref) :: Nil, @@ -77,7 +78,8 @@ class V2CommandsCaseSensitivitySuite withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref => val tableSpec = - UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false) + UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), + None, None, None, None, false) val plan = CreateTableAsSelect( UnresolvedIdentifier(Array("table_name").toImmutableArraySeq), Expressions.bucket(4, ref) :: Nil, @@ -102,7 +104,8 @@ class V2CommandsCaseSensitivitySuite withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("ID", "iD").foreach { ref => val tableSpec = - UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false) + UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), + None, None, None, None, false) val plan = ReplaceTableAsSelect( UnresolvedIdentifier(Array("table_name").toImmutableArraySeq), Expressions.identity(ref) :: Nil, @@ -126,7 +129,8 @@ class V2CommandsCaseSensitivitySuite withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref => val tableSpec = - UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false) + UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), + None, None, None, None, false) val plan = ReplaceTableAsSelect( UnresolvedIdentifier(Array("table_name").toImmutableArraySeq), Expressions.bucket(4, ref) :: Nil, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableSetTblPropertiesSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableSetTblPropertiesSuiteBase.scala index 52a90497fdd37..9ec63acb1d3a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableSetTblPropertiesSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableSetTblPropertiesSuiteBase.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog} +import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.errors.DataTypeErrors.toSQLId import org.apache.spark.sql.internal.SQLConf @@ -89,7 +89,7 @@ trait AlterTableSetTblPropertiesSuiteBase extends QueryTest with DDLCommandTestU PROP_EXTERNAL -> "please use CREATE EXTERNAL TABLE" ) withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) { - CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => + tableLegacyProperties.foreach { key => withNamespaceAndTable("ns", "tbl") { t => val sqlText = s"ALTER TABLE $t SET TBLPROPERTIES ('$key'='bar')" checkError( @@ -109,7 +109,7 @@ trait AlterTableSetTblPropertiesSuiteBase extends QueryTest with DDLCommandTestU } } withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) { - CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => + tableLegacyProperties.foreach { key => Seq("OPTIONS", "TBLPROPERTIES").foreach { clause => withNamespaceAndTable("ns", "tbl") { t => sql(s"CREATE TABLE $t (key int) USING parquet $clause ('$key'='bar')") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableUnsetTblPropertiesSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableUnsetTblPropertiesSuiteBase.scala index 0013919fca08f..0e9e9d9c60815 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableUnsetTblPropertiesSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableUnsetTblPropertiesSuiteBase.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog} +import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.errors.DataTypeErrors.toSQLId import org.apache.spark.sql.internal.SQLConf @@ -109,7 +109,7 @@ trait AlterTableUnsetTblPropertiesSuiteBase extends QueryTest with DDLCommandTes PROP_EXTERNAL -> "please use CREATE EXTERNAL TABLE" ) withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) { - CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => + tableLegacyProperties.foreach { key => withNamespaceAndTable("ns", "tbl") { t => val sqlText = s"ALTER TABLE $t UNSET TBLPROPERTIES ('$key')" checkError( @@ -129,7 +129,7 @@ trait AlterTableUnsetTblPropertiesSuiteBase extends QueryTest with DDLCommandTes } } withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) { - CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => + tableLegacyProperties.foreach { key => Seq("OPTIONS", "TBLPROPERTIES").foreach { clause => withNamespaceAndTable("ns", "tbl") { t => sql(s"CREATE TABLE $t (key int) USING parquet $clause ('$key'='bar')") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala index 39f2abd35c2b5..39624a33d8614 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala @@ -26,6 +26,7 @@ import org.scalatest.Tag import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog} import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.test.SQLTestUtils @@ -172,6 +173,11 @@ trait DDLCommandTestUtils extends SQLTestUtils { FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc)) part1Loc } + + def tableLegacyProperties: Seq[String] = { + val excludedProperties = Set(TableCatalog.PROP_COMMENT, TableCatalog.PROP_COLLATION) + CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(excludedProperties.contains) + } } object DDLCommandTestUtils { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 8b868c0e17230..d38708ab3745c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -498,6 +498,7 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { UnresolvedIdentifier(Seq("view1")), Seq.empty[(String, Option[String])], None, + None, Map.empty[String, String], Some("SELECT * FROM tab1"), parser.parsePlan("SELECT * FROM tab1"), @@ -513,6 +514,7 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { Seq("a").asTableIdentifier, Seq.empty[(String, Option[String])], None, + None, Map.empty[String, String], Some("SELECT * FROM tab1"), parser.parsePlan("SELECT * FROM tab1"), @@ -539,6 +541,7 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { |(col1, col3 COMMENT 'hello') |TBLPROPERTIES('prop1Key'="prop1Val") |COMMENT 'BLABLA' + |DEFAULT COLLATION uNiCodE |AS SELECT * FROM tab1 """.stripMargin val parsed1 = parser.parsePlan(v1) @@ -546,6 +549,7 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { UnresolvedIdentifier(Seq("view1")), Seq("col1" -> None, "col3" -> Some("hello")), Some("BLABLA"), + Some("UNICODE"), Map("prop1Key" -> "prop1Val"), Some("SELECT * FROM tab1"), parser.parsePlan("SELECT * FROM tab1"), @@ -559,6 +563,7 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { |CREATE OR REPLACE GLOBAL TEMPORARY VIEW a |(col1, col3 COMMENT 'hello') |COMMENT 'BLABLA' + |DEFAULT COLLATION uNiCoDe |AS SELECT * FROM tab1 """.stripMargin val parsed2 = parser.parsePlan(v2) @@ -566,6 +571,7 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { Seq("a").asTableIdentifier, Seq("col1" -> None, "col3" -> Some("hello")), Some("BLABLA"), + Some("UNICODE"), Map(), Some("SELECT * FROM tab1"), parser.parsePlan("SELECT * FROM tab1"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala index c4e9ff93ef85d..f8d2e9dd3a3cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala @@ -293,4 +293,29 @@ trait DescribeTableSuiteBase extends QueryTest with DDLCommandTestUtils { Row("col1", "string", null))) } } + + Seq(true, false).foreach { hasCollations => + test(s"DESCRIBE TABLE EXTENDED with collation specified = $hasCollations") { + + withNamespaceAndTable("ns", "tbl") { tbl => + val getCollationDescription = () => sql(s"DESCRIBE TABLE EXTENDED $tbl") + .where("col_name = 'Collation'") + + val defaultCollation = if (hasCollations) "DEFAULT COLLATION uNiCoDe" else "" + + sql(s"CREATE TABLE $tbl (id string) $defaultUsing $defaultCollation") + val descriptionDf = getCollationDescription() + + if (hasCollations) { + checkAnswer(descriptionDf, Seq(Row("Collation", "UNICODE", ""))) + } else { + assert(descriptionDf.isEmpty) + } + + sql(s"ALTER TABLE $tbl DEFAULT COLLATION UniCode_cI_rTrIm") + val newDescription = getCollationDescription() + checkAnswer(newDescription, Seq(Row("Collation", "UNICODE_CI_RTRIM", ""))) + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala index eaf016ac2fa9f..164ac2bff9f63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala @@ -218,6 +218,7 @@ class DescribeTableSuite extends DescribeTableSuiteBase with CommandSuiteBase { " PARTITIONED BY (id)" + " TBLPROPERTIES ('bar'='baz')" + " COMMENT 'this is a test table'" + + " DEFAULT COLLATION unicode" + " LOCATION 'file:/tmp/testcat/table_name'") val descriptionDf = spark.sql(s"DESCRIBE TABLE EXTENDED $tbl") assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq( @@ -241,6 +242,7 @@ class DescribeTableSuite extends DescribeTableSuiteBase with CommandSuiteBase { Row("Type", "EXTERNAL", ""), Row("Provider", getProvider(), ""), Row("Comment", "this is a test table", ""), + Row("Collation", "UNICODE", ""), Row("Table Properties", "[bar=baz]", ""), Row("Location", "file:/tmp/testcat/table_name", ""), Row("Partition Provider", "Catalog", ""))) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index fd4d3220f367d..acc588fb719c2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -518,6 +518,8 @@ private[hive] class HiveClientImpl( val excludedTableProperties = HiveStatisticsProperties ++ Set( // The property value of "comment" is moved to the dedicated field "comment" "comment", + // The property value of "collation" is moved to the dedicated field "collation" + "collation", // For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added // in the function toHiveTable. "EXTERNAL" @@ -527,6 +529,7 @@ private[hive] class HiveClientImpl( case (key, _) => excludedTableProperties.contains(key) } val comment = properties.get("comment") + val collation = properties.get("collation") CatalogTable( identifier = TableIdentifier(h.getTableName, Option(h.getDbName)), @@ -569,6 +572,7 @@ private[hive] class HiveClientImpl( properties = filteredProperties, stats = readHiveStats(properties), comment = comment, + collation = collation, // In older versions of Spark(before 2.2.0), we expand the view original text and // store that into `viewExpandedText`, that should be used in view resolution. // We get `viewExpandedText` as viewText, and also get `viewOriginalText` in order to @@ -1212,6 +1216,7 @@ private[hive] object HiveClientImpl extends Logging { table.storage.properties.foreach { case (k, v) => hiveTable.setSerdeParam(k, v) } table.properties.foreach { case (k, v) => hiveTable.setProperty(k, v) } table.comment.foreach { c => hiveTable.setProperty("comment", c) } + table.collation.foreach { c => hiveTable.setProperty("collation", c) } // Hive will expand the view text, so it needs 2 fields: viewOriginalText and viewExpandedText. // Since we don't expand the view text, but only add table properties, we map the `viewText` to // the both fields in hive table. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index 5c65eb8b12bac..27dc80fbfc173 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} +import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.test.TestHiveVersion import org.apache.spark.sql.types.{IntegerType, StructType} @@ -68,11 +69,13 @@ class HiveClientSuite(version: String) extends HiveVersionSuite(version) { } def table(database: String, tableName: String, + collation: Option[String] = None, tableType: CatalogTableType = CatalogTableType.MANAGED): CatalogTable = { CatalogTable( identifier = TableIdentifier(tableName, Some(database)), tableType = tableType, schema = new StructType().add("key", "int"), + collation = collation, storage = CatalogStorageFormat( locationUri = None, inputFormat = Some(classOf[TextInputFormat].getName), @@ -204,6 +207,22 @@ class HiveClientSuite(version: String) extends HiveVersionSuite(version) { ignoreIfExists = false) } + test("create/alter table with collations") { + client.createTable(table("default", tableName = "collation_table", + collation = Some("UNICODE")), ignoreIfExists = false) + + val readBack = client.getTable("default", "collation_table") + assert(!readBack.properties.contains(TableCatalog.PROP_COLLATION)) + assert(readBack.collation === Some("UNICODE")) + + client.alterTable("default", "collation_table", + readBack.copy(collation = Some("UNICODE_CI"))) + val alteredTbl = client.getTable("default", "collation_table") + assert(alteredTbl.collation === Some("UNICODE_CI")) + + client.dropTable("default", "collation_table", ignoreIfNotExists = true, purge = true) + } + test("loadTable") { client.loadTable( emptyDir,