diff --git a/core/src/main/scala/com/criteo/cuttle/Executor.scala b/core/src/main/scala/com/criteo/cuttle/Executor.scala index 9ff691de3..3f3e745ba 100644 --- a/core/src/main/scala/com/criteo/cuttle/Executor.scala +++ b/core/src/main/scala/com/criteo/cuttle/Executor.scala @@ -71,7 +71,11 @@ private[cuttle] class ExecutionStat( val status : ExecutionStatus ) -private object ExecutionCancelledException extends RuntimeException("Execution cancelled") +object ExecutionCancelledException extends RuntimeException("Execution cancelled") +class CancellationListener private[cuttle](execution: Execution[_], private[cuttle] val thunk: () => Unit) { + def unsubscribe() = execution.removeCancelListener(this) + def unsubscribeOn[A](f: Future[A]) = f.andThen { case _ => unsubscribe() } +} case class Execution[S <: Scheduling]( id: String, @@ -81,23 +85,49 @@ case class Execution[S <: Scheduling]( platforms: Seq[ExecutionPlatform], executionContext: ExecutionContext ) { - private[cuttle] val cancelSignal = Promise[Nothing] - def isCancelled = cancelSignal.isCompleted - val cancelled = cancelSignal.future private var waitingSeconds = 0 - var startTime: Option[Instant] = None - - def onCancelled(thunk: () => Unit) = cancelled.andThen { - case Failure(_) => + private[cuttle] var startTime: Option[Instant] = None + private val cancelListeners = TSet.empty[CancellationListener] + private val cancelled = Ref(false) + + def isCancelled = cancelled.single() + def onCancel(thunk: () => Unit): CancellationListener = { + val listener = new CancellationListener(this, thunk) + val alreadyCancelled = atomic { implicit txn => + if(!cancelled()) { + cancelListeners += listener + false + } + else { + true + } + } + if(alreadyCancelled) { thunk() - case Success(_) => - sys.error("Panic, the cancelled future can never succeed!") + } + listener } - def cancel()(implicit user: User): Boolean = - if (cancelSignal.tryFailure(ExecutionCancelledException)) { + private[cuttle] def removeCancelListener(listener: CancellationListener): Unit = atomic { implicit txn => + cancelListeners -= listener + } + + def cancel()(implicit user: User): Boolean = { + val (hasBeenCancelled, listeners) = atomic { implicit txn => + if(cancelled()) { + (false, Nil) + } else { + cancelled() = true + val listeners = cancelListeners.snapshot + cancelListeners.clear() + (true, listeners) + } + } + if(hasBeenCancelled) { streams.debug(s"Execution has been cancelled by user ${user.userId}.") - true - } else false + listeners.foreach(listener => Try(listener.thunk())) + } + hasBeenCancelled + } private[cuttle] def toExecutionLog(status: ExecutionStatus) = ExecutionLog( @@ -141,7 +171,12 @@ case class Execution[S <: Scheduling]( } else { streams.debug(s"Execution parked for $duration...") isWaiting.set(true) - utils.Timeout(duration).andThen { + val p = Promise[Unit] + p.tryCompleteWith(utils.Timeout(duration)) + this.onCancel(() => { + p.tryComplete(Failure(ExecutionCancelledException)) + }).unsubscribeOn(p.future) + p.future.andThen { case _ => streams.debug(s"Resuming") isWaiting.set(false) @@ -586,7 +621,7 @@ class Executor[S <: Scheduling] private[cuttle] ( unsafeDoRun(execution, promise) case Paused => execution.streams.debug(s"Delayed because job ${execution.job.id} is paused") - execution.onCancelled { () => + execution.onCancel { () => val cancelNow = atomic { implicit tx => val isStillPaused = pausedState.get(job.id).getOrElse(Map.empty).contains(execution) if (isStillPaused) { @@ -617,7 +652,7 @@ class Executor[S <: Scheduling] private[cuttle] ( } } timer.schedule(timerTask, java.util.Date.from(launchDate.atZone(ZoneId.systemDefault()).toInstant)) - execution.onCancelled { + execution.onCancel { () => val cancelNow = atomic { implicit tx => val failureKey = (execution.job -> execution.context) diff --git a/core/src/main/scala/com/criteo/cuttle/platforms/WaitingExecutionQueue.scala b/core/src/main/scala/com/criteo/cuttle/platforms/WaitingExecutionQueue.scala index 41eef1bc1..411e2adb8 100644 --- a/core/src/main/scala/com/criteo/cuttle/platforms/WaitingExecutionQueue.scala +++ b/core/src/main/scala/com/criteo/cuttle/platforms/WaitingExecutionQueue.scala @@ -11,6 +11,7 @@ import scala.concurrent._ import scala.concurrent.stm._ import scala.collection.{SortedSet} +import scala.util._ import scala.concurrent.ExecutionContext.Implicits.global import io.circe._ @@ -49,12 +50,12 @@ private[cuttle] trait WaitingExecutionQueue { atomic { implicit txn => _waiting() = _waiting() + entry } - execution.onCancelled(() => { + execution.onCancel(() => { atomic { implicit txn => _waiting() = _waiting() - entry } - result.promise.tryCompleteWith(execution.cancelled) - }) + result.promise.tryComplete(Failure(ExecutionCancelledException)) + }).unsubscribeOn(result.promise.future) runNext() result.promise.future } diff --git a/core/src/main/scala/com/criteo/cuttle/platforms/local/LocalPlatform.scala b/core/src/main/scala/com/criteo/cuttle/platforms/local/LocalPlatform.scala index 39c7d7b9e..4b36aae79 100644 --- a/core/src/main/scala/com/criteo/cuttle/platforms/local/LocalPlatform.scala +++ b/core/src/main/scala/com/criteo/cuttle/platforms/local/LocalPlatform.scala @@ -70,9 +70,9 @@ class LocalProcess(command: String) { process.setProcessListener(handler) val fork = process.start() streams.debug(s"forked with PID ${fork.getPID}") - execution.onCancelled(() => { + execution.onCancel(() => { fork.destroy(true) - }) + }).unsubscribeOn(result.future) result.future } }