diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 3ba3c90ff6..75ae8a5917 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -16,6 +16,8 @@ package rx.lang.scala.examples import java.io.IOException +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit import scala.concurrent.duration.Duration import scala.concurrent.duration.DurationInt @@ -790,4 +792,50 @@ class RxScalaDemo extends JUnitSuite { assertEquals(List(1, 2, 3, 4), o.toBlockingObservable.toList) } + @Test def schedulerExample1(): Unit = { + val latch = new CountDownLatch(1) + val worker = IOScheduler().createWorker + worker.schedule { + println("Hello from Scheduler") + latch.countDown() + } + latch.await(5, TimeUnit.SECONDS) + } + + @Test def schedulerExample2(): Unit = { + val latch = new CountDownLatch(1) + val worker = IOScheduler().createWorker + worker.schedule( + { + println("Hello from Scheduler after 1 second") + latch.countDown() + }, 1 seconds) + latch.await(5, TimeUnit.SECONDS) + } + + @Test def schedulerExample3(): Unit = { + val worker = IOScheduler().createWorker + var no = 1 + val subscription = worker.schedulePeriodically( + { + println(s"Hello(${no}) from Scheduler") + no += 1 + }, initialDelay = 1 seconds, period = 100 millis) + TimeUnit.SECONDS.sleep(2) + subscription.unsubscribe() + } + + @Test def schedulerExample4(): Unit = { + val worker = IOScheduler().createWorker + var no = 1 + def hello: Unit = { + println(s"Hello(${no}) from Scheduler") + no += 1 + worker.schedule(hello, 100 millis) + } + val subscription = worker.schedule(hello, 1 seconds) + TimeUnit.SECONDS.sleep(2) + subscription.unsubscribe() + } + } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala index 6d5a24474f..8d4d0b9902 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala @@ -31,7 +31,7 @@ trait Scheduler { /** * Parallelism available to a Scheduler. * - * This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster. + * This defaults to `Runtime.getRuntime().availableProcessors()` but can be overridden for use cases such as scheduling work on a computer cluster. * * @return the scheduler's available degree of parallelism. */ @@ -42,6 +42,15 @@ trait Scheduler { */ def now: Long = this.asJavaScheduler.now() + /** + * Retrieve or create a new [[rx.lang.scala.Worker]] that represents serial execution of actions. + *
+ * When work is completed it should be unsubscribed using [[rx.lang.scala.Worker unsubscribe]]. + *
+ * Work on a [[rx.lang.scala.Worker]] is guaranteed to be sequential. + * + * @return Inner representing a serial queue of actions to be executed + */ def createWorker: Worker = this.asJavaScheduler.createWorker() } @@ -54,24 +63,55 @@ trait Worker extends Subscription { private [scala] val asJavaWorker: rx.Scheduler.Worker /** - * Schedules a cancelable action to be executed in delayTime. + * Schedules an Action for execution at some point in the future. + * + * @param action the Action to schedule + * @param delay time to wait before executing the action + * @return a subscription to be able to unsubscribe the action (unschedule it if not executed) */ - def schedule(action: Unit => Unit, delayTime: Duration): Subscription = + def schedule(action: => Unit, delay: Duration): Subscription = { this.asJavaWorker.schedule( new Action0 { - override def call(): Unit = action() + override def call(): Unit = action }, - delayTime.length, - delayTime.unit) + delay.length, + delay.unit) + } + + /** + * Schedules an Action for execution. + * + * @param action the Action to schedule + * @return a subscription to be able to unsubscribe the action (unschedule it if not executed) + */ + def schedule(action: => Unit): Subscription = { + this.asJavaWorker.schedule( + new Action0 { + override def call(): Unit = action + } + ) + } /** - * Schedules a cancelable action to be executed immediately. + * Schedules a cancelable action to be executed periodically. This default implementation schedules + * recursively and waits for actions to complete (instead of potentially executing long-running actions + * concurrently). Each scheduler that can do periodic scheduling in a better way should override this. + * + * @param action the Action to execute periodically + * @param initialDelay time to wait before executing the action for the first time + * @param period the time interval to wait each time in between executing the action + * @return a subscription to be able to unsubscribe the action (unschedule it if not executed) */ - def schedule(action: Unit => Unit): Subscription = this.asJavaWorker.schedule( - new Action0 { - override def call(): Unit = action() - } - ) + def schedulePeriodically(action: => Unit, initialDelay: Duration, period: Duration): Subscription = { + this.asJavaWorker.schedulePeriodically( + new Action0 { + override def call(): Unit = action + }, + initialDelay.toNanos, + period.toNanos, + duration.NANOSECONDS + ) + } /** * @return the scheduler's notion of current absolute time in milliseconds.