Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #5447][AUTHZ] Support Hudi DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand #5482

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
# limitations under the License.
#

org.apache.kyuubi.plugin.spark.authz.serde.HudiMergeIntoSourceTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.LogicalPlanOptionQueryExtractor
org.apache.kyuubi.plugin.spark.authz.serde.LogicalPlanQueryExtractor
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableOptionTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ExpressionSeqTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HudiDataSourceV2RelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HudiMergeIntoTargetTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.IdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.LogicalRelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1604,6 +1604,27 @@
} ],
"opType" : "CREATETABLE",
"queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.DeleteHoodieTableCommand",
"tableDescs" : [ {
"fieldName" : "dft",
"fieldExtractor" : "HudiDataSourceV2RelationTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ {
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.DropHoodieTableCommand",
"tableDescs" : [ {
Expand Down Expand Up @@ -1643,6 +1664,27 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand",
"tableDescs" : [ {
"fieldName" : "mergeInto",
"fieldExtractor" : "HudiMergeIntoTargetTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ {
"fieldName" : "mergeInto",
"fieldExtractor" : "HudiMergeIntoSourceTableExtractor"
} ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.RepairHoodieTableCommand",
"tableDescs" : [ {
Expand Down Expand Up @@ -1705,4 +1747,25 @@
} ],
"opType" : "TRUNCATETABLE",
"queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand",
"tableDescs" : [ {
"fieldName" : "ut",
"fieldExtractor" : "HudiDataSourceV2RelationTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ {
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.kyuubi.plugin.spark.authz.serde

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs

trait QueryExtractor extends (AnyRef => Option[LogicalPlan]) with Extractor

object QueryExtractor {
Expand All @@ -44,3 +46,9 @@ class LogicalPlanOptionQueryExtractor extends QueryExtractor {
v1.asInstanceOf[Option[LogicalPlan]]
}
}

class HudiMergeIntoSourceTableExtractor extends QueryExtractor {
override def apply(v1: AnyRef): Option[LogicalPlan] = {
new LogicalPlanQueryExtractor().apply(invokeAs[LogicalPlan](v1, "sourceTable"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}

import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
import org.apache.kyuubi.util.reflect.ReflectUtils._
Expand Down Expand Up @@ -80,7 +80,9 @@ class TableIdentifierTableExtractor extends TableExtractor {
val catalogTable = spark.sessionState.catalog.getTableMetadata(identifier)
Option(catalogTable.owner).filter(_.nonEmpty)
} catch {
case _: Exception => None
case e: Exception =>
e.printStackTrace()
None
}
Some(Table(None, identifier.database, identifier.table, owner))
}
Expand Down Expand Up @@ -240,3 +242,29 @@ class TableTableExtractor extends TableExtractor {
lookupExtractor[StringTableExtractor].apply(spark, tableName)
}
}

class HudiDataSourceV2RelationTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
invokeAs[LogicalPlan](v1, "table") match {
// Match multipartIdentifier with tableAlias
case SubqueryAlias(_, SubqueryAlias(identifier, _)) =>
new StringTableExtractor().apply(spark, identifier.toString())
// Match multipartIdentifier without tableAlias
case SubqueryAlias(identifier, _) =>
new StringTableExtractor().apply(spark, identifier.toString())
}
}
}

class HudiMergeIntoTargetTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
invokeAs[LogicalPlan](v1, "targetTable") match {
// Match multipartIdentifier with tableAlias
case SubqueryAlias(_, SubqueryAlias(identifier, relation)) =>
new StringTableExtractor().apply(spark, identifier.toString())
// Match multipartIdentifier without tableAlias
case SubqueryAlias(identifier, _) =>
new StringTableExtractor().apply(spark, identifier.toString())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kyuubi.plugin.spark.authz.gen

import org.apache.kyuubi.plugin.spark.authz.OperationType._
import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
import org.apache.kyuubi.plugin.spark.authz.serde._
import org.apache.kyuubi.plugin.spark.authz.serde.TableType._

Expand Down Expand Up @@ -165,6 +166,40 @@ object HudiCommands {
TableCommandSpec(cmd, Seq(tableDesc), SHOWPARTITIONS)
}

val DeleteHoodieTableCommand = {
val cmd = "org.apache.spark.sql.hudi.command.DeleteHoodieTableCommand"
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
val tableDesc =
TableDesc(
"dft",
classOf[HudiDataSourceV2RelationTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(QueryDesc("query")))
}

val UpdateHoodieTableCommand = {
val cmd = "org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand"
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
val tableDesc =
TableDesc(
"ut",
classOf[HudiDataSourceV2RelationTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(QueryDesc("query")))
}

val MergeIntoHoodieTableCommand = {
val cmd = "org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand"
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
val tableDesc =
TableDesc(
"mergeInto",
classOf[HudiMergeIntoTargetTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
val queryDescs = QueryDesc("mergeInto", classOf[HudiMergeIntoSourceTableExtractor])
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDescs))
}

val data: Array[TableCommandSpec] = Array(
AlterHoodieTableAddColumnsCommand,
AlterHoodieTableChangeColumnCommand,
Expand All @@ -176,10 +211,13 @@ object HudiCommands {
CreateHoodieTableLikeCommand,
CompactionHoodieTableCommand,
CompactionShowHoodieTableCommand,
DeleteHoodieTableCommand,
DropHoodieTableCommand,
InsertIntoHoodieTableCommand,
MergeIntoHoodieTableCommand,
RepairHoodieTableCommand,
TruncateHoodieTableCommand,
ShowHoodieTablePartitionsCommand,
Spark31AlterTableCommand)
Spark31AlterTableCommand,
UpdateHoodieTableCommand)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.kyuubi.util.AssertionUtils.interceptContains
*/
@HudiTest
class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
override protected val catalogImpl: String = "hive"
override protected val catalogImpl: String = "in-memory"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we have to change this?

Copy link
Contributor Author

@AngersZhuuuu AngersZhuuuu Oct 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we have to change this?

To trigger a positive case, if not change this, will trigger some hive hms-related derby error and can' run. I have checked that this changing to in-memory the plan is also Hudi's command.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

// TODO: Apache Hudi not support Spark 3.5 and Scala 2.13 yet,
// should change after Apache Hudi support Spark 3.5 and Scala 2.13.
private def isSupportedVersion = !isSparkV35OrGreater && !isScalaV213
Expand Down Expand Up @@ -407,4 +407,69 @@ class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
}
}
}

test("DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand") {
withSingleCallEnabled {
withCleanTmpResources(Seq(
(s"$namespace1.$table1", "table"),
(s"$namespace1.$table2", "table"),
(namespace1, "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(
admin,
sql(
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table1(id int, name string, city string)
|USING HUDI
|OPTIONS (
| type = 'cow',
| primaryKey = 'id',
| 'hoodie.datasource.hive_sync.enable' = 'false'
|)
|PARTITIONED BY(city)
|""".stripMargin))

doAs(
admin,
sql(
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table2(id int, name string, city string)
|USING HUDI
|OPTIONS (
| type = 'cow',
| primaryKey = 'id',
| 'hoodie.datasource.hive_sync.enable' = 'false'
|)
|PARTITIONED BY(city)
|""".stripMargin))

val deleteFrom = s"DELETE FROM $namespace1.$table1 WHERE id = 10"
interceptContains[AccessControlException] {
doAs(someone, sql(deleteFrom))
}(s"does not have [update] privilege on [$namespace1/$table1]")
doAs(admin, sql(deleteFrom))

val updateSql = s"UPDATE $namespace1.$table1 SET name = 'test' WHERE id > 10"
interceptContains[AccessControlException] {
doAs(someone, sql(updateSql))
}(s"does not have [update] privilege on [$namespace1/$table1]")
doAs(admin, sql(updateSql))

val mergeIntoSQL =
s"""
|MERGE INTO $namespace1.$table1 target
|USING $namespace1.$table2 source
|ON target.id = source.id
|WHEN MATCHED
|AND target.name == 'test'
| THEN UPDATE SET id = source.id, name = source.name, city = source.city
|""".stripMargin
interceptContains[AccessControlException] {
doAs(someone, sql(mergeIntoSQL))
}(s"does not have [select] privilege on " +
s"[$namespace1/$table2/id,$namespace1/$table2/name,$namespace1/$table2/city]")
doAs(admin, sql(mergeIntoSQL))
}
AngersZhuuuu marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Loading