Skip to content

Commit

Permalink
Merge branch 'branch-1.8.0' into 'branch-1.8.0'
Browse files Browse the repository at this point in the history
Add ReplaceIcebergTableParser Rule

See merge request !42
  • Loading branch information
wForget committed Nov 16, 2023
2 parents 2b1a093 + dc742bf commit f6a411a
Show file tree
Hide file tree
Showing 5 changed files with 385 additions and 0 deletions.
6 changes: 6 additions & 0 deletions extensions/spark/kyuubi-extension-spark-3-5/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-${spark.binary.version}_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<!--
Spark requires `commons-collections` and `commons-io` but got them from transitive
dependencies of `hadoop-client`. As we are using Hadoop Shaded Client, we need add
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,4 +288,10 @@ object KyuubiSQLConf {
.booleanConf
.createWithDefault(false)

val ICEBERG_TABLE_IMPLICIT_CONVERSION =
buildConf("spark.sql.qiyi.optimizer.icebergTableImplicitConversion")
.doc("Implicit conversion of iceberg table to automatically add `iceberg_catalog`.")
.version("1.5.0")
.booleanConf
.createWithDefault(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kyuubi.sql

import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.parser.ReplaceIcebergTableParser

import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource33, InsertZorderBeforeWritingHive33, ResolveZorder}

Expand All @@ -29,6 +30,8 @@ class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {

object KyuubiSparkSQLCommonExtension {
def injectCommonExtensions(extensions: SparkSessionExtensions): Unit = {
extensions.injectParser((spark, parser) => new ReplaceIcebergTableParser(spark, parser))

// inject zorder parser and related rules
extensions.injectParser { case (_, parser) => new SparkKyuubiSparkSQLParser(parser) }
extensions.injectResolutionRule(ResolveZorder)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.parser

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedIdentifier, UnresolvedRelation, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.ReplaceIcebergTableParser.ICEBERG_CATALOG
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, CreateTableAsSelect, CreateView, InsertIntoStatement, LogicalPlan, MergeIntoTable, SubqueryAlias, UnresolvedWith}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, LookupCatalog, TableCatalog}
import org.apache.spark.sql.types.{DataType, StructType}

import org.apache.kyuubi.sql.KyuubiSQLConf.ICEBERG_TABLE_IMPLICIT_CONVERSION

class ReplaceIcebergTableParser(sparkSession: SparkSession, delegate: ParserInterface)
extends ParserInterface with LookupCatalog with SQLConfHelper with Logging {

/** Creates LogicalPlan for a given SQL string. */
override def parsePlan(sqlText: String): LogicalPlan = {
val oldPlan = delegate.parsePlan(sqlText)
if (conf.getConf(ICEBERG_TABLE_IMPLICIT_CONVERSION)) {
val newPlan = resolvePlan(oldPlan)
newPlan
} else {
oldPlan
}
}

private def resolvePlan(plan: LogicalPlan): LogicalPlan = {
plan resolveOperators {
// insert into/overwrite
case i @ InsertIntoStatement(tableIdent: UnresolvedRelation, _, _, _, _, _, _) =>
i.copy(table = tableIdent
.copy(multipartIdentifier = replaceIcebergTable(tableIdent.multipartIdentifier)))
// CTE
case i @ UnresolvedWith(_, cteRelations) =>
i.copy(cteRelations =
cteRelations.map(r => (r._1, resolvePlan(r._2).asInstanceOf[SubqueryAlias])))
// unresolved table
case i @ UnresolvedRelation(multipartIdentifier, _, _) =>
i.copy(multipartIdentifier = replaceIcebergTable(multipartIdentifier))
case i @ UnresolvedTableOrView(multipartIdentifier, _, _) =>
i.copy(multipartIdentifier = replaceIcebergTable(multipartIdentifier))
// create table
case i @ CreateTable(UnresolvedIdentifier(nameParts, false), _, _, tableSpec, _)
if tableSpec.provider.isDefined && tableSpec.provider.get.toLowerCase.equals(
"iceberg") =>
val newNameParts = nameParts match {
case Seq(table) => Seq(ICEBERG_CATALOG, catalogManager.currentNamespace.last, table)
case Seq(db, table) => Seq(ICEBERG_CATALOG, db, table)
case _ => nameParts
}
i.copy(name = UnresolvedIdentifier(newNameParts))
case i @ CreateTableAsSelect(
UnresolvedIdentifier(nameParts, false),
_,
_,
tableSpec,
_,
_,
_)
if tableSpec.provider.isDefined && tableSpec.provider.get.toLowerCase.equals(
"iceberg") =>
val newNameParts = nameParts match {
case Seq(table) => Seq(ICEBERG_CATALOG, catalogManager.currentNamespace.last, table)
case Seq(db, table) => Seq(ICEBERG_CATALOG, db, table)
case _ => nameParts
}
i.copy(name = UnresolvedIdentifier(newNameParts))
case i @ CreateView(_, _, _, _, _, query, _, _) =>
i.copy(query = resolvePlan(query))
// merge into
case i @ MergeIntoTable(targetTable, sourceTable, _, _, _, _) =>
i.copy(targetTable = resolvePlan(targetTable), sourceTable = resolvePlan(sourceTable))
}
}

override protected lazy val catalogManager: CatalogManager =
sparkSession.sessionState.catalogManager

private def replaceIcebergTable(multipartIdentifier: Seq[String]): Seq[String] = {
multipartIdentifier match {
case SessionCatalogAndIdentifier(_, ident) =>
try {
val isIcebergTable = catalogManager.catalog(ICEBERG_CATALOG) match {
case tableCatalog: TableCatalog =>
tableCatalog.tableExists(ident)
case _ => false
}
if (isIcebergTable) {
val newIdentifier = Seq(ICEBERG_CATALOG, ident.namespace().head, ident.name())
logInfo(s"replace iceberg table identifier, origin: $multipartIdentifier, " +
s"replaced: $newIdentifier")
newIdentifier
} else {
multipartIdentifier
}
} catch {
case _: CatalogNotFoundException =>
multipartIdentifier
}
case _ => multipartIdentifier
}
}

override def parseExpression(sqlText: String): Expression =
delegate.parseExpression(sqlText)

override def parseTableIdentifier(sqlText: String): TableIdentifier =
delegate.parseTableIdentifier(sqlText)

override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier =
delegate.parseFunctionIdentifier(sqlText)

override def parseMultipartIdentifier(sqlText: String): Seq[String] =
delegate.parseMultipartIdentifier(sqlText)

override def parseTableSchema(sqlText: String): StructType =
delegate.parseTableSchema(sqlText)

override def parseDataType(sqlText: String): DataType =
delegate.parseDataType(sqlText)

override def parseQuery(sqlText: String): LogicalPlan = {
val oldPlan = delegate.parseQuery(sqlText)
if (conf.getConf(ICEBERG_TABLE_IMPLICIT_CONVERSION)) {
val newPlan = resolvePlan(oldPlan)
newPlan
} else {
oldPlan
}
}
}

object ReplaceIcebergTableParser {

val ICEBERG_CATALOG = "iceberg_catalog"

}
Loading

0 comments on commit f6a411a

Please sign in to comment.