Skip to content

Commit

Permalink
Implement HiveTBinaryFrontendService#RenewDelegationToken
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyifan279 committed Nov 27, 2023
1 parent 5481bf5 commit 9f6c452
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,15 @@ object HiveSQLEngine extends Logging {
} else {
val effectiveUser = UserGroupInformation.createProxyUser(sessionUser.get, realUser)
effectiveUser.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = startEngine()
override def run(): Unit = {
val engineCredentials =
kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
engineCredentials.filter(_.nonEmpty).foreach { credentials =>
HiveTBinaryFrontendService.renewDelegationToken(credentials)
}
startEngine()
}
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@

package org.apache.kyuubi.engine.hive

import org.apache.hadoop.io.Text
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.rpc.thrift.{TRenewDelegationTokenReq, TRenewDelegationTokenResp}

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery}
import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService}
import org.apache.kyuubi.service.TFrontendService.OK_STATUS
import org.apache.kyuubi.util.KyuubiHadoopUtils

class HiveTBinaryFrontendService(override val serverable: Serverable)
extends TBinaryFrontendService("HiveTBinaryFrontend") {
import HiveTBinaryFrontendService._

override lazy val discoveryService: Option[Service] = {
if (ServiceDiscovery.supportServiceDiscovery(conf)) {
Expand All @@ -30,4 +38,39 @@ class HiveTBinaryFrontendService(override val serverable: Serverable)
None
}
}

override def RenewDelegationToken(req: TRenewDelegationTokenReq): TRenewDelegationTokenResp = {
debug(req.toString)

// We hacked `TCLIService.Iface.RenewDelegationToken` to transfer Credentials from Kyuubi
// Server to Hive SQL engine
val resp = new TRenewDelegationTokenResp()
try {
renewDelegationToken(req.getDelegationToken)
resp.setStatus(OK_STATUS)
} catch {
case e: Exception =>
warn("Error renew delegation tokens: ", e)
resp.setStatus(KyuubiSQLException.toTStatus(e))
}
resp
}
}

object HiveTBinaryFrontendService {

def renewDelegationToken(tokenStr: String): Unit = {
val currentUser = UserGroupInformation.getCurrentUser
// `currentUser` is either `UserGroupInformation.getLoginUser` or a proxy user.
// If `currentUser` is a proxy user, it needs a HIVE_DELEGATION_TOKEN to pass
// HiveMetastoreClient authentication.
if (currentUser.getAuthenticationMethod == UserGroupInformation.AuthenticationMethod.PROXY) {
val newCreds = KyuubiHadoopUtils.decodeCredentials(tokenStr)
KyuubiHadoopUtils.getTokenMap(newCreds).values.find(_.getKind == new Text(
"HIVE_DELEGATION_TOKEN"))
.foreach { token =>
UserGroupInformation.getCurrentUser.addToken(token)
}
}
}
}

0 comments on commit 9f6c452

Please sign in to comment.