Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #6358] Fix the incorrect construction of yarnConf #6386

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ trait ApplicationOperation {
* @note For implementations, please suppress exceptions and always return KillResponse
*/
def killApplicationByTag(
sessionConf: Option[KyuubiConf],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sessionConf: Option[KyuubiConf],
def killApplicationByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None,
sessionConf: Option[KyuubiConf] = None): KillResponse

appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None): KillResponse
Expand All @@ -73,6 +74,7 @@ trait ApplicationOperation {
* @return [[ApplicationInfo]]
*/
def getApplicationInfoByTag(
sessionConf: Option[KyuubiConf],
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,11 @@ private[kyuubi] class EngineRef(

if (started + timeout <= System.currentTimeMillis()) {
val killMessage =
engineManager.killApplication(builder.appMgrInfo(), engineRefId, Some(appUser))
engineManager.killApplication(
Some(conf),
builder.appMgrInfo(),
engineRefId,
Some(appUser))
builder.close(true)
MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
throw KyuubiSQLException(
Expand All @@ -270,6 +274,7 @@ private[kyuubi] class EngineRef(
}

val applicationInfo = engineMgr.getApplicationInfo(
Some(conf),
builder.appMgrInfo(),
engineRefId,
Some(appUser),
Expand Down Expand Up @@ -366,7 +371,7 @@ private[kyuubi] class EngineRef(
try {
val appMgrInfo = builder.appMgrInfo()
builder.close(true)
engineManager.killApplication(appMgrInfo, engineRefId, Some(appUser))
engineManager.killApplication(Some(conf), appMgrInfo, engineRefId, Some(appUser))
} catch {
case e: Exception =>
warn(s"Error closing engine builder, engineRefId: $engineRefId", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,15 @@ class JpsApplicationOperation extends ApplicationOperation {
}

override def killApplicationByTag(
sessionConf: Option[KyuubiConf],
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None): KillResponse = {
killJpsApplicationByTag(tag, true)
}

override def getApplicationInfoByTag(
sessionConf: Option[KyuubiConf],
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}

override def killApplicationByTag(
sessionConf: Option[KyuubiConf],
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None): KillResponse = {
Expand Down Expand Up @@ -224,6 +225,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}

override def getApplicationInfoByTag(
sessionConf: Option[KyuubiConf],
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,14 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
}

def killApplication(
sessionConf: Option[KyuubiConf],
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None): KillResponse = {
var (killed, lastMessage): KillResponse = (false, null)
for (operation <- operations if !killed) {
if (operation.isSupported(appMgrInfo)) {
val (k, m) = operation.killApplicationByTag(appMgrInfo, tag, proxyUser)
val (k, m) = operation.killApplicationByTag(sessionConf, appMgrInfo, tag, proxyUser)
killed = k
lastMessage = m
}
Expand All @@ -84,13 +85,15 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
}

def getApplicationInfo(
sessionConf: Option[KyuubiConf],
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None,
submitTime: Option[Long] = None): Option[ApplicationInfo] = {
val operation = operations.find(_.isSupported(appMgrInfo))
operation match {
case Some(op) => Some(op.getApplicationInfoByTag(appMgrInfo, tag, proxyUser, submitTime))
case Some(op) =>
Some(op.getApplicationInfoByTag(sessionConf, appMgrInfo, tag, proxyUser, submitTime))
case None => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,74 +95,139 @@ class YarnApplicationOperation extends ApplicationOperation with Logging {
}
}

private def withNewYarnClient[T](
proxyUser: Option[String],
sessionConf: Option[KyuubiConf])(action: YarnClient => T): T = {
(sessionConf) match {
case Some(sessionConf) =>
(proxyUser) match {
case Some(user) =>
Utils.doAs(user) { () =>
var yarnClient: YarnClient = null
try {
val yarnConf = KyuubiHadoopUtils.newYarnConfiguration(sessionConf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this use kyuubi.engineEnv.HADOOP_CONF_DIR/YARN_CONF_DIR in sessionConf to initialize yarnConf?

yarnClient = createYarnClient(yarnConf)
action(yarnClient)
} finally {
Utils.tryLogNonFatalError(yarnClient.close())
}
}
case None =>
var yarnClient: YarnClient = null
try {
yarnClient = createAdminYarnClient(sessionConf).get
action(yarnClient)
} finally {
Utils.tryLogNonFatalError(yarnClient.close())
}
}
case None =>
withYarnClient(proxyUser)(action)
}
}

def createAdminYarnClient(conf: KyuubiConf): Option[YarnClient] = {

val yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf)

def createYarnClientWithCurrentUser(): Option[YarnClient] = {
val c = createYarnClient(yarnConf)
info(s"Creating admin YARN client with current user: ${Utils.currentUser}.")
Some(c);
}

def createYarnClientWithProxyUser(proxyUser: String): Option[YarnClient] =
Utils.doAs(proxyUser) { () =>
val c = createYarnClient(yarnConf)
info(s"Creating admin YARN client with proxy user: $proxyUser.")
Some(c);
}

YarnUserStrategy.withName(conf.get(KyuubiConf.YARN_USER_STRATEGY)) match {
case NONE =>
createYarnClientWithCurrentUser()
case ADMIN if conf.get(KyuubiConf.YARN_USER_ADMIN) == Utils.currentUser =>
createYarnClientWithCurrentUser()
case ADMIN =>
createYarnClientWithProxyUser(conf.get(KyuubiConf.YARN_USER_ADMIN))
case OWNER =>
info("Skip initializing admin YARN client")
None
}
}

override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean =
appMgrInfo.resourceManager.exists(_.toLowerCase(Locale.ROOT).startsWith("yarn"))

override def killApplicationByTag(
sessionConf: Option[KyuubiConf],
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None): KillResponse = withYarnClient(proxyUser) { yarnClient =>
try {
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
if (reports.isEmpty) {
(false, NOT_FOUND)
} else {
try {
val applicationId = reports.get(0).getApplicationId
yarnClient.killApplication(applicationId)
(true, s"Succeeded to terminate: $applicationId with $tag")
} catch {
case e: Exception =>
(false, s"Failed to terminate application with $tag, due to ${e.getMessage}")
proxyUser: Option[String] = None): KillResponse =
withNewYarnClient(proxyUser, sessionConf) { yarnClient =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that newYarnClient is not always needed. Can we use it only when needed? Like, the hadoop_conf_dir/yarn_conf_dir engine evn exists in sessionConf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your help, you're right, I'll make the changes.

try {
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
if (reports.isEmpty) {
(false, NOT_FOUND)
} else {
try {
val applicationId = reports.get(0).getApplicationId
yarnClient.killApplication(applicationId)
(true, s"Succeeded to terminate: $applicationId with $tag")
} catch {
case e: Exception =>
(false, s"Failed to terminate application with $tag, due to ${e.getMessage}")
}
}
} catch {
case e: Exception =>
(
false,
s"Failed to get while terminating application with tag $tag, due to ${e.getMessage}")
}
} catch {
case e: Exception =>
(
false,
s"Failed to get while terminating application with tag $tag, due to ${e.getMessage}")
}
}

override def getApplicationInfoByTag(
sessionConf: Option[KyuubiConf],
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None,
submitTime: Option[Long] = None): ApplicationInfo = withYarnClient(proxyUser) { yarnClient =>
debug(s"Getting application info from YARN cluster by tag: $tag")
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
if (reports.isEmpty) {
debug(s"Can't find target application from YARN cluster by tag: $tag")
submitTime match {
case Some(_submitTime) =>
val elapsedTime = System.currentTimeMillis - _submitTime
if (elapsedTime < submitTimeout) {
info(s"Wait for YARN application[tag: $tag] to be submitted, " +
s"elapsed time: ${elapsedTime}ms, return ${ApplicationInfo.UNKNOWN} status")
ApplicationInfo.UNKNOWN
} else {
error(s"Can't find target application from YARN cluster by tag: $tag, " +
s"elapsed time: ${elapsedTime}ms exceeds ${ENGINE_YARN_SUBMIT_TIMEOUT.key}: " +
s"${submitTimeout}ms, return ${ApplicationInfo.NOT_FOUND} status")
ApplicationInfo.NOT_FOUND
}
case _ => ApplicationInfo.NOT_FOUND
submitTime: Option[Long] = None): ApplicationInfo =
withNewYarnClient(proxyUser, sessionConf) { yarnClient =>
debug(s"Getting application info from YARN cluster by tag: $tag")
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
if (reports.isEmpty) {
debug(s"Can't find target application from YARN cluster by tag: $tag")
submitTime match {
case Some(_submitTime) =>
val elapsedTime = System.currentTimeMillis - _submitTime
if (elapsedTime < submitTimeout) {
info(s"Wait for YARN application[tag: $tag] to be submitted, " +
s"elapsed time: ${elapsedTime}ms, return ${ApplicationInfo.UNKNOWN} status")
ApplicationInfo.UNKNOWN
} else {
error(s"Can't find target application from YARN cluster by tag: $tag, " +
s"elapsed time: ${elapsedTime}ms exceeds ${ENGINE_YARN_SUBMIT_TIMEOUT.key}: " +
s"${submitTimeout}ms, return ${ApplicationInfo.NOT_FOUND} status")
ApplicationInfo.NOT_FOUND
}
case _ => ApplicationInfo.NOT_FOUND
}
} else {
val report = reports.get(0)
val info = ApplicationInfo(
id = report.getApplicationId.toString,
name = report.getName,
state = toApplicationState(
report.getApplicationId.toString,
report.getYarnApplicationState,
report.getFinalApplicationStatus),
url = Option(report.getTrackingUrl),
error = Option(report.getDiagnostics))
debug(s"Successfully got application info by tag: $tag. $info")
info
}
} else {
val report = reports.get(0)
val info = ApplicationInfo(
id = report.getApplicationId.toString,
name = report.getName,
state = toApplicationState(
report.getApplicationId.toString,
report.getYarnApplicationState,
report.getFinalApplicationStatus),
url = Option(report.getTrackingUrl),
error = Option(report.getDiagnostics))
debug(s"Successfully got application info by tag: $tag. $info")
info
}
}

override def stop(): Unit = adminYarnClient.foreach { yarnClient =>
Utils.tryLogNonFatalError(yarnClient.stop())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class BatchJobSubmission(
}
val applicationInfo =
applicationManager.getApplicationInfo(
None,
builder.appMgrInfo(),
batchId,
Some(session.user),
Expand All @@ -126,7 +127,7 @@ class BatchJobSubmission(
}

private[kyuubi] def killBatchApplication(): KillResponse = {
applicationManager.killApplication(builder.appMgrInfo(), batchId, Some(session.user))
applicationManager.killApplication(None, builder.appMgrInfo(), batchId, Some(session.user))
}

private val applicationCheckInterval =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
case e: KyuubiRestException =>
error(s"Error redirecting get batch[$batchId] to ${metadata.kyuubiInstance}", e)
val batchAppStatus = sessionManager.applicationManager.getApplicationInfo(
None,
metadata.appMgrInfo,
batchId,
Some(userName),
Expand Down Expand Up @@ -477,7 +478,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
batchId: String,
user: String): KillResponse = {
val (killed, message) = sessionManager.applicationManager
.killApplication(appMgrInfo, batchId, Some(user))
.killApplication(None, appMgrInfo, batchId, Some(user))
info(s"Mark batch[$batchId] closed by ${fe.connectionUrl}")
sessionManager.updateMetadata(Metadata(identifier = batchId, peerInstanceClosed = true))
(killed, message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,13 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
}
val engineId = sessionManager.allSessions().head.handle.identifier.toString
// kill the engine application and wait the engine terminate
sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None), engineId)
sessionManager.applicationManager.killApplication(
None,
ApplicationManagerInfo(None),
engineId)
eventually(timeout(30.seconds), interval(100.milliseconds)) {
assert(sessionManager.applicationManager.getApplicationInfo(
None,
ApplicationManagerInfo(None),
engineId)
.exists(_.state == ApplicationState.NOT_FOUND))
Expand Down
Loading
Loading