Skip to content

Commit

Permalink
Merge pull request #562 from samuelgruetter/RxJavaBugFixesSam
Browse files Browse the repository at this point in the history
Scala Adaptor Improvements by Erik
  • Loading branch information
benjchristensen committed Dec 4, 2013
2 parents 0e4cd7b + 38d4371 commit 90e93b3
Show file tree
Hide file tree
Showing 23 changed files with 361 additions and 264 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import org.scalatest.junit.JUnitSuite

import rx.lang.scala.Notification
import rx.lang.scala.Observable
import rx.lang.scala.observable
import rx.lang.scala.concurrency.Schedulers
import rx.lang.scala.concurrency._

@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
class RxScalaDemo extends JUnitSuite {
Expand Down Expand Up @@ -167,21 +166,21 @@ class RxScalaDemo extends JUnitSuite {
@Test def testTwoSubscriptionsToOneInterval() {
val o = Observable.interval(100 millis).take(8)
o.subscribe(
i => println(s"${i}a (on thread #${Thread.currentThread().getId()})")
i => println(s"${i}a (on thread #${Thread.currentThread().getId})")
)
o.subscribe(
i => println(s"${i}b (on thread #${Thread.currentThread().getId()})")
i => println(s"${i}b (on thread #${Thread.currentThread().getId})")
)
waitFor(o)
}

@Test def schedulersExample() {
val o = Observable.interval(100 millis).take(8)
o.observeOn(Schedulers.newThread).subscribe(
i => println(s"${i}a (on thread #${Thread.currentThread().getId()})")
o.observeOn(NewThreadScheduler()).subscribe(
i => println(s"${i}a (on thread #${Thread.currentThread().getId})")
)
o.observeOn(Schedulers.newThread).subscribe(
i => println(s"${i}b (on thread #${Thread.currentThread().getId()})")
o.observeOn(NewThreadScheduler()).subscribe(
i => println(s"${i}b (on thread #${Thread.currentThread().getId})")
)
waitFor(o)
}
Expand Down Expand Up @@ -357,13 +356,13 @@ class RxScalaDemo extends JUnitSuite {
}

def square(x: Int): Int = {
println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId()}")
println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId}")
Thread.sleep(100) // calculating a square is heavy work :)
x*x
}

def work(o1: Observable[Int]): Observable[String] = {
println(s"map() is being called on thread ${Thread.currentThread().getId()}")
println(s"map() is being called on thread ${Thread.currentThread().getId}")
o1.map(i => s"The square of $i is ${square(i)}")
}

Expand Down Expand Up @@ -428,40 +427,6 @@ class RxScalaDemo extends JUnitSuite {
assertEquals("!!", Observable("a", "b", "c").drop(10).firstOrElse("!!").toBlockingObservable.single)
}

@Test def observableLikeFuture1() {
implicit val scheduler = Schedulers.threadPoolForIO
val o1 = observable {
Thread.sleep(1000)
5
}
val o2 = observable {
Thread.sleep(500)
4
}
Thread.sleep(500)
val t1 = System.currentTimeMillis
println((o1 merge o2).first.toBlockingObservable.single)
println(System.currentTimeMillis - t1)
}

@Test def observableLikeFuture2() {
class Friend {}
val session = new Object {
def getFriends: List[Friend] = List(new Friend, new Friend)
}

implicit val scheduler = Schedulers.threadPoolForIO
val o: Observable[List[Friend]] = observable {
session.getFriends
}
o.subscribe(
friendList => println(friendList),
err => println(err.getMessage)
)

Thread.sleep(1500) // or convert to BlockingObservable
}

@Test def takeWhileWithIndexAlternative {
val condition = true
Observable("a", "b").zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class TestSchedulerExample extends JUnitSuite {

scheduler.advanceTimeTo(2 seconds)

val inOrdr = inOrder(observer);
val inOrdr = inOrder(observer)
inOrdr.verify(observer, times(1)).onNext(0L)
inOrdr.verify(observer, times(1)).onNext(1L)
inOrdr.verify(observer, never).onNext(2L)
Expand All @@ -37,7 +37,7 @@ class TestSchedulerExample extends JUnitSuite {

verify(observer, never).onNext(2L)

sub.unsubscribe();
sub.unsubscribe()

scheduler.advanceTimeTo(4 seconds)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package rx.lang.scala

import java.lang.Exception
import java.{ lang => jlang }
import rx.lang.scala._
import rx.util.functions._

import scala.collection.Seq
import java.{lang => jlang}
import scala.language.implicitConversions

import rx.util.functions._

/**
* These function conversions convert between Scala functions and Rx `Func`s and `Action`s.
* Most RxScala users won't need them, but they might be useful if one wants to use
Expand All @@ -32,10 +32,17 @@ import scala.language.implicitConversions
object ImplicitFunctionConversions {
import language.implicitConversions

// implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription): Func2[rx.Scheduler, T, rx.Subscription] with Object {def call(s: rx.Scheduler, t: T): rx.Subscription} =
// new Func2[rx.Scheduler, T, rx.Subscription] {
// def call(s: rx.Scheduler, t: T): rx.Subscription = {
// action(rx.lang.scala.Scheduler(s), t).asJavaSubscription
// }
// }

implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription) =
new Func2[rx.Scheduler, T, rx.Subscription] {
def call(s: rx.Scheduler, t: T): rx.Subscription = {
action(s, t).asJavaSubscription
action(Scheduler(s), t).asJavaSubscription
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import rx.Observable.OnSubscribeFunc
/**
* The Observable interface that implements the Reactive Pattern.
*
* @param asJavaObservable the underlying Java observable
*
* @define subscribeObserverMain
* Call this method to subscribe an [[rx.lang.scala.Observer]] for receiving
* items and notifications from the Observable.
Expand Down Expand Up @@ -227,7 +225,6 @@ trait Observable[+T]
* otherwise you'll get a compilation error.
*
* @usecase def concat[U]: Observable[U]
* @inheritdoc
*/
def concat[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
val o2: Observable[Observable[U]] = this
Expand Down Expand Up @@ -273,7 +270,19 @@ trait Observable[+T]
* is the minumum of the number of `onNext` invocations of `this` and `that`.
*/
def zip[U](that: Observable[U]): Observable[(T, U)] = {
Observable[(T, U)](rx.Observable.zip[T, U, (T, U)](this.asJavaObservable, that.asJavaObservable, (t: T, u: U) => (t, u)))
zip(that, (t: T, u: U) => (t, u))
}

/**
* Returns an Observable formed from this Observable and another Observable by combining
* corresponding elements using the selector function.
* The number of `onNext` invocations of the resulting `Observable[(T, U)]`
* is the minumum of the number of `onNext` invocations of `this` and `that`.
*
* Note that this function is private because Scala collections don't have such a function.
*/
private def zip[U, R](that: Observable[U], selector: (T,U) => R): Observable[R] = {
Observable[R](rx.Observable.zip[T, U, R](this.asJavaObservable, that.asJavaObservable, selector))
}

/**
Expand Down Expand Up @@ -1903,7 +1912,7 @@ object Observable {
/**
* Creates a new Scala Observable from a given Java Observable.
*/
def apply[T](observable: rx.Observable[_ <: T]): Observable[T] = {
private [scala] def apply[T](observable: rx.Observable[_ <: T]): Observable[T] = {
new Observable[T]{
def asJavaObservable = observable
}
Expand All @@ -1926,13 +1935,13 @@ object Observable {
*
*
* @tparam T
* the type of the items that this Observable emits
* the type of the items that this Observable emits.
* @param func
* a function that accepts an `Observer[T]`, invokes its `onNext`, `onError`, and `onCompleted` methods
* as appropriate, and returns a [[rx.lang.scala.Subscription]] to allow the Observer to
* canceling the subscription
* @return an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given
* function
* canceling the subscription.
* @return
* an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given function.
*/
def apply[T](func: Observer[T] => Subscription): Observable[T] = {
Observable[T](rx.Observable.create(new OnSubscribeFunc[T] {
Expand All @@ -1942,16 +1951,26 @@ object Observable {
}))
}

def create[T](func: Observer[T] => Subscription): Observable[T] = {
Observable[T](rx.Observable.create(new OnSubscribeFunc[T] {
def onSubscribe(t1: rx.Observer[_ >: T]): rx.Subscription = {
func(Observer(t1))
}
}))
}

/**
* Returns an Observable that invokes an [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]] method when the Observer subscribes to it
* Returns an Observable that invokes an [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]]
* method when the Observer subscribes to it.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/error.png">
*
* @param exception
* the particular error to report
* @tparam T
* the type of the items (ostensibly) emitted by the Observable
* @return an Observable that invokes the [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]] method when the Observer subscribes to it
* @return an Observable that invokes the [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]]
* method when the Observer subscribes to it
*/
def apply[T](exception: Throwable): Observable[T] = {
Observable[T](rx.Observable.error(exception))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package rx.lang.scala

import rx.joins.ObserverBase

/**
Provides a mechanism for receiving push-based notifications.
*
Expand All @@ -24,7 +26,11 @@ package rx.lang.scala
*/
trait Observer[-T] {

def asJavaObserver: rx.Observer[_ >: T]
private [scala] def asJavaObserver: rx.Observer[_ >: T] = new ObserverBase[T] {
protected def onCompletedCore(): Unit = onCompleted()
protected def onErrorCore(error: Throwable): Unit = onError(error)
protected def onNextCore(value: T): Unit = onNext(value)
}

/**
* Provides the Observer with new data.
Expand All @@ -33,30 +39,37 @@ trait Observer[-T] {
*
* The [[rx.lang.scala.Observable]] will not call this method again after it calls either `onCompleted` or `onError`.
*/
def onNext(value: T): Unit = asJavaObserver.onNext(value)
def onNext(value: T): Unit

/**
* Notifies the Observer that the [[rx.lang.scala.Observable]] has experienced an error condition.
*
* If the [[rx.lang.scala.Observable]] calls this method, it will not thereafter call `onNext` or `onCompleted`.
*/
def onError(error: Throwable): Unit = asJavaObserver.onError(error)
def onError(error: Throwable): Unit

/**
* Notifies the Observer that the [[rx.lang.scala.Observable]] has finished sending push-based notifications.
*
* The [[rx.lang.scala.Observable]] will not call this method if it calls `onError`.
*/
def onCompleted(): Unit = asJavaObserver.onCompleted()
def onCompleted(): Unit

}

object Observer {
def apply[T](observer: rx.Observer[T]) : Observer[T] = {
new Observer[T]() {
def asJavaObserver: rx.Observer[_ >: T] = observer
}
}
}
/**
* Assume that the underlying rx.Observer does not need to be wrapped.
*/
private [scala] def apply[T](observer: rx.Observer[T]) : Observer[T] = {
new Observer[T] {

override def asJavaObserver = observer

def onNext(value: T): Unit = asJavaObserver.onNext(value)
def onError(error: Throwable): Unit = asJavaObserver.onError(error)
def onCompleted(): Unit = asJavaObserver.onCompleted()

}
}
}
Loading

0 comments on commit 90e93b3

Please sign in to comment.