Skip to content

Commit

Permalink
Merge pull request #54 from lucidsoftware/worker-cancellation
Browse files Browse the repository at this point in the history
Support worker cancellation for all workers
  • Loading branch information
jjudd authored Sep 3, 2024
2 parents 3222051 + c43b82d commit 73faf81
Show file tree
Hide file tree
Showing 38 changed files with 796 additions and 77 deletions.
1 change: 1 addition & 0 deletions .bazelrc_shared
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ build --strategy=ScalaCompile=worker
build --worker_max_instances=4
build --worker_sandboxing
build --experimental_worker_multiplex_sandboxing
build --experimental_worker_cancellation
build --verbose_failures

test --test_output=all
Expand Down
1 change: 1 addition & 0 deletions rules/private/phases/phase_coverage_jacoco.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def phase_coverage_jacoco(ctx, g):
"supports-multiplex-workers": "1",
"supports-workers": "1",
"supports-multiplex-sandboxing": "1",
"supports-worker-cancellation": "1",
},
),
arguments = [args],
Expand Down
1 change: 1 addition & 0 deletions rules/private/phases/phase_zinc_compile.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def phase_zinc_compile(ctx, g):
"supports-multiplex-workers": "1",
"supports-workers": "1",
"supports-multiplex-sandboxing": "1",
"supports-worker-cancellation": "1",
}

# Disable several things if incremental compilation features are going to be used
Expand Down
1 change: 1 addition & 0 deletions rules/private/phases/phase_zinc_depscheck.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def phase_zinc_depscheck(ctx, g):
"supports-multiplex-workers": "1",
"supports-workers": "1",
"supports-multiplex-sandboxing": "1",
"supports-worker-cancellation": "1",
},
),
arguments = [deps_args],
Expand Down
1 change: 1 addition & 0 deletions rules/scala/private/doc.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def scaladoc_implementation(ctx):
"supports-multiplex-workers": "1",
"supports-workers": "1",
"supports-multiplex-sandboxing": "1",
"supports-worker-cancellation": "1",
},
),
input_manifests = input_manifests,
Expand Down
1 change: 1 addition & 0 deletions rules/scala_proto/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ scala_binary(
deps = [
"//src/main/scala/higherkindness/rules_scala/common/args",
"//src/main/scala/higherkindness/rules_scala/common/error",
"//src/main/scala/higherkindness/rules_scala/common/interrupt",
"//src/main/scala/higherkindness/rules_scala/common/sandbox",
"//src/main/scala/higherkindness/rules_scala/common/worker",
"@annex//:net_sourceforge_argparse4j_argparse4j",
Expand Down
4 changes: 4 additions & 0 deletions rules/scala_proto/private/ScalaProtoWorker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package annex.scala.proto
import higherkindness.rules_scala.common.args.ArgsUtil
import higherkindness.rules_scala.common.args.ArgsUtil.PathArgumentType
import higherkindness.rules_scala.common.args.implicits._
import higherkindness.rules_scala.common.interrupt.InterruptUtil
import higherkindness.rules_scala.common.error.AnnexWorkerError
import higherkindness.rules_scala.common.sandbox.SandboxUtil
import higherkindness.rules_scala.common.worker.WorkerMain
Expand Down Expand Up @@ -65,6 +66,7 @@ object ScalaProtoWorker extends WorkerMain[Unit] {

protected def work(ctx: Unit, args: Array[String], out: PrintStream, workDir: Path): Unit = {
val workRequest = ScalaProtoRequest(workDir, ArgsUtil.parseArgsOrFailSafe(args, argParser, out))
InterruptUtil.throwIfInterrupted()

val scalaOut = workRequest.outputDir
Files.createDirectories(scalaOut)
Expand All @@ -83,6 +85,7 @@ object ScalaProtoWorker extends WorkerMain[Unit] {
}
}

InterruptUtil.throwIfInterrupted()
val exitCode = ProtocBridge.runWithGenerators(
new MyProtocRunner,
namedGenerators = List("scala" -> ScalaPbCodeGenerator),
Expand All @@ -91,6 +94,7 @@ object ScalaProtoWorker extends WorkerMain[Unit] {
if (exitCode != 0) {
throw new AnnexWorkerError(exitCode)
}
InterruptUtil.throwIfInterrupted()
}

}
1 change: 1 addition & 0 deletions rules/scala_proto/private/core.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def scala_proto_library_implementation(ctx):
"supports-multiplex-workers": supports_workers,
"supports-workers": supports_workers,
"supports-multiplex-sandboxing": supports_workers,
"supports-worker-cancellation": supports_workers,
},
),
arguments = [args],
Expand Down
1 change: 1 addition & 0 deletions rules/scalafmt/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ scala_binary(
scala = "//src/main/scala:bootstrap",
visibility = ["//visibility:public"],
deps = [
"//src/main/scala/higherkindness/rules_scala/common/interrupt",
"//src/main/scala/higherkindness/rules_scala/common/sandbox",
"//src/main/scala/higherkindness/rules_scala/common/worker",
"//src/main/scala/higherkindness/rules_scala/workers/common",
Expand Down
1 change: 1 addition & 0 deletions rules/scalafmt/private/test.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def build_format(ctx):
"supports-multiplex-workers": "1",
"supports-workers": "1",
"supports-multiplex-sandboxing": "1",
"supports-worker-cancellation": "1",
},
),
mnemonic = "ScalaFmt",
Expand Down
4 changes: 4 additions & 0 deletions rules/scalafmt/scalafmt/ScalafmtRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package annex.scalafmt

import higherkindness.rules_scala.common.args.ArgsUtil
import higherkindness.rules_scala.common.args.ArgsUtil.PathArgumentType
import higherkindness.rules_scala.common.interrupt.InterruptUtil
import higherkindness.rules_scala.common.sandbox.SandboxUtil
import higherkindness.rules_scala.common.worker.WorkerMain
import higherkindness.rules_scala.workers.common.Color
Expand Down Expand Up @@ -46,12 +47,14 @@ object ScalafmtRunner extends WorkerMain[Unit] {

protected[this] def work(worker: Unit, args: Array[String], out: PrintStream, workDir: Path): Unit = {
val workRequest = ScalafmtRequest(workDir, ArgsUtil.parseArgsOrFailSafe(args, argParser, out))
InterruptUtil.throwIfInterrupted()

val source = FileOps.readFile(workRequest.inputFile)(Codec.UTF8)

val config = ScalafmtConfig.fromHoconFile(workRequest.configFile).get
@tailrec
def format(code: String): String = {
InterruptUtil.throwIfInterrupted()
val formatted = Scalafmt.format(code, config).get
if (code == formatted) code else format(formatted)
}
Expand All @@ -73,6 +76,7 @@ object ScalafmtRunner extends WorkerMain[Unit] {
}

Files.write(workRequest.outputFile, output.getBytes)
InterruptUtil.throwIfInterrupted()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package higherkindness.rules_scala
package common.error

class AnnexDuplicateActiveRequestException(
val message: String = "",
val cause: Throwable = null,
) extends Exception(message, cause)
14 changes: 14 additions & 0 deletions src/main/scala/higherkindness/rules_scala/common/interrupt/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
load("//rules:scala.bzl", "scala_library")
load("//rules:scalafmt.bzl", "scala_format_test")

scala_library(
name = "interrupt",
srcs = glob(["*.scala"]),
scala = "//src/main/scala:bootstrap",
visibility = ["//visibility:public"],
)

scala_format_test(
name = "format",
srcs = glob(["*.scala"]),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package higherkindness.rules_scala
package common.interrupt

object InterruptUtil {
def throwIfInterrupted(): Unit = {
if (Thread.interrupted()) {
throw new InterruptedException("WorkRequest was cancelled.")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package higherkindness.rules_scala
package common.worker

import java.util.concurrent.{Callable, FutureTask}
import scala.concurrent.{ExecutionContext, ExecutionException, Future, Promise}
import scala.util.Try

/**
* This is more or less a cancellable Future. It stitches together Scala Future, which is not cancellable, with the Java
* FutureTask, which is cancellable.
*
* However, it uses our extension on FutureTask, which, upon cancellation, waits for the callable to be interrupted or
* complete. That way we can be confident the task is no longer running when we respond to Bazel that it has been
* cancelled.
*
* Heavily inspired by the following: https://github.com/NthPortal/cancellable-task/tree/master
* https://stackoverflow.com/a/39986418/6442597
*/
class CancellableTask[S] private (fn: => S) {
private val promise = Promise[S]()
val future: Future[S] = promise.future

private val fnCallable = new Callable[S]() {
def call(): S = fn
}

private val task = new FutureTaskWaitOnCancel[S](fnCallable) {
override def done() = promise.complete {
Try(get()).recover {
// FutureTask wraps exceptions in an ExecutionException. We want to re-throw the underlying
// error because Scala's Future handles things like fatal exception in a special way that
// we miss out on if they're wrapped in that ExecutionException. Put another way: leaving
// them wrapped in the ExecutionException breaks the contract that Scala Future users expect.
case e: ExecutionException => throw e.getCause()
}
}
}

def cancel(mayInterruptIfRunning: Boolean): Boolean = task.cancel(mayInterruptIfRunning)

def execute(executionContext: ExecutionContext): Unit = executionContext.execute(task)
}

object CancellableTask {
def apply[S](fn: => S): CancellableTask[S] = {
new CancellableTask(fn)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package higherkindness.rules_scala
package common.worker

import java.util.concurrent.{Callable, CancellationException, FutureTask, TimeUnit}
import java.util.concurrent.locks.ReentrantLock

/**
* This is a FutureTask that, when cancelled, waits for its callable to end, either by interruption or by completing.
*
* The regular FutureTask is immediately marked done when you cancel it, regardless of the status of the callable. That
* becomes a problem for if the worker receives many work requests, has them cancelled, and then immediately receives
* more work requests. The callable could still be running, but we've received more work to do.
*
* That can create funky book keeping situations for Bazel: imagine you are running compile actions that take 60 seconds
* and can't be interrupted. Bazel asks you to cancel them 30 seconds after they start. You respond to Bazel saying
* they've been cancelled. Bazel sends you more work requests that also take 60 seconds to compile. You don't have any
* threads to run them as they're all still finishing hte original requests. As a result it looks like these new compile
* actions take 90 seconds because they had to wait 30 seconds to get threads in order to start executing.
*
* This class was heavily inspired by the following:
* https://stackoverflow.com/questions/6040962/wait-for-cancel-on-futuretask?rq=3
*/
class FutureTaskWaitOnCancel[S](
callable: CallableLockedWhileRunning[S],
) extends FutureTask[S](callable) {

def this(callable: Callable[S]) = {
this(new CallableLockedWhileRunning[S](callable))
}

private def waitForCallable(): Unit = {
// If the callable is running, wait for it to complete or be interrupted
callable.isRunning.lock()
callable.isRunning.unlock()
}

override def get(): S = {
try {
super.get()
} catch {
case e: CancellationException =>
waitForCallable()
throw e
}
}

override def get(timeout: Long, unit: TimeUnit): S = {
throw new UnsupportedOperationException()
}

override def cancel(mayInterruptIfRunning: Boolean): Boolean = {
val result = super.cancel(mayInterruptIfRunning)
waitForCallable()
result
}
}

private class CallableLockedWhileRunning[S](callable: Callable[S]) extends Callable[S] {
private[worker] val isRunning = new ReentrantLock()

override def call(): S = {
isRunning.lock()
try {
callable.call()
} finally {
isRunning.unlock()
}
}
}
Loading

0 comments on commit 73faf81

Please sign in to comment.