Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala Scheduler Bindings Fix #817

Merged
merged 1 commit into from
Feb 5, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Tue Sep 03 10:20:57 PDT 2013
#Wed Feb 05 12:05:54 CET 2014
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=http\://services.gradle.org/distributions/gradle-1.10-bin.zip
distributionUrl=http\://services.gradle.org/distributions/gradle-1.10-all.zip
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ object JavaConversions {
implicit def toScalaSubscription(s: rx.Subscription): Subscription = Subscription(s)

implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = s.asJavaScheduler

implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = Scheduler(s)

implicit def scalaInnerToJavaInner(s: Inner): rx.Scheduler.Inner = s.asJavaInner
implicit def javaInnerToScalaInner(s: rx.Scheduler.Inner): Inner = Inner(s)


implicit def toJavaObserver[T](s: Observer[T]): rx.Observer[_ >: T] = s.asJavaObserver

implicit def toScalaObserver[T](s: rx.Observer[_ >: T]): Observer[T] = Observer(s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object Notification {
* The item passed to the onNext method.
*/
def apply[T](value: T): Notification[T] = {
Notification(new rx.Notification[T](value))
Notification(rx.Notification.createOnNext[T](value))
}

/**
Expand Down Expand Up @@ -128,7 +128,7 @@ object Notification {
* The exception passed to the onNext method.
*/
def apply[T](error: Throwable): Notification[T] = {
Notification(new rx.Notification[T](error))
Notification(rx.Notification.createOnError[T](error))
}

/**
Expand Down Expand Up @@ -156,7 +156,7 @@ object Notification {
* Constructor for onCompleted notifications.
*/
def apply[T](): Notification[T] = {
Notification(new rx.Notification())
Notification(rx.Notification.createOnCompleted[T]())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package rx.lang.scala
import rx.util.functions.FuncN
import rx.Observable.OnSubscribeFunc
import rx.lang.scala.observables.ConnectableObservable
import scala.concurrent.duration


/**
Expand Down Expand Up @@ -2342,17 +2343,53 @@ object Observable {
*
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/interval.png">
*
* @param duration
* @param period
* duration between two consecutive numbers
* @param scheduler
* the scheduler to use
* @return An Observable that emits a number each time interval.
*/
def interval(duration: Duration, scheduler: Scheduler): Observable[Long] = {
toScalaObservable[java.lang.Long](rx.Observable.interval(duration.length, duration.unit, scheduler)).map(_.longValue())
def interval(period: Duration, scheduler: Scheduler): Observable[Long] = {
toScalaObservable[java.lang.Long](rx.Observable.interval(period.length, period.unit, scheduler)).map(_.longValue())
}

/**
* Return an Observable that emits a 0L after the {@code initialDelay} and ever increasing
* numbers after each {@code period} of time thereafter, on a specified Scheduler.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timer.ps.png">
*
* @param initialDelay
* the initial delay time to wait before emitting the first value of 0L
* @param period
* the period of time between emissions of the subsequent numbers
* @return an Observable that emits a 0L after the { @code initialDelay} and ever increasing
* numbers after each { @code period} of time thereafter, while running on the given { @code scheduler}
*/
def timer(initialDelay: Duration, period: Duration): Observable[Long] = {
toScalaObservable[java.lang.Long](rx.Observable.timer(initialDelay.toNanos, period.toNanos, duration.NANOSECONDS)).map(_.longValue())
/*XXX*/
}

/**
* Return an Observable that emits a 0L after the {@code initialDelay} and ever increasing
* numbers after each {@code period} of time thereafter, on a specified Scheduler.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timer.ps.png">
*
* @param initialDelay
* the initial delay time to wait before emitting the first value of 0L
* @param period
* the period of time between emissions of the subsequent numbers
* @param scheduler
* the scheduler on which the waiting happens and items are emitted
* @return an Observable that emits a 0L after the { @code initialDelay} and ever increasing
* numbers after each { @code period} of time thereafter, while running on the given { @code scheduler}
*/
def timer(initialDelay: Duration, period: Duration, scheduler: Scheduler): Observable[Long] = {
toScalaObservable[java.lang.Long](rx.Observable.timer(initialDelay.toNanos, period.toNanos, duration.NANOSECONDS, scheduler)).map(_.longValue())
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,199 +15,107 @@
*/
package rx.lang.scala

import java.util.Date
import scala.concurrent.duration.Duration
import rx.util.functions.{Action0, Action1, Func2}
import rx.util.functions.Action1
import rx.lang.scala.schedulers._
import scala.concurrent.duration
import rx.lang.scala.JavaConversions._

/**
* Represents an object that schedules units of work.
*/
trait Scheduler {
import rx.lang.scala.ImplicitFunctionConversions._

private [scala] val asJavaScheduler: rx.Scheduler

/**
* Schedules a cancelable action to be executed.
* Parallelism available to a Scheduler.
*
* @param action Action to schedule.
* @return a subscription to be able to unsubscribe from action.
*/
def schedule(action: Scheduler => Subscription): Subscription = {
this.schedule[Integer](0, (s: Scheduler, x: Integer) => action(s): Subscription): Subscription
}

/**
* Schedules a cancelable action to be executed.
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
*
* @param state State to pass into the action.
* @param action Action to schedule.
* @return a subscription to be able to unsubscribe from action.
* @return the scheduler's available degree of parallelism.
*/
private [scala] def schedule[T](state: T, action: (Scheduler, T) => Subscription): Subscription = {
Subscription(asJavaScheduler.schedule(state, new Func2[rx.Scheduler, T, rx.Subscription] {
def call(t1: rx.Scheduler, t2: T): rx.Subscription = {
action(Scheduler(t1), t2).asJavaSubscription
}
}))
}
def degreeOfParallelism: Int = asJavaScheduler.degreeOfParallelism

/**
* Schedules a cancelable action to be executed in delayTime.
*
* @param action Action to schedule.
* @param delayTime Time the action is to be delayed before executing.
* @return a subscription to be able to unsubscribe from action.
* @return the scheduler's notion of current absolute time in milliseconds.
*/
def schedule(delayTime: Duration, action: Scheduler => Subscription): Subscription = {
this.schedule[Integer](0, (s: Scheduler, x: Integer) => action(s), delayTime: Duration): Subscription
}
def now: Long = this.asJavaScheduler.now()

/**
* Schedules a cancelable action to be executed in delayTime.
* Schedules a cancelable action to be executed.
*
* @param state
* State to pass into the action.
* @param action
* Action to schedule.
* @param delayTime
* Time the action is to be delayed before executing.
* @param action Action to schedule.
* @return a subscription to be able to unsubscribe from action.
*/
private [scala] def schedule[T](state: T, action: (Scheduler, T) => Subscription, delayTime: Duration): Subscription = {
Subscription(asJavaScheduler.schedule(state, schedulerActionToFunc2(action), delayTime.length, delayTime.unit))
}

/**
* Schedules a cancelable action to be executed periodically.
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
*
* @param action The action to execute periodically.
* @param initialDelay Time to wait before executing the action for the first time.
* @param period The time interval to wait each time in between executing the action.
* @return A subscription to be able to unsubscribe from action.
*/
def schedule(initialDelay: Duration, period: Duration, action: Scheduler => Subscription): Subscription = {
this.schedulePeriodically[Integer](0, (s: Scheduler, x:Integer) => action(s): Subscription, initialDelay: Duration, period: Duration): Subscription
}
def schedule(action: Inner => Unit): Subscription = this.asJavaScheduler.schedule(action)

/**
* Schedules a cancelable action to be executed periodically.
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
*
* @param state
* State to pass into the action.
* @param action
* The action to execute periodically.
* The action to execute periodically.
* @param initialDelay
* Time to wait before executing the action for the first time.
* Time to wait before executing the action for the first time.
* @param period
* The time interval to wait each time in between executing the action.
* The time interval to wait each time in between executing the action.
* @return A subscription to be able to unsubscribe from action.
*/
private [scala] def schedulePeriodically[T](state: T, action: (Scheduler, T) => Subscription, initialDelay: Duration, period: Duration): Subscription = {
Subscription(asJavaScheduler.schedulePeriodically(state, action, initialDelay.length, initialDelay.unit.convert(period.length, period.unit), initialDelay.unit))
}

/**
* Schedules a cancelable action to be executed at dueTime.
*
* @param action Action to schedule.
* @param dueTime Time the action is to be executed. If in the past it will be executed immediately.
* @return a subscription to be able to unsubscribe from action.
*/
def schedule(dueTime: Date, action: Scheduler => Subscription): Subscription = {
this.schedule(0: Integer, (s: Scheduler, x: Integer) => action(s): Subscription, dueTime: Date): Subscription
}
def schedulePeriodically(action: Inner => Unit, initialDelay: Duration, period: Duration): Subscription =
this.asJavaScheduler.schedulePeriodically (
new Action1[rx.Scheduler.Inner] {
override def call(inner: rx.Scheduler.Inner): Unit = action(javaInnerToScalaInner(inner))
},
initialDelay.toNanos,
period.toNanos,
duration.NANOSECONDS
)

/**
* Schedules a cancelable action to be executed at dueTime.
*
* @param state
* State to pass into the action.
* @param action
* Action to schedule.
* @param dueTime
* Time the action is to be executed. If in the past it will be executed immediately.
* @return a subscription to be able to unsubscribe from action.
*/
private [scala] def schedule[T](state: T, action: (Scheduler, T) => Subscription, dueTime: Date): Subscription = {
Subscription(asJavaScheduler.schedule(state, action, dueTime))
def scheduleRec(work: (=>Unit)=>Unit): Subscription = {
Subscription(asJavaScheduler.schedule(new Action1[rx.Scheduler.Inner] {
override def call(inner: rx.Scheduler.Inner): Unit = work{ inner.schedule(this) }
}))
}
}

/**
* Schedules an action to be executed.
*
* @param action
* action
* @return a subscription to be able to unsubscribe from action.
*/
def schedule(action: =>Unit): Subscription = {
Subscription(asJavaScheduler.schedule(()=>action))
}
object Inner {
def apply(inner: rx.Scheduler.Inner): Inner = new Inner { private[scala] val asJavaInner = inner }
}

/**
* Schedules an action to be executed in delayTime.
*
* @param action action
* @return a subscription to be able to unsubscribe from action.
*/
def schedule(delayTime: Duration, action: =>Unit): Subscription = {
Subscription(asJavaScheduler.schedule(()=>action, delayTime.length, delayTime.unit))
}
trait Inner extends Subscription {
private [scala] val asJavaInner: rx.Scheduler.Inner

/**
* Schedules an action to be executed periodically.
*
* @param action
* The action to execute periodically.
* @param initialDelay
* Time to wait before executing the action for the first time.
* @param period
* The time interval to wait each time in between executing the action.
* @return A subscription to be able to unsubscribe from action.
* Schedules a cancelable action to be executed in delayTime.
*/
def schedule(initialDelay: Duration, period: Duration, action: =>Unit): Subscription = {
Subscription(asJavaScheduler.schedulePeriodically(()=>action, initialDelay.length, initialDelay.unit.convert(period.length, period.unit), initialDelay.unit))
}

def scheduleRec(work: (=>Unit)=>Unit): Subscription = {
Subscription(asJavaScheduler.schedule(new Action1[Action0] {
def call(t1: Action0){
work{ t1.call() }
}
}))
}
def schedule(action: Inner => Unit, delayTime: Duration): Unit =
this.asJavaInner.schedule(
new Action1[rx.Scheduler.Inner] {
override def call(inner: rx.Scheduler.Inner): Unit = action(javaInnerToScalaInner(inner))
},
delayTime.length,
delayTime.unit)

/**
* Returns the scheduler's notion of current absolute time in milliseconds.
* Schedules a cancelable action to be executed immediately.
*/
def now: Long = {
asJavaScheduler.now
}
def schedule(action: Inner=>Unit): Unit = this.asJavaInner.schedule(
new Action1[rx.Scheduler.Inner]{
override def call(inner: rx.Scheduler.Inner): Unit = action(javaInnerToScalaInner(inner))
}
)

/**
* Parallelism available to a Scheduler.
*
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
*
* @return the scheduler's available degree of parallelism.
* @return the scheduler's notion of current absolute time in milliseconds.
*/
def degreeOfParallelism: Int = {
asJavaScheduler.degreeOfParallelism
}

def now: Long = this.asJavaInner.now()
}


private [scala] object Scheduler {
def apply(scheduler: rx.Scheduler): Scheduler = scheduler match {
case s: rx.schedulers.CurrentThreadScheduler => new CurrentThreadScheduler(s)
case s: rx.schedulers.ExecutorScheduler => new ExecutorScheduler(s)
case s: rx.schedulers.ImmediateScheduler => new ImmediateScheduler(s)
case s: rx.schedulers.NewThreadScheduler => new NewThreadScheduler(s)
case s: rx.schedulers.TestScheduler => new TestScheduler(s)
case s: rx.Scheduler => new Scheduler{ val asJavaScheduler = s }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package rx.lang.scala.schedulers

import rx.lang.scala.Scheduler


object ComputationScheduler {
/**
* {@link Scheduler} intended for computational work.
* <p>
* This can be used for event-loops, processing callbacks and other computational work.
* <p>
* Do not perform IO-bound work on this scheduler. Use {@link IOScheduler()} instead.
*
* @return { @link Scheduler} for computation-bound work.
*/
def apply(): IOScheduler = {
new IOScheduler(rx.schedulers.Schedulers.computation())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be ComputationScheduler I guess ;-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix that.

On Thu, Feb 6, 2014 at 11:27 AM, samuelgruetter [email protected]:

In
language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ComputationScheduler.scala:

+import rx.lang.scala.Scheduler
+
+
+object ComputationScheduler {

  • /**
  • * {@link Scheduler} intended for computational work.
  • *

  • * This can be used for event-loops, processing callbacks and other computational work.
  • *

  • * Do not perform IO-bound work on this scheduler. Use {@link IOScheduler()} instead.
  • * @return { @link Scheduler} for computation-bound work.
  • */
  • def apply(): IOScheduler = {
  • new IOScheduler(rx.schedulers.Schedulers.computation())
  • }

This should be ComputationScheduler I guess ;-)

Reply to this email directly or view it on GitHubhttps://github.com//pull/817/files#r9495519
.

}

/**
* Created by netflix on 2/5/14.
*/
class ComputationScheduler private[scala] (val asJavaScheduler: rx.Scheduler)
extends Scheduler {}
Loading