Skip to content

Commit

Permalink
Better cancel listener design (#170)
Browse files Browse the repository at this point in the history
To avoid memory leak for long execution registering several cancel listeners,
we now allow to unsubscribe when the code is not anymore interested by the cancel
signal.
  • Loading branch information
guillaumebort authored Sep 20, 2017
1 parent 499aeb1 commit 7d1386c
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 22 deletions.
69 changes: 52 additions & 17 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ private[cuttle] class ExecutionStat(
val status : ExecutionStatus
)

private object ExecutionCancelledException extends RuntimeException("Execution cancelled")
object ExecutionCancelledException extends RuntimeException("Execution cancelled")
class CancellationListener private[cuttle](execution: Execution[_], private[cuttle] val thunk: () => Unit) {
def unsubscribe() = execution.removeCancelListener(this)
def unsubscribeOn[A](f: Future[A]) = f.andThen { case _ => unsubscribe() }
}

case class Execution[S <: Scheduling](
id: String,
Expand All @@ -81,23 +85,49 @@ case class Execution[S <: Scheduling](
platforms: Seq[ExecutionPlatform],
executionContext: ExecutionContext
) {
private[cuttle] val cancelSignal = Promise[Nothing]
def isCancelled = cancelSignal.isCompleted
val cancelled = cancelSignal.future
private var waitingSeconds = 0
var startTime: Option[Instant] = None

def onCancelled(thunk: () => Unit) = cancelled.andThen {
case Failure(_) =>
private[cuttle] var startTime: Option[Instant] = None
private val cancelListeners = TSet.empty[CancellationListener]
private val cancelled = Ref(false)

def isCancelled = cancelled.single()
def onCancel(thunk: () => Unit): CancellationListener = {
val listener = new CancellationListener(this, thunk)
val alreadyCancelled = atomic { implicit txn =>
if(!cancelled()) {
cancelListeners += listener
false
}
else {
true
}
}
if(alreadyCancelled) {
thunk()
case Success(_) =>
sys.error("Panic, the cancelled future can never succeed!")
}
listener
}
def cancel()(implicit user: User): Boolean =
if (cancelSignal.tryFailure(ExecutionCancelledException)) {
private[cuttle] def removeCancelListener(listener: CancellationListener): Unit = atomic { implicit txn =>
cancelListeners -= listener
}

def cancel()(implicit user: User): Boolean = {
val (hasBeenCancelled, listeners) = atomic { implicit txn =>
if(cancelled()) {
(false, Nil)
} else {
cancelled() = true
val listeners = cancelListeners.snapshot
cancelListeners.clear()
(true, listeners)
}
}
if(hasBeenCancelled) {
streams.debug(s"Execution has been cancelled by user ${user.userId}.")
true
} else false
listeners.foreach(listener => Try(listener.thunk()))
}
hasBeenCancelled
}

private[cuttle] def toExecutionLog(status: ExecutionStatus) =
ExecutionLog(
Expand Down Expand Up @@ -141,7 +171,12 @@ case class Execution[S <: Scheduling](
} else {
streams.debug(s"Execution parked for $duration...")
isWaiting.set(true)
utils.Timeout(duration).andThen {
val p = Promise[Unit]
p.tryCompleteWith(utils.Timeout(duration))
this.onCancel(() => {
p.tryComplete(Failure(ExecutionCancelledException))
}).unsubscribeOn(p.future)
p.future.andThen {
case _ =>
streams.debug(s"Resuming")
isWaiting.set(false)
Expand Down Expand Up @@ -586,7 +621,7 @@ class Executor[S <: Scheduling] private[cuttle] (
unsafeDoRun(execution, promise)
case Paused =>
execution.streams.debug(s"Delayed because job ${execution.job.id} is paused")
execution.onCancelled { () =>
execution.onCancel { () =>
val cancelNow = atomic { implicit tx =>
val isStillPaused = pausedState.get(job.id).getOrElse(Map.empty).contains(execution)
if (isStillPaused) {
Expand Down Expand Up @@ -617,7 +652,7 @@ class Executor[S <: Scheduling] private[cuttle] (
}
}
timer.schedule(timerTask, java.util.Date.from(launchDate.atZone(ZoneId.systemDefault()).toInstant))
execution.onCancelled {
execution.onCancel {
() =>
val cancelNow = atomic { implicit tx =>
val failureKey = (execution.job -> execution.context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import scala.concurrent._
import scala.concurrent.stm._
import scala.collection.{SortedSet}

import scala.util._
import scala.concurrent.ExecutionContext.Implicits.global

import io.circe._
Expand Down Expand Up @@ -49,12 +50,12 @@ private[cuttle] trait WaitingExecutionQueue {
atomic { implicit txn =>
_waiting() = _waiting() + entry
}
execution.onCancelled(() => {
execution.onCancel(() => {
atomic { implicit txn =>
_waiting() = _waiting() - entry
}
result.promise.tryCompleteWith(execution.cancelled)
})
result.promise.tryComplete(Failure(ExecutionCancelledException))
}).unsubscribeOn(result.promise.future)
runNext()
result.promise.future
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class LocalProcess(command: String) {
process.setProcessListener(handler)
val fork = process.start()
streams.debug(s"forked with PID ${fork.getPID}")
execution.onCancelled(() => {
execution.onCancel(() => {
fork.destroy(true)
})
}).unsubscribeOn(result.future)
result.future
}
}
Expand Down

0 comments on commit 7d1386c

Please sign in to comment.