Skip to content

Commit

Permalink
feat: Add SchedulerTask which will be notified once cancelled.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Dec 16, 2024
1 parent 7af03e5 commit db126f1
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
final override protected def scheduledFirst(): Cancellable =
schedule(
executor,
new AtomicLong(clock() + initialDelay.toNanos) with Runnable {
new AtomicLong(clock() + initialDelay.toNanos) with SchedulerTask {
override def run(): Unit = {
try {
runnable.run()
Expand All @@ -150,6 +150,11 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
case _: SchedulerException => // ignore failure to enqueue or terminated target actor
}
}

override def cancelled(): Unit = runnable match {
case task: SchedulerTask => task.cancelled()
case _ =>
}
},
roundUp(initialDelay))
}
Expand Down Expand Up @@ -390,7 +395,19 @@ object LightArrayRevolverScheduler {

override def cancel(): Boolean = extractTask(CancelledTask) match {
case ExecutedTask | CancelledTask => false
case _ => true
case task: SchedulerTask =>
notifyCancellation(task)
true
case _ => true
}

private def notifyCancellation(task: SchedulerTask): Unit = {
try {
task.cancelled()
} catch {
case _: Exception =>
// TODO how to logging here?
}
}

override def isCancelled: Boolean = task eq CancelledTask
Expand Down
19 changes: 18 additions & 1 deletion actor/src/main/scala/org/apache/pekko/actor/Scheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ trait Scheduler {
final override protected def scheduledFirst(): Cancellable =
scheduleOnce(
initialDelay,
new Runnable {
new SchedulerTask {
override def run(): Unit = {
try {
runnable.run()
Expand All @@ -97,6 +97,11 @@ trait Scheduler {
case e: IllegalStateException if e.getCause != null && e.getCause.isInstanceOf[SchedulerException] =>
}
}

override def cancelled(): Unit = runnable match {
case task: SchedulerTask => task.cancelled()
case _ =>
}
})
}

Expand Down Expand Up @@ -498,6 +503,18 @@ trait Scheduler {
// this one is just here so we can present a nice AbstractScheduler for Java
abstract class AbstractSchedulerBase extends Scheduler

/**
* A Task that will be notified when it is cancelled.
*/
trait SchedulerTask extends Runnable {

/**
* Called for [[SchedulerTask]]s that are successfully canceled via [[Cancellable#cancel]].
* Overriding this method allows to for example run some cleanup.
*/
def cancelled(): Unit = ()
}

/**
* Signifies something that can be cancelled
* There is no strict guarantee that the implementation is thread-safe,
Expand Down

0 comments on commit db126f1

Please sign in to comment.