Skip to content

Commit

Permalink
Add unit tests for kyuubi.session.proxy.user in RESTful API
Browse files Browse the repository at this point in the history
  • Loading branch information
mrtisttt committed Nov 15, 2023
1 parent 10d0aec commit cf09958
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,76 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
}
}

test("delete engine - user share level & proxyUser") {
// we use superUser to impersonate commonUser
val superUser = "superUser"
val commonUser = "commonUser"

val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test")
conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop")

// In EngineRef, when use hive.server2.proxy.user or kyuubi.session.proxy.user
// the user is the proxyUser, and in our test it is commonUser
val engine = new EngineRef(conf.clone, user = commonUser, PluginLoader.loadGroupProvider(conf),
id, null)

// so as the firstChild in engineSpace we use commonUser
val engineSpace = DiscoveryPaths.makePath(
s"kyuubi_test_${KYUUBI_VERSION}_USER_SPARK_SQL",
commonUser,
"default")

withDiscoveryClient(conf) { client =>
engine.getOrCreate(client)

assert(client.pathExists(engineSpace))
assert(client.getChildren(engineSpace).size == 1)

// use proxyUser
val response = webTarget.path("api/v1/admin/engine")
.queryParam("sharelevel", "USER")
.queryParam("type", "spark_sql")
.queryParam("proxyUser", commonUser)
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(superUser))
.delete()

assert(response.getStatus === 405)
val errorMessage = s"Failed to validate proxy privilege of $superUser for $commonUser"
assert(response.readEntity(classOf[String]).contains(errorMessage))

// it should be the same behavior as hive.server2.proxy.user
val response2 = webTarget.path("api/v1/admin/engine")
.queryParam("sharelevel", "USER")
.queryParam("type", "spark_sql")
.queryParam("hive.server2.proxy.user", commonUser)
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(superUser))
.delete()

assert(response2.getStatus === 405)
assert(response2.readEntity(classOf[String]).contains(errorMessage))

// when both set, proxyUser takes precedence
val response3 = webTarget.path("api/v1/admin/engine")
.queryParam("sharelevel", "USER")
.queryParam("type", "spark_sql")
.queryParam("proxyUser", commonUser)
// here, we also set hive.server2.proxy.user, but the different value from proxyUser
.queryParam("hive.server2.proxy.user", s"${commonUser}HiveServer2")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(superUser))
.delete()

assert(response3.getStatus === 405)
assert(response3.readEntity(classOf[String]).contains(errorMessage))
}
}

test("list engine - user share level") {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
Expand Down Expand Up @@ -545,6 +615,73 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
}
}

test("list engine - user share level & proxyUser") {
// we use superUser to impersonate commonUser
val superUser = "superUser"
val commonUser = "commonUser"

val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test")
conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop")

// In EngineRef, when use hive.server2.proxy.user or kyuubi.session.proxy.user
// the user is the proxyUser, and in our test it is commonUser
val engine = new EngineRef(conf.clone, user = commonUser, PluginLoader.loadGroupProvider(conf),
id, null)

// so as the firstChild in engineSpace we use commonUser
val engineSpace = DiscoveryPaths.makePath(
s"kyuubi_test_${KYUUBI_VERSION}_USER_SPARK_SQL",
commonUser,
"")

withDiscoveryClient(conf) { client =>
engine.getOrCreate(client)

assert(client.pathExists(engineSpace))
assert(client.getChildren(engineSpace).size == 1)

// use proxyUser
val response = webTarget.path("api/v1/admin/engine")
.queryParam("type", "spark_sql")
.queryParam("proxyUser", commonUser)
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(superUser))
.get

assert(response.getStatus === 405)
val errorMessage = s"Failed to validate proxy privilege of $superUser for $commonUser"
assert(response.readEntity(classOf[String]).contains(errorMessage))

// it should be the same behavior as hive.server2.proxy.user
val response2 = webTarget.path("api/v1/admin/engine")
.queryParam("type", "spark_sql")
.queryParam("hive.server2.proxy.user", commonUser)
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(superUser))
.get

assert(response2.getStatus === 405)
assert(response2.readEntity(classOf[String]).contains(errorMessage))

// when both set, proxyUser takes precedence
val response3 = webTarget.path("api/v1/admin/engine")
.queryParam("type", "spark_sql")
.queryParam("proxyUser", commonUser)
// here, we also set hive.server2.proxy.user, but the different value from proxyUser
.queryParam("hive.server2.proxy.user", s"${commonUser}HiveServer2")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(superUser))
.get

assert(response3.getStatus === 405)
assert(response3.readEntity(classOf[String]).contains(errorMessage))
}
}

test("list server") {
// Mock Kyuubi Server
val serverDiscovery = mock[ServiceDiscovery]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,4 +852,103 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
val getBatchListResponse = response.readEntity(classOf[GetBatchesResponse])
assert(getBatchListResponse.getTotal == 1)
}

test("open batch session with proxyUser") {
// we use superUser to impersonate commonUser
val superUser = "superUser"
val commonUser = "commonUser"

// use kyuubi.session.proxy.user
val proxyUserRequest = newSparkBatchRequest(
Map("spark.master" -> "local", PROXY_USER.key -> commonUser))

val proxyUserResponse = webTarget.path("api/v1/batches")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader(superUser))
.post(Entity.entity(proxyUserRequest, MediaType.APPLICATION_JSON_TYPE))

assert(proxyUserResponse.getStatus === 405)
val errorMessage = s"Failed to validate proxy privilege of $superUser for $commonUser"
assert(proxyUserResponse.readEntity(classOf[String]).contains(errorMessage))

// it should be the same behavior as hive.server2.proxy.user
val proxyUserRequest2 = newSparkBatchRequest(
Map("spark.master" -> "local", KyuubiAuthenticationFactory.HS2_PROXY_USER -> commonUser))

val proxyUserResponse2 = webTarget.path("api/v1/batches")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader(superUser))
.post(Entity.entity(proxyUserRequest2, MediaType.APPLICATION_JSON_TYPE))

assert(proxyUserResponse2.getStatus === 405)
assert(proxyUserResponse2.readEntity(classOf[String]).contains(errorMessage))

// when both set, kyuubi.session.proxy.user takes precedence
val proxyUserRequest3 = newSparkBatchRequest(
Map("spark.master" -> "local", PROXY_USER.key -> commonUser,
// here, we also set hive.server2.proxy.user,
// but the different value from kyuubi.session.proxy.user
KyuubiAuthenticationFactory.HS2_PROXY_USER -> s"${commonUser}HiveServer2"))

val proxyUserResponse3 = webTarget.path("api/v1/batches")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader(superUser))
.post(Entity.entity(proxyUserRequest3, MediaType.APPLICATION_JSON_TYPE))
assert(proxyUserResponse3.getStatus === 405)
assert(proxyUserResponse3.readEntity(classOf[String]).contains(errorMessage))
}

test("delete batch with proxyUser") {
// we use superUser to impersonate commonUser
val superUser = "superUser"
val commonUser = "commonUser"

// Thought it doesn't have kerberos environment in these unit tests,
// that means we don't need to add the metadata as it will not execute,
// we still add these code to make the test more integrity and standard.
val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
val metadata = Metadata(
identifier = UUID.randomUUID().toString,
sessionType = SessionType.BATCH,
realUser = superUser,
username = commonUser,
ipAddress = "localhost",
kyuubiInstance = fe.connectionUrl,
state = "PENDING",
resource = "resource",
className = "className",
requestName = "LOCAL_LOG_NOT_FOUND",
engineType = "SPARK")
sessionManager.insertMetadata(metadata)

// use proxyUser
val deleteBatchResponse = webTarget.path(s"api/v1/batches/${metadata.identifier}")
.queryParam("proxyUser", commonUser)
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader(superUser))
.delete()
assert(deleteBatchResponse.getStatus === 405)
val errorMessage = s"Failed to validate proxy privilege of $superUser for $commonUser"
assert(deleteBatchResponse.readEntity(classOf[String]).contains(errorMessage))

// it should be the same behavior as hive.server2.proxy.user
val deleteBatchResponse2 = webTarget.path(s"api/v1/batches/${metadata.identifier}")
.queryParam("proxyUser", commonUser)
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader(superUser))
.delete()
assert(deleteBatchResponse2.getStatus === 405)
assert(deleteBatchResponse2.readEntity(classOf[String]).contains(errorMessage))

// when both set, proxyUser takes precedence
val deleteBatchResponse3 = webTarget.path(s"api/v1/batches/${metadata.identifier}")
.queryParam("proxyUser", commonUser)
// here, we also set hive.server2.proxy.user, but the different value from proxyUser
.queryParam("hive.server2.proxy.user", s"${commonUser}HiveServer2")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader(superUser))
.delete()
assert(deleteBatchResponse3.getStatus === 405)
assert(deleteBatchResponse3.readEntity(classOf[String]).contains(errorMessage))
}
}

0 comments on commit cf09958

Please sign in to comment.