Skip to content

Commit

Permalink
[KYUUBI #6007] AuthZ should check hoodie procedures path resource pri…
Browse files Browse the repository at this point in the history
…vileges

# 🔍 Description
## Issue References 🔗

This pull request aims to make authz check hoodie procedures path resource privileges.

## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️
When the Hoodie procedure operation is on the path, the check can pass regardless of whether the path resource has permissions.

#### Behavior With This Pull Request 🎉
Check the path permissions correctly.

#### Related Unit Tests
New tests added.

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #5972 from Yikf/hudi-call-path.

Closes #6007

e7dd28b [yikaifei] AuthZ should check hoodie procedures path resource privileges

Authored-by: yikaifei <[email protected]>
Signed-off-by: yikaifei <[email protected]>
  • Loading branch information
yikf committed Jan 26, 2024
1 parent 47a1091 commit 7806812
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ org.apache.kyuubi.plugin.spark.authz.serde.BaseRelationFileIndexURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogStorageFormatURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HudiCallProcedureInputUriExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HudiCallProcedureOutputUriExtractor
org.apache.kyuubi.plugin.spark.authz.serde.IdentifierURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.PartitionLocsSeqURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.PropertiesLocationUriExtractor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2025,7 +2025,17 @@
} ],
"opType" : "QUERY",
"queryDescs" : [ ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "clone",
"fieldExtractor" : "HudiCallProcedureInputUriExtractor",
"isInput" : true,
"comment" : "Hudi"
}, {
"fieldName" : "clone",
"fieldExtractor" : "HudiCallProcedureOutputUriExtractor",
"isInput" : false,
"comment" : "Hudi"
} ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.CompactionHoodiePathCommand",
"tableDescs" : [ ],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class HudiMergeIntoTargetTableExtractor extends TableExtractor {
}
}

abstract class HudiCallProcedureTableExtractor extends TableExtractor {
trait HudiCallProcedureExtractor {

protected def extractTableIdentifier(
procedure: AnyRef,
Expand All @@ -334,204 +334,236 @@ abstract class HudiCallProcedureTableExtractor extends TableExtractor {
}
}

case class ProcedureArgsInputOutputPair(
input: Option[String] = None,
output: Option[String] = None)
case class ProcedureArgsInputOutputTuple(
inputTable: Option[String] = None,
outputTable: Option[String] = None,
inputUri: Option[String] = None,
outputUri: Option[String] = None)

protected val PROCEDURE_CLASS_PATH = "org.apache.spark.sql.hudi.command.procedures"

protected val INVALID_INDEX = -1

// These pairs are used to get the procedure input/output args which user passed in call command.
protected val procedureArgsInputOutputPairs: Map[String, ProcedureArgsInputOutputPair] = Map(
protected val procedureArgsInputOutputPairs: Map[String, ProcedureArgsInputOutputTuple] = Map(
(
s"$PROCEDURE_CLASS_PATH.ArchiveCommitsProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.CommitsCompareProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.CopyToTableProcedure",
ProcedureArgsInputOutputPair(
input = Some("table"),
output = Some("new_table"))),
ProcedureArgsInputOutputTuple(
inputTable = Some("table"),
outputTable = Some("new_table"))),
(
s"$PROCEDURE_CLASS_PATH.CopyToTempViewProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.CreateMetadataTableProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.CreateSavepointProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.DeleteMarkerProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.DeleteMetadataTableProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.DeleteSavepointProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ExportInstantsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.HdfsParquetImportProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.HelpProcedure",
ProcedureArgsInputOutputPair()),
ProcedureArgsInputOutputTuple()),
(
s"$PROCEDURE_CLASS_PATH.HiveSyncProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.InitMetadataTableProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RepairAddpartitionmetaProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RepairCorruptedCleanFilesProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RepairDeduplicateProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RepairMigratePartitionMetaProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RepairOverwriteHoodiePropsProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RollbackToInstantTimeProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RollbackToSavepointProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RunBootstrapProcedure",
ProcedureArgsInputOutputPair(
input = Some("table"),
output = Some("table"))),
ProcedureArgsInputOutputTuple(
inputTable = Some("table"),
outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RunCleanProcedure",
ProcedureArgsInputOutputPair(
input = Some("table"),
output = Some("table"))),
ProcedureArgsInputOutputTuple(
inputTable = Some("table"),
outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RunClusteringProcedure",
ProcedureArgsInputOutputPair(
input = Some("table"),
output = Some("table"))),
ProcedureArgsInputOutputTuple(
inputTable = Some("table"),
outputTable = Some("table"),
outputUri = Some("path"))),
(
s"$PROCEDURE_CLASS_PATH.RunCompactionProcedure",
ProcedureArgsInputOutputPair(
input = Some("table"),
output = Some("table"))),
ProcedureArgsInputOutputTuple(
inputTable = Some("table"),
outputTable = Some("table"),
outputUri = Some("path"))),
(
s"$PROCEDURE_CLASS_PATH.ShowArchivedCommitsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowBootstrapMappingProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowClusteringProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"), inputUri = Some("path"))),
(
s"$PROCEDURE_CLASS_PATH.ShowCommitsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowCommitExtraMetadataProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowCommitFilesProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowCommitPartitionsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowCommitWriteStatsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowCompactionProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"), inputUri = Some("path"))),
(
s"$PROCEDURE_CLASS_PATH.ShowFileSystemViewProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowFsPathDetailProcedure",
ProcedureArgsInputOutputPair()),
ProcedureArgsInputOutputTuple()),
(
s"$PROCEDURE_CLASS_PATH.ShowHoodieLogFileMetadataProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowHoodieLogFileRecordsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowInvalidParquetProcedure",
ProcedureArgsInputOutputPair()),
ProcedureArgsInputOutputTuple()),
(
s"$PROCEDURE_CLASS_PATH.ShowMetadataTableFilesProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowMetadataTablePartitionsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowMetadataTableStatsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowRollbacksProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowSavepointsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowTablePropertiesProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.StatsFileSizeProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.StatsWriteAmplificationProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.UpgradeOrDowngradeProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ValidateHoodieSyncProcedure",
ProcedureArgsInputOutputPair(
input = Some("src_table"),
output = Some("dst_table"))),
ProcedureArgsInputOutputTuple(
inputTable = Some("src_table"),
outputTable = Some("dst_table"))),
(
s"$PROCEDURE_CLASS_PATH.ValidateMetadataTableFilesProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))))
ProcedureArgsInputOutputTuple(inputTable = Some("table"))))
}

class HudiCallProcedureOutputTableExtractor
extends HudiCallProcedureTableExtractor {
extends TableExtractor with HudiCallProcedureExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
val procedure = invokeAs[AnyRef](v1, "procedure")
val args = invokeAs[AnyRef](v1, "args")
procedureArgsInputOutputPairs.get(procedure.getClass.getName)
.filter(_.output.isDefined)
.filter(_.outputTable.isDefined)
.map { argsPairs =>
val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.output.get)
val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.outputTable.get)
lookupExtractor[StringTableExtractor].apply(spark, tableIdentifier.get).orNull
}
}
}

class HudiCallProcedureInputTableExtractor
extends HudiCallProcedureTableExtractor {
extends TableExtractor with HudiCallProcedureExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
val procedure = invokeAs[AnyRef](v1, "procedure")
val args = invokeAs[AnyRef](v1, "args")
procedureArgsInputOutputPairs.get(procedure.getClass.getName)
.filter(_.input.isDefined)
.filter(_.inputTable.isDefined)
.map { argsPairs =>
val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.input.get)
val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.inputTable.get)
lookupExtractor[StringTableExtractor].apply(spark, tableIdentifier.get).orNull
}
}
}

class HudiCallProcedureInputUriExtractor
extends URIExtractor with HudiCallProcedureExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
val procedure = invokeAs[AnyRef](v1, "procedure")
val args = invokeAs[AnyRef](v1, "args")
procedureArgsInputOutputPairs.get(procedure.getClass.getName)
.filter(_.inputUri.isDefined)
.map { argsPairs =>
val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.inputUri.get)
lookupExtractor[StringURIExtractor].apply(spark, tableIdentifier.get)
}.getOrElse(Nil)
}
}

class HudiCallProcedureOutputUriExtractor
extends URIExtractor with HudiCallProcedureExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
val procedure = invokeAs[AnyRef](v1, "procedure")
val args = invokeAs[AnyRef](v1, "args")
procedureArgsInputOutputPairs.get(procedure.getClass.getName)
.filter(_.outputUri.isDefined)
.map { argsPairs =>
val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.outputUri.get)
lookupExtractor[StringURIExtractor].apply(spark, tableIdentifier.get)
}.getOrElse(Nil)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,14 @@ object HudiCommands extends CommandSpecs[TableCommandSpec] {
"clone",
classOf[HudiCallProcedureOutputTableExtractor],
actionTypeDesc = Some(ActionTypeDesc(actionType = Some(UPDATE))),
setCurrentDatabaseIfMissing = true)))
setCurrentDatabaseIfMissing = true)),
uriDescs = Seq(
UriDesc(
"clone",
classOf[HudiCallProcedureInputUriExtractor],
isInput = true,
comment = "Hudi"),
UriDesc("clone", classOf[HudiCallProcedureOutputUriExtractor], comment = "Hudi")))
}

override def specs: Seq[TableCommandSpec] = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,4 +644,28 @@ class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
doAs(admin, sql(showCommitsSql))
}
}

test("ShowClusteringProcedure") {
val path = "hdfs://demo/test/hudi/path"
val showCommitsSql = s"CALL SHOW_CLUSTERING(path => '$path')"
interceptEndsWith[AccessControlException] {
doAs(someone, sql(showCommitsSql))
}(s"does not have [read] privilege on [[$path, $path/]]")
}

test("RunClusteringProcedure") {
val path = "hdfs://demo/test/hudi/path"
val showCommitsSql = s"CALL RUN_CLUSTERING(path => '$path')"
interceptEndsWith[AccessControlException] {
doAs(someone, sql(showCommitsSql))
}(s"does not have [write] privilege on [[$path, $path/]]")
}

test("RunCompactionProcedure") {
val path = "hdfs://demo/test/hudi/path"
val showCommitsSql = s"CALL RUN_COMPACTION(path => '$path')"
interceptEndsWith[AccessControlException] {
doAs(someone, sql(showCommitsSql))
}(s"does not have [write] privilege on [[$path, $path/]]")
}
}

0 comments on commit 7806812

Please sign in to comment.