Skip to content

Commit

Permalink
[KYUUBI #5447][AUTHZ] Support Hudi DeleteHoodieTableCommand/UpdateHoo…
Browse files Browse the repository at this point in the history
…dieTableCommand/MergeIntoHoodieTableCommand

### _Why are the changes needed?_
To close #5447. Kyuubi authz Support hudi DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand

- DeleteHoodieTableCommand: https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
- UpdateHoodieTableCommand: https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
- MergeIntoHoodieTableCommand: https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_
No

Closes #5482 from AngersZhuuuu/KYUUBI-5447.

Closes #5447

2598af2 [Angerszhuuuu] Update HudiCatalogRangerSparkExtensionSuite.scala
08be589 [Angerszhuuuu] Update org.apache.kyuubi.plugin.spark.authz.serde.QueryExtractor
19497d1 [Angerszhuuuu] Update tableExtractors.scala
df6e244 [Angerszhuuuu] update
1a72f13 [Angerszhuuuu] update
f7ca684 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5447
3700686 [Angerszhuuuu] [KYUUBI #5447][AUTHZ] Support hudi DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand

Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: liangbowen <[email protected]>
  • Loading branch information
AngersZhuuuu authored and bowenliang123 committed Oct 21, 2023
1 parent 59c25b9 commit abaa369
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
# limitations under the License.
#

org.apache.kyuubi.plugin.spark.authz.serde.HudiMergeIntoSourceTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.LogicalPlanOptionQueryExtractor
org.apache.kyuubi.plugin.spark.authz.serde.LogicalPlanQueryExtractor
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableOptionTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ExpressionSeqTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HudiDataSourceV2RelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HudiMergeIntoTargetTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.IdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.LogicalRelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1604,6 +1604,27 @@
} ],
"opType" : "CREATETABLE",
"queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.DeleteHoodieTableCommand",
"tableDescs" : [ {
"fieldName" : "dft",
"fieldExtractor" : "HudiDataSourceV2RelationTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ {
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.DropHoodieTableCommand",
"tableDescs" : [ {
Expand Down Expand Up @@ -1643,6 +1664,27 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand",
"tableDescs" : [ {
"fieldName" : "mergeInto",
"fieldExtractor" : "HudiMergeIntoTargetTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ {
"fieldName" : "mergeInto",
"fieldExtractor" : "HudiMergeIntoSourceTableExtractor"
} ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.RepairHoodieTableCommand",
"tableDescs" : [ {
Expand Down Expand Up @@ -1705,4 +1747,25 @@
} ],
"opType" : "TRUNCATETABLE",
"queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand",
"tableDescs" : [ {
"fieldName" : "ut",
"fieldExtractor" : "HudiDataSourceV2RelationTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ {
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.kyuubi.plugin.spark.authz.serde

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs

trait QueryExtractor extends (AnyRef => Option[LogicalPlan]) with Extractor

object QueryExtractor {
Expand All @@ -44,3 +46,9 @@ class LogicalPlanOptionQueryExtractor extends QueryExtractor {
v1.asInstanceOf[Option[LogicalPlan]]
}
}

class HudiMergeIntoSourceTableExtractor extends QueryExtractor {
override def apply(v1: AnyRef): Option[LogicalPlan] = {
new LogicalPlanQueryExtractor().apply(invokeAs[LogicalPlan](v1, "sourceTable"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}

import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
import org.apache.kyuubi.util.reflect.ReflectUtils._
Expand Down Expand Up @@ -80,7 +80,9 @@ class TableIdentifierTableExtractor extends TableExtractor {
val catalogTable = spark.sessionState.catalog.getTableMetadata(identifier)
Option(catalogTable.owner).filter(_.nonEmpty)
} catch {
case _: Exception => None
case e: Exception =>
e.printStackTrace()
None
}
Some(Table(None, identifier.database, identifier.table, owner))
}
Expand Down Expand Up @@ -240,3 +242,29 @@ class TableTableExtractor extends TableExtractor {
lookupExtractor[StringTableExtractor].apply(spark, tableName)
}
}

class HudiDataSourceV2RelationTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
invokeAs[LogicalPlan](v1, "table") match {
// Match multipartIdentifier with tableAlias
case SubqueryAlias(_, SubqueryAlias(identifier, _)) =>
new StringTableExtractor().apply(spark, identifier.toString())
// Match multipartIdentifier without tableAlias
case SubqueryAlias(identifier, _) =>
new StringTableExtractor().apply(spark, identifier.toString())
}
}
}

class HudiMergeIntoTargetTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
invokeAs[LogicalPlan](v1, "targetTable") match {
// Match multipartIdentifier with tableAlias
case SubqueryAlias(_, SubqueryAlias(identifier, relation)) =>
new StringTableExtractor().apply(spark, identifier.toString())
// Match multipartIdentifier without tableAlias
case SubqueryAlias(identifier, _) =>
new StringTableExtractor().apply(spark, identifier.toString())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kyuubi.plugin.spark.authz.gen

import org.apache.kyuubi.plugin.spark.authz.OperationType._
import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
import org.apache.kyuubi.plugin.spark.authz.serde._
import org.apache.kyuubi.plugin.spark.authz.serde.TableType._

Expand Down Expand Up @@ -165,6 +166,40 @@ object HudiCommands {
TableCommandSpec(cmd, Seq(tableDesc), SHOWPARTITIONS)
}

val DeleteHoodieTableCommand = {
val cmd = "org.apache.spark.sql.hudi.command.DeleteHoodieTableCommand"
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
val tableDesc =
TableDesc(
"dft",
classOf[HudiDataSourceV2RelationTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(QueryDesc("query")))
}

val UpdateHoodieTableCommand = {
val cmd = "org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand"
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
val tableDesc =
TableDesc(
"ut",
classOf[HudiDataSourceV2RelationTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(QueryDesc("query")))
}

val MergeIntoHoodieTableCommand = {
val cmd = "org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand"
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
val tableDesc =
TableDesc(
"mergeInto",
classOf[HudiMergeIntoTargetTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
val queryDescs = QueryDesc("mergeInto", classOf[HudiMergeIntoSourceTableExtractor])
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDescs))
}

val data: Array[TableCommandSpec] = Array(
AlterHoodieTableAddColumnsCommand,
AlterHoodieTableChangeColumnCommand,
Expand All @@ -176,10 +211,13 @@ object HudiCommands {
CreateHoodieTableLikeCommand,
CompactionHoodieTableCommand,
CompactionShowHoodieTableCommand,
DeleteHoodieTableCommand,
DropHoodieTableCommand,
InsertIntoHoodieTableCommand,
MergeIntoHoodieTableCommand,
RepairHoodieTableCommand,
TruncateHoodieTableCommand,
ShowHoodieTablePartitionsCommand,
Spark31AlterTableCommand)
Spark31AlterTableCommand,
UpdateHoodieTableCommand)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.kyuubi.util.AssertionUtils.interceptContains
*/
@HudiTest
class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
override protected val catalogImpl: String = "hive"
override protected val catalogImpl: String = "in-memory"
// TODO: Apache Hudi not support Spark 3.5 and Scala 2.13 yet,
// should change after Apache Hudi support Spark 3.5 and Scala 2.13.
private def isSupportedVersion = !isSparkV35OrGreater && !isScalaV213
Expand Down Expand Up @@ -407,4 +407,69 @@ class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
}
}
}

test("DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand") {
withSingleCallEnabled {
withCleanTmpResources(Seq(
(s"$namespace1.$table1", "table"),
(s"$namespace1.$table2", "table"),
(namespace1, "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(
admin,
sql(
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table1(id int, name string, city string)
|USING HUDI
|OPTIONS (
| type = 'cow',
| primaryKey = 'id',
| 'hoodie.datasource.hive_sync.enable' = 'false'
|)
|PARTITIONED BY(city)
|""".stripMargin))

doAs(
admin,
sql(
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table2(id int, name string, city string)
|USING HUDI
|OPTIONS (
| type = 'cow',
| primaryKey = 'id',
| 'hoodie.datasource.hive_sync.enable' = 'false'
|)
|PARTITIONED BY(city)
|""".stripMargin))

val deleteFrom = s"DELETE FROM $namespace1.$table1 WHERE id = 10"
interceptContains[AccessControlException] {
doAs(someone, sql(deleteFrom))
}(s"does not have [update] privilege on [$namespace1/$table1]")
doAs(admin, sql(deleteFrom))

val updateSql = s"UPDATE $namespace1.$table1 SET name = 'test' WHERE id > 10"
interceptContains[AccessControlException] {
doAs(someone, sql(updateSql))
}(s"does not have [update] privilege on [$namespace1/$table1]")
doAs(admin, sql(updateSql))

val mergeIntoSQL =
s"""
|MERGE INTO $namespace1.$table1 target
|USING $namespace1.$table2 source
|ON target.id = source.id
|WHEN MATCHED
|AND target.name == 'test'
| THEN UPDATE SET id = source.id, name = source.name, city = source.city
|""".stripMargin
interceptContains[AccessControlException] {
doAs(someone, sql(mergeIntoSQL))
}(s"does not have [select] privilege on " +
s"[$namespace1/$table2/id,$namespace1/$table2/name,$namespace1/$table2/city]")
doAs(admin, sql(mergeIntoSQL))
}
}
}
}

0 comments on commit abaa369

Please sign in to comment.