From 71af24a4f9f3a0c37a12732037cdd37a8133f0a5 Mon Sep 17 00:00:00 2001 From: zml1206 Date: Sat, 4 Nov 2023 00:48:54 +0800 Subject: [PATCH] Support merge into table command for Delta Lake --- .../main/resources/table_command_spec.json | 22 +++++++++ .../spark/authz/gen/DeltaCommands.scala | 12 +++++ ...eltaCatalogRangerSparkExtensionSuite.scala | 45 +++++++++++++++++++ 3 files changed, 79 insertions(+) diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json index 9a1c0c34c39..b36bce82460 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json +++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json @@ -2029,6 +2029,28 @@ "opType" : "QUERY", "queryDescs" : [ ], "uriDescs" : [ ] +}, { + "classname" : "org.apache.spark.sql.delta.commands.MergeIntoCommand", + "tableDescs" : [ { + "fieldName" : "target", + "fieldExtractor" : "SubqueryAliasTableExtractor", + "columnDesc" : null, + "actionTypeDesc" : { + "fieldName" : null, + "fieldExtractor" : null, + "actionType" : "UPDATE" + }, + "tableTypeDesc" : null, + "catalogDesc" : null, + "isInput" : false, + "setCurrentDatabaseIfMissing" : false + } ], + "opType" : "QUERY", + "queryDescs" : [ { + "fieldName" : "source", + "fieldExtractor" : "LogicalPlanQueryExtractor" + } ], + "uriDescs" : [ ] }, { "classname" : "org.apache.spark.sql.delta.commands.UpdateCommand", "tableDescs" : [ { diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala index e8cce67abcd..38701760d0e 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala @@ -42,7 +42,19 @@ object DeltaCommands extends CommandSpecs[TableCommandSpec] { DeleteCommand.copy(classname = cmd) } + val MergeIntoCommand = { + val cmd = "org.apache.spark.sql.delta.commands.MergeIntoCommand" + val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE)) + val tableDesc = TableDesc( + "target", + classOf[SubqueryAliasTableExtractor], + actionTypeDesc = Some(actionTypeDesc)) + val queryDesc = QueryDesc("source") + TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDesc)) + } + override def specs: Seq[TableCommandSpec] = Seq( DeleteCommand, + MergeIntoCommand, UpdateCommand) } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala index 4fc73adcee2..04d2262df4e 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala @@ -221,6 +221,51 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { doAs(admin, sql(updateTableSql)) } } + + test("merge into table") { + withSingleCallEnabled { + withCleanTmpResources(Seq( + (s"$namespace1.$table1", "table"), + (s"$namespace1.$table2", "table"), + (s"$namespace1", "database"))) { + doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1")) + doAs(admin, sql(createTableSql(namespace1, table1))) + doAs(admin, sql(createTableSql(namespace1, table2))) + + val mergeIntoSql = + s""" + |MERGE INTO $namespace1.$table1 AS target + |USING $namespace1.$table2 AS source + |ON target.id = source.id + |WHEN MATCHED THEN + | UPDATE SET + | id = source.id, + | name = source.name, + | gender = source.gender, + | birthDate = source.birthDate + |WHEN NOT MATCHED + | THEN INSERT ( + | id, + | name, + | gender, + | birthDate + | ) + | VALUES ( + | source.id, + | source.name, + | source.gender, + | source.birthDate + | ) + |""".stripMargin + interceptContains[AccessControlException]( + doAs(someone, sql(mergeIntoSql)))( + s"does not have [select] privilege on [$namespace1/$table2/id,$namespace1/$table2/name," + + s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," + + s" [update] privilege on [$namespace1/$table1]") + doAs(admin, sql(mergeIntoSql)) + } + } + } } object DeltaCatalogRangerSparkExtensionSuite {