Skip to content

Commit

Permalink
fix:Compare computatation token versions in Millbase during exception…
Browse files Browse the repository at this point in the history
… handling. (#1920)
  • Loading branch information
renjiezh authored Dec 4, 2024
1 parent e9856aa commit f11bf48
Showing 1 changed file with 44 additions and 26 deletions.
70 changes: 44 additions & 26 deletions src/main/kotlin/org/wfanet/measurement/duchy/mill/MillBase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,7 @@ abstract class MillBase(
try {
processComputationImpl(token)
} catch (e: Exception) {
try {
// The token version may have already changed. We need the latest token in order to complete
// or enqueue the computation.
val latestToken = getLatestComputationToken(globalId)
handleExceptions(latestToken, e)
} catch (e: Exception) {
handleExceptions(token, e)
}
handleExceptions(token, e)
}
logger.info("$globalId@$millId: Processed computation ")

Expand All @@ -271,27 +264,52 @@ abstract class MillBase(

private suspend fun handleExceptions(token: ComputationToken, e: Exception) {
val globalId = token.globalComputationId
val latestToken =
try {
getLatestComputationToken(globalId)
} catch (e: Exception) {
logger.log(Level.WARNING, e) {
"$globalId@$millId: Fail to get latest token during exception handling."
}
return
}

if (latestToken.computationStage == endingStage) {
logger.log(Level.WARNING, e) {
"$globalId@$millId: Skip exception handling as computation has been terminated."
}
return
}

if (token.attempt > maximumAttempts) {
failComputation(
token,
message = "Failing computation due to too many failed attempts. Last message: ${e.message}",
cause = e,
)
return
}

if (latestToken.version != token.version) {
logger.log(Level.WARNING, e) {
"$globalId@$millId: Skip exception handling as token has been outdated."
}
return
}

when (e) {
is ComputationDataClients.TransientErrorException -> {
if (token.attempt > maximumAttempts) {
failComputation(
logger.log(Level.WARNING, e) { "$globalId@$millId: TRANSIENT error" }
sendStatusUpdateToKingdom(
globalId,
buildErrorLogEntry(
token,
message = "Failing computation due to too many failed attempts.",
cause = e,
)
} else {
logger.log(Level.WARNING, e) { "$globalId@$millId: TRANSIENT error" }
sendStatusUpdateToKingdom(
globalId,
buildErrorLogEntry(
token,
"Transient error processing Computation $globalId at attempt ${token.attempt} of " +
"stage ${token.computationStage}: ${e.message}",
),
)
// Enqueue the computation again for future retry
enqueueComputation(token)
}
"Transient error processing Computation $globalId at attempt ${token.attempt} of " +
"stage ${token.computationStage}: ${e.message}",
),
)
// Enqueue the computation again for future retry
enqueueComputation(token)
}
is StatusException -> throw IllegalStateException("Programming bug: uncaught gRPC error", e)
else -> {
Expand Down

0 comments on commit f11bf48

Please sign in to comment.