diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 4a8ee56345..be7a30e9d6 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -30,8 +30,7 @@ import org.scalatest.junit.JUnitSuite import rx.lang.scala.Notification import rx.lang.scala.Observable -import rx.lang.scala.observable -import rx.lang.scala.concurrency.Schedulers +import rx.lang.scala.concurrency._ @Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily class RxScalaDemo extends JUnitSuite { @@ -167,21 +166,21 @@ class RxScalaDemo extends JUnitSuite { @Test def testTwoSubscriptionsToOneInterval() { val o = Observable.interval(100 millis).take(8) o.subscribe( - i => println(s"${i}a (on thread #${Thread.currentThread().getId()})") + i => println(s"${i}a (on thread #${Thread.currentThread().getId})") ) o.subscribe( - i => println(s"${i}b (on thread #${Thread.currentThread().getId()})") + i => println(s"${i}b (on thread #${Thread.currentThread().getId})") ) waitFor(o) } @Test def schedulersExample() { val o = Observable.interval(100 millis).take(8) - o.observeOn(Schedulers.newThread).subscribe( - i => println(s"${i}a (on thread #${Thread.currentThread().getId()})") + o.observeOn(NewThreadScheduler()).subscribe( + i => println(s"${i}a (on thread #${Thread.currentThread().getId})") ) - o.observeOn(Schedulers.newThread).subscribe( - i => println(s"${i}b (on thread #${Thread.currentThread().getId()})") + o.observeOn(NewThreadScheduler()).subscribe( + i => println(s"${i}b (on thread #${Thread.currentThread().getId})") ) waitFor(o) } @@ -357,13 +356,13 @@ class RxScalaDemo extends JUnitSuite { } def square(x: Int): Int = { - println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId()}") + println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId}") Thread.sleep(100) // calculating a square is heavy work :) x*x } def work(o1: Observable[Int]): Observable[String] = { - println(s"map() is being called on thread ${Thread.currentThread().getId()}") + println(s"map() is being called on thread ${Thread.currentThread().getId}") o1.map(i => s"The square of $i is ${square(i)}") } @@ -428,40 +427,6 @@ class RxScalaDemo extends JUnitSuite { assertEquals("!!", Observable("a", "b", "c").drop(10).firstOrElse("!!").toBlockingObservable.single) } - @Test def observableLikeFuture1() { - implicit val scheduler = Schedulers.threadPoolForIO - val o1 = observable { - Thread.sleep(1000) - 5 - } - val o2 = observable { - Thread.sleep(500) - 4 - } - Thread.sleep(500) - val t1 = System.currentTimeMillis - println((o1 merge o2).first.toBlockingObservable.single) - println(System.currentTimeMillis - t1) - } - - @Test def observableLikeFuture2() { - class Friend {} - val session = new Object { - def getFriends: List[Friend] = List(new Friend, new Friend) - } - - implicit val scheduler = Schedulers.threadPoolForIO - val o: Observable[List[Friend]] = observable { - session.getFriends - } - o.subscribe( - friendList => println(friendList), - err => println(err.getMessage) - ) - - Thread.sleep(1500) // or convert to BlockingObservable - } - @Test def takeWhileWithIndexAlternative { val condition = true Observable("a", "b").zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala index 55df673708..bc1817bdf4 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala @@ -28,7 +28,7 @@ class TestSchedulerExample extends JUnitSuite { scheduler.advanceTimeTo(2 seconds) - val inOrdr = inOrder(observer); + val inOrdr = inOrder(observer) inOrdr.verify(observer, times(1)).onNext(0L) inOrdr.verify(observer, times(1)).onNext(1L) inOrdr.verify(observer, never).onNext(2L) @@ -37,7 +37,7 @@ class TestSchedulerExample extends JUnitSuite { verify(observer, never).onNext(2L) - sub.unsubscribe(); + sub.unsubscribe() scheduler.advanceTimeTo(4 seconds) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala index ead475bda9..a932eca936 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala @@ -17,12 +17,12 @@ package rx.lang.scala import java.lang.Exception import java.{ lang => jlang } -import rx.lang.scala._ -import rx.util.functions._ + import scala.collection.Seq -import java.{lang => jlang} import scala.language.implicitConversions +import rx.util.functions._ + /** * These function conversions convert between Scala functions and Rx `Func`s and `Action`s. * Most RxScala users won't need them, but they might be useful if one wants to use @@ -32,10 +32,17 @@ import scala.language.implicitConversions object ImplicitFunctionConversions { import language.implicitConversions +// implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription): Func2[rx.Scheduler, T, rx.Subscription] with Object {def call(s: rx.Scheduler, t: T): rx.Subscription} = +// new Func2[rx.Scheduler, T, rx.Subscription] { +// def call(s: rx.Scheduler, t: T): rx.Subscription = { +// action(rx.lang.scala.Scheduler(s), t).asJavaSubscription +// } +// } + implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription) = new Func2[rx.Scheduler, T, rx.Subscription] { def call(s: rx.Scheduler, t: T): rx.Subscription = { - action(s, t).asJavaSubscription + action(Scheduler(s), t).asJavaSubscription } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 9f0e1134d6..f031a2730c 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -22,8 +22,6 @@ import rx.Observable.OnSubscribeFunc /** * The Observable interface that implements the Reactive Pattern. * - * @param asJavaObservable the underlying Java observable - * * @define subscribeObserverMain * Call this method to subscribe an [[rx.lang.scala.Observer]] for receiving * items and notifications from the Observable. @@ -227,7 +225,6 @@ trait Observable[+T] * otherwise you'll get a compilation error. * * @usecase def concat[U]: Observable[U] - * @inheritdoc */ def concat[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = { val o2: Observable[Observable[U]] = this @@ -273,7 +270,19 @@ trait Observable[+T] * is the minumum of the number of `onNext` invocations of `this` and `that`. */ def zip[U](that: Observable[U]): Observable[(T, U)] = { - Observable[(T, U)](rx.Observable.zip[T, U, (T, U)](this.asJavaObservable, that.asJavaObservable, (t: T, u: U) => (t, u))) + zip(that, (t: T, u: U) => (t, u)) + } + + /** + * Returns an Observable formed from this Observable and another Observable by combining + * corresponding elements using the selector function. + * The number of `onNext` invocations of the resulting `Observable[(T, U)]` + * is the minumum of the number of `onNext` invocations of `this` and `that`. + * + * Note that this function is private because Scala collections don't have such a function. + */ + private def zip[U, R](that: Observable[U], selector: (T,U) => R): Observable[R] = { + Observable[R](rx.Observable.zip[T, U, R](this.asJavaObservable, that.asJavaObservable, selector)) } /** @@ -1903,7 +1912,7 @@ object Observable { /** * Creates a new Scala Observable from a given Java Observable. */ - def apply[T](observable: rx.Observable[_ <: T]): Observable[T] = { + private [scala] def apply[T](observable: rx.Observable[_ <: T]): Observable[T] = { new Observable[T]{ def asJavaObservable = observable } @@ -1926,13 +1935,13 @@ object Observable { * * * @tparam T - * the type of the items that this Observable emits + * the type of the items that this Observable emits. * @param func * a function that accepts an `Observer[T]`, invokes its `onNext`, `onError`, and `onCompleted` methods * as appropriate, and returns a [[rx.lang.scala.Subscription]] to allow the Observer to - * canceling the subscription - * @return an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given - * function + * canceling the subscription. + * @return + * an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given function. */ def apply[T](func: Observer[T] => Subscription): Observable[T] = { Observable[T](rx.Observable.create(new OnSubscribeFunc[T] { @@ -1942,8 +1951,17 @@ object Observable { })) } + def create[T](func: Observer[T] => Subscription): Observable[T] = { + Observable[T](rx.Observable.create(new OnSubscribeFunc[T] { + def onSubscribe(t1: rx.Observer[_ >: T]): rx.Subscription = { + func(Observer(t1)) + } + })) + } + /** - * Returns an Observable that invokes an [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]] method when the Observer subscribes to it + * Returns an Observable that invokes an [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]] + * method when the Observer subscribes to it. * * * @@ -1951,7 +1969,8 @@ object Observable { * the particular error to report * @tparam T * the type of the items (ostensibly) emitted by the Observable - * @return an Observable that invokes the [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]] method when the Observer subscribes to it + * @return an Observable that invokes the [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]] + * method when the Observer subscribes to it */ def apply[T](exception: Throwable): Observable[T] = { Observable[T](rx.Observable.error(exception)) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observer.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observer.scala index 73f865ef84..68f3e95cd1 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observer.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observer.scala @@ -15,6 +15,8 @@ */ package rx.lang.scala +import rx.joins.ObserverBase + /** Provides a mechanism for receiving push-based notifications. * @@ -24,7 +26,11 @@ package rx.lang.scala */ trait Observer[-T] { - def asJavaObserver: rx.Observer[_ >: T] + private [scala] def asJavaObserver: rx.Observer[_ >: T] = new ObserverBase[T] { + protected def onCompletedCore(): Unit = onCompleted() + protected def onErrorCore(error: Throwable): Unit = onError(error) + protected def onNextCore(value: T): Unit = onNext(value) + } /** * Provides the Observer with new data. @@ -33,30 +39,37 @@ trait Observer[-T] { * * The [[rx.lang.scala.Observable]] will not call this method again after it calls either `onCompleted` or `onError`. */ - def onNext(value: T): Unit = asJavaObserver.onNext(value) + def onNext(value: T): Unit /** * Notifies the Observer that the [[rx.lang.scala.Observable]] has experienced an error condition. * * If the [[rx.lang.scala.Observable]] calls this method, it will not thereafter call `onNext` or `onCompleted`. */ - def onError(error: Throwable): Unit = asJavaObserver.onError(error) + def onError(error: Throwable): Unit /** * Notifies the Observer that the [[rx.lang.scala.Observable]] has finished sending push-based notifications. * * The [[rx.lang.scala.Observable]] will not call this method if it calls `onError`. */ - def onCompleted(): Unit = asJavaObserver.onCompleted() + def onCompleted(): Unit } object Observer { - def apply[T](observer: rx.Observer[T]) : Observer[T] = { - new Observer[T]() { - def asJavaObserver: rx.Observer[_ >: T] = observer - } - } -} + /** + * Assume that the underlying rx.Observer does not need to be wrapped. + */ + private [scala] def apply[T](observer: rx.Observer[T]) : Observer[T] = { + new Observer[T] { + + override def asJavaObserver = observer + def onNext(value: T): Unit = asJavaObserver.onNext(value) + def onError(error: Throwable): Unit = asJavaObserver.onError(error) + def onCompleted(): Unit = asJavaObserver.onCompleted() + } + } +} \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala index 4c84eed840..4f1f89e808 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala @@ -17,16 +17,15 @@ package rx.lang.scala import java.util.Date import scala.concurrent.duration.Duration -import ImplicitFunctionConversions.scalaFunction0ProducingUnitToAction0 -import ImplicitFunctionConversions.schedulerActionToFunc2 import rx.util.functions.{Action0, Action1, Func2} /** - * Represents an object thatimport rx.lang.scala.ImplicitFunctionConversions - schedules units of work. + * Represents an object that schedules units of work. */ trait Scheduler { - def asJavaScheduler: rx.Scheduler + import rx.lang.scala.ImplicitFunctionConversions._ + + val asJavaScheduler: rx.Scheduler /** * Schedules a cancelable action to be executed. @@ -34,7 +33,7 @@ trait Scheduler { * @param action Action to schedule. * @return a subscription to be able to unsubscribe from action. */ - def schedule(action: rx.lang.scala.Scheduler => Subscription): Subscription = { + def schedule(action: Scheduler => Subscription): Subscription = { this.schedule[Integer](0, (s: Scheduler, x: Integer) => action(s): Subscription): Subscription } @@ -60,7 +59,7 @@ trait Scheduler { * @param delayTime Time the action is to be delayed before executing. * @return a subscription to be able to unsubscribe from action. */ - def schedule(delayTime: Duration)(action: Scheduler => Subscription): Subscription = { + def schedule(delayTime: Duration, action: Scheduler => Subscription): Subscription = { this.schedule[Integer](0, (s: Scheduler, x: Integer) => action(s), delayTime: Duration): Subscription } @@ -76,7 +75,8 @@ trait Scheduler { * @return a subscription to be able to unsubscribe from action. */ private def schedule[T](state: T, action: (Scheduler, T) => Subscription, delayTime: Duration): Subscription = { - Subscription(asJavaScheduler.schedule(state, action, delayTime.length, delayTime.unit)) + val xxx = schedulerActionToFunc2(action) + Subscription(asJavaScheduler.schedule(state, xxx, delayTime.length, delayTime.unit)) } /** @@ -89,7 +89,7 @@ trait Scheduler { * @param period The time interval to wait each time in between executing the action. * @return A subscription to be able to unsubscribe from action. */ - def schedule(initialDelay: Duration, period: Duration)(action: Scheduler => Subscription): Subscription = { + def schedule(initialDelay: Duration, period: Duration, action: Scheduler => Subscription): Subscription = { this.schedulePeriodically[Integer](0, (s: Scheduler, x:Integer) => action(s): Subscription, initialDelay: Duration, period: Duration): Subscription } @@ -119,7 +119,7 @@ trait Scheduler { * @param dueTime Time the action is to be executed. If in the past it will be executed immediately. * @return a subscription to be able to unsubscribe from action. */ - def schedule(dueTime: Date)(action: Scheduler => Subscription): Subscription = { + def schedule(dueTime: Date, action: Scheduler => Subscription): Subscription = { this.schedule(0: Integer, (s: Scheduler, x: Integer) => action(s): Subscription, dueTime: Date): Subscription } @@ -155,7 +155,7 @@ trait Scheduler { * @param action action * @return a subscription to be able to unsubscribe from action. */ - def schedule(delayTime: Duration)(action: =>Unit): Subscription = { + def schedule(delayTime: Duration, action: =>Unit): Subscription = { Subscription(asJavaScheduler.schedule(()=>action, delayTime.length, delayTime.unit)) } @@ -170,7 +170,7 @@ trait Scheduler { * The time interval to wait each time in between executing the action. * @return A subscription to be able to unsubscribe from action. */ - def schedule(initialDelay: Duration, period: Duration)(action: =>Unit): Subscription = { + def schedule(initialDelay: Duration, period: Duration, action: =>Unit): Subscription = { Subscription(asJavaScheduler.schedulePeriodically(()=>action, initialDelay.length, initialDelay.unit.convert(period.length, period.unit), initialDelay.unit)) } @@ -213,14 +213,12 @@ trait Scheduler { } -/** - * Provides constructors for Schedulers. - */ object Scheduler { - private class WrapJavaScheduler(val asJavaScheduler: rx.Scheduler) extends Scheduler - - /** - * Constructs a Scala Scheduler from a Java Scheduler. - */ - def apply(s: rx.Scheduler): Scheduler = new WrapJavaScheduler(s) + private [scala] def apply(scheduler: rx.Scheduler): Scheduler = { + new Scheduler() { + val asJavaScheduler = scheduler + } + } } + + diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/Subject.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subject.scala similarity index 60% rename from language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/Subject.scala rename to language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subject.scala index 08ba9e404c..d60b2f3c3d 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/Subject.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subject.scala @@ -15,12 +15,26 @@ */ package rx.lang.scala +import rx.joins.ObserverBase + /** * A Subject is an Observable and an Observer at the same time. */ trait Subject[-T, +R] extends Observable[R] with Observer[T] { val asJavaSubject: rx.subjects.Subject[_ >: T, _<: R] + def asJavaObservable: rx.Observable[_ <: R] = asJavaSubject - def asJavaObserver: rx.Observer[_ >: T] = asJavaSubject + + // temporary hack to workaround bugs in rx Subjects + override def asJavaObserver: rx.Observer[_ >: T] = new ObserverBase[T] { + protected def onNextCore(value: T) = asJavaSubject.onNext(value) + protected def onErrorCore(error: Throwable) = asJavaSubject.onError(error) + protected def onCompletedCore() = asJavaSubject.onCompleted() + } + + def onNext(value: T): Unit = asJavaObserver.onNext(value) + def onError(error: Throwable): Unit = asJavaObserver.onError(error) + def onCompleted(): Unit = asJavaObserver.onCompleted() + } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscription.scala index bd3c6849a9..93e76e4524 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscription.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscription.scala @@ -1,3 +1,4 @@ + /** * Copyright 2013 Netflix, Inc. * @@ -22,7 +23,6 @@ package rx.lang.scala * This interface is the equivalent of `IDisposable` in the .NET Rx implementation. */ trait Subscription { - val asJavaSubscription: rx.Subscription /** @@ -38,19 +38,21 @@ trait Subscription { } object Subscription { + import java.util.concurrent.atomic.AtomicBoolean import rx.lang.scala.subscriptions._ - + + /** * Creates an [[rx.lang.scala.Subscription]] from an [[rx.Subscription]]. */ - def apply(subscription: rx.Subscription): Subscription = { + private [scala] def apply(subscription: rx.Subscription): Subscription = { subscription match { case x: rx.subscriptions.BooleanSubscription => new BooleanSubscription(x) case x: rx.subscriptions.CompositeSubscription => new CompositeSubscription(x) case x: rx.subscriptions.MultipleAssignmentSubscription => new MultipleAssignmentSubscription(x) case x: rx.subscriptions.SerialSubscription => new SerialSubscription(x) - case x: rx.Subscription => Subscription { x.unsubscribe() } + case x: rx.Subscription => apply { x.unsubscribe() } } } @@ -61,6 +63,7 @@ object Subscription { new Subscription() { private val unsubscribed = new AtomicBoolean(false) + def isUnsubscribed = unsubscribed.get() val asJavaSubscription = new rx.Subscription { @@ -69,10 +72,4 @@ object Subscription { } } -} - - - - - - +} \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/CurrentThreadScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/CurrentThreadScheduler.scala new file mode 100644 index 0000000000..500d9c1d33 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/CurrentThreadScheduler.scala @@ -0,0 +1,31 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala.concurrency + +import rx.lang.scala.Scheduler + +object CurrentThreadScheduler { + + /** + * Returns a [[rx.lang.scala.Scheduler]] that queues work on the current thread to be executed after the current work completes. + */ + def apply(): CurrentThreadScheduler = { + new CurrentThreadScheduler(rx.concurrency.Schedulers.currentThread()) + } +} + +class CurrentThreadScheduler private[scala] (val asJavaScheduler: rx.Scheduler) + extends Scheduler {} \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ExecutorScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ExecutorScheduler.scala new file mode 100644 index 0000000000..03c7e79d44 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ExecutorScheduler.scala @@ -0,0 +1,38 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala.concurrency + +import java.util.concurrent.Executor +import rx.lang.scala.Scheduler + +object ExecutorScheduler { + + /** + * Returns a [[rx.lang.scala.Scheduler]] that queues work on an `java.util.concurrent.Executor`. + * + * Note that this does not support scheduled actions with a delay. + */ + def apply(executor: Executor): ExecutorScheduler = { + new ExecutorScheduler(rx.concurrency.Schedulers.executor(executor)) + } +} + + +class ExecutorScheduler private[scala] (val asJavaScheduler: rx.Scheduler) + extends Scheduler {} + + + diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ImmediateScheduler.scala similarity index 58% rename from language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/package.scala rename to language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ImmediateScheduler.scala index c54f83c982..91843a5746 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/package.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ImmediateScheduler.scala @@ -13,9 +13,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.lang.scala +package rx.lang.scala.concurrency + +import rx.lang.scala.Scheduler + +object ImmediateScheduler { + + /** + * Returns a [[rx.lang.scala.Scheduler]] that executes work immediately on the current thread. + */ + def apply(): ImmediateScheduler = { + new ImmediateScheduler(rx.concurrency.Schedulers.immediate()) + } +} + +class ImmediateScheduler private[scala] (val asJavaScheduler: rx.Scheduler) + extends Scheduler {} + -/** - * Provides `Subscription`, and specialized versions of it. - */ -package object subscriptions {} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/NewThreadScheduler.scala similarity index 53% rename from language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/package.scala rename to language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/NewThreadScheduler.scala index a3e61c0021..dc69578082 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/package.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/NewThreadScheduler.scala @@ -1,31 +1,31 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.lang.scala +package rx.lang.scala.concurrency -import rx.concurrency.CurrentThreadScheduler +import rx.lang.scala.Scheduler -/** - * Provides schedulers. - */ -package object concurrency { +object NewThreadScheduler { - // These classes are not exposed to Scala users, but are accessible through rx.lang.scala.concurrency.Schedulers: - - // rx.concurrency.CurrentThreadScheduler - // rx.concurrency.ExecutorScheduler - // rx.concurrency.ImmediateScheduler - // rx.concurrency.NewThreadScheduler + /** + * Returns a [[rx.lang.scala.Scheduler]] that creates a new {@link Thread} for each unit of work. + */ + def apply(): NewThreadScheduler = { + new NewThreadScheduler(rx.concurrency.Schedulers.newThread()) + } } + +class NewThreadScheduler private[scala] (val asJavaScheduler: rx.Scheduler) + extends Scheduler {} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ScheduledExecutorServiceScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ScheduledExecutorServiceScheduler.scala new file mode 100644 index 0000000000..d6e8b11d98 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ScheduledExecutorServiceScheduler.scala @@ -0,0 +1,33 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala.concurrency + +import java.util.concurrent.ScheduledExecutorService +import rx.lang.scala.Scheduler + +object ScheduledExecutorServiceScheduler { + + /** + * Returns a [[rx.lang.scala.Scheduler]] that queues work on an `java.util.concurrent.ScheduledExecutorService`. + */ + def apply(executor: ScheduledExecutorService): ScheduledExecutorServiceScheduler = { + new ScheduledExecutorServiceScheduler(rx.concurrency.Schedulers.executor(executor)) + } +} + +class ScheduledExecutorServiceScheduler private[scala] (val asJavaScheduler: rx.Scheduler) + extends Scheduler {} + diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/Schedulers.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/Schedulers.scala deleted file mode 100644 index cbba7fd1db..0000000000 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/Schedulers.scala +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.lang.scala.concurrency - -import java.util.concurrent.Executor -import java.util.concurrent.ScheduledExecutorService -import rx.lang.scala.Scheduler -import rx.lang.scala.ImplicitFunctionConversions._ - -/** - * Factory methods for creating Schedulers. - */ -object Schedulers { - - /** - * Returns a [[rx.lang.scala.Scheduler]] that executes work immediately on the current thread. - */ - def immediate: Scheduler = Scheduler(rx.concurrency.Schedulers.immediate()) - - /** - * Returns a [[rx.lang.scala.Scheduler]] that queues work on the current thread to be executed after the current work completes. - */ - def currentThread: Scheduler = Scheduler(rx.concurrency.Schedulers.currentThread()) - - /** - * Returns a [[rx.lang.scala.Scheduler]] that creates a new {@link Thread} for each unit of work. - */ - def newThread: Scheduler = Scheduler(rx.concurrency.Schedulers.newThread) - - /** - * Returns a [[rx.lang.scala.Scheduler]] that queues work on an `java.util.concurrent.Executor`. - * - * Note that this does not support scheduled actions with a delay. - */ - def executor(executor: Executor): Scheduler = Scheduler(rx.concurrency.Schedulers.executor(executor)) - - /** - * Returns a [[rx.lang.scala.Scheduler]] that queues work on an `java.util.concurrent.ScheduledExecutorService`. - */ - def executor(executor: ScheduledExecutorService): Scheduler = Scheduler(rx.concurrency.Schedulers.executor(executor)) - - /** - * Returns a [[rx.lang.scala.Scheduler]] intended for computational work. - * - * The implementation is backed by a `java.util.concurrent.ScheduledExecutorService` thread-pool sized to the number of CPU cores. - * - * This can be used for event-loops, processing callbacks and other computational work. - * - * Do not perform IO-bound work on this scheduler. Use [[rx.lang.scala.concurrency.Schedulers.threadPoolForIO]] instead. - */ - def threadPoolForComputation: Scheduler = Scheduler(rx.concurrency.Schedulers.threadPoolForComputation()) - - /** - * [[rx.lang.scala.Scheduler]] intended for IO-bound work. - * - * The implementation is backed by an `java.util.concurrent.Executor` thread-pool that will grow as needed. - * - * This can be used for asynchronously performing blocking IO. - * - * Do not perform computational work on this scheduler. Use [[rx.lang.scala.concurrency.Schedulers.threadPoolForComputation]] instead. - */ - def threadPoolForIO: Scheduler = Scheduler(rx.concurrency.Schedulers.threadPoolForIO()) - -} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/TestScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/TestScheduler.scala index f7b7e4beba..98a0d241b8 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/TestScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/TestScheduler.scala @@ -18,16 +18,25 @@ package rx.lang.scala.concurrency import scala.concurrent.duration.Duration import rx.lang.scala.Scheduler +/** + * Provides constructors for `TestScheduler`. + */ +object TestScheduler { + def apply(): TestScheduler = { + new TestScheduler(new rx.concurrency.TestScheduler()) + } +} + /** * Scheduler with artificial time, useful for testing. - * + * * For example, you could test the `Observable.interval` operation using a `TestScheduler` as follows: - * + * * {{{ * @Test def testInterval() { * import org.mockito.Matchers._ * import org.mockito.Mockito._ - * + * * val scheduler = TestScheduler() * val observer = mock(classOf[rx.Observer[Long]]) * @@ -55,8 +64,7 @@ import rx.lang.scala.Scheduler * } * }}} */ -class TestScheduler extends Scheduler { - val asJavaScheduler = new rx.concurrency.TestScheduler +class TestScheduler private[scala] (val asJavaScheduler: rx.concurrency.TestScheduler) extends Scheduler { def advanceTimeBy(time: Duration) { asJavaScheduler.advanceTimeBy(time.length, time.unit) @@ -70,13 +78,3 @@ class TestScheduler extends Scheduler { asJavaScheduler.triggerActions() } } - -/** - * Provides constructors for `TestScheduler`. - */ -object TestScheduler { - def apply(): TestScheduler = { - new TestScheduler - } -} - diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ThreadPoolForComputationScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ThreadPoolForComputationScheduler.scala new file mode 100644 index 0000000000..8b51083b89 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ThreadPoolForComputationScheduler.scala @@ -0,0 +1,37 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala.concurrency + +import rx.lang.scala.Scheduler + +object ThreadPoolForComputationScheduler { + + /** + * Returns a [[rx.lang.scala.Scheduler]] intended for computational work. + * + * The implementation is backed by a `java.util.concurrent.ScheduledExecutorService` thread-pool sized to the number of CPU cores. + * + * This can be used for event-loops, processing callbacks and other computational work. + * + * Do not perform IO-bound work on this scheduler. Use [[rx.lang.scala.concurrency.ThreadPoolForIOScheduler]] instead. + */ + def apply(): ThreadPoolForComputationScheduler = { + new ThreadPoolForComputationScheduler(rx.concurrency.Schedulers.threadPoolForComputation()) + } +} + +class ThreadPoolForComputationScheduler private[scala] (val asJavaScheduler: rx.Scheduler) + extends Scheduler {} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ThreadPoolForIOScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ThreadPoolForIOScheduler.scala new file mode 100644 index 0000000000..8869cf96b3 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ThreadPoolForIOScheduler.scala @@ -0,0 +1,37 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala.concurrency + +import rx.lang.scala.Scheduler + +object ThreadPoolForIOScheduler { + + /** + * [[rx.lang.scala.Scheduler]] intended for IO-bound work. + * + * The implementation is backed by an `java.util.concurrent.Executor` thread-pool that will grow as needed. + * + * This can be used for asynchronously performing blocking IO. + * + * Do not perform computational work on this scheduler. Use [[rx.lang.scala.concurrency.ThreadPoolForComputationScheduler]] instead. + */ + def apply(): ThreadPoolForIOScheduler = { + new ThreadPoolForIOScheduler(rx.concurrency.Schedulers.threadPoolForIO()) + } +} + +class ThreadPoolForIOScheduler private[scala] (val asJavaScheduler: rx.Scheduler) + extends Scheduler {} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/package.scala index 2b43860b53..12f830dc9a 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/package.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/package.scala @@ -18,7 +18,7 @@ package rx.lang.scala /** * Contains special Observables. * - * In Scala, this package only contains [[BlockingObservable]]. + * In Scala, this package only contains [[rx.lang.scala.observables.BlockingObservable]]. * In the corresponding Java package `rx.observables`, there is also a * `GroupedObservable` and a `ConnectableObservable`, but these are not needed * in Scala, because we use a pair `(key, observable)` instead of `GroupedObservable` diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala index 0809e1fb2b..83352c8426 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala @@ -15,34 +15,9 @@ */ package rx.lang -import java.util.concurrent.TimeUnit -import java.util.Date - /** * This package contains all classes that RxScala users need. - * - * It mirrors the structure of package `rx`, but implementation classes that RxScala users - * will not need are left out. + * + * It basically mirrors the structure of package `rx`, but some changes were made to make it more Scala-idiomatic. */ -package object scala { - - /** - * Allows to construct observables in a similar way as futures. - * - * Example: - * - * {{{ - * implicit val scheduler = Schedulers.threadPoolForIO - * val o: Observable[List[Friend]] = observable { - * session.getFriends - * } - * o.subscribe( - * friendList => println(friendList), - * err => println(err.getMessage) - * ) - * }}} - */ - def observable[T](body: => T)(implicit scheduler: Scheduler): Observable[T] = { - Observable(1).observeOn(scheduler).map(_ => body) - } -} +package object scala {} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/BehaviorSubject.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/BehaviorSubject.scala index 5b358aba9a..9ee8ba9db4 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/BehaviorSubject.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/BehaviorSubject.scala @@ -19,7 +19,7 @@ import rx.lang.scala.Subject object BehaviorSubject { def apply[T](value: T): BehaviorSubject[T] = { - new BehaviorSubject[T](rx.subjects.BehaviorSubject.createWithDefaultValue(value)) + new BehaviorSubject[T](rx.subjects.BehaviorSubject.create(value)) } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/BooleanSubscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/BooleanSubscription.scala index be2cb5f392..a9f2070345 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/BooleanSubscription.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/BooleanSubscription.scala @@ -32,7 +32,7 @@ object BooleanSubscription { def apply(u: => Unit): BooleanSubscription = { new BooleanSubscription(new rx.subscriptions.BooleanSubscription { override def unsubscribe(): Unit = { - if(!super.isUnsubscribed()) { + if(!super.isUnsubscribed) { u super.unsubscribe() } diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index f38ac0d521..7a161c0f8f 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -189,25 +189,25 @@ class CompletenessTest extends JUnitSuite { } @Ignore // because spams output - @Test def printJavaInstanceMethods: Unit = { + @Test def printJavaInstanceMethods(): Unit = { printMethodSet("Instance methods of rx.Observable", typeOf[rx.Observable[_]]) } @Ignore // because spams output - @Test def printScalaInstanceMethods: Unit = { + @Test def printScalaInstanceMethods(): Unit = { printMethodSet("Instance methods of rx.lang.scala.Observable", typeOf[rx.lang.scala.Observable[_]]) } @Ignore // because spams output - @Test def printJavaStaticMethods: Unit = { + @Test def printJavaStaticMethods(): Unit = { printMethodSet("Static methods of rx.Observable", typeOf[rx.Observable[_]].typeSymbol.companionSymbol.typeSignature) } @Ignore // because spams output - @Test def printScalaCompanionMethods: Unit = { + @Test def printScalaCompanionMethods(): Unit = { printMethodSet("Companion methods of rx.lang.scala.Observable", typeOf[rx.lang.scala.Observable.type]) } @@ -227,7 +227,7 @@ class CompletenessTest extends JUnitSuite { } @Ignore // because spams output - @Test def printDefaultMethodCorrespondence: Unit = { + @Test def printDefaultMethodCorrespondence(): Unit = { println("\nDefault Method Correspondence") println( "-----------------------------\n") val c = SortedMap(defaultMethodCorrespondence.toSeq : _*) @@ -238,7 +238,7 @@ class CompletenessTest extends JUnitSuite { } @Ignore // because spams output - @Test def printCorrectedMethodCorrespondence: Unit = { + @Test def printCorrectedMethodCorrespondence(): Unit = { println("\nCorrected Method Correspondence") println( "-------------------------------\n") val c = SortedMap(correspondence.toSeq : _*) @@ -262,7 +262,7 @@ class CompletenessTest extends JUnitSuite { println(s"$status: $bad out of ${bad+good} methods were not found in $tp") } - @Test def checkScalaMethodPresenceVerbose: Unit = { + @Test def checkScalaMethodPresenceVerbose(): Unit = { println("\nTesting that all mentioned Scala methods exist") println( "----------------------------------------------\n") @@ -289,14 +289,14 @@ class CompletenessTest extends JUnitSuite { (javaM, if (actualMethods.contains(scalaM) || scalaM.charAt(0) == '[') scalaM else "[**TODO: missing**]") } - @Test def checkJavaMethodPresence: Unit = { + @Test def checkJavaMethodPresence(): Unit = { println("\nTesting that all mentioned Java methods exist") println( "---------------------------------------------\n") checkMethodPresence(correspondence.keys, typeOf[rx.Observable[_]]) } @Ignore // because we prefer the verbose version - @Test def checkScalaMethodPresence: Unit = { + @Test def checkScalaMethodPresence(): Unit = { checkMethodPresence(correspondence.values, typeOf[rx.lang.scala.Observable[_]]) } @@ -347,8 +347,8 @@ Note: (for (((javaName, scalaCol), pairs) <- ps.groupBy(groupingKey(_)).toList.sortBy(_._1._1)) yield { "| " + formatJavaCol(javaName, pairs.map(_._1)) + " | " + formatScalaCol(scalaCol) + " |" }).foreach(println(_)) - println(s"\nThis table was generated on ${Calendar.getInstance().getTime()}.") - println(s"**Do not edit**. Instead, edit `${getClass().getCanonicalName()}`.") + println(s"\nThis table was generated on ${Calendar.getInstance().getTime}.") + println(s"**Do not edit**. Instead, edit `${getClass.getCanonicalName}`.") } } diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala index d96b23fe43..21f272d03b 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala @@ -37,14 +37,14 @@ class ObservableTests extends JUnitSuite { // If this changes (i.e. it suppresses errors and returns default) then Scala's firstOrElse // should be changed accordingly. @Test def testJavaFirstOrDefault() { - assertEquals(1, rx.Observable.from(1, 2).firstOrDefault(10).toBlockingObservable().single) - assertEquals(10, rx.Observable.empty().firstOrDefault(10).toBlockingObservable().single) + assertEquals(1, rx.Observable.from(1, 2).firstOrDefault(10).toBlockingObservable.single) + assertEquals(10, rx.Observable.empty().firstOrDefault(10).toBlockingObservable.single) val msg = "msg6251" var receivedMsg = "none" try { - rx.Observable.error(new Exception(msg)).firstOrDefault(10).toBlockingObservable().single + rx.Observable.error(new Exception(msg)).firstOrDefault(10).toBlockingObservable.single } catch { - case e: Exception => receivedMsg = e.getCause().getMessage() + case e: Exception => receivedMsg = e.getCause.getMessage } assertEquals(receivedMsg, msg) } @@ -62,7 +62,7 @@ class ObservableTests extends JUnitSuite { try { Observable[Int](new Exception(msg)).firstOrElse(10).toBlockingObservable.single } catch { - case e: Exception => receivedMsg = e.getCause().getMessage() + case e: Exception => receivedMsg = e.getCause.getMessage } assertEquals(receivedMsg, msg) }