Skip to content

Commit

Permalink
Support merge into table command for Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
zml1206 committed Nov 6, 2023
1 parent f23b6de commit 71af24a
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2029,6 +2029,28 @@
"opType" : "QUERY",
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.delta.commands.MergeIntoCommand",
"tableDescs" : [ {
"fieldName" : "target",
"fieldExtractor" : "SubqueryAliasTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ {
"fieldName" : "source",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.delta.commands.UpdateCommand",
"tableDescs" : [ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,19 @@ object DeltaCommands extends CommandSpecs[TableCommandSpec] {
DeleteCommand.copy(classname = cmd)
}

val MergeIntoCommand = {
val cmd = "org.apache.spark.sql.delta.commands.MergeIntoCommand"
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
val tableDesc = TableDesc(
"target",
classOf[SubqueryAliasTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
val queryDesc = QueryDesc("source")
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDesc))
}

override def specs: Seq[TableCommandSpec] = Seq(
DeleteCommand,
MergeIntoCommand,
UpdateCommand)
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,51 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
doAs(admin, sql(updateTableSql))
}
}

test("merge into table") {
withSingleCallEnabled {
withCleanTmpResources(Seq(
(s"$namespace1.$table1", "table"),
(s"$namespace1.$table2", "table"),
(s"$namespace1", "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(admin, sql(createTableSql(namespace1, table1)))
doAs(admin, sql(createTableSql(namespace1, table2)))

val mergeIntoSql =
s"""
|MERGE INTO $namespace1.$table1 AS target
|USING $namespace1.$table2 AS source
|ON target.id = source.id
|WHEN MATCHED THEN
| UPDATE SET
| id = source.id,
| name = source.name,
| gender = source.gender,
| birthDate = source.birthDate
|WHEN NOT MATCHED
| THEN INSERT (
| id,
| name,
| gender,
| birthDate
| )
| VALUES (
| source.id,
| source.name,
| source.gender,
| source.birthDate
| )
|""".stripMargin
interceptContains[AccessControlException](
doAs(someone, sql(mergeIntoSql)))(
s"does not have [select] privilege on [$namespace1/$table2/id,$namespace1/$table2/name," +
s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
s" [update] privilege on [$namespace1/$table1]")
doAs(admin, sql(mergeIntoSql))
}
}
}
}

object DeltaCatalogRangerSparkExtensionSuite {
Expand Down

0 comments on commit 71af24a

Please sign in to comment.