diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala index 3cc426c435a..3e6b8729db1 100644 --- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala +++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala @@ -130,7 +130,15 @@ object HiveSQLEngine extends Logging { } else { val effectiveUser = UserGroupInformation.createProxyUser(sessionUser.get, realUser) effectiveUser.doAs(new PrivilegedExceptionAction[Unit] { - override def run(): Unit = startEngine() + override def run(): Unit = { + val engineCredentials = + kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY) + kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY) + engineCredentials.filter(_.nonEmpty).foreach { credentials => + HiveTBinaryFrontendService.renewDelegationToken(credentials) + } + startEngine() + } }) } diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveTBinaryFrontendService.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveTBinaryFrontendService.scala index d7cc801d3f6..19356d7c645 100644 --- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveTBinaryFrontendService.scala +++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveTBinaryFrontendService.scala @@ -17,11 +17,19 @@ package org.apache.kyuubi.engine.hive +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hive.service.rpc.thrift.{TRenewDelegationTokenReq, TRenewDelegationTokenResp} + +import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery} import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService} +import org.apache.kyuubi.service.TFrontendService.OK_STATUS +import org.apache.kyuubi.util.KyuubiHadoopUtils class HiveTBinaryFrontendService(override val serverable: Serverable) extends TBinaryFrontendService("HiveTBinaryFrontend") { + import HiveTBinaryFrontendService._ override lazy val discoveryService: Option[Service] = { if (ServiceDiscovery.supportServiceDiscovery(conf)) { @@ -30,4 +38,39 @@ class HiveTBinaryFrontendService(override val serverable: Serverable) None } } + + override def RenewDelegationToken(req: TRenewDelegationTokenReq): TRenewDelegationTokenResp = { + debug(req.toString) + + // We hacked `TCLIService.Iface.RenewDelegationToken` to transfer Credentials from Kyuubi + // Server to Hive SQL engine + val resp = new TRenewDelegationTokenResp() + try { + renewDelegationToken(req.getDelegationToken) + resp.setStatus(OK_STATUS) + } catch { + case e: Exception => + warn("Error renew delegation tokens: ", e) + resp.setStatus(KyuubiSQLException.toTStatus(e)) + } + resp + } +} + +object HiveTBinaryFrontendService { + + def renewDelegationToken(tokenStr: String): Unit = { + val currentUser = UserGroupInformation.getCurrentUser + // `currentUser` is either `UserGroupInformation.getLoginUser` or a proxy user. + // If `currentUser` is a proxy user, it needs a HIVE_DELEGATION_TOKEN to pass + // HiveMetastoreClient authentication. + if (currentUser.getAuthenticationMethod == UserGroupInformation.AuthenticationMethod.PROXY) { + val newCreds = KyuubiHadoopUtils.decodeCredentials(tokenStr) + KyuubiHadoopUtils.getTokenMap(newCreds).values + .find(_.getKind == new Text("HIVE_DELEGATION_TOKEN")) + .foreach { token => + UserGroupInformation.getCurrentUser.addToken(token) + } + } + } }