From a420f0bd5b0d711fbe83ebbcb5f5bab85f6a231c Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Fri, 5 Jan 2024 12:14:28 +0100 Subject: [PATCH] Spark: Support dropping Views --- .../sql/catalyst/analysis/ResolveViews.scala | 8 +- .../analysis/RewriteViewCommands.scala | 65 +++++++++++ .../IcebergSparkSqlExtensionsParser.scala | 3 +- .../plans/logical/views/DropIcebergView.scala | 30 ++++++ .../datasources/v2/DropV2ViewExec.scala | 48 +++++++++ .../v2/ExtendedDataSourceV2Strategy.scala | 5 + .../iceberg/spark/extensions/TestViews.java | 102 ++++++++++++++++++ .../apache/iceberg/spark/SparkCatalog.java | 7 +- 8 files changed, 261 insertions(+), 7 deletions(-) create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/DropIcebergView.scala create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index 19522d939b59..a1b826569f35 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -117,8 +117,8 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look } private def qualifyFunctionIdentifiers( - plan: LogicalPlan, - catalogAndNamespace: Seq[String]): LogicalPlan = plan transformExpressions { + plan: LogicalPlan, + catalogAndNamespace: Seq[String]): LogicalPlan = plan transformExpressions { case u@UnresolvedFunction(Seq(name), _, _, _, _) => if (!isBuiltinFunction(name)) { u.copy(nameParts = catalogAndNamespace :+ name) @@ -143,11 +143,11 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look } private def isCatalog(name: String): Boolean = { - spark.sessionState.catalogManager.isCatalogRegistered(name) + catalogManager.isCatalogRegistered(name) } private def isBuiltinFunction(name: String): Boolean = { - spark.sessionState.catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name)) + catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name)) } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala new file mode 100644 index 000000000000..2b35db33c0c5 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.DropView +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.sql.connector.catalog.LookupCatalog +import org.apache.spark.sql.connector.catalog.ViewCatalog + +/** + * ResolveSessionCatalog exits early for some v2 View commands, + * thus they are pre-substituted here and then handled in ResolveViews + */ +case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog { + + protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case DropView(ResolvedView(resolved), ifExists) => + DropIcebergView(resolved, ifExists) + } + + private def isTempView(nameParts: Seq[String]): Boolean = { + catalogManager.v1SessionCatalog.isTempView(nameParts) + } + + private def isViewCatalog(catalog: CatalogPlugin): Boolean = { + catalog.isInstanceOf[ViewCatalog] + } + + object ResolvedView { + def unapply(unresolved: UnresolvedIdentifier): Option[ResolvedIdentifier] = unresolved match { + case UnresolvedIdentifier(nameParts, true) if isTempView(nameParts) => + None + + case UnresolvedIdentifier(CatalogAndIdentifier(catalog, ident), _) if isViewCatalog(catalog) => + Some(ResolvedIdentifier(catalog, ident)) + + case _ => + None + } + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 4fe0b5f283e5..02bd59366c13 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.analysis.RewriteViewCommands import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.ParserInterface @@ -122,7 +123,7 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI if (isIcebergCommand(sqlTextAfterSubstitution)) { parse(sqlTextAfterSubstitution) { parser => astBuilder.visit(parser.singleStatement()) }.asInstanceOf[LogicalPlan] } else { - delegate.parsePlan(sqlText) + RewriteViewCommands(SparkSession.active).apply(delegate.parsePlan(sqlText)) } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/DropIcebergView.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/DropIcebergView.scala new file mode 100644 index 000000000000..275dba6fbf5e --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/DropIcebergView.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.UnaryCommand + +case class DropIcebergView( + child: LogicalPlan, + ifExists: Boolean) extends UnaryCommand { + override protected def withNewChildInternal(newChild: LogicalPlan): DropIcebergView = + copy(child = newChild) +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala new file mode 100644 index 000000000000..c35af1486fc7 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog + + +case class DropV2ViewExec( + catalog: ViewCatalog, + ident: Identifier, + ifExists: Boolean) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + val dropped = catalog.dropView(ident) + if (!dropped && !ifExists) { + throw new NoSuchViewException(ident) + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"DropV2View: ${ident}" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 05307d8bf3a1..44157fc3823c 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.expressions.PredicateHelper @@ -43,6 +44,7 @@ import org.apache.spark.sql.catalyst.plans.logical.RenameTable import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering +import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableCatalog @@ -102,6 +104,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi } RenameV2ViewExec(oldCatalog, oldIdent, newIdent.identifier()) :: Nil + case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) => + DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil + case _ => Nil } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index 9b179a6d1151..0eb8c96b3cc5 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.extensions; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; @@ -43,6 +44,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.catalog.SessionCatalog; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -780,6 +782,106 @@ public void renameViewTargetAlreadyExistsAsTable() { String.format("Cannot create view default.%s because it already exists", target)); } + @Test + public void dropView() { + String viewName = "viewToBeDropped"; + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + TableIdentifier identifier = TableIdentifier.of(NAMESPACE, viewName); + viewCatalog + .buildView(identifier) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + assertThat(viewCatalog.viewExists(identifier)).isTrue(); + + sql("DROP VIEW %s", viewName); + assertThat(viewCatalog.viewExists(identifier)).isFalse(); + } + + @Test + public void dropNonExistingView() { + assertThatThrownBy(() -> sql("DROP VIEW non_existing")) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The view %s.%s cannot be found", NAMESPACE, "non_existing"); + } + + @Test + public void dropViewIfExists() { + String viewName = "viewToBeDropped"; + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + TableIdentifier identifier = TableIdentifier.of(NAMESPACE, viewName); + viewCatalog + .buildView(identifier) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + assertThat(viewCatalog.viewExists(identifier)).isTrue(); + + sql("DROP VIEW IF EXISTS %s", viewName); + assertThat(viewCatalog.viewExists(identifier)).isFalse(); + + assertThatNoException().isThrownBy(() -> sql("DROP VIEW IF EXISTS %s", viewName)); + } + + /** The purpose of this test is mainly to make sure that normal view deletion isn't messed up */ + @Test + public void dropGlobalTempView() { + String globalTempView = "globalViewToBeDropped"; + sql("CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s", globalTempView, tableName); + assertThat(v1SessionCatalog().getGlobalTempView(globalTempView).isDefined()).isTrue(); + + sql("DROP VIEW global_temp.%s", globalTempView); + assertThat(v1SessionCatalog().getGlobalTempView(globalTempView).isDefined()).isFalse(); + } + + /** The purpose of this test is mainly to make sure that normal view deletion isn't messed up */ + @Test + public void dropTempView() { + String tempView = "tempViewToBeDropped"; + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s", tempView, tableName); + assertThat(v1SessionCatalog().getTempView(tempView).isDefined()).isTrue(); + + sql("DROP VIEW %s", tempView); + assertThat(v1SessionCatalog().getTempView(tempView).isDefined()).isFalse(); + } + + /** The purpose of this test is mainly to make sure that normal view deletion isn't messed up */ + @Test + public void dropV1View() { + String v1View = "v1ViewToBeDropped"; + sql("USE spark_catalog"); + sql("CREATE NAMESPACE IF NOT EXISTS %s", NAMESPACE); + sql("CREATE TABLE %s (id INT, data STRING)", tableName); + sql("CREATE VIEW %s AS SELECT id FROM %s", v1View, tableName); + sql("USE %s", catalogName); + assertThat( + v1SessionCatalog() + .tableExists(new org.apache.spark.sql.catalyst.TableIdentifier(v1View))) + .isTrue(); + + sql("DROP VIEW spark_catalog.%s.%s", NAMESPACE, v1View); + assertThat( + v1SessionCatalog() + .tableExists(new org.apache.spark.sql.catalyst.TableIdentifier(v1View))) + .isFalse(); + } + + private SessionCatalog v1SessionCatalog() { + return spark.sessionState().catalogManager().v1SessionCatalog(); + } + private String viewName(String viewName) { return viewName + new Random().nextInt(1000000); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 82665194dfa3..a62f6bb95aaa 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -581,8 +581,11 @@ public View alterView(Identifier ident, ViewChange... changes) @Override public boolean dropView(Identifier ident) { - throw new UnsupportedOperationException( - "Dropping a view is not supported by catalog: " + catalogName); + if (null != asViewCatalog) { + return asViewCatalog.dropView(buildIdentifier(ident)); + } + + return false; } @Override