diff --git a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/SetExecutionEnvironmentCommand.java b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/SetExecutionEnvironmentCommand.java index 7bf93656c239..3434f747ee3c 100644 --- a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/SetExecutionEnvironmentCommand.java +++ b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/SetExecutionEnvironmentCommand.java @@ -52,7 +52,9 @@ private void setExecutionEnvironment( if (!oldEnvironmentName.equals(executionEnvironment.name())) { ctx.jobControlPlane() .abortJobs( - contextId, "set execution environment to " + executionEnvironment.name()); + contextId, + "set execution environment to " + executionEnvironment.name(), + false); ctx.locking() .withWriteCompilationLock( this.getClass(), diff --git a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/execution/JobControlPlane.java b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/execution/JobControlPlane.java index 58a316c0c247..748f1932969d 100644 --- a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/execution/JobControlPlane.java +++ b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/execution/JobControlPlane.java @@ -21,11 +21,15 @@ public interface JobControlPlane { * Aborts jobs that relates to the specified execution context. * * @param contextId an identifier of a context + * @param reason reason for aborting job(s) + * @param softAbortFirst true if ongoing jobs should be aborted with safepoints first, even if + * marked as interruptible * @param classOf abort jobs of a given class only. If empty all jobs for the given context are * aborted */ @SuppressWarnings("unchecked") - void abortJobs(UUID contextId, String reason, Class>... classOf); + void abortJobs( + UUID contextId, String reason, boolean softAbortFirst, Class>... classOf); /** * Aborts jobs that relate to the specified execution context. diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/DestroyContextCmd.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/DestroyContextCmd.scala index bf8996869e7d..8b59c8c10804 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/DestroyContextCmd.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/DestroyContextCmd.scala @@ -34,7 +34,7 @@ class DestroyContextCmd( } private def removeContext()(implicit ctx: RuntimeContext): Unit = { - ctx.jobControlPlane.abortJobs(request.contextId, "destroy context") + ctx.jobControlPlane.abortJobs(request.contextId, "destroy context", false) val contextLock = ctx.locking.getOrCreateContextLock(request.contextId) try { ctx.locking.withContextLock( diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/InterruptContextCmd.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/InterruptContextCmd.scala index bf2fd3ac3198..c30a61f5bc74 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/InterruptContextCmd.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/InterruptContextCmd.scala @@ -23,7 +23,11 @@ class InterruptContextCmd( ): Future[Unit] = if (doesContextExist) { Future { - ctx.jobControlPlane.abortJobs(request.contextId, "interrupt context") + ctx.jobControlPlane.abortJobs( + request.contextId, + "interrupt context", + false + ) reply(Api.InterruptContextResponse(request.contextId)) } } else { diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/PopContextCmd.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/PopContextCmd.scala index 1b79127bd3ce..5074644fb826 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/PopContextCmd.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/PopContextCmd.scala @@ -43,7 +43,7 @@ class PopContextCmd( ec: ExecutionContext ): Future[Unit] = Future { - ctx.jobControlPlane.abortJobs(request.contextId, "pop context") + ctx.jobControlPlane.abortJobs(request.contextId, "pop context", false) val maybeTopItem = ctx.contextManager.pop(request.contextId) if (maybeTopItem.isDefined) { reply(Api.PopContextResponse(request.contextId)) diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/PushContextCmd.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/PushContextCmd.scala index f77f3490a825..486209ad83a8 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/PushContextCmd.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/PushContextCmd.scala @@ -46,7 +46,7 @@ class PushContextCmd( ec: ExecutionContext ): Future[Boolean] = Future { - ctx.jobControlPlane.abortJobs(request.contextId, "push context") + ctx.jobControlPlane.abortJobs(request.contextId, "push context", false) val stack = ctx.contextManager.getStack(request.contextId) val pushed = request.stackItem match { case _: Api.StackItem.ExplicitCall if stack.isEmpty => diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/RecomputeContextCmd.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/RecomputeContextCmd.scala index 12ac8453f94f..43b6df2bab42 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/RecomputeContextCmd.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/RecomputeContextCmd.scala @@ -42,7 +42,11 @@ class RecomputeContextCmd( ec: ExecutionContext ): Future[Boolean] = { Future { - ctx.jobControlPlane.abortJobs(request.contextId, "recompute context") + ctx.jobControlPlane.abortJobs( + request.contextId, + "recompute context", + false + ) val stack = ctx.contextManager.getStack(request.contextId) if (stack.isEmpty) { reply(Api.EmptyStackError(request.contextId)) diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobExecutionEngine.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobExecutionEngine.scala index e76bff7909d9..1b3b13156b08 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobExecutionEngine.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobExecutionEngine.scala @@ -84,8 +84,11 @@ final class JobExecutionEngine( private lazy val logger: TruffleLogger = runtimeContext.executionService.getLogger + // Inpendent Thread that keeps track of the existing list of pending + // job cancellations. + // private class ForceJobCancellations extends Runnable { - private val forceInterruptTimeout: Long = 30 * 1000 // 30sec + private val forceInterruptTimeout: Long = 50 * 1000 override def run(): Unit = { while (pendingCancellations.get().nonEmpty) { @@ -117,23 +120,48 @@ final class JobExecutionEngine( } } logger.log(Level.WARNING, sb.toString()) - runningJob.future.cancel(true) + runningJob.future.cancel(runningJob.job.mayInterruptIfRunning) } try { - if (pending.length != outdated.length) - Thread.sleep(forceInterruptTimeout) + if (pending.length != outdated.length) { + // Calculate optimal sleep time to ensure that Job is killed roughly on time + val nextToCancel = pending + .filter { case (startTime, _) => + startTime + forceInterruptTimeout > at + } + .sortBy(_._1) + .headOption + val timeout = nextToCancel + .map(j => Math.min(j._1 - at, forceInterruptTimeout)) + .getOrElse(forceInterruptTimeout) + Thread.sleep(timeout) + } } catch { case e: InterruptedException => logger.log( Level.WARNING, - "Encountered InterruptedException while on status of pending jobs", + "Encountered InterruptedException while waiting on status of pending jobs", e ) + throw new RuntimeException(e) } } } } + private def maybeForceCancelRunningJob( + runningJob: RunningJob, + softAbortFirst: Boolean + ): Option[RunningJob] = { + val delayJobCancellation = + runningJob.job.mayInterruptIfRunning && softAbortFirst + if (delayJobCancellation) Some(runningJob) + else { + runningJob.future.cancel(runningJob.job.mayInterruptIfRunning) + None + } + } + /** @inheritdoc */ override def runBackground[A](job: BackgroundJob[A]): Unit = synchronized { @@ -174,10 +202,12 @@ final class JobExecutionEngine( case jobRef: UniqueJob[_] if jobRef.equalsTo(job) => logger .log(Level.FINEST, s"Cancelling duplicate job [$jobRef].") - runningJob.future.cancel(false) //jobRef.mayInterruptIfRunning) - if (jobRef.mayInterruptIfRunning) { - updatePendingCancellations(Seq(runningJob)) - } + updatePendingCancellations( + maybeForceCancelRunningJob( + runningJob, + softAbortFirst = true + ).toSeq + ) case _ => } } @@ -189,6 +219,13 @@ final class JobExecutionEngine( jobsToCancel: Seq[RunningJob] ): Unit = { val at = System.currentTimeMillis() + if (jobsToCancel.nonEmpty) { + logger.log( + Level.FINEST, + "Submitting {0} job(s) for future cancellation", + jobsToCancel.map(j => (j.job.getClass, j.id)) + ) + } pendingCancellations.updateAndGet(_ ++ jobsToCancel.map((at, _))) pendingCancellationsExecutor.submit(new ForceJobCancellations) } @@ -259,10 +296,10 @@ final class JobExecutionEngine( "Aborting {0} jobs because {1}: {2}", Array[Any](cancellableJobs.length, reason, cancellableJobs.map(_.id)) ) - val pending = cancellableJobs.flatMap { runningJob => - runningJob.future.cancel(false) - Option(runningJob.job.mayInterruptIfRunning).map(_ => runningJob) - } + + val pending = cancellableJobs.flatMap( + maybeForceCancelRunningJob(_, softAbortFirst = true) + ) updatePendingCancellations(pending) runtimeContext.executionService.getContext.getThreadManager .interruptThreads() @@ -272,24 +309,26 @@ final class JobExecutionEngine( override def abortJobs( contextId: UUID, reason: String, + softAbortFirst: Boolean, toAbort: Class[_ <: Job[_]]* ): Unit = { val allJobs = runningJobsRef.get() val contextJobs = allJobs.filter(_.job.contextIds.contains(contextId)) - val pending = contextJobs.flatMap { runningJob => - if ( - runningJob.job.isCancellable && (toAbort.isEmpty || toAbort - .contains(runningJob.getClass)) - ) { - logger.log( - Level.FINE, - "Aborting job {0} because {1}", - Array[Any](runningJob.id, reason) - ) - runningJob.future.cancel(false) - Option(runningJob.job.mayInterruptIfRunning).map(_ => runningJob) - } else None - } + val pending = contextJobs + .flatMap { runningJob => + if ( + runningJob.job.isCancellable && (toAbort.isEmpty || toAbort + .contains(runningJob.getClass)) + ) { + logger.log( + Level.FINE, + "Aborting job {0} because {1}", + Array[Any](runningJob.id, reason) + ) + Some(runningJob) + } else None + } + .flatMap(maybeForceCancelRunningJob(_, softAbortFirst)) updatePendingCancellations(pending) runtimeContext.executionService.getContext.getThreadManager .interruptThreads() @@ -303,17 +342,18 @@ final class JobExecutionEngine( ): Unit = { val allJobs = runningJobsRef.get() val contextJobs = allJobs.filter(_.job.contextIds.contains(contextId)) - val pending = contextJobs.flatMap { runningJob => - if (runningJob.job.isCancellable && accept.apply(runningJob.job)) { - logger.log( - Level.FINE, - "Aborting job {0} because {1}", - Array[Any](runningJob.id, reason) - ) - runningJob.future.cancel(false) - Option(runningJob.job.mayInterruptIfRunning).map(_ => runningJob) - } else None - } + val pending = contextJobs + .flatMap { runningJob => + if (runningJob.job.isCancellable && accept.apply(runningJob.job)) { + logger.log( + Level.FINE, + "Aborting job {0} because {1}", + Array[Any](runningJob.id, reason) + ) + Some(runningJob) + } else None + } + .flatMap(maybeForceCancelRunningJob(_, softAbortFirst = true)) updatePendingCancellations(pending) runtimeContext.executionService.getContext.getThreadManager .interruptThreads() @@ -335,10 +375,9 @@ final class JobExecutionEngine( "Aborting {0} background jobs because {1}: {2}", Array[Any](cancellableJobs.length, reason, cancellableJobs.map(_.id)) ) - val pending = cancellableJobs.flatMap { runningJob => - runningJob.future.cancel(false) - Option(runningJob.job.mayInterruptIfRunning).map(_ => runningJob) - } + val pending = cancellableJobs.flatMap( + maybeForceCancelRunningJob(_, softAbortFirst = true) + ) updatePendingCancellations(pending) } diff --git a/engine/runtime-integration-tests/src/test/scala/org/enso/interpreter/test/instrument/RuntimeAsyncCommandsTest.scala b/engine/runtime-integration-tests/src/test/scala/org/enso/interpreter/test/instrument/RuntimeAsyncCommandsTest.scala index a36ce16a7eee..f38b1ef6dd02 100644 --- a/engine/runtime-integration-tests/src/test/scala/org/enso/interpreter/test/instrument/RuntimeAsyncCommandsTest.scala +++ b/engine/runtime-integration-tests/src/test/scala/org/enso/interpreter/test/instrument/RuntimeAsyncCommandsTest.scala @@ -230,7 +230,7 @@ class RuntimeAsyncCommandsTest var iteration = 0 while (!isProgramStarted && iteration < 100) { val out = context.consumeOut - Thread.sleep(200) + Thread.sleep(100) isProgramStarted = out == List("started") iteration += 1 } @@ -361,7 +361,7 @@ class RuntimeAsyncCommandsTest var iteration = 0 while (!isProgramStarted && iteration < 100) { val out = context.consumeOut - Thread.sleep(200) + Thread.sleep(100) isProgramStarted = out == List("started") iteration += 1 } @@ -386,7 +386,7 @@ class RuntimeAsyncCommandsTest ) ) ) - val responses1 = context.receiveNIgnorePendingExpressionUpdates(3) + val responses1 = context.receiveNIgnorePendingExpressionUpdates(2) responses1 should contain allOf ( TestMessages.update( contextId,