Skip to content

Commit

Permalink
Only cancel Future when you mean it
Browse files Browse the repository at this point in the history
Soft-cancelling a future only to later call it with `mayInterrupt` set
to `true` has no effect in the latter case.
Changed the logic so that interrupting a Future will really enforce it.

Ocassionally some commands should not attempt to run soft cancellations
- we know they will re-execute the program.
  • Loading branch information
hubertp committed Oct 22, 2024
1 parent 29de7a4 commit f73ad73
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ private void setExecutionEnvironment(
if (!oldEnvironmentName.equals(executionEnvironment.name())) {
ctx.jobControlPlane()
.abortJobs(
contextId, "set execution environment to " + executionEnvironment.name());
contextId,
"set execution environment to " + executionEnvironment.name(),
false);
ctx.locking()
.withWriteCompilationLock(
this.getClass(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ public interface JobControlPlane {
* Aborts jobs that relates to the specified execution context.
*
* @param contextId an identifier of a context
* @param reason reason for aborting job(s)
* @param softAbortFirst true if ongoing jobs should be aborted with safepoints first, even if
* marked as interruptible
* @param classOf abort jobs of a given class only. If empty all jobs for the given context are
* aborted
*/
@SuppressWarnings("unchecked")
void abortJobs(UUID contextId, String reason, Class<? extends Job<?>>... classOf);
void abortJobs(
UUID contextId, String reason, boolean softAbortFirst, Class<? extends Job<?>>... classOf);

/**
* Aborts jobs that relate to the specified execution context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class DestroyContextCmd(
}

private def removeContext()(implicit ctx: RuntimeContext): Unit = {
ctx.jobControlPlane.abortJobs(request.contextId, "destroy context")
ctx.jobControlPlane.abortJobs(request.contextId, "destroy context", false)
val contextLock = ctx.locking.getOrCreateContextLock(request.contextId)
try {
ctx.locking.withContextLock(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ class InterruptContextCmd(
): Future[Unit] =
if (doesContextExist) {
Future {
ctx.jobControlPlane.abortJobs(request.contextId, "interrupt context")
ctx.jobControlPlane.abortJobs(
request.contextId,
"interrupt context",
false
)
reply(Api.InterruptContextResponse(request.contextId))
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class PopContextCmd(
ec: ExecutionContext
): Future[Unit] =
Future {
ctx.jobControlPlane.abortJobs(request.contextId, "pop context")
ctx.jobControlPlane.abortJobs(request.contextId, "pop context", false)
val maybeTopItem = ctx.contextManager.pop(request.contextId)
if (maybeTopItem.isDefined) {
reply(Api.PopContextResponse(request.contextId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class PushContextCmd(
ec: ExecutionContext
): Future[Boolean] =
Future {
ctx.jobControlPlane.abortJobs(request.contextId, "push context")
ctx.jobControlPlane.abortJobs(request.contextId, "push context", false)
val stack = ctx.contextManager.getStack(request.contextId)
val pushed = request.stackItem match {
case _: Api.StackItem.ExplicitCall if stack.isEmpty =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ class RecomputeContextCmd(
ec: ExecutionContext
): Future[Boolean] = {
Future {
ctx.jobControlPlane.abortJobs(request.contextId, "recompute context")
ctx.jobControlPlane.abortJobs(
request.contextId,
"recompute context",
false
)
val stack = ctx.contextManager.getStack(request.contextId)
if (stack.isEmpty) {
reply(Api.EmptyStackError(request.contextId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,11 @@ final class JobExecutionEngine(
private lazy val logger: TruffleLogger =
runtimeContext.executionService.getLogger

// Inpendent Thread that keeps track of the existing list of pending
// job cancellations.
//
private class ForceJobCancellations extends Runnable {
private val forceInterruptTimeout: Long = 30 * 1000 // 30sec
private val forceInterruptTimeout: Long = 50 * 1000

override def run(): Unit = {
while (pendingCancellations.get().nonEmpty) {
Expand Down Expand Up @@ -117,23 +120,48 @@ final class JobExecutionEngine(
}
}
logger.log(Level.WARNING, sb.toString())
runningJob.future.cancel(true)
runningJob.future.cancel(runningJob.job.mayInterruptIfRunning)
}
try {
if (pending.length != outdated.length)
Thread.sleep(forceInterruptTimeout)
if (pending.length != outdated.length) {
// Calculate optimal sleep time to ensure that Job is killed roughly on time
val nextToCancel = pending
.filter { case (startTime, _) =>
startTime + forceInterruptTimeout > at
}
.sortBy(_._1)
.headOption
val timeout = nextToCancel
.map(j => Math.min(j._1 - at, forceInterruptTimeout))
.getOrElse(forceInterruptTimeout)
Thread.sleep(timeout)
}
} catch {
case e: InterruptedException =>
logger.log(
Level.WARNING,
"Encountered InterruptedException while on status of pending jobs",
"Encountered InterruptedException while waiting on status of pending jobs",
e
)
throw new RuntimeException(e)
}
}
}
}

private def maybeForceCancelRunningJob(
runningJob: RunningJob,
softAbortFirst: Boolean
): Option[RunningJob] = {
val delayJobCancellation =
runningJob.job.mayInterruptIfRunning && softAbortFirst
if (delayJobCancellation) Some(runningJob)
else {
runningJob.future.cancel(runningJob.job.mayInterruptIfRunning)
None
}
}

/** @inheritdoc */
override def runBackground[A](job: BackgroundJob[A]): Unit =
synchronized {
Expand Down Expand Up @@ -174,10 +202,12 @@ final class JobExecutionEngine(
case jobRef: UniqueJob[_] if jobRef.equalsTo(job) =>
logger
.log(Level.FINEST, s"Cancelling duplicate job [$jobRef].")
runningJob.future.cancel(false) //jobRef.mayInterruptIfRunning)
if (jobRef.mayInterruptIfRunning) {
updatePendingCancellations(Seq(runningJob))
}
updatePendingCancellations(
maybeForceCancelRunningJob(
runningJob,
softAbortFirst = true
).toSeq
)
case _ =>
}
}
Expand All @@ -189,6 +219,13 @@ final class JobExecutionEngine(
jobsToCancel: Seq[RunningJob]
): Unit = {
val at = System.currentTimeMillis()
if (jobsToCancel.nonEmpty) {
logger.log(
Level.FINEST,
"Submitting {0} job(s) for future cancellation",
jobsToCancel.map(j => (j.job.getClass, j.id))
)
}
pendingCancellations.updateAndGet(_ ++ jobsToCancel.map((at, _)))
pendingCancellationsExecutor.submit(new ForceJobCancellations)
}
Expand Down Expand Up @@ -259,10 +296,10 @@ final class JobExecutionEngine(
"Aborting {0} jobs because {1}: {2}",
Array[Any](cancellableJobs.length, reason, cancellableJobs.map(_.id))
)
val pending = cancellableJobs.flatMap { runningJob =>
runningJob.future.cancel(false)
Option(runningJob.job.mayInterruptIfRunning).map(_ => runningJob)
}

val pending = cancellableJobs.flatMap(
maybeForceCancelRunningJob(_, softAbortFirst = true)
)
updatePendingCancellations(pending)
runtimeContext.executionService.getContext.getThreadManager
.interruptThreads()
Expand All @@ -272,24 +309,26 @@ final class JobExecutionEngine(
override def abortJobs(
contextId: UUID,
reason: String,
softAbortFirst: Boolean,
toAbort: Class[_ <: Job[_]]*
): Unit = {
val allJobs = runningJobsRef.get()
val contextJobs = allJobs.filter(_.job.contextIds.contains(contextId))
val pending = contextJobs.flatMap { runningJob =>
if (
runningJob.job.isCancellable && (toAbort.isEmpty || toAbort
.contains(runningJob.getClass))
) {
logger.log(
Level.FINE,
"Aborting job {0} because {1}",
Array[Any](runningJob.id, reason)
)
runningJob.future.cancel(false)
Option(runningJob.job.mayInterruptIfRunning).map(_ => runningJob)
} else None
}
val pending = contextJobs
.flatMap { runningJob =>
if (
runningJob.job.isCancellable && (toAbort.isEmpty || toAbort
.contains(runningJob.getClass))
) {
logger.log(
Level.FINE,
"Aborting job {0} because {1}",
Array[Any](runningJob.id, reason)
)
Some(runningJob)
} else None
}
.flatMap(maybeForceCancelRunningJob(_, softAbortFirst))
updatePendingCancellations(pending)
runtimeContext.executionService.getContext.getThreadManager
.interruptThreads()
Expand All @@ -303,17 +342,18 @@ final class JobExecutionEngine(
): Unit = {
val allJobs = runningJobsRef.get()
val contextJobs = allJobs.filter(_.job.contextIds.contains(contextId))
val pending = contextJobs.flatMap { runningJob =>
if (runningJob.job.isCancellable && accept.apply(runningJob.job)) {
logger.log(
Level.FINE,
"Aborting job {0} because {1}",
Array[Any](runningJob.id, reason)
)
runningJob.future.cancel(false)
Option(runningJob.job.mayInterruptIfRunning).map(_ => runningJob)
} else None
}
val pending = contextJobs
.flatMap { runningJob =>
if (runningJob.job.isCancellable && accept.apply(runningJob.job)) {
logger.log(
Level.FINE,
"Aborting job {0} because {1}",
Array[Any](runningJob.id, reason)
)
Some(runningJob)
} else None
}
.flatMap(maybeForceCancelRunningJob(_, softAbortFirst = true))
updatePendingCancellations(pending)
runtimeContext.executionService.getContext.getThreadManager
.interruptThreads()
Expand All @@ -335,10 +375,9 @@ final class JobExecutionEngine(
"Aborting {0} background jobs because {1}: {2}",
Array[Any](cancellableJobs.length, reason, cancellableJobs.map(_.id))
)
val pending = cancellableJobs.flatMap { runningJob =>
runningJob.future.cancel(false)
Option(runningJob.job.mayInterruptIfRunning).map(_ => runningJob)
}
val pending = cancellableJobs.flatMap(
maybeForceCancelRunningJob(_, softAbortFirst = true)
)
updatePendingCancellations(pending)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class RuntimeAsyncCommandsTest
var iteration = 0
while (!isProgramStarted && iteration < 100) {
val out = context.consumeOut
Thread.sleep(200)
Thread.sleep(100)
isProgramStarted = out == List("started")
iteration += 1
}
Expand Down Expand Up @@ -361,7 +361,7 @@ class RuntimeAsyncCommandsTest
var iteration = 0
while (!isProgramStarted && iteration < 100) {
val out = context.consumeOut
Thread.sleep(200)
Thread.sleep(100)
isProgramStarted = out == List("started")
iteration += 1
}
Expand All @@ -386,7 +386,7 @@ class RuntimeAsyncCommandsTest
)
)
)
val responses1 = context.receiveNIgnorePendingExpressionUpdates(3)
val responses1 = context.receiveNIgnorePendingExpressionUpdates(2)
responses1 should contain allOf (
TestMessages.update(
contextId,
Expand Down

0 comments on commit f73ad73

Please sign in to comment.