Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Support creating views via SQL #9423

Merged
merged 7 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.spark.extensions

import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.analysis.CheckViews
import org.apache.spark.sql.catalyst.analysis.ProcedureArgumentCoercion
import org.apache.spark.sql.catalyst.analysis.ResolveProcedures
import org.apache.spark.sql.catalyst.analysis.ResolveViews
Expand All @@ -37,6 +38,7 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
extensions.injectResolutionRule { spark => ResolveProcedures(spark) }
extensions.injectResolutionRule { spark => ResolveViews(spark) }
extensions.injectResolutionRule { _ => ProcedureArgumentCoercion }
extensions.injectCheckRule(_ => CheckViews)

// optimizer extensions
extensions.injectOptimizerRule { _ => ReplaceStaticInvoke }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.SchemaUtils

object CheckViews extends (LogicalPlan => Unit) {

override def apply(plan: LogicalPlan): Unit = {
plan foreach {
case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _,
_, _, _, _, _, _) =>
verifyColumnCount(ident, columnAliases, query)
SchemaUtils.checkColumnNameDuplication(query.schema.fieldNames, SQLConf.get.resolver)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.


case _ => // OK
}
}

private def verifyColumnCount(ident: Identifier, columns: Seq[String], query: LogicalPlan): Unit = {
if (columns.nonEmpty) {
if (columns.length > query.output.length) {
throw new AnalysisException(
errorClass = "CREATE_VIEW_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
messageParameters = Map(
"viewName" -> ident.toString,
"viewColumns" -> columns.mkString(", "),
"dataColumns" -> query.output.map(c => c.name).mkString(", ")))
} else if (columns.length < query.output.length) {
throw new AnalysisException(
errorClass = "CREATE_VIEW_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
messageParameters = Map(
"viewName" -> ident.toString,
"viewColumns" -> columns.mkString(", "),
"dataColumns" -> query.output.map(c => c.name).mkString(", ")))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
Expand All @@ -38,6 +39,7 @@ import org.apache.spark.sql.connector.catalog.LookupCatalog
import org.apache.spark.sql.connector.catalog.View
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.MetadataBuilder

case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog {

Expand All @@ -59,6 +61,30 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
loadView(catalog, ident)
.map(_ => ResolvedV2View(catalog.asViewCatalog, ident))
.getOrElse(u)

case c@CreateIcebergView(ResolvedIdentifier(_, _), _, query, columnAliases, columnComments, _, _, _, _, _, _)
if query.resolved && !c.rewritten =>
val aliased = aliasColumns(query, columnAliases, columnComments)
c.copy(query = aliased, queryColumnNames = query.schema.fieldNames, rewritten = true)
}

private def aliasColumns(
plan: LogicalPlan,
columnAliases: Seq[String],
columnComments: Seq[Option[String]]): LogicalPlan = {
if (columnAliases.isEmpty || columnAliases.length != plan.output.length) {
plan
} else {
val projectList = plan.output.zipWithIndex.map { case (attr, pos) =>
if (columnComments.apply(pos).isDefined) {
val meta = new MetadataBuilder().putString("comment", columnComments.apply(pos).get).build()
Alias(attr, columnAliases.apply(pos))(explicitMetadata = Some(meta))
} else {
Alias(attr, columnAliases.apply(pos))()
}
}
Project(projectList, plan)
}
}

def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match {
Expand Down Expand Up @@ -151,7 +177,7 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
}


implicit class ViewHelper(plugin: CatalogPlugin) {
implicit class IcebergViewHelper(plugin: CatalogPlugin) {
nastra marked this conversation as resolved.
Show resolved Hide resolved
def asViewCatalog: ViewCatalog = plugin match {
case viewCatalog: ViewCatalog =>
viewCatalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.CreateView
import org.apache.spark.sql.catalyst.plans.logical.DropView
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.View
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.CatalogPlugin
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.LookupCatalog
import org.apache.spark.sql.connector.catalog.ViewCatalog

Expand All @@ -40,6 +46,20 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case DropView(ResolvedView(resolved), ifExists) =>
DropIcebergView(resolved, ifExists)

case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, properties,
Some(queryText), query, allowExisting, replace) =>
val q = CTESubstitution.apply(query)
verifyTemporaryObjectsDontExist(resolved.identifier, q)
CreateIcebergView(child = resolved,
queryText = queryText,
query = q,
columnAliases = userSpecifiedColumns.map(_._1),
columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)),
comment = comment,
properties = properties,
allowExisting = allowExisting,
replace = replace)
}

private def isTempView(nameParts: Seq[String]): Boolean = {
Expand All @@ -62,4 +82,45 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi
None
}
}

/**
* Permanent views are not allowed to reference temp objects
*/
private def verifyTemporaryObjectsDontExist(
name: Identifier,
child: LogicalPlan): Unit = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

val tempViews = collectTemporaryViews(child)
tempViews.foreach { nameParts =>
throw new AnalysisException(
errorClass = "INVALID_TEMP_OBJ_REFERENCE",
messageParameters = Map(
"obj" -> "VIEW",
"objName" -> name.name(),
"tempObj" -> "VIEW",
"tempObjName" -> nameParts.quoted))
}

// TODO: check for temp function names
}

/**
* Collect all temporary views and return the identifiers separately
*/
private def collectTemporaryViews(child: LogicalPlan): Seq[Seq[String]] = {
def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = {
child.flatMap {
case unresolved: UnresolvedRelation if isTempView(unresolved.multipartIdentifier) =>
Seq(unresolved.multipartIdentifier)
case view: View if view.isTempView => Seq(view.desc.identifier.nameParts)
case plan => plan.expressions.flatMap(_.flatMap {
case e: SubqueryExpression => collectTempViews(e.plan)
case _ => Seq.empty
})
}.distinct
}

collectTempViews(child)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical.views

import org.apache.spark.sql.catalyst.plans.logical.BinaryCommand
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

case class CreateIcebergView(
rdblue marked this conversation as resolved.
Show resolved Hide resolved
child: LogicalPlan,
queryText: String,
query: LogicalPlan,
columnAliases: Seq[String],
columnComments: Seq[Option[String]],
queryColumnNames: Seq[String] = Seq.empty,
comment: Option[String],
properties: Map[String, String],
allowExisting: Boolean,
replace: Boolean,
rewritten: Boolean = false) extends BinaryCommand {
override def left: LogicalPlan = child

override def right: LogicalPlan = query

override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
copy(child = newLeft, query = newRight)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._


case class CreateV2ViewExec(
nastra marked this conversation as resolved.
Show resolved Hide resolved
catalog: ViewCatalog,
ident: Identifier,
queryText: String,
viewSchema: StructType,
columnAliases: Seq[String],
columnComments: Seq[Option[String]],
queryColumnNames: Seq[String],
comment: Option[String],
properties: Map[String, String],
allowExisting: Boolean,
replace: Boolean) extends LeafV2CommandExec {

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name
val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null
val currentNamespace = session.sessionState.catalogManager.currentNamespace

val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION
val newProperties = properties ++
comment.map(ViewCatalog.PROP_COMMENT -> _) +
(ViewCatalog.PROP_CREATE_ENGINE_VERSION -> engineVersion,
ViewCatalog.PROP_ENGINE_VERSION -> engineVersion)

if (replace) {
// CREATE OR REPLACE VIEW
if (catalog.viewExists(ident)) {
catalog.dropView(ident)
}
// FIXME: replaceView API doesn't exist in Spark 3.5
catalog.createView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
} else {
try {
// CREATE VIEW [IF NOT EXISTS]
catalog.createView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
} catch {
case _: ViewAlreadyExistsException if allowExisting => // Ignore
}
}

Nil
}

override def simpleString(maxFields: Int): String = {
s"CreateV2ViewExec: ${ident}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.plans.logical.RenameTable
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
import org.apache.spark.sql.connector.catalog.Identifier
Expand Down Expand Up @@ -107,6 +108,21 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) =>
DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil

case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), queryText, query,
columnAliases, columnComments, queryColumnNames, comment, properties, allowExisting, replace, _) =>
CreateV2ViewExec(
catalog = viewCatalog,
ident = ident,
queryText = queryText,
columnAliases = columnAliases,
columnComments = columnComments,
queryColumnNames = queryColumnNames,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd also be fine with not tracking this and passing an empty array through. That's not correct for Spark, but it would work for us since Iceberg doesn't store these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically we don't need to track it in this PR but we'll eventually need it when we show the properties of views and such. I'd probably keep it here but let me know if you'd like me to remove it here and introduce it in an upcoming PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems reasonable to include it. Let's just leave it as-is since you even have a test for it.

viewSchema = query.schema,
comment = comment,
properties = properties,
allowExisting = allowExisting,
replace = replace) :: Nil

case _ => Nil
}

Expand Down
Loading