Skip to content

Commit

Permalink
Fix issue ReactiveX#1187
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing authored and jbripley committed May 17, 2014
1 parent 29b34a7 commit 5e2fb32
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package rx.lang.scala.examples

import java.io.IOException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
Expand Down Expand Up @@ -790,4 +792,50 @@ class RxScalaDemo extends JUnitSuite {
assertEquals(List(1, 2, 3, 4), o.toBlockingObservable.toList)
}

@Test def schedulerExample1(): Unit = {
val latch = new CountDownLatch(1)
val worker = IOScheduler().createWorker
worker.schedule {
println("Hello from Scheduler")
latch.countDown()
}
latch.await(5, TimeUnit.SECONDS)
}

@Test def schedulerExample2(): Unit = {
val latch = new CountDownLatch(1)
val worker = IOScheduler().createWorker
worker.schedule(
{
println("Hello from Scheduler after 1 second")
latch.countDown()
}, 1 seconds)
latch.await(5, TimeUnit.SECONDS)
}

@Test def schedulerExample3(): Unit = {
val worker = IOScheduler().createWorker
var no = 1
val subscription = worker.schedulePeriodically(
{
println(s"Hello(${no}) from Scheduler")
no += 1
}, initialDelay = 1 seconds, period = 100 millis)
TimeUnit.SECONDS.sleep(2)
subscription.unsubscribe()
}

@Test def schedulerExample4(): Unit = {
val worker = IOScheduler().createWorker
var no = 1
def hello: Unit = {
println(s"Hello(${no}) from Scheduler")
no += 1
worker.schedule(hello, 100 millis)
}
val subscription = worker.schedule(hello, 1 seconds)
TimeUnit.SECONDS.sleep(2)
subscription.unsubscribe()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trait Scheduler {
/**
* Parallelism available to a Scheduler.
*
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
* This defaults to `Runtime.getRuntime().availableProcessors()` but can be overridden for use cases such as scheduling work on a computer cluster.
*
* @return the scheduler's available degree of parallelism.
*/
Expand All @@ -42,6 +42,15 @@ trait Scheduler {
*/
def now: Long = this.asJavaScheduler.now()

/**
* Retrieve or create a new [[rx.lang.scala.Worker]] that represents serial execution of actions.
* <p>
* When work is completed it should be unsubscribed using [[rx.lang.scala.Worker unsubscribe]].
* <p>
* Work on a [[rx.lang.scala.Worker]] is guaranteed to be sequential.
*
* @return Inner representing a serial queue of actions to be executed
*/
def createWorker: Worker = this.asJavaScheduler.createWorker()

}
Expand All @@ -54,24 +63,55 @@ trait Worker extends Subscription {
private [scala] val asJavaWorker: rx.Scheduler.Worker

/**
* Schedules a cancelable action to be executed in delayTime.
* Schedules an Action for execution at some point in the future.
*
* @param action the Action to schedule
* @param delay time to wait before executing the action
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
*/
def schedule(action: Unit => Unit, delayTime: Duration): Subscription =
def schedule(action: => Unit, delay: Duration): Subscription = {
this.asJavaWorker.schedule(
new Action0 {
override def call(): Unit = action()
override def call(): Unit = action
},
delayTime.length,
delayTime.unit)
delay.length,
delay.unit)
}

/**
* Schedules an Action for execution.
*
* @param action the Action to schedule
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
*/
def schedule(action: => Unit): Subscription = {
this.asJavaWorker.schedule(
new Action0 {
override def call(): Unit = action
}
)
}

/**
* Schedules a cancelable action to be executed immediately.
* Schedules a cancelable action to be executed periodically. This default implementation schedules
* recursively and waits for actions to complete (instead of potentially executing long-running actions
* concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
*
* @param action the Action to execute periodically
* @param initialDelay time to wait before executing the action for the first time
* @param period the time interval to wait each time in between executing the action
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
*/
def schedule(action: Unit => Unit): Subscription = this.asJavaWorker.schedule(
new Action0 {
override def call(): Unit = action()
}
)
def schedulePeriodically(action: => Unit, initialDelay: Duration, period: Duration): Subscription = {
this.asJavaWorker.schedulePeriodically(
new Action0 {
override def call(): Unit = action
},
initialDelay.toNanos,
period.toNanos,
duration.NANOSECONDS
)
}

/**
* @return the scheduler's notion of current absolute time in milliseconds.
Expand Down

0 comments on commit 5e2fb32

Please sign in to comment.