From 6bee0922baafebfdf8af2f56ff8517cbe1fbf085 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Fri, 26 Jan 2024 16:44:01 +0100 Subject: [PATCH] Spark: Support creating views via SQL (#9423) --- .../IcebergSparkSessionExtensions.scala | 2 + .../sql/catalyst/analysis/CheckViews.scala | 62 +++++ .../sql/catalyst/analysis/ResolveViews.scala | 28 +- .../analysis/RewriteViewCommands.scala | 61 ++++ .../logical/views/CreateIcebergView.scala | 44 +++ .../datasources/v2/CreateV2ViewExec.scala | 98 +++++++ .../v2/ExtendedDataSourceV2Strategy.scala | 16 ++ .../iceberg/spark/extensions/TestViews.java | 263 ++++++++++++++++++ .../apache/iceberg/spark/SparkCatalog.java | 31 +++ .../iceberg/spark/source/SparkView.java | 7 +- 10 files changed, 609 insertions(+), 3 deletions(-) create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala index ad9df3994fc0..3fca29c294c0 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala @@ -20,6 +20,7 @@ package org.apache.iceberg.spark.extensions import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.catalyst.analysis.CheckViews import org.apache.spark.sql.catalyst.analysis.ProcedureArgumentCoercion import org.apache.spark.sql.catalyst.analysis.ResolveProcedures import org.apache.spark.sql.catalyst.analysis.ResolveViews @@ -37,6 +38,7 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) { extensions.injectResolutionRule { spark => ResolveProcedures(spark) } extensions.injectResolutionRule { spark => ResolveViews(spark) } extensions.injectResolutionRule { _ => ProcedureArgumentCoercion } + extensions.injectCheckRule(_ => CheckViews) // optimizer extensions extensions.injectOptimizerRule { _ => ReplaceStaticInvoke } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala new file mode 100644 index 000000000000..4a1736764d0d --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.util.SchemaUtils + +object CheckViews extends (LogicalPlan => Unit) { + + override def apply(plan: LogicalPlan): Unit = { + plan foreach { + case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _, + _, _, _, _, _, _) => + verifyColumnCount(ident, columnAliases, query) + SchemaUtils.checkColumnNameDuplication(query.schema.fieldNames, SQLConf.get.resolver) + + case _ => // OK + } + } + + private def verifyColumnCount(ident: Identifier, columns: Seq[String], query: LogicalPlan): Unit = { + if (columns.nonEmpty) { + if (columns.length > query.output.length) { + throw new AnalysisException( + errorClass = "CREATE_VIEW_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + messageParameters = Map( + "viewName" -> ident.toString, + "viewColumns" -> columns.mkString(", "), + "dataColumns" -> query.output.map(c => c.name).mkString(", "))) + } else if (columns.length < query.output.length) { + throw new AnalysisException( + errorClass = "CREATE_VIEW_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", + messageParameters = Map( + "viewName" -> ident.toString, + "viewColumns" -> columns.mkString(", "), + "dataColumns" -> query.output.map(c => c.name).mkString(", "))) + } + } + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index a1b826569f35..5616f6f70be6 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin @@ -38,6 +39,7 @@ import org.apache.spark.sql.connector.catalog.LookupCatalog import org.apache.spark.sql.connector.catalog.View import org.apache.spark.sql.connector.catalog.ViewCatalog import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types.MetadataBuilder case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog { @@ -59,6 +61,30 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look loadView(catalog, ident) .map(_ => ResolvedV2View(catalog.asViewCatalog, ident)) .getOrElse(u) + + case c@CreateIcebergView(ResolvedIdentifier(_, _), _, query, columnAliases, columnComments, _, _, _, _, _, _) + if query.resolved && !c.rewritten => + val aliased = aliasColumns(query, columnAliases, columnComments) + c.copy(query = aliased, queryColumnNames = query.schema.fieldNames, rewritten = true) + } + + private def aliasColumns( + plan: LogicalPlan, + columnAliases: Seq[String], + columnComments: Seq[Option[String]]): LogicalPlan = { + if (columnAliases.isEmpty || columnAliases.length != plan.output.length) { + plan + } else { + val projectList = plan.output.zipWithIndex.map { case (attr, pos) => + if (columnComments.apply(pos).isDefined) { + val meta = new MetadataBuilder().putString("comment", columnComments.apply(pos).get).build() + Alias(attr, columnAliases.apply(pos))(explicitMetadata = Some(meta)) + } else { + Alias(attr, columnAliases.apply(pos))() + } + } + Project(projectList, plan) + } } def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match { @@ -151,7 +177,7 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look } - implicit class ViewHelper(plugin: CatalogPlugin) { + implicit class IcebergViewHelper(plugin: CatalogPlugin) { def asViewCatalog: ViewCatalog = plugin match { case viewCatalog: ViewCatalog => viewCatalog diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala index 2b35db33c0c5..066ba59394d7 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala @@ -19,13 +19,19 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.SubqueryExpression +import org.apache.spark.sql.catalyst.plans.logical.CreateView import org.apache.spark.sql.catalyst.plans.logical.DropView import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.View +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.LookupCatalog import org.apache.spark.sql.connector.catalog.ViewCatalog @@ -40,6 +46,20 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case DropView(ResolvedView(resolved), ifExists) => DropIcebergView(resolved, ifExists) + + case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, properties, + Some(queryText), query, allowExisting, replace) => + val q = CTESubstitution.apply(query) + verifyTemporaryObjectsDontExist(resolved.identifier, q) + CreateIcebergView(child = resolved, + queryText = queryText, + query = q, + columnAliases = userSpecifiedColumns.map(_._1), + columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)), + comment = comment, + properties = properties, + allowExisting = allowExisting, + replace = replace) } private def isTempView(nameParts: Seq[String]): Boolean = { @@ -62,4 +82,45 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi None } } + + /** + * Permanent views are not allowed to reference temp objects + */ + private def verifyTemporaryObjectsDontExist( + name: Identifier, + child: LogicalPlan): Unit = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + val tempViews = collectTemporaryViews(child) + tempViews.foreach { nameParts => + throw new AnalysisException( + errorClass = "INVALID_TEMP_OBJ_REFERENCE", + messageParameters = Map( + "obj" -> "VIEW", + "objName" -> name.name(), + "tempObj" -> "VIEW", + "tempObjName" -> nameParts.quoted)) + } + + // TODO: check for temp function names + } + + /** + * Collect all temporary views and return the identifiers separately + */ + private def collectTemporaryViews(child: LogicalPlan): Seq[Seq[String]] = { + def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = { + child.flatMap { + case unresolved: UnresolvedRelation if isTempView(unresolved.multipartIdentifier) => + Seq(unresolved.multipartIdentifier) + case view: View if view.isTempView => Seq(view.desc.identifier.nameParts) + case plan => plan.expressions.flatMap(_.flatMap { + case e: SubqueryExpression => collectTempViews(e.plan) + case _ => Seq.empty + }) + }.distinct + } + + collectTempViews(child) + } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala new file mode 100644 index 000000000000..9366d5efe163 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.plans.logical.BinaryCommand +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +case class CreateIcebergView( + child: LogicalPlan, + queryText: String, + query: LogicalPlan, + columnAliases: Seq[String], + columnComments: Seq[Option[String]], + queryColumnNames: Seq[String] = Seq.empty, + comment: Option[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean, + rewritten: Boolean = false) extends BinaryCommand { + override def left: LogicalPlan = child + + override def right: LogicalPlan = query + + override protected def withNewChildrenInternal( + newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan = + copy(child = newLeft, query = newRight) +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala new file mode 100644 index 000000000000..892e1eb857e4 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.types.StructType +import scala.collection.JavaConverters._ + + +case class CreateV2ViewExec( + catalog: ViewCatalog, + ident: Identifier, + queryText: String, + viewSchema: StructType, + columnAliases: Seq[String], + columnComments: Seq[Option[String]], + queryColumnNames: Seq[String], + comment: Option[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name + val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null + val currentNamespace = session.sessionState.catalogManager.currentNamespace + + val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION + val newProperties = properties ++ + comment.map(ViewCatalog.PROP_COMMENT -> _) + + (ViewCatalog.PROP_CREATE_ENGINE_VERSION -> engineVersion, + ViewCatalog.PROP_ENGINE_VERSION -> engineVersion) + + if (replace) { + // CREATE OR REPLACE VIEW + if (catalog.viewExists(ident)) { + catalog.dropView(ident) + } + // FIXME: replaceView API doesn't exist in Spark 3.5 + catalog.createView( + ident, + queryText, + currentCatalog, + currentNamespace, + viewSchema, + queryColumnNames.toArray, + columnAliases.toArray, + columnComments.map(c => c.orNull).toArray, + newProperties.asJava) + } else { + try { + // CREATE VIEW [IF NOT EXISTS] + catalog.createView( + ident, + queryText, + currentCatalog, + currentNamespace, + viewSchema, + queryColumnNames.toArray, + columnAliases.toArray, + columnComments.map(c => c.orNull).toArray, + newProperties.asJava) + } catch { + case _: ViewAlreadyExistsException if allowExisting => // Ignore + } + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"CreateV2ViewExec: ${ident}" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 44157fc3823c..0505fe4e3030 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.plans.logical.RenameTable import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View import org.apache.spark.sql.connector.catalog.Identifier @@ -107,6 +108,21 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) => DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil + case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), queryText, query, + columnAliases, columnComments, queryColumnNames, comment, properties, allowExisting, replace, _) => + CreateV2ViewExec( + catalog = viewCatalog, + ident = ident, + queryText = queryText, + columnAliases = columnAliases, + columnComments = columnComments, + queryColumnNames = queryColumnNames, + viewSchema = query.schema, + comment = comment, + properties = properties, + allowExisting = allowExisting, + replace = replace) :: Nil + case _ => Nil } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index 0eb8c96b3cc5..bf6509afee77 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -40,6 +40,7 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.iceberg.types.Types; +import org.apache.iceberg.view.View; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -886,6 +887,268 @@ private String viewName(String viewName) { return viewName + new Random().nextInt(1000000); } + @Test + public void createViewIfNotExists() { + String viewName = "viewThatAlreadyExists"; + sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName); + + assertThatThrownBy(() -> sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + String.format( + "Cannot create view %s.%s because it already exists", NAMESPACE, viewName)); + + // using IF NOT EXISTS should work + assertThatNoException() + .isThrownBy( + () -> sql("CREATE VIEW IF NOT EXISTS %s AS SELECT id FROM %s", viewName, tableName)); + } + + @Test + public void createViewWithInvalidSQL() { + assertThatThrownBy(() -> sql("CREATE VIEW simpleViewWithInvalidSQL AS invalid SQL")) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Syntax error"); + } + + @Test + public void createViewReferencingTempView() throws NoSuchTableException { + insertRows(10); + String tempView = "temporaryViewBeingReferencedInAnotherView"; + String viewReferencingTempView = "viewReferencingTemporaryView"; + + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", tempView, tableName); + + // creating a view that references a TEMP VIEW shouldn't be possible + assertThatThrownBy( + () -> sql("CREATE VIEW %s AS SELECT id FROM %s", viewReferencingTempView, tempView)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewReferencingTempView) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(tempView); + } + + @Test + public void createViewReferencingGlobalTempView() throws NoSuchTableException { + insertRows(10); + String globalTempView = "globalTemporaryViewBeingReferenced"; + String viewReferencingTempView = "viewReferencingGlobalTemporaryView"; + + sql( + "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", + globalTempView, tableName); + + // creating a view that references a GLOBAL TEMP VIEW shouldn't be possible + assertThatThrownBy( + () -> + sql( + "CREATE VIEW %s AS SELECT id FROM global_temp.%s", + viewReferencingTempView, globalTempView)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewReferencingTempView) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(globalTempView); + } + + @Test + public void createViewUsingNonExistingTable() { + assertThatThrownBy( + () -> sql("CREATE VIEW viewWithNonExistingTable AS SELECT id FROM non_existing")) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The table or view `non_existing` cannot be found"); + } + + @Test + public void createViewWithMismatchedColumnCounts() { + String viewName = "viewWithMismatchedColumnCounts"; + + assertThatThrownBy( + () -> sql("CREATE VIEW %s (id, data) AS SELECT id FROM %s", viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining(String.format("Cannot create view %s.%s", NAMESPACE, viewName)) + .hasMessageContaining("not enough data columns") + .hasMessageContaining("View columns: id, data") + .hasMessageContaining("Data columns: id"); + + assertThatThrownBy( + () -> sql("CREATE VIEW %s (id) AS SELECT id, data FROM %s", viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining(String.format("Cannot create view %s.%s", NAMESPACE, viewName)) + .hasMessageContaining("too many data columns") + .hasMessageContaining("View columns: id") + .hasMessageContaining("Data columns: id, data"); + } + + @Test + public void createViewWithColumnAliases() throws NoSuchTableException { + insertRows(6); + String viewName = "viewWithColumnAliases"; + + sql( + "CREATE VIEW %s (new_id COMMENT 'ID', new_data COMMENT 'DATA') AS SELECT id, data FROM %s WHERE id <= 3", + viewName, tableName); + + View view = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName)); + assertThat(view.properties()).containsEntry("queryColumnNames", "id,data"); + + assertThat(view.schema().columns()).hasSize(2); + Types.NestedField first = view.schema().columns().get(0); + assertThat(first.name()).isEqualTo("new_id"); + assertThat(first.doc()).isEqualTo("ID"); + + Types.NestedField second = view.schema().columns().get(1); + assertThat(second.name()).isEqualTo("new_data"); + assertThat(second.doc()).isEqualTo("DATA"); + + assertThat(sql("SELECT new_id FROM %s", viewName)) + .hasSize(3) + .containsExactlyInAnyOrder(row(1), row(2), row(3)); + + sql("DROP VIEW %s", viewName); + + sql( + "CREATE VIEW %s (new_data, new_id) AS SELECT data, id FROM %s WHERE id <= 3", + viewName, tableName); + + assertThat(sql("SELECT new_id FROM %s", viewName)) + .hasSize(3) + .containsExactlyInAnyOrder(row(1), row(2), row(3)); + } + + @Test + public void createViewWithDuplicateColumnNames() { + assertThatThrownBy( + () -> + sql( + "CREATE VIEW viewWithDuplicateColumnNames (new_id, new_id) AS SELECT id, id FROM %s WHERE id <= 3", + tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The column `new_id` already exists"); + } + + @Test + public void createViewWithDuplicateQueryColumnNames() throws NoSuchTableException { + insertRows(3); + String viewName = "viewWithDuplicateQueryColumnNames"; + String sql = String.format("SELECT id, id FROM %s WHERE id <= 3", tableName); + + // not specifying column aliases in the view should fail + assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The column `id` already exists"); + + sql("CREATE VIEW %s (id_one, id_two) AS %s", viewName, sql); + + assertThat(sql("SELECT * FROM %s", viewName)) + .hasSize(3) + .containsExactlyInAnyOrder(row(1, 1), row(2, 2), row(3, 3)); + } + + @Test + public void createViewWithCTE() throws NoSuchTableException { + insertRows(10); + String viewName = "simpleViewWithCTE"; + String sql = + String.format( + "WITH max_by_data AS (SELECT max(id) as max FROM %s) " + + "SELECT max, count(1) AS count FROM max_by_data GROUP BY max", + tableName); + + sql("CREATE VIEW %s AS %s", viewName, sql); + + assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10, 1L)); + } + + @Test + public void createViewWithConflictingNamesForCTEAndTempView() throws NoSuchTableException { + insertRows(10); + String viewName = "viewWithConflictingNamesForCTEAndTempView"; + String cteName = "cteName"; + String sql = + String.format( + "WITH %s AS (SELECT max(id) as max FROM %s) " + + "(SELECT max, count(1) AS count FROM %s GROUP BY max)", + cteName, tableName, cteName); + + // create a CTE and a TEMP VIEW with the same name + sql("CREATE TEMPORARY VIEW %s AS SELECT * from %s", cteName, tableName); + sql("CREATE VIEW %s AS %s", viewName, sql); + + // CTE should take precedence over the TEMP VIEW when data is read + assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10, 1L)); + } + + @Test + public void createViewWithCTEReferencingTempView() { + String viewName = "viewWithCTEReferencingTempView"; + String tempViewInCTE = "tempViewInCTE"; + String sql = + String.format( + "WITH max_by_data AS (SELECT max(id) as max FROM %s) " + + "SELECT max, count(1) AS count FROM max_by_data GROUP BY max", + tempViewInCTE); + + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE ID <= 5", tempViewInCTE, tableName); + + assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewName) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(tempViewInCTE); + } + + @Test + public void createViewWithNonExistingQueryColumn() { + assertThatThrownBy( + () -> + sql( + "CREATE VIEW viewWithNonExistingQueryColumn AS SELECT non_existing FROM %s WHERE id <= 3", + tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + "A column or function parameter with name `non_existing` cannot be resolved"); + } + + @Test + public void createViewWithSubqueryExpressionUsingTempView() { + String viewName = "viewWithSubqueryExpression"; + String tempView = "simpleTempView"; + String sql = + String.format("SELECT * FROM %s WHERE id = (SELECT id FROM %s)", tableName, tempView); + + sql("CREATE TEMPORARY VIEW %s AS SELECT id from %s WHERE id = 5", tempView, tableName); + + assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining(String.format("Cannot create the persistent object %s", viewName)) + .hasMessageContaining( + String.format("because it references to the temporary object %s", tempView)); + } + + @Test + public void createViewWithSubqueryExpressionUsingGlobalTempView() { + String viewName = "simpleViewWithSubqueryExpression"; + String globalTempView = "simpleGlobalTempView"; + String sql = + String.format( + "SELECT * FROM %s WHERE id = (SELECT id FROM global_temp.%s)", + tableName, globalTempView); + + sql( + "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id from %s WHERE id = 5", + globalTempView, tableName); + + assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining(String.format("Cannot create the persistent object %s", viewName)) + .hasMessageContaining( + String.format( + "because it references to the temporary object global_temp.%s", globalTempView)); + } + private void insertRows(int numRows) throws NoSuchTableException { List records = Lists.newArrayListWithCapacity(numRows); for (int i = 1; i <= numRows; i++) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index a62f6bb95aaa..37e7387d696d 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.StringJoiner; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; @@ -568,6 +569,36 @@ public View createView( String[] columnComments, Map properties) throws ViewAlreadyExistsException, NoSuchNamespaceException { + if (null != asViewCatalog) { + Schema icebergSchema = SparkSchemaUtil.convert(schema); + + StringJoiner joiner = new StringJoiner(","); + Arrays.stream(queryColumnNames).forEach(joiner::add); + + try { + Map props = + ImmutableMap.builder() + .putAll(Spark3Util.rebuildCreateProperties(properties)) + .put("queryColumnNames", joiner.toString()) + .build(); + org.apache.iceberg.view.View view = + asViewCatalog + .buildView(buildIdentifier(ident)) + .withDefaultCatalog(currentCatalog) + .withDefaultNamespace(Namespace.of(currentNamespace)) + .withQuery("spark", sql) + .withSchema(icebergSchema) + .withLocation(properties.get("location")) + .withProperties(props) + .create(); + return new SparkView(catalogName, view); + } catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) { + throw new NoSuchNamespaceException(currentNamespace); + } catch (AlreadyExistsException e) { + throw new ViewAlreadyExistsException(ident); + } + } + throw new UnsupportedOperationException( "Creating a view is not supported by catalog: " + catalogName); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java index 424519623e4d..5391d75476ce 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java @@ -35,8 +35,9 @@ public class SparkView implements org.apache.spark.sql.connector.catalog.View { + private static final String QUERY_COLUMN_NAMES = "queryColumnNames"; private static final Set RESERVED_PROPERTIES = - ImmutableSet.of("provider", "location", FORMAT_VERSION); + ImmutableSet.of("provider", "location", FORMAT_VERSION, QUERY_COLUMN_NAMES); private final View icebergView; private final String catalogName; @@ -86,7 +87,9 @@ public StructType schema() { @Override public String[] queryColumnNames() { - return new String[0]; + return icebergView.properties().containsKey(QUERY_COLUMN_NAMES) + ? icebergView.properties().get(QUERY_COLUMN_NAMES).split(",") + : new String[0]; } @Override