Skip to content

Commit

Permalink
log
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Sep 16, 2024
1 parent 8e2b1b3 commit b3483c7
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -349,51 +349,55 @@ class BatchJobSubmission(
if (!isClosedOrCanceled) {
MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opType)))

// fast fail
if (isTerminalState(state)) {
killMessage = (false, s"batch $batchId is already terminal so can not kill it.")
builder.close(true)
cleanupUploadedResourceIfNeeded()
return
}

try {
killMessage = killBatchApplication()
builder.close(true)
cleanupUploadedResourceIfNeeded()
} finally {
if (state == OperationState.INITIALIZED) {
// if state is INITIALIZED, it means that the batch submission has not started to run, set
// the state to CANCELED manually and regardless of kill result
setState(OperationState.CANCELED)
updateBatchMetadata()
} else {
if (killMessage._1 && !isTerminalState(state)) {
// kill success and we can change state safely
// note that, the batch operation state should never be closed
// fast fail
if (isTerminalState(state)) {
killMessage = (false, s"batch $batchId is already terminal so can not kill it.")
builder.close(true)
cleanupUploadedResourceIfNeeded()
return
}

try {
killMessage = killBatchApplication()
builder.close(true)
cleanupUploadedResourceIfNeeded()
} finally {
if (state == OperationState.INITIALIZED) {
// if state is INITIALIZED, it means that the batch submission has not started to run,
// set the state to CANCELED manually and regardless of kill result
setState(OperationState.CANCELED)
updateBatchMetadata()
} else if (killMessage._1) {
// we can not change state safely
killMessage = (false, s"batch $batchId is already terminal so can not kill it.")
} else if (!isTerminalState(state)) {
_applicationInfo = currentApplicationInfo()
_applicationInfo.map(_.state) match {
case Some(ApplicationState.FINISHED) =>
setState(OperationState.FINISHED)
updateBatchMetadata()
case Some(ApplicationState.FAILED) =>
setState(OperationState.ERROR)
updateBatchMetadata()
case Some(ApplicationState.UNKNOWN) |
Some(ApplicationState.NOT_FOUND) |
Some(ApplicationState.KILLED) =>
setState(OperationState.CANCELED)
updateBatchMetadata()
case _ => // failed to kill, the kill message is enough
} else {
if (killMessage._1 && !isTerminalState(state)) {
// kill success and we can change state safely
// note that, the batch operation state should never be closed
setState(OperationState.CANCELED)
updateBatchMetadata()
} else if (killMessage._1) {
// we can not change state safely
killMessage = (false, s"batch $batchId is already terminal so can not kill it.")
} else if (!isTerminalState(state)) {
_applicationInfo = currentApplicationInfo()
_applicationInfo.map(_.state) match {
case Some(ApplicationState.FINISHED) =>
setState(OperationState.FINISHED)
updateBatchMetadata()
case Some(ApplicationState.FAILED) =>
setState(OperationState.ERROR)
updateBatchMetadata()
case Some(ApplicationState.UNKNOWN) |
Some(ApplicationState.NOT_FOUND) |
Some(ApplicationState.KILLED) =>
setState(OperationState.CANCELED)
updateBatchMetadata()
case _ => // failed to kill, the kill message is enough
}
}
}
}
} finally {
withOperationLog(warn(s"Kill batch response: $killMessage."))
}
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,12 +501,11 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
sessionManager.getBatchSession(sessionHandle).map { batchSession =>
val userName = fe.getSessionUser(batchSession.user)
val ipAddress = fe.getIpAddress
sessionManager.closeSession(batchSession.handle)
val (killed, msg) = batchSession.batchJobSubmissionOp.getKillMessage
batchSession.batchJobSubmissionOp.withOperationLog {
warn(s"Received kill batch request from $userName/$ipAddress")
warn(s"Kill batch response: killed: $killed, msg: $msg.")
}
sessionManager.closeSession(batchSession.handle)
val (killed, msg) = batchSession.batchJobSubmissionOp.getKillMessage
new CloseBatchResponse(killed, msg)
}.getOrElse {
sessionManager.getBatchMetadata(batchId).map { metadata =>
Expand Down

0 comments on commit b3483c7

Please sign in to comment.