Skip to content

Commit

Permalink
Spark: Support creating views via SQL
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Jan 16, 2024
1 parent a420f0b commit c7410ca
Show file tree
Hide file tree
Showing 9 changed files with 377 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.analysis.ProcedureArgumentCoercion
import org.apache.spark.sql.catalyst.analysis.ResolveProcedures
import org.apache.spark.sql.catalyst.analysis.ResolveViews
import org.apache.spark.sql.catalyst.analysis.ViewCheck
import org.apache.spark.sql.catalyst.optimizer.ReplaceStaticInvoke
import org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser
import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy
Expand All @@ -37,6 +38,7 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
extensions.injectResolutionRule { spark => ResolveProcedures(spark) }
extensions.injectResolutionRule { spark => ResolveViews(spark) }
extensions.injectResolutionRule { _ => ProcedureArgumentCoercion }
extensions.injectCheckRule(_ => ViewCheck)

// optimizer extensions
extensions.injectOptimizerRule { _ => ReplaceStaticInvoke }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.CreateView
import org.apache.spark.sql.catalyst.plans.logical.DropView
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
Expand All @@ -40,6 +42,11 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case DropView(ResolvedView(resolved), ifExists) =>
DropIcebergView(resolved, ifExists)

case CreateView(r@ResolvedView(_), userSpecifiedColumns, comment, properties,
originalText, query, allowExisting, replace) =>
CreateIcebergView(r, userSpecifiedColumns, comment, properties, originalText,
query, allowExisting, replace)
}

private def isTempView(nameParts: Seq[String]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.iceberg.spark.Spark3Util
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.execution.command.ViewHelper

object ViewCheck extends (LogicalPlan => Unit) {

override def apply(plan: LogicalPlan): Unit = {
plan foreach {
case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, _, _, _, query, _, _) =>
val identifier = Spark3Util.toV1TableIdentifier(ident)
ViewHelper.verifyTemporaryObjectsNotExists(false, identifier, query, Seq.empty)
ViewHelper.verifyAutoGeneratedAliasesNotExists(query, false, identifier)
case _ => // OK
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical.views

import org.apache.spark.sql.catalyst.plans.logical.BinaryCommand
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

case class CreateIcebergView(
child: LogicalPlan,
userSpecifiedColumns: Seq[(String, Option[String])],
comment: Option[String],
properties: Map[String, String],
originalText: Option[String],
query: LogicalPlan,
allowExisting: Boolean,
replace: Boolean) extends BinaryCommand {
override def left: LogicalPlan = child

override def right: LogicalPlan = query

override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
copy(child = newLeft, query = newRight)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.iceberg.spark.Spark3Util
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.CommandExecutionMode
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.sql.util.SchemaUtils
import scala.collection.JavaConverters._


case class CreateV2ViewExec(
catalog: ViewCatalog,
ident: Identifier,
originalText: String,
query: LogicalPlan,
userSpecifiedColumns: Seq[(String, Option[String])],
comment: Option[String],
properties: Map[String, String],
allowExisting: Boolean,
replace: Boolean) extends LeafV2CommandExec {

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed

val identifier = Spark3Util.toV1TableIdentifier(ident)

if (userSpecifiedColumns.nonEmpty) {
if (userSpecifiedColumns.length > analyzedPlan.output.length) {
throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError(
identifier, userSpecifiedColumns.map(_._1), analyzedPlan)
} else if (userSpecifiedColumns.length < analyzedPlan.output.length) {
throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError(
identifier, userSpecifiedColumns.map(_._1), analyzedPlan)
}
}

val queryColumnNames = analyzedPlan.schema.fieldNames
SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver)

val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema
val columnAliases = userSpecifiedColumns.map(_._1).toArray
val columnComments = userSpecifiedColumns.map(_._2.getOrElse("")).toArray

val currentCatalog = session.sessionState.catalogManager.currentCatalog.name
val currentNamespace = session.sessionState.catalogManager.currentNamespace

val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION
val createEngineVersion = Some(engineVersion)
val newProperties = properties ++
comment.map(ViewCatalog.PROP_COMMENT -> _) ++
createEngineVersion.map(ViewCatalog.PROP_CREATE_ENGINE_VERSION -> _) +
(ViewCatalog.PROP_ENGINE_VERSION -> engineVersion)

if (replace) {
// CREATE OR REPLACE VIEW
if (catalog.viewExists(ident)) {
catalog.dropView(ident)
}
// FIXME: replaceView API doesn't exist in Spark 3.5
catalog.createView(
ident,
originalText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames,
columnAliases,
columnComments,
newProperties.asJava)
} else {
try {
// CREATE VIEW [IF NOT EXISTS]
catalog.createView(
ident,
originalText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames,
columnAliases,
columnComments,
newProperties.asJava)
} catch {
case _: ViewAlreadyExistsException if allowExisting => // Ignore
}
}

Nil
}

override def simpleString(maxFields: Int): String = {
s"CreateV2ViewExec: ${ident}"
}

/**
* If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns,
* else return the analyzed plan directly.
*/
private def aliasPlan(
analyzedPlan: LogicalPlan,
userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = {
if (userSpecifiedColumns.isEmpty) {
analyzedPlan
} else {
val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
case (attr, (colName, None)) => Alias(attr, colName)()
case (attr, (colName, Some(colComment))) =>
val meta = new MetadataBuilder().putString("comment", colComment).build()
Alias(attr, colName)(explicitMetadata = Some(meta))
}
val projectedPlan = Project(projectList, analyzedPlan)
session.sessionState.executePlan(projectedPlan, CommandExecutionMode.SKIP).analyzed
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.plans.logical.RenameTable
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
import org.apache.spark.sql.connector.catalog.Identifier
Expand Down Expand Up @@ -107,6 +108,19 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) =>
DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil

case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), userSpecifiedColumns, comment,
properties, Some(originalText), query, allowExisting, replace) =>
CreateV2ViewExec(
catalog = viewCatalog,
ident = ident,
originalText = originalText,
query = query,
userSpecifiedColumns = userSpecifiedColumns,
comment = comment,
properties = properties,
allowExisting = allowExisting,
replace = replace) :: Nil

case _ => Nil
}

Expand Down
Loading

0 comments on commit c7410ca

Please sign in to comment.