From 92eba8f858ca39b433a14070fb156cda0b0b697f Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Thu, 21 Dec 2023 16:00:01 +0100 Subject: [PATCH 1/7] Spark: Support creating views via SQL --- .../IcebergSparkSessionExtensions.scala | 2 + .../analysis/RewriteViewCommands.scala | 7 + .../sql/catalyst/analysis/ViewCheck.scala | 39 +++++ .../logical/views/CreateIcebergView.scala | 41 +++++ .../datasources/v2/CreateV2ViewExec.scala | 147 ++++++++++++++++ .../v2/ExtendedDataSourceV2Strategy.scala | 14 ++ .../iceberg/spark/extensions/TestViews.java | 158 ++++++++++-------- .../apache/iceberg/spark/SparkCatalog.java | 27 +++ .../iceberg/spark/source/SparkView.java | 12 +- 9 files changed, 377 insertions(+), 70 deletions(-) create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala index ad9df3994fc0..beea0c10f209 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.analysis.ProcedureArgumentCoercion import org.apache.spark.sql.catalyst.analysis.ResolveProcedures import org.apache.spark.sql.catalyst.analysis.ResolveViews +import org.apache.spark.sql.catalyst.analysis.ViewCheck import org.apache.spark.sql.catalyst.optimizer.ReplaceStaticInvoke import org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy @@ -37,6 +38,7 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) { extensions.injectResolutionRule { spark => ResolveProcedures(spark) } extensions.injectResolutionRule { spark => ResolveViews(spark) } extensions.injectResolutionRule { _ => ProcedureArgumentCoercion } + extensions.injectCheckRule(_ => ViewCheck) // optimizer extensions extensions.injectOptimizerRule { _ => ReplaceStaticInvoke } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala index 2b35db33c0c5..651be7409781 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala @@ -20,8 +20,10 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.CreateView import org.apache.spark.sql.catalyst.plans.logical.DropView import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.CatalogManager @@ -40,6 +42,11 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case DropView(ResolvedView(resolved), ifExists) => DropIcebergView(resolved, ifExists) + + case CreateView(r@ResolvedView(_), userSpecifiedColumns, comment, properties, + originalText, query, allowExisting, replace) => + CreateIcebergView(r, userSpecifiedColumns, comment, properties, originalText, + query, allowExisting, replace) } private def isTempView(nameParts: Seq[String]): Boolean = { diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala new file mode 100644 index 000000000000..a0e3b7a6639e --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.execution.command.ViewHelper + +object ViewCheck extends (LogicalPlan => Unit) { + + override def apply(plan: LogicalPlan): Unit = { + plan foreach { + case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, _, _, _, query, _, _) => + val identifier = Spark3Util.toV1TableIdentifier(ident) + ViewHelper.verifyTemporaryObjectsNotExists(false, identifier, query, Seq.empty) + ViewHelper.verifyAutoGeneratedAliasesNotExists(query, false, identifier) + case _ => // OK + } + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala new file mode 100644 index 000000000000..036ec8ce2b95 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.plans.logical.BinaryCommand +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +case class CreateIcebergView( + child: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + originalText: Option[String], + query: LogicalPlan, + allowExisting: Boolean, + replace: Boolean) extends BinaryCommand { + override def left: LogicalPlan = child + + override def right: LogicalPlan = query + + override protected def withNewChildrenInternal( + newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan = + copy(child = newLeft, query = newRight) +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala new file mode 100644 index 000000000000..4db76601385a --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.CommandExecutionMode +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.util.SchemaUtils +import scala.collection.JavaConverters._ + + +case class CreateV2ViewExec( + catalog: ViewCatalog, + ident: Identifier, + originalText: String, + query: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP) + qe.assertAnalyzed() + val analyzedPlan = qe.analyzed + + val identifier = Spark3Util.toV1TableIdentifier(ident) + + if (userSpecifiedColumns.nonEmpty) { + if (userSpecifiedColumns.length > analyzedPlan.output.length) { + throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } else if (userSpecifiedColumns.length < analyzedPlan.output.length) { + throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } + } + + val queryColumnNames = analyzedPlan.schema.fieldNames + SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) + + val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema + val columnAliases = userSpecifiedColumns.map(_._1).toArray + val columnComments = userSpecifiedColumns.map(_._2.getOrElse("")).toArray + + val currentCatalog = session.sessionState.catalogManager.currentCatalog.name + val currentNamespace = session.sessionState.catalogManager.currentNamespace + + val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION + val createEngineVersion = Some(engineVersion) + val newProperties = properties ++ + comment.map(ViewCatalog.PROP_COMMENT -> _) ++ + createEngineVersion.map(ViewCatalog.PROP_CREATE_ENGINE_VERSION -> _) + + (ViewCatalog.PROP_ENGINE_VERSION -> engineVersion) + + if (replace) { + // CREATE OR REPLACE VIEW + if (catalog.viewExists(ident)) { + catalog.dropView(ident) + } + // FIXME: replaceView API doesn't exist in Spark 3.5 + catalog.createView( + ident, + originalText, + currentCatalog, + currentNamespace, + viewSchema, + queryColumnNames, + columnAliases, + columnComments, + newProperties.asJava) + } else { + try { + // CREATE VIEW [IF NOT EXISTS] + catalog.createView( + ident, + originalText, + currentCatalog, + currentNamespace, + viewSchema, + queryColumnNames, + columnAliases, + columnComments, + newProperties.asJava) + } catch { + case _: ViewAlreadyExistsException if allowExisting => // Ignore + } + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"CreateV2ViewExec: ${ident}" + } + + /** + * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, + * else return the analyzed plan directly. + */ + private def aliasPlan( + analyzedPlan: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = { + if (userSpecifiedColumns.isEmpty) { + analyzedPlan + } else { + val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { + case (attr, (colName, None)) => Alias(attr, colName)() + case (attr, (colName, Some(colComment))) => + val meta = new MetadataBuilder().putString("comment", colComment).build() + Alias(attr, colName)(explicitMetadata = Some(meta)) + } + val projectedPlan = Project(projectList, analyzedPlan) + session.sessionState.executePlan(projectedPlan, CommandExecutionMode.SKIP).analyzed + } + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 44157fc3823c..81bb559785c7 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.plans.logical.RenameTable import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View import org.apache.spark.sql.connector.catalog.Identifier @@ -107,6 +108,19 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) => DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil + case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), userSpecifiedColumns, comment, + properties, Some(originalText), query, allowExisting, replace) => + CreateV2ViewExec( + catalog = viewCatalog, + ident = ident, + originalText = originalText, + query = query, + userSpecifiedColumns = userSpecifiedColumns, + comment = comment, + properties = properties, + allowExisting = allowExisting, + replace = replace) :: Nil + case _ => Nil } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index 0eb8c96b3cc5..5733a07d9a29 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -139,24 +139,9 @@ public void readFromMultipleViews() throws NoSuchTableException { insertRows(6); String viewName = "firstView"; String secondView = "secondView"; - String viewSQL = String.format("SELECT id FROM %s WHERE id <= 3", tableName); - String secondViewSQL = String.format("SELECT id FROM %s WHERE id > 3", tableName); - ViewCatalog viewCatalog = viewCatalog(); - - viewCatalog - .buildView(TableIdentifier.of(NAMESPACE, viewName)) - .withQuery("spark", viewSQL) - .withDefaultNamespace(NAMESPACE) - .withSchema(schema(viewSQL)) - .create(); - - viewCatalog - .buildView(TableIdentifier.of(NAMESPACE, secondView)) - .withQuery("spark", secondViewSQL) - .withDefaultNamespace(NAMESPACE) - .withSchema(schema(secondViewSQL)) - .create(); + sql("CREATE VIEW %s AS SELECT id FROM %s WHERE id <= 3", viewName, tableName); + sql("CREATE VIEW %s AS SELECT id FROM %s WHERE id > 3", secondView, tableName); assertThat(sql("SELECT * FROM %s", viewName)) .hasSize(3) @@ -350,39 +335,22 @@ public void readFromViewReferencingAnotherView() throws NoSuchTableException { } @Test - public void readFromViewReferencingTempView() throws NoSuchTableException { + public void createViewReferencingTempView() throws NoSuchTableException { insertRows(10); String tempView = "tempViewBeingReferencedInAnotherView"; String viewReferencingTempView = "viewReferencingTempView"; String sql = String.format("SELECT id FROM %s", tempView); - ViewCatalog viewCatalog = viewCatalog(); - sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", tempView, tableName); - // it wouldn't be possible to reference a TEMP VIEW if the view had been created via SQL, - // but this can't be prevented when using the API directly - viewCatalog - .buildView(TableIdentifier.of(NAMESPACE, viewReferencingTempView)) - .withQuery("spark", sql) - .withDefaultNamespace(NAMESPACE) - .withDefaultCatalog(catalogName) - .withSchema(schema(sql)) - .create(); - - List expected = - IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList()); - - assertThat(sql("SELECT * FROM %s", tempView)) - .hasSize(5) - .containsExactlyInAnyOrderElementsOf(expected); - - // reading from a view that references a TEMP VIEW shouldn't be possible - assertThatThrownBy(() -> sql("SELECT * FROM %s", viewReferencingTempView)) + // creating a view that references a TEMP VIEW shouldn't be possible + assertThatThrownBy( + () -> sql("CREATE VIEW %s AS SELECT id FROM %s", viewReferencingTempView, tempView)) .isInstanceOf(AnalysisException.class) - .hasMessageContaining("The table or view") - .hasMessageContaining(tempView) - .hasMessageContaining("cannot be found"); + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewReferencingTempView) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(tempView); } @Test @@ -434,41 +402,26 @@ public void readFromViewReferencingAnotherViewHiddenByTempView() throws NoSuchTa } @Test - public void readFromViewReferencingGlobalTempView() throws NoSuchTableException { + public void createViewReferencingGlobalTempView() throws NoSuchTableException { insertRows(10); String globalTempView = "globalTempViewBeingReferenced"; String viewReferencingTempView = "viewReferencingGlobalTempView"; - ViewCatalog viewCatalog = viewCatalog(); - Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); - sql( "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", globalTempView, tableName); - // it wouldn't be possible to reference a GLOBAL TEMP VIEW if the view had been created via SQL, - // but this can't be prevented when using the API directly - viewCatalog - .buildView(TableIdentifier.of(NAMESPACE, viewReferencingTempView)) - .withQuery("spark", String.format("SELECT id FROM global_temp.%s", globalTempView)) - .withDefaultNamespace(NAMESPACE) - .withDefaultCatalog(catalogName) - .withSchema(schema) - .create(); - - List expected = - IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList()); - - assertThat(sql("SELECT * FROM global_temp.%s", globalTempView)) - .hasSize(5) - .containsExactlyInAnyOrderElementsOf(expected); - - // reading from a view that references a GLOBAL TEMP VIEW shouldn't be possible - assertThatThrownBy(() -> sql("SELECT * FROM %s", viewReferencingTempView)) + // creating a view that references a GLOBAL TEMP VIEW shouldn't be possible + assertThatThrownBy( + () -> + sql( + "CREATE VIEW %s AS SELECT id FROM global_temp.%s", + viewReferencingTempView, globalTempView)) .isInstanceOf(AnalysisException.class) - .hasMessageContaining("The table or view") - .hasMessageContaining(globalTempView) - .hasMessageContaining("cannot be found"); + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewReferencingTempView) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(globalTempView); } @Test @@ -886,6 +839,75 @@ private String viewName(String viewName) { return viewName + new Random().nextInt(1000000); } + @Test + public void createViewIfNotExists() { + String viewName = "viewThatAlreadyExists"; + sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName); + + assertThatThrownBy(() -> sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + String.format( + "Cannot create view %s.%s because it already exists", NAMESPACE, viewName)); + + // using IF NOT EXISTS should work + assertThatNoException() + .isThrownBy( + () -> sql("CREATE VIEW IF NOT EXISTS %s AS SELECT id FROM %s", viewName, tableName)); + } + + @Test + public void createViewUsingNonExistingTable() { + assertThatThrownBy(() -> sql("CREATE VIEW %s AS SELECT id FROM %s", "viewName", "non_existing")) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The table or view `non_existing` cannot be found"); + } + + @Test + public void createViewColumnMismatch() { + String viewName = "viewWithMismatchedColumns"; + + assertThatThrownBy( + () -> sql("CREATE VIEW %s (id, data) AS SELECT id FROM %s", viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining(String.format("Cannot create view `%s`.`%s`", NAMESPACE, viewName)) + .hasMessageContaining("not enough data columns") + .hasMessageContaining("View columns: `id`, `data`") + .hasMessageContaining("Data columns: `id`"); + + assertThatThrownBy( + () -> sql("CREATE VIEW %s (id) AS SELECT id, data FROM %s", viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining(String.format("Cannot create view `%s`.`%s`", NAMESPACE, viewName)) + .hasMessageContaining("too many data columns") + .hasMessageContaining("View columns: `id`") + .hasMessageContaining("Data columns: `id`, `data`"); + } + + @Test + public void createViewWithColumnAliases() throws NoSuchTableException { + insertRows(6); + String viewName = "viewWithColumnAliases"; + + sql( + "CREATE VIEW %s (new_id COMMENT 'ID', new_data COMMENT 'DATA') AS SELECT id, data FROM %s WHERE id <= 3", + viewName, tableName); + + assertThat(sql("SELECT new_id FROM %s", viewName)) + .hasSize(3) + .containsExactlyInAnyOrder(row(1), row(2), row(3)); + + sql("DROP VIEW %s", viewName); + + sql( + "CREATE VIEW %s (new_id, new_data) AS SELECT id, data FROM %s WHERE id <= 3", + viewName, tableName); + + assertThat(sql("SELECT new_id FROM %s", viewName)) + .hasSize(3) + .containsExactlyInAnyOrder(row(1), row(2), row(3)); + } + private void insertRows(int numRows) throws NoSuchTableException { List records = Lists.newArrayListWithCapacity(numRows); for (int i = 1; i <= numRows; i++) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index a62f6bb95aaa..5ab86e265ae1 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -568,6 +568,33 @@ public View createView( String[] columnComments, Map properties) throws ViewAlreadyExistsException, NoSuchNamespaceException { + if (null != asViewCatalog) { + Schema icebergSchema = SparkSchemaUtil.convert(schema); + + try { + Map props = + ImmutableMap.builder() + .putAll(Spark3Util.rebuildCreateProperties(properties)) + .put("queryColumnNames", Arrays.toString(queryColumnNames)) + .build(); + org.apache.iceberg.view.View view = + asViewCatalog + .buildView(buildIdentifier(ident)) + .withDefaultCatalog(currentCatalog) + .withDefaultNamespace(Namespace.of(currentNamespace)) + .withQuery("spark", sql) + .withSchema(icebergSchema) + .withLocation(properties.get("location")) + .withProperties(props) + .create(); + return new SparkView(catalogName, view); + } catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) { + throw new NoSuchNamespaceException(currentNamespace); + } catch (AlreadyExistsException e) { + throw new ViewAlreadyExistsException(ident); + } + } + throw new UnsupportedOperationException( "Creating a view is not supported by catalog: " + catalogName); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java index 424519623e4d..3a4a92ce390b 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java @@ -35,8 +35,9 @@ public class SparkView implements org.apache.spark.sql.connector.catalog.View { + private static final String QUERY_COLUMN_NAMES = "queryColumnNames"; private static final Set RESERVED_PROPERTIES = - ImmutableSet.of("provider", "location", FORMAT_VERSION); + ImmutableSet.of("provider", "location", FORMAT_VERSION, QUERY_COLUMN_NAMES); private final View icebergView; private final String catalogName; @@ -86,7 +87,9 @@ public StructType schema() { @Override public String[] queryColumnNames() { - return new String[0]; + return properties().containsKey(QUERY_COLUMN_NAMES) + ? properties().get(QUERY_COLUMN_NAMES).split(", ") + : new String[0]; } @Override @@ -109,6 +112,11 @@ public Map properties() { propsBuilder.put("provider", "iceberg"); propsBuilder.put("location", icebergView.location()); + if (icebergView.properties().containsKey(QUERY_COLUMN_NAMES)) { + String queryColumnNames = + icebergView.properties().get(QUERY_COLUMN_NAMES).replace("[", "").replace("]", ""); + propsBuilder.put(QUERY_COLUMN_NAMES, queryColumnNames); + } if (icebergView instanceof BaseView) { ViewOperations ops = ((BaseView) icebergView).operations(); From 5153b4b6b3d110cff409adea6be4a732fa499f96 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Wed, 17 Jan 2024 10:19:36 +0100 Subject: [PATCH 2/7] review feedback --- .../IcebergSparkSessionExtensions.scala | 2 - .../sql/catalyst/analysis/ResolveViews.scala | 9 ++++- .../analysis/RewriteViewCommands.scala | 4 +- .../sql/catalyst/analysis/ViewCheck.scala | 39 ------------------- .../datasources/v2/CreateV2ViewExec.scala | 17 ++++---- .../iceberg/spark/extensions/TestViews.java | 17 +++++++- .../apache/iceberg/spark/SparkCatalog.java | 6 ++- .../iceberg/spark/source/SparkView.java | 4 +- 8 files changed, 40 insertions(+), 58 deletions(-) delete mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala index beea0c10f209..ad9df3994fc0 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala @@ -23,7 +23,6 @@ import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.analysis.ProcedureArgumentCoercion import org.apache.spark.sql.catalyst.analysis.ResolveProcedures import org.apache.spark.sql.catalyst.analysis.ResolveViews -import org.apache.spark.sql.catalyst.analysis.ViewCheck import org.apache.spark.sql.catalyst.optimizer.ReplaceStaticInvoke import org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy @@ -38,7 +37,6 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) { extensions.injectResolutionRule { spark => ResolveProcedures(spark) } extensions.injectResolutionRule { spark => ResolveViews(spark) } extensions.injectResolutionRule { _ => ProcedureArgumentCoercion } - extensions.injectCheckRule(_ => ViewCheck) // optimizer extensions extensions.injectOptimizerRule { _ => ReplaceStaticInvoke } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index a1b826569f35..57995a49bd24 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.iceberg.spark.Spark3Util import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.Alias @@ -27,6 +28,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin @@ -38,6 +40,7 @@ import org.apache.spark.sql.connector.catalog.LookupCatalog import org.apache.spark.sql.connector.catalog.View import org.apache.spark.sql.connector.catalog.ViewCatalog import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.command.ViewHelper case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog { @@ -59,6 +62,10 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look loadView(catalog, ident) .map(_ => ResolvedV2View(catalog.asViewCatalog, ident)) .getOrElse(u) + + case c@CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, _, _, _, query, _, _) => + ViewHelper.verifyTemporaryObjectsNotExists(false, Spark3Util.toV1TableIdentifier(ident), query, Seq.empty) + c } def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match { @@ -151,7 +158,7 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look } - implicit class ViewHelper(plugin: CatalogPlugin) { + implicit class IcebergViewHelper(plugin: CatalogPlugin) { def asViewCatalog: ViewCatalog = plugin match { case viewCatalog: ViewCatalog => viewCatalog diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala index 651be7409781..09c5e5aff863 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala @@ -43,9 +43,9 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi case DropView(ResolvedView(resolved), ifExists) => DropIcebergView(resolved, ifExists) - case CreateView(r@ResolvedView(_), userSpecifiedColumns, comment, properties, + case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, properties, originalText, query, allowExisting, replace) => - CreateIcebergView(r, userSpecifiedColumns, comment, properties, originalText, + CreateIcebergView(resolved, userSpecifiedColumns, comment, properties, originalText, query, allowExisting, replace) } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala deleted file mode 100644 index a0e3b7a6639e..000000000000 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.spark.sql.catalyst.analysis - -import org.apache.iceberg.spark.Spark3Util -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView -import org.apache.spark.sql.connector.catalog.ViewCatalog -import org.apache.spark.sql.execution.command.ViewHelper - -object ViewCheck extends (LogicalPlan => Unit) { - - override def apply(plan: LogicalPlan): Unit = { - plan foreach { - case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, _, _, _, query, _, _) => - val identifier = Spark3Util.toV1TableIdentifier(ident) - ViewHelper.verifyTemporaryObjectsNotExists(false, identifier, query, Seq.empty) - ViewHelper.verifyAutoGeneratedAliasesNotExists(query, false, identifier) - case _ => // OK - } - } -} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala index 4db76601385a..d20c219ffdd0 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala @@ -50,10 +50,7 @@ case class CreateV2ViewExec( override lazy val output: Seq[Attribute] = Nil override protected def run(): Seq[InternalRow] = { - val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP) - qe.assertAnalyzed() - val analyzedPlan = qe.analyzed - + val analyzedPlan = session.sessionState.executePlan(query, CommandExecutionMode.SKIP).analyzed val identifier = Spark3Util.toV1TableIdentifier(ident) if (userSpecifiedColumns.nonEmpty) { @@ -71,17 +68,17 @@ case class CreateV2ViewExec( val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema val columnAliases = userSpecifiedColumns.map(_._1).toArray - val columnComments = userSpecifiedColumns.map(_._2.getOrElse("")).toArray + val columnComments = userSpecifiedColumns.map(_._2.orNull).toArray - val currentCatalog = session.sessionState.catalogManager.currentCatalog.name + val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name + val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null val currentNamespace = session.sessionState.catalogManager.currentNamespace val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION - val createEngineVersion = Some(engineVersion) val newProperties = properties ++ - comment.map(ViewCatalog.PROP_COMMENT -> _) ++ - createEngineVersion.map(ViewCatalog.PROP_CREATE_ENGINE_VERSION -> _) + - (ViewCatalog.PROP_ENGINE_VERSION -> engineVersion) + comment.map(ViewCatalog.PROP_COMMENT -> _) + + (ViewCatalog.PROP_CREATE_ENGINE_VERSION -> engineVersion, + ViewCatalog.PROP_ENGINE_VERSION -> engineVersion) if (replace) { // CREATE OR REPLACE VIEW diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index 5733a07d9a29..ebba124eb4d8 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -339,7 +339,6 @@ public void createViewReferencingTempView() throws NoSuchTableException { insertRows(10); String tempView = "tempViewBeingReferencedInAnotherView"; String viewReferencingTempView = "viewReferencingTempView"; - String sql = String.format("SELECT id FROM %s", tempView); sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", tempView, tableName); @@ -893,6 +892,9 @@ public void createViewWithColumnAliases() throws NoSuchTableException { "CREATE VIEW %s (new_id COMMENT 'ID', new_data COMMENT 'DATA') AS SELECT id, data FROM %s WHERE id <= 3", viewName, tableName); + assertThat(viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName)).properties()) + .containsEntry("queryColumnNames", "id, data"); + assertThat(sql("SELECT new_id FROM %s", viewName)) .hasSize(3) .containsExactlyInAnyOrder(row(1), row(2), row(3)); @@ -908,6 +910,19 @@ public void createViewWithColumnAliases() throws NoSuchTableException { .containsExactlyInAnyOrder(row(1), row(2), row(3)); } + @Test + public void createViewWithDuplicateQueryColumnNames() { + String viewName = "viewWithDuplicateQueryColumnNames"; + + assertThatThrownBy( + () -> + sql( + "CREATE VIEW %s (new_id , new_data) AS SELECT id, id FROM %s WHERE id <= 3", + viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The column `id` already exists"); + } + private void insertRows(int numRows) throws NoSuchTableException { List records = Lists.newArrayListWithCapacity(numRows); for (int i = 1; i <= numRows; i++) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 5ab86e265ae1..db8829261472 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.StringJoiner; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; @@ -571,11 +572,14 @@ public View createView( if (null != asViewCatalog) { Schema icebergSchema = SparkSchemaUtil.convert(schema); + StringJoiner joiner = new StringJoiner(", "); + Arrays.stream(queryColumnNames).forEach(joiner::add); + try { Map props = ImmutableMap.builder() .putAll(Spark3Util.rebuildCreateProperties(properties)) - .put("queryColumnNames", Arrays.toString(queryColumnNames)) + .put("queryColumnNames", joiner.toString()) .build(); org.apache.iceberg.view.View view = asViewCatalog diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java index 3a4a92ce390b..62d300b36a1d 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java @@ -87,8 +87,8 @@ public StructType schema() { @Override public String[] queryColumnNames() { - return properties().containsKey(QUERY_COLUMN_NAMES) - ? properties().get(QUERY_COLUMN_NAMES).split(", ") + return icebergView.properties().containsKey(QUERY_COLUMN_NAMES) + ? icebergView.properties().get(QUERY_COLUMN_NAMES).split(", ") : new String[0]; } From 6559f7220e15498c6594ab5d5046d54960d57f04 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Wed, 24 Jan 2024 12:26:01 +0100 Subject: [PATCH 3/7] review feedback --- .../IcebergSparkSessionExtensions.scala | 2 + .../sql/catalyst/analysis/CheckViews.scala | 101 ++++++++++++++++++ .../sql/catalyst/analysis/ResolveViews.scala | 7 -- .../analysis/RewriteViewCommands.scala | 8 +- .../logical/views/CreateIcebergView.scala | 7 +- .../datasources/v2/CreateV2ViewExec.scala | 41 +++---- .../v2/ExtendedDataSourceV2Strategy.scala | 11 +- .../iceberg/spark/extensions/TestViews.java | 45 ++++++-- 8 files changed, 169 insertions(+), 53 deletions(-) create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala index ad9df3994fc0..3fca29c294c0 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala @@ -20,6 +20,7 @@ package org.apache.iceberg.spark.extensions import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.catalyst.analysis.CheckViews import org.apache.spark.sql.catalyst.analysis.ProcedureArgumentCoercion import org.apache.spark.sql.catalyst.analysis.ResolveProcedures import org.apache.spark.sql.catalyst.analysis.ResolveViews @@ -37,6 +38,7 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) { extensions.injectResolutionRule { spark => ResolveProcedures(spark) } extensions.injectResolutionRule { spark => ResolveViews(spark) } extensions.injectResolutionRule { _ => ProcedureArgumentCoercion } + extensions.injectCheckRule(_ => CheckViews) // optimizer extensions extensions.injectOptimizerRule { _ => ReplaceStaticInvoke } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala new file mode 100644 index 000000000000..e2bfe3a637a4 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.SubqueryExpression +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.View +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog + +object CheckViews extends (LogicalPlan => Unit) { + + override def apply(plan: LogicalPlan): Unit = { + plan foreach { + case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _, _, _, _, _) => + verifyAmountOfColumns(ident, columnAliases, query) + verifyTemporaryObjectsDontExist(ident, query) + + case _ => // OK + } + } + + private def verifyAmountOfColumns(ident: Identifier, columns: Seq[String], query: LogicalPlan): Unit = { + if (columns.nonEmpty) { + if (columns.length > query.output.length) { + throw new AnalysisException( + errorClass = "CREATE_VIEW_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + messageParameters = Map( + "viewName" -> ident.toString, + "viewColumns" -> columns.mkString(", "), + "dataColumns" -> query.output.map(c => c.name).mkString(", "))) + } else if (columns.length < query.output.length) { + throw new AnalysisException( + errorClass = "CREATE_VIEW_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", + messageParameters = Map( + "viewName" -> ident.toString, + "viewColumns" -> columns.mkString(", "), + "dataColumns" -> query.output.map(c => c.name).mkString(", "))) + } + } + } + + /** + * Permanent views are not allowed to reference temp objects + */ + private def verifyTemporaryObjectsDontExist( + name: Identifier, + child: LogicalPlan): Unit = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + val tempViews = collectTemporaryViews(child) + tempViews.foreach { nameParts => + throw new AnalysisException( + errorClass = "INVALID_TEMP_OBJ_REFERENCE", + messageParameters = Map( + "obj" -> "VIEW", + "objName" -> name.name(), + "tempObj" -> "VIEW", + "tempObjName" -> nameParts.quoted)) + } + + // TODO: check for temp function names + } + + + /** + * Collect all temporary views and return the identifiers separately + */ + private def collectTemporaryViews(child: LogicalPlan): Seq[Seq[String]] = { + def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = { + child.flatMap { + case view: View if view.isTempView => Seq(view.desc.identifier.nameParts) + case plan => plan.expressions.flatMap(_.flatMap { + case e: SubqueryExpression => collectTempViews(e.plan) + case _ => Seq.empty + }) + }.distinct + } + + collectTempViews(child) + } +} \ No newline at end of file diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index 57995a49bd24..a4ae3dc99def 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.iceberg.spark.Spark3Util import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.Alias @@ -28,7 +27,6 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias -import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin @@ -40,7 +38,6 @@ import org.apache.spark.sql.connector.catalog.LookupCatalog import org.apache.spark.sql.connector.catalog.View import org.apache.spark.sql.connector.catalog.ViewCatalog import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.command.ViewHelper case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog { @@ -62,10 +59,6 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look loadView(catalog, ident) .map(_ => ResolvedV2View(catalog.asViewCatalog, ident)) .getOrElse(u) - - case c@CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, _, _, _, query, _, _) => - ViewHelper.verifyTemporaryObjectsNotExists(false, Spark3Util.toV1TableIdentifier(ident), query, Seq.empty) - c } def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match { diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala index 09c5e5aff863..826501157052 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala @@ -44,9 +44,11 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi DropIcebergView(resolved, ifExists) case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, properties, - originalText, query, allowExisting, replace) => - CreateIcebergView(resolved, userSpecifiedColumns, comment, properties, originalText, - query, allowExisting, replace) + Some(queryText), query, allowExisting, replace) => + val columnAliases = userSpecifiedColumns.map(_._1) + val columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)) + CreateIcebergView(resolved, queryText, query, columnAliases, columnComments, + comment, properties, allowExisting, replace) } private def isTempView(nameParts: Seq[String]): Boolean = { diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala index 036ec8ce2b95..8ef96586d355 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala @@ -24,11 +24,12 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan case class CreateIcebergView( child: LogicalPlan, - userSpecifiedColumns: Seq[(String, Option[String])], + queryText: String, + query: LogicalPlan, + columnAliases: Seq[String], + columnComments: Seq[Option[String]], comment: Option[String], properties: Map[String, String], - originalText: Option[String], - query: LogicalPlan, allowExisting: Boolean, replace: Boolean) extends BinaryCommand { override def left: LogicalPlan = child diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala index d20c219ffdd0..6efd7c0e2efc 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.CommandExecutionMode import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.SchemaUtils import scala.collection.JavaConverters._ @@ -39,9 +40,10 @@ import scala.collection.JavaConverters._ case class CreateV2ViewExec( catalog: ViewCatalog, ident: Identifier, - originalText: String, - query: LogicalPlan, - userSpecifiedColumns: Seq[(String, Option[String])], + queryText: String, + viewSchema: StructType, + columnAliases: Seq[String], + columnComments: Seq[Option[String]], comment: Option[String], properties: Map[String, String], allowExisting: Boolean, @@ -50,26 +52,11 @@ case class CreateV2ViewExec( override lazy val output: Seq[Attribute] = Nil override protected def run(): Seq[InternalRow] = { - val analyzedPlan = session.sessionState.executePlan(query, CommandExecutionMode.SKIP).analyzed - val identifier = Spark3Util.toV1TableIdentifier(ident) - - if (userSpecifiedColumns.nonEmpty) { - if (userSpecifiedColumns.length > analyzedPlan.output.length) { - throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError( - identifier, userSpecifiedColumns.map(_._1), analyzedPlan) - } else if (userSpecifiedColumns.length < analyzedPlan.output.length) { - throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError( - identifier, userSpecifiedColumns.map(_._1), analyzedPlan) - } - } - - val queryColumnNames = analyzedPlan.schema.fieldNames +// val analyzedPlan = session.sessionState.executePlan(query, CommandExecutionMode.SKIP).analyzed +// + val queryColumnNames = viewSchema.fieldNames SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) - val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema - val columnAliases = userSpecifiedColumns.map(_._1).toArray - val columnComments = userSpecifiedColumns.map(_._2.orNull).toArray - val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null val currentNamespace = session.sessionState.catalogManager.currentNamespace @@ -88,26 +75,26 @@ case class CreateV2ViewExec( // FIXME: replaceView API doesn't exist in Spark 3.5 catalog.createView( ident, - originalText, + queryText, currentCatalog, currentNamespace, viewSchema, queryColumnNames, - columnAliases, - columnComments, + columnAliases.toArray, + columnComments.map(c => c.orNull).toArray, newProperties.asJava) } else { try { // CREATE VIEW [IF NOT EXISTS] catalog.createView( ident, - originalText, + queryText, currentCatalog, currentNamespace, viewSchema, queryColumnNames, - columnAliases, - columnComments, + columnAliases.toArray, + columnComments.map(c => c.orNull).toArray, newProperties.asJava) } catch { case _: ViewAlreadyExistsException if allowExisting => // Ignore diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 81bb559785c7..8951d8208494 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -108,14 +108,15 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) => DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil - case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), userSpecifiedColumns, comment, - properties, Some(originalText), query, allowExisting, replace) => + case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), queryText, query, + columnAliases, columnComments, comment, properties, allowExisting, replace) => CreateV2ViewExec( catalog = viewCatalog, ident = ident, - originalText = originalText, - query = query, - userSpecifiedColumns = userSpecifiedColumns, + queryText = queryText, + columnAliases = columnAliases, + columnComments = columnComments, + viewSchema = query.schema, comment = comment, properties = properties, allowExisting = allowExisting, diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index ebba124eb4d8..65d2c39d437b 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -139,9 +139,24 @@ public void readFromMultipleViews() throws NoSuchTableException { insertRows(6); String viewName = "firstView"; String secondView = "secondView"; + String viewSQL = String.format("SELECT id FROM %s WHERE id <= 3", tableName); + String secondViewSQL = String.format("SELECT id FROM %s WHERE id > 3", tableName); - sql("CREATE VIEW %s AS SELECT id FROM %s WHERE id <= 3", viewName, tableName); - sql("CREATE VIEW %s AS SELECT id FROM %s WHERE id > 3", secondView, tableName); + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", viewSQL) + .withDefaultNamespace(NAMESPACE) + .withSchema(schema(viewSQL)) + .create(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, secondView)) + .withQuery("spark", secondViewSQL) + .withDefaultNamespace(NAMESPACE) + .withSchema(schema(secondViewSQL)) + .create(); assertThat(sql("SELECT * FROM %s", viewName)) .hasSize(3) @@ -869,18 +884,18 @@ public void createViewColumnMismatch() { assertThatThrownBy( () -> sql("CREATE VIEW %s (id, data) AS SELECT id FROM %s", viewName, tableName)) .isInstanceOf(AnalysisException.class) - .hasMessageContaining(String.format("Cannot create view `%s`.`%s`", NAMESPACE, viewName)) + .hasMessageContaining(String.format("Cannot create view %s.%s", NAMESPACE, viewName)) .hasMessageContaining("not enough data columns") - .hasMessageContaining("View columns: `id`, `data`") - .hasMessageContaining("Data columns: `id`"); + .hasMessageContaining("View columns: id, data") + .hasMessageContaining("Data columns: id"); assertThatThrownBy( () -> sql("CREATE VIEW %s (id) AS SELECT id, data FROM %s", viewName, tableName)) .isInstanceOf(AnalysisException.class) - .hasMessageContaining(String.format("Cannot create view `%s`.`%s`", NAMESPACE, viewName)) + .hasMessageContaining(String.format("Cannot create view %s.%s", NAMESPACE, viewName)) .hasMessageContaining("too many data columns") - .hasMessageContaining("View columns: `id`") - .hasMessageContaining("Data columns: `id`, `data`"); + .hasMessageContaining("View columns: id") + .hasMessageContaining("Data columns: id, data"); } @Test @@ -923,6 +938,20 @@ public void createViewWithDuplicateQueryColumnNames() { .hasMessageContaining("The column `id` already exists"); } + @Test + public void createViewWithNonExistingQueryColumn() { + String viewName = "viewWithNonExistingQueryColumn"; + + assertThatThrownBy( + () -> + sql( + "CREATE VIEW %s AS SELECT non_existing FROM %s WHERE id <= 3", + viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + "A column or function parameter with name `non_existing` cannot be resolved"); + } + private void insertRows(int numRows) throws NoSuchTableException { List records = Lists.newArrayListWithCapacity(numRows); for (int i = 1; i <= numRows; i++) { From ffe9fbae1375ead446893a29a4f6283f11d5dcb5 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Wed, 24 Jan 2024 18:05:15 +0100 Subject: [PATCH 4/7] WIP: aliasing --- .../sql/catalyst/analysis/CheckViews.scala | 8 +++- .../sql/catalyst/analysis/ResolveViews.scala | 28 +++++++++++++ .../analysis/RewriteViewCommands.scala | 13 ++++-- .../logical/views/CreateIcebergView.scala | 4 +- .../datasources/v2/CreateV2ViewExec.scala | 42 +++---------------- .../v2/ExtendedDataSourceV2Strategy.scala | 4 +- 6 files changed, 54 insertions(+), 45 deletions(-) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala index e2bfe3a637a4..123dc163dd3a 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala @@ -26,14 +26,18 @@ import org.apache.spark.sql.catalyst.plans.logical.View import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.util.SchemaUtils object CheckViews extends (LogicalPlan => Unit) { override def apply(plan: LogicalPlan): Unit = { plan foreach { - case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _, _, _, _, _) => + case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _, + queryColumnNames, _, _, _, _, _) => verifyAmountOfColumns(ident, columnAliases, query) verifyTemporaryObjectsDontExist(ident, query) + SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) case _ => // OK } @@ -98,4 +102,4 @@ object CheckViews extends (LogicalPlan => Unit) { collectTempViews(child) } -} \ No newline at end of file +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index a4ae3dc99def..ce20e68767e2 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin @@ -38,6 +39,7 @@ import org.apache.spark.sql.connector.catalog.LookupCatalog import org.apache.spark.sql.connector.catalog.View import org.apache.spark.sql.connector.catalog.ViewCatalog import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types.MetadataBuilder case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog { @@ -59,6 +61,32 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look loadView(catalog, ident) .map(_ => ResolvedV2View(catalog.asViewCatalog, ident)) .getOrElse(u) + + case c@CreateIcebergView(ResolvedIdentifier(_, ident), _, query, columnAliases, columnComments, _, _, _, _, _, + rewritten) + if query.resolved && !rewritten => + val rewritten = rewriteIdentifiers(query, ident.asMultipartIdentifier) + val aliasedPlan = aliasPlan(rewritten, columnAliases, columnComments) + c.copy(query = aliasedPlan, queryColumnNames = query.schema.fieldNames, rewritten = true) + } + + private def aliasPlan( + analyzedPlan: LogicalPlan, + columnAliases: Seq[String], + columnComments: Seq[Option[String]]): LogicalPlan = { + if (columnAliases.isEmpty || columnAliases.length != analyzedPlan.output.length) { + analyzedPlan + } else { + val projectList = analyzedPlan.output.zipWithIndex.map { case (attr, pos) => + if (columnComments.apply(pos).isDefined) { + val meta = new MetadataBuilder().putString("comment", columnComments.apply(pos).get).build() + Alias(attr, columnAliases.apply(pos))(explicitMetadata = Some(meta)) + } else { + Alias(attr, columnAliases.apply(pos))() + } + } + Project(projectList, analyzedPlan) + } } def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match { diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala index 826501157052..7b4883494d96 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala @@ -45,10 +45,15 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, properties, Some(queryText), query, allowExisting, replace) => - val columnAliases = userSpecifiedColumns.map(_._1) - val columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)) - CreateIcebergView(resolved, queryText, query, columnAliases, columnComments, - comment, properties, allowExisting, replace) + CreateIcebergView(child = resolved, + queryText = queryText, + query = query, + columnAliases = userSpecifiedColumns.map(_._1), + columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)), + comment = comment, + properties = properties, + allowExisting = allowExisting, + replace = replace) } private def isTempView(nameParts: Seq[String]): Boolean = { diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala index 8ef96586d355..9366d5efe163 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala @@ -28,10 +28,12 @@ case class CreateIcebergView( query: LogicalPlan, columnAliases: Seq[String], columnComments: Seq[Option[String]], + queryColumnNames: Seq[String] = Seq.empty, comment: Option[String], properties: Map[String, String], allowExisting: Boolean, - replace: Boolean) extends BinaryCommand { + replace: Boolean, + rewritten: Boolean = false) extends BinaryCommand { override def left: LogicalPlan = child override def right: LogicalPlan = query diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala index 6efd7c0e2efc..2d0fcc69114e 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala @@ -19,21 +19,13 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.iceberg.spark.Spark3Util import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException -import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.ViewCatalog -import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.CommandExecutionMode -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.MetadataBuilder import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.SchemaUtils import scala.collection.JavaConverters._ @@ -44,19 +36,16 @@ case class CreateV2ViewExec( viewSchema: StructType, columnAliases: Seq[String], columnComments: Seq[Option[String]], + queryColumnNames: Seq[String], comment: Option[String], properties: Map[String, String], allowExisting: Boolean, - replace: Boolean) extends LeafV2CommandExec { + replace: Boolean, + query: LogicalPlan) extends LeafV2CommandExec { override lazy val output: Seq[Attribute] = Nil override protected def run(): Seq[InternalRow] = { -// val analyzedPlan = session.sessionState.executePlan(query, CommandExecutionMode.SKIP).analyzed -// - val queryColumnNames = viewSchema.fieldNames - SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) - val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null val currentNamespace = session.sessionState.catalogManager.currentNamespace @@ -79,7 +68,7 @@ case class CreateV2ViewExec( currentCatalog, currentNamespace, viewSchema, - queryColumnNames, + queryColumnNames.toArray, columnAliases.toArray, columnComments.map(c => c.orNull).toArray, newProperties.asJava) @@ -92,7 +81,7 @@ case class CreateV2ViewExec( currentCatalog, currentNamespace, viewSchema, - queryColumnNames, + queryColumnNames.toArray, columnAliases.toArray, columnComments.map(c => c.orNull).toArray, newProperties.asJava) @@ -107,25 +96,4 @@ case class CreateV2ViewExec( override def simpleString(maxFields: Int): String = { s"CreateV2ViewExec: ${ident}" } - - /** - * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, - * else return the analyzed plan directly. - */ - private def aliasPlan( - analyzedPlan: LogicalPlan, - userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = { - if (userSpecifiedColumns.isEmpty) { - analyzedPlan - } else { - val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { - case (attr, (colName, None)) => Alias(attr, colName)() - case (attr, (colName, Some(colComment))) => - val meta = new MetadataBuilder().putString("comment", colComment).build() - Alias(attr, colName)(explicitMetadata = Some(meta)) - } - val projectedPlan = Project(projectList, analyzedPlan) - session.sessionState.executePlan(projectedPlan, CommandExecutionMode.SKIP).analyzed - } - } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 8951d8208494..e6148dcca4a3 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -109,13 +109,15 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), queryText, query, - columnAliases, columnComments, comment, properties, allowExisting, replace) => + columnAliases, columnComments, queryColumnNames, comment, properties, allowExisting, replace, _) => CreateV2ViewExec( catalog = viewCatalog, ident = ident, queryText = queryText, columnAliases = columnAliases, columnComments = columnComments, + queryColumnNames = queryColumnNames, + query = query, viewSchema = query.schema, comment = comment, properties = properties, From 7b86876354fa10bde253c506cf6a367a7bb1463d Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Thu, 25 Jan 2024 09:25:16 +0100 Subject: [PATCH 5/7] review feedback --- .../sql/catalyst/analysis/CheckViews.scala | 4 +- .../sql/catalyst/analysis/ResolveViews.scala | 13 +- .../datasources/v2/CreateV2ViewExec.scala | 3 +- .../v2/ExtendedDataSourceV2Strategy.scala | 1 - .../iceberg/spark/extensions/TestViews.java | 192 +++++++++++++++--- .../iceberg/spark/source/SparkView.java | 5 - 6 files changed, 171 insertions(+), 47 deletions(-) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala index 123dc163dd3a..2d1645b95ee9 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala @@ -35,7 +35,7 @@ object CheckViews extends (LogicalPlan => Unit) { plan foreach { case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _, queryColumnNames, _, _, _, _, _) => - verifyAmountOfColumns(ident, columnAliases, query) + verifyColumnCount(ident, columnAliases, query) verifyTemporaryObjectsDontExist(ident, query) SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) @@ -43,7 +43,7 @@ object CheckViews extends (LogicalPlan => Unit) { } } - private def verifyAmountOfColumns(ident: Identifier, columns: Seq[String], query: LogicalPlan): Unit = { + private def verifyColumnCount(ident: Identifier, columns: Seq[String], query: LogicalPlan): Unit = { if (columns.nonEmpty) { if (columns.length > query.output.length) { throw new AnalysisException( diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index ce20e68767e2..49a6f3df2884 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -62,9 +62,8 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look .map(_ => ResolvedV2View(catalog.asViewCatalog, ident)) .getOrElse(u) - case c@CreateIcebergView(ResolvedIdentifier(_, ident), _, query, columnAliases, columnComments, _, _, _, _, _, - rewritten) - if query.resolved && !rewritten => + case c@CreateIcebergView(ResolvedIdentifier(_, ident), _, query, columnAliases, columnComments, _, _, _, _, _, _) + if query.resolved && !c.rewritten => val rewritten = rewriteIdentifiers(query, ident.asMultipartIdentifier) val aliasedPlan = aliasPlan(rewritten, columnAliases, columnComments) c.copy(query = aliasedPlan, queryColumnNames = query.schema.fieldNames, rewritten = true) @@ -77,12 +76,14 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look if (columnAliases.isEmpty || columnAliases.length != analyzedPlan.output.length) { analyzedPlan } else { - val projectList = analyzedPlan.output.zipWithIndex.map { case (attr, pos) => + val projectList = analyzedPlan.output.zipWithIndex.map { case (_, pos) => + val column = GetColumnByOrdinal(pos, analyzedPlan.schema.fields.apply(pos).dataType) + if (columnComments.apply(pos).isDefined) { val meta = new MetadataBuilder().putString("comment", columnComments.apply(pos).get).build() - Alias(attr, columnAliases.apply(pos))(explicitMetadata = Some(meta)) + Alias(column, columnAliases.apply(pos))(explicitMetadata = Some(meta)) } else { - Alias(attr, columnAliases.apply(pos))() + Alias(column, columnAliases.apply(pos))() } } Project(projectList, analyzedPlan) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala index 2d0fcc69114e..892e1eb857e4 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala @@ -40,8 +40,7 @@ case class CreateV2ViewExec( comment: Option[String], properties: Map[String, String], allowExisting: Boolean, - replace: Boolean, - query: LogicalPlan) extends LeafV2CommandExec { + replace: Boolean) extends LeafV2CommandExec { override lazy val output: Seq[Attribute] = Nil diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index e6148dcca4a3..0505fe4e3030 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -117,7 +117,6 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi columnAliases = columnAliases, columnComments = columnComments, queryColumnNames = queryColumnNames, - query = query, viewSchema = query.schema, comment = comment, properties = properties, diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index 65d2c39d437b..d212cc15aaa3 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -350,21 +350,39 @@ public void readFromViewReferencingAnotherView() throws NoSuchTableException { } @Test - public void createViewReferencingTempView() throws NoSuchTableException { + public void readFromViewReferencingTempView() throws NoSuchTableException { insertRows(10); String tempView = "tempViewBeingReferencedInAnotherView"; String viewReferencingTempView = "viewReferencingTempView"; + String sql = String.format("SELECT id FROM %s", tempView); + + ViewCatalog viewCatalog = viewCatalog(); sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", tempView, tableName); - // creating a view that references a TEMP VIEW shouldn't be possible - assertThatThrownBy( - () -> sql("CREATE VIEW %s AS SELECT id FROM %s", viewReferencingTempView, tempView)) + // it wouldn't be possible to reference a TEMP VIEW if the view had been created via SQL, + // but this can't be prevented when using the API directly + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewReferencingTempView)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + List expected = + IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList()); + + assertThat(sql("SELECT * FROM %s", tempView)) + .hasSize(5) + .containsExactlyInAnyOrderElementsOf(expected); + + // reading from a view that references a TEMP VIEW shouldn't be possible + assertThatThrownBy(() -> sql("SELECT * FROM %s", viewReferencingTempView)) .isInstanceOf(AnalysisException.class) - .hasMessageContaining("Cannot create the persistent object") - .hasMessageContaining(viewReferencingTempView) - .hasMessageContaining("of the type VIEW because it references to the temporary object") - .hasMessageContaining(tempView); + .hasMessageContaining("The table or view") + .hasMessageContaining(tempView) + .hasMessageContaining("cannot be found"); } @Test @@ -416,26 +434,41 @@ public void readFromViewReferencingAnotherViewHiddenByTempView() throws NoSuchTa } @Test - public void createViewReferencingGlobalTempView() throws NoSuchTableException { + public void readFromViewReferencingGlobalTempView() throws NoSuchTableException { insertRows(10); String globalTempView = "globalTempViewBeingReferenced"; String viewReferencingTempView = "viewReferencingGlobalTempView"; + ViewCatalog viewCatalog = viewCatalog(); + Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); + sql( "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", globalTempView, tableName); - // creating a view that references a GLOBAL TEMP VIEW shouldn't be possible - assertThatThrownBy( - () -> - sql( - "CREATE VIEW %s AS SELECT id FROM global_temp.%s", - viewReferencingTempView, globalTempView)) + // it wouldn't be possible to reference a GLOBAL TEMP VIEW if the view had been created via SQL, + // but this can't be prevented when using the API directly + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewReferencingTempView)) + .withQuery("spark", String.format("SELECT id FROM global_temp.%s", globalTempView)) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema) + .create(); + + List expected = + IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList()); + + assertThat(sql("SELECT * FROM global_temp.%s", globalTempView)) + .hasSize(5) + .containsExactlyInAnyOrderElementsOf(expected); + + // reading from a view that references a GLOBAL TEMP VIEW shouldn't be possible + assertThatThrownBy(() -> sql("SELECT * FROM %s", viewReferencingTempView)) .isInstanceOf(AnalysisException.class) - .hasMessageContaining("Cannot create the persistent object") - .hasMessageContaining(viewReferencingTempView) - .hasMessageContaining("of the type VIEW because it references to the temporary object") - .hasMessageContaining(globalTempView); + .hasMessageContaining("The table or view") + .hasMessageContaining(globalTempView) + .hasMessageContaining("cannot be found"); } @Test @@ -870,16 +903,65 @@ public void createViewIfNotExists() { () -> sql("CREATE VIEW IF NOT EXISTS %s AS SELECT id FROM %s", viewName, tableName)); } + @Test + public void createViewWithInvalidSQL() { + assertThatThrownBy(() -> sql("CREATE VIEW simpleViewWithInvalidSQL AS invalid SQL")) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Syntax error"); + } + + @Test + public void createViewReferencingTempView() throws NoSuchTableException { + insertRows(10); + String tempView = "temporaryViewBeingReferencedInAnotherView"; + String viewReferencingTempView = "viewReferencingTemporaryView"; + + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", tempView, tableName); + + // creating a view that references a TEMP VIEW shouldn't be possible + assertThatThrownBy( + () -> sql("CREATE VIEW %s AS SELECT id FROM %s", viewReferencingTempView, tempView)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewReferencingTempView) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(tempView); + } + + @Test + public void createViewReferencingGlobalTempView() throws NoSuchTableException { + insertRows(10); + String globalTempView = "globalTemporaryViewBeingReferenced"; + String viewReferencingTempView = "viewReferencingGlobalTemporaryView"; + + sql( + "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", + globalTempView, tableName); + + // creating a view that references a GLOBAL TEMP VIEW shouldn't be possible + assertThatThrownBy( + () -> + sql( + "CREATE VIEW %s AS SELECT id FROM global_temp.%s", + viewReferencingTempView, globalTempView)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewReferencingTempView) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(globalTempView); + } + @Test public void createViewUsingNonExistingTable() { - assertThatThrownBy(() -> sql("CREATE VIEW %s AS SELECT id FROM %s", "viewName", "non_existing")) + assertThatThrownBy( + () -> sql("CREATE VIEW viewWithNonExistingTable AS SELECT id FROM non_existing")) .isInstanceOf(AnalysisException.class) .hasMessageContaining("The table or view `non_existing` cannot be found"); } @Test - public void createViewColumnMismatch() { - String viewName = "viewWithMismatchedColumns"; + public void createViewWithMismatchedColumnCounts() { + String viewName = "viewWithMismatchedColumnCounts"; assertThatThrownBy( () -> sql("CREATE VIEW %s (id, data) AS SELECT id FROM %s", viewName, tableName)) @@ -917,7 +999,7 @@ public void createViewWithColumnAliases() throws NoSuchTableException { sql("DROP VIEW %s", viewName); sql( - "CREATE VIEW %s (new_id, new_data) AS SELECT id, data FROM %s WHERE id <= 3", + "CREATE VIEW %s (new_data, new_id) AS SELECT data, id FROM %s WHERE id <= 3", viewName, tableName); assertThat(sql("SELECT new_id FROM %s", viewName)) @@ -927,31 +1009,79 @@ public void createViewWithColumnAliases() throws NoSuchTableException { @Test public void createViewWithDuplicateQueryColumnNames() { - String viewName = "viewWithDuplicateQueryColumnNames"; - assertThatThrownBy( () -> sql( - "CREATE VIEW %s (new_id , new_data) AS SELECT id, id FROM %s WHERE id <= 3", - viewName, tableName)) + "CREATE VIEW viewWithDuplicateQueryColumnNames (new_id , new_data) AS SELECT id, id FROM %s WHERE id <= 3", + tableName)) .isInstanceOf(AnalysisException.class) .hasMessageContaining("The column `id` already exists"); } @Test - public void createViewWithNonExistingQueryColumn() { - String viewName = "viewWithNonExistingQueryColumn"; + public void createViewWithCTE() throws NoSuchTableException { + insertRows(10); + String viewName = "simpleViewWithCTE"; + String sql = + String.format( + "WITH max_by_data AS (SELECT max(id) as max FROM %s) " + + "SELECT max, count(1) AS count FROM max_by_data GROUP BY max", + tableName); + + sql("CREATE VIEW %s AS %s", viewName, sql); + + assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10, 1L)); + } + @Test + public void createViewWithNonExistingQueryColumn() { assertThatThrownBy( () -> sql( - "CREATE VIEW %s AS SELECT non_existing FROM %s WHERE id <= 3", - viewName, tableName)) + "CREATE VIEW viewWithNonExistingQueryColumn AS SELECT non_existing FROM %s WHERE id <= 3", + tableName)) .isInstanceOf(AnalysisException.class) .hasMessageContaining( "A column or function parameter with name `non_existing` cannot be resolved"); } + @Test + public void createViewWithSubqueryExpressionUsingTempView() { + String viewName = "viewWithSubqueryExpression"; + String tempView = "simpleTempView"; + String sql = + String.format("SELECT * FROM %s WHERE id = (SELECT id FROM %s)", tableName, tempView); + + sql("CREATE TEMPORARY VIEW %s AS SELECT id from %s WHERE id = 5", tempView, tableName); + + assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining(String.format("Cannot create the persistent object %s", viewName)) + .hasMessageContaining( + String.format("because it references to the temporary object %s", tempView)); + } + + @Test + public void createViewWithSubqueryExpressionUsingGlobalTempView() { + String viewName = "simpleViewWithSubqueryExpression"; + String globalTempView = "simpleGlobalTempView"; + String sql = + String.format( + "SELECT * FROM %s WHERE id = (SELECT id FROM global_temp.%s)", + tableName, globalTempView); + + sql( + "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id from %s WHERE id = 5", + globalTempView, tableName); + + assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining(String.format("Cannot create the persistent object %s", viewName)) + .hasMessageContaining( + String.format( + "because it references to the temporary object global_temp.%s", globalTempView)); + } + private void insertRows(int numRows) throws NoSuchTableException { List records = Lists.newArrayListWithCapacity(numRows); for (int i = 1; i <= numRows; i++) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java index 62d300b36a1d..d7a13562c408 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java @@ -112,11 +112,6 @@ public Map properties() { propsBuilder.put("provider", "iceberg"); propsBuilder.put("location", icebergView.location()); - if (icebergView.properties().containsKey(QUERY_COLUMN_NAMES)) { - String queryColumnNames = - icebergView.properties().get(QUERY_COLUMN_NAMES).replace("[", "").replace("]", ""); - propsBuilder.put(QUERY_COLUMN_NAMES, queryColumnNames); - } if (icebergView instanceof BaseView) { ViewOperations ops = ((BaseView) icebergView).operations(); From 71a00a400c277515e042afe3a30d4cb6269cd392 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Thu, 25 Jan 2024 17:15:51 +0100 Subject: [PATCH 6/7] move temp view detection --- .../sql/catalyst/analysis/CheckViews.scala | 43 ----------------- .../analysis/RewriteViewCommands.scala | 46 +++++++++++++++++++ 2 files changed, 46 insertions(+), 43 deletions(-) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala index 2d1645b95ee9..95f54ccaf724 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala @@ -20,9 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.plans.logical.View import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.ViewCatalog @@ -36,7 +34,6 @@ object CheckViews extends (LogicalPlan => Unit) { case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _, queryColumnNames, _, _, _, _, _) => verifyColumnCount(ident, columnAliases, query) - verifyTemporaryObjectsDontExist(ident, query) SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) case _ => // OK @@ -62,44 +59,4 @@ object CheckViews extends (LogicalPlan => Unit) { } } } - - /** - * Permanent views are not allowed to reference temp objects - */ - private def verifyTemporaryObjectsDontExist( - name: Identifier, - child: LogicalPlan): Unit = { - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - - val tempViews = collectTemporaryViews(child) - tempViews.foreach { nameParts => - throw new AnalysisException( - errorClass = "INVALID_TEMP_OBJ_REFERENCE", - messageParameters = Map( - "obj" -> "VIEW", - "objName" -> name.name(), - "tempObj" -> "VIEW", - "tempObjName" -> nameParts.quoted)) - } - - // TODO: check for temp function names - } - - - /** - * Collect all temporary views and return the identifiers separately - */ - private def collectTemporaryViews(child: LogicalPlan): Seq[Seq[String]] = { - def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = { - child.flatMap { - case view: View if view.isTempView => Seq(view.desc.identifier.nameParts) - case plan => plan.expressions.flatMap(_.flatMap { - case e: SubqueryExpression => collectTempViews(e.plan) - case _ => Seq.empty - }) - }.distinct - } - - collectTempViews(child) - } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala index 7b4883494d96..884f6c9f774f 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala @@ -19,15 +19,19 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.CreateView import org.apache.spark.sql.catalyst.plans.logical.DropView import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.View import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.LookupCatalog import org.apache.spark.sql.connector.catalog.ViewCatalog @@ -45,6 +49,7 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, properties, Some(queryText), query, allowExisting, replace) => + verifyTemporaryObjectsDontExist(resolved.identifier, query) CreateIcebergView(child = resolved, queryText = queryText, query = query, @@ -76,4 +81,45 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi None } } + + /** + * Permanent views are not allowed to reference temp objects + */ + private def verifyTemporaryObjectsDontExist( + name: Identifier, + child: LogicalPlan): Unit = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + val tempViews = collectTemporaryViews(child) + tempViews.foreach { nameParts => + throw new AnalysisException( + errorClass = "INVALID_TEMP_OBJ_REFERENCE", + messageParameters = Map( + "obj" -> "VIEW", + "objName" -> name.name(), + "tempObj" -> "VIEW", + "tempObjName" -> nameParts.quoted)) + } + + // TODO: check for temp function names + } + + /** + * Collect all temporary views and return the identifiers separately + */ + private def collectTemporaryViews(child: LogicalPlan): Seq[Seq[String]] = { + def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = { + child.flatMap { + case unresolved: UnresolvedRelation if isTempView(unresolved.multipartIdentifier) => + Seq(unresolved.multipartIdentifier) + case view: View if view.isTempView => Seq(view.desc.identifier.nameParts) + case plan => plan.expressions.flatMap(_.flatMap { + case e: SubqueryExpression => collectTempViews(e.plan) + case _ => Seq.empty + }) + }.distinct + } + + collectTempViews(child) + } } From f348f8b05e40bf0908120f6e1458bc2a0d171e99 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Fri, 26 Jan 2024 08:05:04 +0100 Subject: [PATCH 7/7] review feedback --- .../sql/catalyst/analysis/CheckViews.scala | 4 +- .../sql/catalyst/analysis/ResolveViews.scala | 25 +++---- .../analysis/RewriteViewCommands.scala | 5 +- .../iceberg/spark/extensions/TestViews.java | 75 ++++++++++++++++++- .../apache/iceberg/spark/SparkCatalog.java | 2 +- .../iceberg/spark/source/SparkView.java | 2 +- 6 files changed, 89 insertions(+), 24 deletions(-) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala index 95f54ccaf724..4a1736764d0d 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala @@ -32,9 +32,9 @@ object CheckViews extends (LogicalPlan => Unit) { override def apply(plan: LogicalPlan): Unit = { plan foreach { case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _, - queryColumnNames, _, _, _, _, _) => + _, _, _, _, _, _) => verifyColumnCount(ident, columnAliases, query) - SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) + SchemaUtils.checkColumnNameDuplication(query.schema.fieldNames, SQLConf.get.resolver) case _ => // OK } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index 49a6f3df2884..5616f6f70be6 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -62,31 +62,28 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look .map(_ => ResolvedV2View(catalog.asViewCatalog, ident)) .getOrElse(u) - case c@CreateIcebergView(ResolvedIdentifier(_, ident), _, query, columnAliases, columnComments, _, _, _, _, _, _) + case c@CreateIcebergView(ResolvedIdentifier(_, _), _, query, columnAliases, columnComments, _, _, _, _, _, _) if query.resolved && !c.rewritten => - val rewritten = rewriteIdentifiers(query, ident.asMultipartIdentifier) - val aliasedPlan = aliasPlan(rewritten, columnAliases, columnComments) - c.copy(query = aliasedPlan, queryColumnNames = query.schema.fieldNames, rewritten = true) + val aliased = aliasColumns(query, columnAliases, columnComments) + c.copy(query = aliased, queryColumnNames = query.schema.fieldNames, rewritten = true) } - private def aliasPlan( - analyzedPlan: LogicalPlan, + private def aliasColumns( + plan: LogicalPlan, columnAliases: Seq[String], columnComments: Seq[Option[String]]): LogicalPlan = { - if (columnAliases.isEmpty || columnAliases.length != analyzedPlan.output.length) { - analyzedPlan + if (columnAliases.isEmpty || columnAliases.length != plan.output.length) { + plan } else { - val projectList = analyzedPlan.output.zipWithIndex.map { case (_, pos) => - val column = GetColumnByOrdinal(pos, analyzedPlan.schema.fields.apply(pos).dataType) - + val projectList = plan.output.zipWithIndex.map { case (attr, pos) => if (columnComments.apply(pos).isDefined) { val meta = new MetadataBuilder().putString("comment", columnComments.apply(pos).get).build() - Alias(column, columnAliases.apply(pos))(explicitMetadata = Some(meta)) + Alias(attr, columnAliases.apply(pos))(explicitMetadata = Some(meta)) } else { - Alias(column, columnAliases.apply(pos))() + Alias(attr, columnAliases.apply(pos))() } } - Project(projectList, analyzedPlan) + Project(projectList, plan) } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala index 884f6c9f774f..066ba59394d7 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala @@ -49,10 +49,11 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, properties, Some(queryText), query, allowExisting, replace) => - verifyTemporaryObjectsDontExist(resolved.identifier, query) + val q = CTESubstitution.apply(query) + verifyTemporaryObjectsDontExist(resolved.identifier, q) CreateIcebergView(child = resolved, queryText = queryText, - query = query, + query = q, columnAliases = userSpecifiedColumns.map(_._1), columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)), comment = comment, diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index d212cc15aaa3..bf6509afee77 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -40,6 +40,7 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.iceberg.types.Types; +import org.apache.iceberg.view.View; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -989,8 +990,17 @@ public void createViewWithColumnAliases() throws NoSuchTableException { "CREATE VIEW %s (new_id COMMENT 'ID', new_data COMMENT 'DATA') AS SELECT id, data FROM %s WHERE id <= 3", viewName, tableName); - assertThat(viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName)).properties()) - .containsEntry("queryColumnNames", "id, data"); + View view = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName)); + assertThat(view.properties()).containsEntry("queryColumnNames", "id,data"); + + assertThat(view.schema().columns()).hasSize(2); + Types.NestedField first = view.schema().columns().get(0); + assertThat(first.name()).isEqualTo("new_id"); + assertThat(first.doc()).isEqualTo("ID"); + + Types.NestedField second = view.schema().columns().get(1); + assertThat(second.name()).isEqualTo("new_data"); + assertThat(second.doc()).isEqualTo("DATA"); assertThat(sql("SELECT new_id FROM %s", viewName)) .hasSize(3) @@ -1008,14 +1018,32 @@ public void createViewWithColumnAliases() throws NoSuchTableException { } @Test - public void createViewWithDuplicateQueryColumnNames() { + public void createViewWithDuplicateColumnNames() { assertThatThrownBy( () -> sql( - "CREATE VIEW viewWithDuplicateQueryColumnNames (new_id , new_data) AS SELECT id, id FROM %s WHERE id <= 3", + "CREATE VIEW viewWithDuplicateColumnNames (new_id, new_id) AS SELECT id, id FROM %s WHERE id <= 3", tableName)) .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The column `new_id` already exists"); + } + + @Test + public void createViewWithDuplicateQueryColumnNames() throws NoSuchTableException { + insertRows(3); + String viewName = "viewWithDuplicateQueryColumnNames"; + String sql = String.format("SELECT id, id FROM %s WHERE id <= 3", tableName); + + // not specifying column aliases in the view should fail + assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql)) + .isInstanceOf(AnalysisException.class) .hasMessageContaining("The column `id` already exists"); + + sql("CREATE VIEW %s (id_one, id_two) AS %s", viewName, sql); + + assertThat(sql("SELECT * FROM %s", viewName)) + .hasSize(3) + .containsExactlyInAnyOrder(row(1, 1), row(2, 2), row(3, 3)); } @Test @@ -1033,6 +1061,45 @@ public void createViewWithCTE() throws NoSuchTableException { assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10, 1L)); } + @Test + public void createViewWithConflictingNamesForCTEAndTempView() throws NoSuchTableException { + insertRows(10); + String viewName = "viewWithConflictingNamesForCTEAndTempView"; + String cteName = "cteName"; + String sql = + String.format( + "WITH %s AS (SELECT max(id) as max FROM %s) " + + "(SELECT max, count(1) AS count FROM %s GROUP BY max)", + cteName, tableName, cteName); + + // create a CTE and a TEMP VIEW with the same name + sql("CREATE TEMPORARY VIEW %s AS SELECT * from %s", cteName, tableName); + sql("CREATE VIEW %s AS %s", viewName, sql); + + // CTE should take precedence over the TEMP VIEW when data is read + assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10, 1L)); + } + + @Test + public void createViewWithCTEReferencingTempView() { + String viewName = "viewWithCTEReferencingTempView"; + String tempViewInCTE = "tempViewInCTE"; + String sql = + String.format( + "WITH max_by_data AS (SELECT max(id) as max FROM %s) " + + "SELECT max, count(1) AS count FROM max_by_data GROUP BY max", + tempViewInCTE); + + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE ID <= 5", tempViewInCTE, tableName); + + assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewName) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(tempViewInCTE); + } + @Test public void createViewWithNonExistingQueryColumn() { assertThatThrownBy( diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index db8829261472..37e7387d696d 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -572,7 +572,7 @@ public View createView( if (null != asViewCatalog) { Schema icebergSchema = SparkSchemaUtil.convert(schema); - StringJoiner joiner = new StringJoiner(", "); + StringJoiner joiner = new StringJoiner(","); Arrays.stream(queryColumnNames).forEach(joiner::add); try { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java index d7a13562c408..5391d75476ce 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java @@ -88,7 +88,7 @@ public StructType schema() { @Override public String[] queryColumnNames() { return icebergView.properties().containsKey(QUERY_COLUMN_NAMES) - ? icebergView.properties().get(QUERY_COLUMN_NAMES).split(", ") + ? icebergView.properties().get(QUERY_COLUMN_NAMES).split(",") : new String[0]; }