diff --git a/language-adaptors/rxjava-scala/README.md b/language-adaptors/rxjava-scala/README.md index 470a65744e..c4ad66d0af 100644 --- a/language-adaptors/rxjava-scala/README.md +++ b/language-adaptors/rxjava-scala/README.md @@ -1,8 +1,71 @@ # Scala Adaptor for RxJava -There's an old Scala adaptor ( `rx.lang.scala.RxImplicits` with test `rx.lang.scala.RxImplicitsTest` ), which is deprecated. All other classes in `rx.lang.scala` belong to the new adaptor. +This adaptor allows to use RxJava in Scala with anonymous functions, e.g. -# Binaries +```scala +val o = Observable.interval(200 millis).take(5) +o.subscribe(n => println("n = " + n)) +Observable(1, 2, 3, 4).reduce(_ + _) +``` + +For-comprehensions are also supported: + +```scala +val first = Observable(10, 11, 12) +val second = Observable(10, 11, 12) +val booleans = for ((n1, n2) <- (first zip second)) yield (n1 == n2) +``` + +Further, this adaptor attempts to expose an API which is as Scala-idiomatic as possible. This means that certain methods have been renamed, their signature was changed, or static methods were changed to instance methods. Some examples: + +```scala + // instead of concat: +def ++[U >: T](that: Observable[U]): Observable[U] + +// instance method instead of static: +def zip[U](that: Observable[U]): Observable[(T, U)] + +// the implicit evidence argument ensures that dematerialize can only be called on Observables of Notifications: +def dematerialize[U](implicit evidence: T <:< Notification[U]): Observable[U] + +// additional type parameter U with lower bound to get covariance right: +def onErrorResumeNext[U >: T](resumeFunction: Throwable => Observable[U]): Observable[U] + +// curried in Scala collections, so curry fold also here: +def fold[R](initialValue: R)(accumulator: (R, T) => R): Observable[R] + +// using Duration instead of (long timepan, TimeUnit duration): +def sample(duration: Duration): Observable[T] + +// called skip in Java, but drop in Scala +def drop(n: Int): Observable[T] + +// there's only mapWithIndex in Java, because Java doesn't have tuples: +def zipWithIndex: Observable[(T, Int)] + +// corresponds to Java's toList: +def toSeq: Observable[Seq[T]] + +// the implicit evidence argument ensures that switch can only be called on Observables of Observables: +def switch[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] + +// Java's from becomes apply, and we use Scala Range +def apply(range: Range): Observable[Int] + +// use Bottom type: +def never: Observable[Nothing] +``` + +Also, the Scala Observable is fully covariant in its type parameter, whereas the Java Observable only achieves partial covariance due to limitations of Java's type system (or if you can fix this, your suggestions are very welcome). + +For more examples, see [RxScalaDemo.scala](https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala). + +Scala code using Rx should only import members from `rx.lang.scala` and below. + +Work on this adaptor is still in progress, and for the moment, the best source of documentation are the comments in the source code of [`rx.lang.scala.Observable`](https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala). + + +## Binaries Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-scala%22). diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala new file mode 100644 index 0000000000..b7f13b0783 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala @@ -0,0 +1,129 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala + +import java.{ lang => jlang } +import rx.util.functions._ + +/** + * These function conversions convert between Scala functions and Rx Funcs and Actions. + * Most users RxScala won't need them, but they might be useful if one wants to use + * the rx.Observable directly instead of using rx.lang.scala.Observable or if one wants + * to use a Java library taking/returning Funcs and Actions. + */ +object ImplicitFunctionConversions { + import language.implicitConversions + + implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) = + new rx.Observable.OnSubscribeFunc[T] { + def onSubscribe(obs: Observer[_ >: T]): Subscription = { + f(obs) + } + } + + /** + * Converts a by-name parameter to a Rx Func0 + */ + implicit def scalaByNameParamToFunc0[B](param: => B): Func0[B] = + new Func0[B] { + def call(): B = param + } + + /** + * Converts 0-arg function to Rx Action0 + */ + implicit def scalaFunction0ProducingUnitToAction0(f: (() => Unit)): Action0 = + new Action0 { + def call(): Unit = f() + } + + /** + * Converts 1-arg function to Rx Action1 + */ + implicit def scalaFunction1ProducingUnitToAction1[A](f: (A => Unit)): Action1[A] = + new Action1[A] { + def call(a: A): Unit = f(a) + } + + /** + * Converts 1-arg predicate to Rx Func1[A, java.lang.Boolean] + */ + implicit def scalaBooleanFunction1ToRxBooleanFunc1[A](f: (A => Boolean)): Func1[A, jlang.Boolean] = + new Func1[A, jlang.Boolean] { + def call(a: A): jlang.Boolean = f(a).booleanValue + } + + /** + * Converts 2-arg predicate to Rx Func2[A, B, java.lang.Boolean] + */ + implicit def scalaBooleanFunction2ToRxBooleanFunc1[A, B](f: ((A, B) => Boolean)): Func2[A, B, jlang.Boolean] = + new Func2[A, B, jlang.Boolean] { + def call(a: A, b: B): jlang.Boolean = f(a, b).booleanValue + } + + /** + * Converts a specific function shape (used in takeWhile) to the equivalent Java types with an Rx Func2 + */ + implicit def convertTakeWhileFuncToRxFunc2[A](f: (A, Int) => Boolean): Func2[A, jlang.Integer, jlang.Boolean] = + new Func2[A, jlang.Integer, jlang.Boolean] { + def call(a: A, b: jlang.Integer): jlang.Boolean = f(a, b).booleanValue + } + + /** + * Converts a function shaped ilke compareTo into the equivalent Rx Func2 + */ + implicit def convertComparisonFuncToRxFunc2[A](f: (A, A) => Int): Func2[A, A, jlang.Integer] = + new Func2[A, A, jlang.Integer] { + def call(a1: A, a2: A): jlang.Integer = f(a1, a2).intValue + } + + /** + * This implicit allows Scala code to use any exception type and still work + * with invariant Func1 interface + */ + implicit def exceptionFunction1ToRxExceptionFunc1[A <: Exception, B](f: (A => B)): Func1[Exception, B] = + new Func1[Exception, B] { + def call(ex: Exception): B = f(ex.asInstanceOf[A]) + } + + /** + * The following implicits convert functions of different arities into the Rx equivalents + */ + implicit def scalaFunction0ToRxFunc0[A](f: () => A): Func0[A] = + new Func0[A] { + def call(): A = f() + } + + implicit def scalaFunction1ToRxFunc1[A, B](f: (A => B)): Func1[A, B] = + new Func1[A, B] { + def call(a: A): B = f(a) + } + + implicit def scalaFunction2ToRxFunc2[A, B, C](f: (A, B) => C): Func2[A, B, C] = + new Func2[A, B, C] { + def call(a: A, b: B) = f(a, b) + } + + implicit def scalaFunction3ToRxFunc3[A, B, C, D](f: (A, B, C) => D): Func3[A, B, C, D] = + new Func3[A, B, C, D] { + def call(a: A, b: B, c: C) = f(a, b, c) + } + + implicit def scalaFunction4ToRxFunc4[A, B, C, D, E](f: (A, B, C, D) => E): Func4[A, B, C, D, E] = + new Func4[A, B, C, D, E] { + def call(a: A, b: B, c: C, d: D) = f(a, b, c, d) + } +} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 4c4cccf4a0..d26602bada 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -20,7 +20,6 @@ package rx.lang.scala import org.scalatest.junit.JUnitSuite import scala.collection.Seq import rx.lang.scala.observables.BlockingObservable -import rx.lang.scala.observables.ConnectableObservable /** @@ -38,7 +37,8 @@ class Observable[+T](val asJava: rx.Observable[_ <: T]) import rx.util.functions._ import rx.lang.scala.{Notification, Subscription, Scheduler, Observer} import rx.lang.scala.util._ - import rx.lang.scala.internal.ImplicitFunctionConversions._ + import rx.lang.scala.subjects.Subject + import rx.lang.scala.ImplicitFunctionConversions._ /** * An {@link Observer} must call an Observable's {@code subscribe} method in order to @@ -132,11 +132,13 @@ class Observable[+T](val asJava: rx.Observable[_ <: T]) * into * @param * result type - * @return a {@link ConnectableObservable} that upon connection causes the source Observable to - * push results into the specified {@link Subject} + * @return a pair of a start function and an {@link Observable} such that when the start function + * is called, the Observable starts to push results into the specified {@link Subject} */ - // public ConnectableObservable multicast(Subject subject) TODO - + def multicast[R](subject: Subject[T, R]): (() => Subscription, Observable[R]) = { + val javaCO = asJava.multicast[R](subject) + (() => javaCO.connect(), Observable[R](javaCO)) + } /** * Returns an Observable that first emits the items emitted by this, and then the items emitted @@ -904,11 +906,12 @@ class Observable[+T](val asJava: rx.Observable[_ <: T]) *

* * - * @return a {@link ConnectableObservable} that upon connection causes the source Observable to - * emit items to its {@link Observer}s + * @return a pair of a start function and an {@link Observable} such that when the start function + * is called, the Observable starts to emit items to its {@link Observer}s */ - def replay(): ConnectableObservable[T] = { - new ConnectableObservable[T](asJava.replay()) + def replay(): (() => Subscription, Observable[T]) = { + val javaCO = asJava.replay() + (() => javaCO.connect(), Observable[T](javaCO)) } /** @@ -937,11 +940,12 @@ class Observable[+T](val asJava: rx.Observable[_ <: T]) *

* * - * @return a {@link ConnectableObservable} that upon connection causes the source Observable to - * emit items to its {@link Observer}s + * @return a pair of a start function and an {@link Observable} such that when the start function + * is called, the Observable starts to emit items to its {@link Observer}s */ - def publish: ConnectableObservable[T] = { - new ConnectableObservable[T](asJava.publish()) + def publish: (() => Subscription, Observable[T]) = { + val javaCO = asJava.publish() + (() => javaCO.connect(), Observable[T](javaCO)) } // There is no aggregate function with signature @@ -1215,51 +1219,25 @@ class Observable[+T](val asJava: rx.Observable[_ <: T]) // because we can just use ++ instead /** - * Groups the items emitted by an Observable according to a specified criterion, and emits these - * grouped items as {@link GroupedObservable}s, one GroupedObservable per group. - *

- * + * Groups the items emitted by this Observable according to a specified discriminator function. * - * @param keySelector + * @param f * a function that extracts the key from an item - * @param elementSelector - * a function to map a source item to an item in a {@link GroupedObservable} - * @param - * the key type - * @param - * the type of items emitted by the resulting {@link GroupedObservable}s - * @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a - * unique key value and emits items representing items from the source Observable that - * share that key value - */ - /* TODO make a Scala GroupedObservable and groupBy - def groupBy[K,R](keySelector: T => K, elementSelector: T => R ): Observable[GroupedObservable[K,R]] = { - ??? - } - */ - // public Observable> groupBy(final Func1 keySelector, final Func1 elementSelector) - - /** - * Groups the items emitted by an Observable according to a specified criterion, and emits these - * grouped items as {@link GroupedObservable}s, one GroupedObservable per group. - *

- * - * - * @param keySelector - * a function that extracts the key for each item * @param - * the key type - * @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a - * unique key value and emits items representing items from the source Observable that - * share that key value + * the type of keys returned by the discriminator function. + * @return an Observable that emits {@code (key, observable)} pairs, where {@code observable} + * contains all items for which {@code f} returned {@code key}. */ - /* TODO - def groupBy[K](keySelector: T => K ): Observable[GroupedObservable[K,T]] = { - ??? + def groupBy[K](f: T => K): Observable[(K, Observable[T])] = { + val o1 = asJava.groupBy[K](f) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]] + val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey(), Observable[T](o)) + Observable[(K, Observable[T])](o1.map[(K, Observable[T])](func)) } - */ - // public Observable> groupBy(final Func1 keySelector) + // There's no method corresponding to + // public Observable> groupBy(final Func1 keySelector, final Func1 elementSelector) + // because this can be obtained by combining groupBy and map (as in Scala) + /** * Given an Observable that emits Observables, creates a single Observable that * emits the items emitted by the most recently published of those Observables. @@ -1482,7 +1460,7 @@ object Observable { import rx.{Observable => JObservable} import rx.lang.scala.{Notification, Subscription, Scheduler, Observer} import rx.lang.scala.util._ - import rx.lang.scala.internal.ImplicitFunctionConversions._ + import rx.lang.scala.ImplicitFunctionConversions._ private[scala] def jObsOfListToScObsOfSeq[T](jObs: rx.Observable[_ <: java.util.List[T]]): Observable[Seq[T]] = { @@ -1800,7 +1778,7 @@ object Observable { // "implementation restriction: nested class is not allowed in value class. // This restriction is planned to be removed in subsequent releases." class WithFilter[+T] private[scala] (p: T => Boolean, asJava: rx.Observable[_ <: T]) { - import rx.lang.scala.internal.ImplicitFunctionConversions._ + import rx.lang.scala.ImplicitFunctionConversions._ def map[B](f: T => B): Observable[B] = { Observable[B](asJava.filter(p).map[B](f)) @@ -1852,7 +1830,7 @@ class UnitTestSuite extends JUnitSuite { @Test def testTest() = { val a: Observable[Int] = Observable() - assertEquals(4, Observable(1, 2, 3, 4).toBlockingObservable.last) + assertEquals(4, Observable(1, 2, 3, 4).toBlockingObservable.toIterable.last) } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala new file mode 100644 index 0000000000..d826aa58e8 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala @@ -0,0 +1,60 @@ +package rx.lang.scala.examples + +import rx.lang.scala.Observable +import scala.concurrent.duration._ + +object Olympics { + case class Medal(val year: Int, val games: String, val discipline: String, val medal: String, val athlete: String, val country: String) + + def mountainBikeMedals: Observable[Medal] = Observable( + Medal(1996, "Atlanta 1996", "cross-country men", "Gold", "Bart BRENTJENS", "Netherlands"), + Medal(1996, "Atlanta 1996", "cross-country women", "Gold", "Paola PEZZO", "Italy"), + Medal(1996, "Atlanta 1996", "cross-country men", "Silver", "Thomas FRISCHKNECHT", "Switzerland"), + Medal(1996, "Atlanta 1996", "cross-country women", "Silver", "Alison SYDOR", "Canada"), + Medal(1996, "Atlanta 1996", "cross-country men", "Bronze", "Miguel MARTINEZ", "France"), + Medal(1996, "Atlanta 1996", "cross-country women", "Bronze", "Susan DEMATTEI", "United States of America") + ) ++ fourYearsEmpty ++ Observable( + Medal(2000, "Sydney 2000", "cross-country women", "Gold", "Paola PEZZO", "Italy"), + Medal(2000, "Sydney 2000", "cross-country women", "Silver", "Barbara BLATTER", "Switzerland"), + Medal(2000, "Sydney 2000", "cross-country women", "Bronze", "Marga FULLANA", "Spain"), + Medal(2000, "Sydney 2000", "cross-country men", "Gold", "Miguel MARTINEZ", "France"), + Medal(2000, "Sydney 2000", "cross-country men", "Silver", "Filip MEIRHAEGHE", "Belgium"), + Medal(2000, "Sydney 2000", "cross-country men", "Bronze", "Christoph SAUSER", "Switzerland") + ) ++ fourYearsEmpty ++ Observable( + Medal(2004, "Athens 2004", "cross-country men", "Gold", "Julien ABSALON", "France"), + Medal(2004, "Athens 2004", "cross-country men", "Silver", "Jose Antonio HERMIDA RAMOS", "Spain"), + Medal(2004, "Athens 2004", "cross-country men", "Bronze", "Bart BRENTJENS", "Netherlands"), + Medal(2004, "Athens 2004", "cross-country women", "Gold", "Gunn-Rita DAHLE", "Norway"), + Medal(2004, "Athens 2004", "cross-country women", "Silver", "Marie-Helene PREMONT", "Canada"), + Medal(2004, "Athens 2004", "cross-country women", "Bronze", "Sabine SPITZ", "Germany") + ) ++ fourYearsEmpty ++ Observable( + Medal(2008, "Beijing 2008", "cross-country women", "Gold", "Sabine SPITZ", "Germany"), + Medal(2008, "Beijing 2008", "cross-country women", "Silver", "Maja WLOSZCZOWSKA", "Poland"), + Medal(2008, "Beijing 2008", "cross-country women", "Bronze", "Irina KALENTYEVA", "Russian Federation"), + Medal(2008, "Beijing 2008", "cross-country men", "Gold", "Julien ABSALON", "France"), + Medal(2008, "Beijing 2008", "cross-country men", "Silver", "Jean-Christophe PERAUD", "France"), + Medal(2008, "Beijing 2008", "cross-country men", "Bronze", "Nino SCHURTER", "Switzerland") + ) ++ fourYearsEmpty ++ Observable( + Medal(2012, "London 2012", "cross-country men", "Gold", "Jaroslav KULHAVY", "Czech Republic"), + Medal(2012, "London 2012", "cross-country men", "Silver", "Nino SCHURTER", "Switzerland"), + Medal(2012, "London 2012", "cross-country men", "Bronze", "Marco Aurelio FONTANA", "Italy"), + Medal(2012, "London 2012", "cross-country women", "Gold", "Julie BRESSET", "France"), + Medal(2012, "London 2012", "cross-country women", "Silver", "Sabine SPITZ", "Germany"), + Medal(2012, "London 2012", "cross-country women", "Bronze", "Georgia GOULD", "United States of America") + ) + + // speed it up :D + val fourYears = 4000.millis + + val neverUsedDummyMedal = Medal(3333, "?", "?", "?", "?", "?") + + def fourYearsEmpty: Observable[Medal] = { + // TODO this should return an observable which emits nothing during fourYears and then completes + // Because of https://github.com/Netflix/RxJava/issues/388, we get non-terminating tests + // So we don't use this: + // Observable.interval(fourYears).take(1).map(i => neverUsedDummyMedal).filter(m => false) + // But we just return empty, which completes immediately + Observable() + } + +} \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala index d46bfe1d3b..6030329b26 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -136,7 +136,6 @@ class RxScalaDemo extends JUnitSuite { } @Test def testTwoSubscriptionsToOneInterval() { - // TODO this does not yet work as expected! val o = Observable.interval(100 millis).take(8) o.subscribe( i => println(s"${i}a (on thread #${Thread.currentThread().getId()})") @@ -158,11 +157,89 @@ class RxScalaDemo extends JUnitSuite { waitFor(o) } + @Test def testGroupByThenFlatMap() { + val m = Observable(1, 2, 3, 4) + val g = m.groupBy(i => i % 2) + val t = g.flatMap((p: (Int, Observable[Int])) => p._2) + assertEquals(List(1, 2, 3, 4), t.toBlockingObservable.toList) + } + + @Test def testGroupByThenFlatMapByForComprehension() { + val m = Observable(1, 2, 3, 4) + val g = m.groupBy(i => i % 2) + val t = for ((i, o) <- g; n <- o) yield n + assertEquals(List(1, 2, 3, 4), t.toBlockingObservable.toList) + } + + @Test def testGroupByThenFlatMapByForComprehensionWithTiming() { + val m = Observable.interval(100 millis).take(4) + val g = m.groupBy(i => i % 2) + val t = for ((i, o) <- g; n <- o) yield n + assertEquals(List(0, 1, 2, 3), t.toBlockingObservable.toList) + } + + @Test def groupByExample() { + val medalsByCountry = Olympics.mountainBikeMedals.groupBy(medal => medal.country) + + val firstMedalOfEachCountry = + for ((country, medals) <- medalsByCountry; firstMedal <- medals.take(1)) yield firstMedal + + firstMedalOfEachCountry.subscribe(medal => { + println(s"${medal.country} wins its first medal in ${medal.year}") + }) + + waitFor(firstMedalOfEachCountry) + } + + @Test def exampleWithoutPublish() { + val unshared = Observable(1 to 4) + unshared.subscribe(n => println(s"subscriber 1 gets $n")) + unshared.subscribe(n => println(s"subscriber 2 gets $n")) + } + + @Test def exampleWithPublish() { + val unshared = Observable(1 to 4) + val (startFunc, shared) = unshared.publish + shared.subscribe(n => println(s"subscriber 1 gets $n")) + shared.subscribe(n => println(s"subscriber 2 gets $n")) + startFunc() + } + + def doLater(waitTime: Duration, action: () => Unit): Unit = { + Observable.interval(waitTime).take(1).subscribe(_ => action()) + } + + @Test def exampleWithoutReplay() { + val numbers = Observable.interval(1000 millis).take(6) + val (startFunc, sharedNumbers) = numbers.publish + sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n")) + startFunc() + // subscriber 2 misses 0, 1, 2! + doLater(3500 millis, () => { sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) }) + waitFor(sharedNumbers) + } + + @Test def exampleWithReplay() { + val numbers = Observable.interval(1000 millis).take(6) + val (startFunc, sharedNumbers) = numbers.replay + sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n")) + startFunc() + // subscriber 2 subscribes later but still gets all numbers + doLater(3500 millis, () => { sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) }) + waitFor(sharedNumbers) + } + + @Test def testSingleOption() { + assertEquals(None, Observable(1, 2).toBlockingObservable.singleOption) + assertEquals(Some(1), Observable(1) .toBlockingObservable.singleOption) + assertEquals(None, Observable() .toBlockingObservable.singleOption) + } + def output(s: String): Unit = println(s) // blocks until obs has completed def waitFor[T](obs: Observable[T]): Unit = { - obs.toBlockingObservable.last + obs.toBlockingObservable.toIterable.last } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/internal/ImplicitFunctionConversions.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/internal/ImplicitFunctionConversions.scala deleted file mode 100644 index e5f6e49fc6..0000000000 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/internal/ImplicitFunctionConversions.scala +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.lang.scala.internal - - -import java.{lang => jlang} -import rx.util.functions.Action0 -import rx.util.functions.Action1 -import rx.util.functions.Func0 -import rx.util.functions.Func1 -import rx.util.functions.Func2 -import rx.util.functions.Func3 -import rx.util.functions.Func4 -import java.{lang => jlang} -import rx.Observer -import rx.Subscription -import java.{lang => jlang} -import scala.language.implicitConversions - -/** - * These function conversions are only used by the ScalaAdapter, users of RxScala don't need them. - */ -object ImplicitFunctionConversions { - // code below is copied from - // https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/RxImplicits.scala - - import java.{ lang => jlang } - import language.implicitConversions - - import rx.observables.BlockingObservable - import rx.util.functions._ - import rx.{Observer, Subscription} - - implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) = - new rx.Observable.OnSubscribeFunc[T] { - def onSubscribe(obs: Observer[_ >: T]): Subscription = { - f(obs) - } - } - - /*implicit def scalaFunction1ToOnSubscribeFunc[T](f: Observer[_ >: T] => Subscription) = - new rx.Observable.OnSubscribeFunc[T] { - def onSubscribe(obs: Observer[_ >: T]): Subscription = { - f(obs) - } - }*/ - - /** - * Converts a by-name parameter to a Rx Func0 - */ - implicit def scalaByNameParamToFunc0[B](param: => B): Func0[B] = - new Func0[B]{ - def call(): B = param - } - - - /** - * Converts 0-arg function to Rx Action0 - */ - implicit def scalaFunction0ProducingUnitToAction0(f: (() => Unit)): Action0 = - new Action0 { - def call(): Unit = f() - } - - /** - * Converts 1-arg function to Rx Action1 - */ - implicit def scalaFunction1ProducingUnitToAction1[A](f: (A => Unit)): Action1[A] = - new Action1[A] { - def call(a: A): Unit = f(a) - } - - /** - * Converts 1-arg predicate to Rx Func1[A, java.lang.Boolean] - */ - implicit def scalaBooleanFunction1ToRxBooleanFunc1[A](f: (A => Boolean)): Func1[A, jlang.Boolean] = - new Func1[A, jlang.Boolean] { - def call(a: A): jlang.Boolean = f(a).booleanValue - } - - /** - * Converts 2-arg predicate to Rx Func2[A, B, java.lang.Boolean] - */ - implicit def scalaBooleanFunction2ToRxBooleanFunc1[A, B](f: ((A, B) => Boolean)): Func2[A, B, jlang.Boolean] = - new Func2[A, B, jlang.Boolean] { - def call(a: A, b: B): jlang.Boolean = f(a, b).booleanValue - } - - /** - * Converts a specific function shape (used in takeWhile) to the equivalent Java types with an Rx Func2 - */ - implicit def convertTakeWhileFuncToRxFunc2[A](f: (A, Int) => Boolean): Func2[A, jlang.Integer, jlang.Boolean] = - new Func2[A, jlang.Integer, jlang.Boolean] { - def call(a: A, b: jlang.Integer): jlang.Boolean = f(a, b).booleanValue - } - - /** - * Converts a function shaped ilke compareTo into the equivalent Rx Func2 - */ - implicit def convertComparisonFuncToRxFunc2[A](f: (A, A) => Int): Func2[A, A, jlang.Integer] = - new Func2[A, A, jlang.Integer] { - def call(a1: A, a2: A): jlang.Integer = f(a1, a2).intValue - } - - /* - * This implicit allows Scala code to use any exception type and still work - * with invariant Func1 interface - */ - implicit def exceptionFunction1ToRxExceptionFunc1[A <: Exception, B](f: (A => B)): Func1[Exception, B] = - new Func1[Exception, B] { - def call(ex: Exception): B = f(ex.asInstanceOf[A]) - } - - /** - * The following implicits convert functions of different arities into the Rx equivalents - */ - implicit def scalaFunction0ToRxFunc0[A](f: () => A): Func0[A] = - new Func0[A] { - def call(): A = f() - } - - implicit def scalaFunction1ToRxFunc1[A, B](f: (A => B)): Func1[A, B] = - new Func1[A, B] { - def call(a: A): B = f(a) - } - - implicit def scalaFunction2ToRxFunc2[A, B, C](f: (A, B) => C): Func2[A, B, C] = - new Func2[A, B, C] { - def call(a: A, b: B) = f(a, b) - } - - implicit def scalaFunction3ToRxFunc3[A, B, C, D](f: (A, B, C) => D): Func3[A, B, C, D] = - new Func3[A, B, C, D] { - def call(a: A, b: B, c: C) = f(a, b, c) - } - - implicit def scalaFunction4ToRxFunc4[A, B, C, D, E](f: (A, B, C, D) => E): Func4[A, B, C, D, E] = - new Func4[A, B, C, D, E] { - def call(a: A, b: B, c: C, d: D) = f(a, b, c, d) - } - -} \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala index 3b3562d652..5470f6f1cb 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala @@ -16,46 +16,105 @@ package rx.lang.scala.observables import scala.collection.JavaConverters._ -import rx.lang.scala.internal.ImplicitFunctionConversions._ +import rx.lang.scala.ImplicitFunctionConversions._ class BlockingObservable[+T](val asJava: rx.observables.BlockingObservable[_ <: T]) extends AnyVal { + /** + * Invoke a method on each item emitted by the {@link Observable}; block until the Observable + * completes. + *

+ * NOTE: This will block even if the Observable is asynchronous. + *

+ * This is similar to {@link Observable#subscribe(Observer)}, but it blocks. Because it blocks it does + * not need the {@link Observer#onCompleted()} or {@link Observer#onError(Throwable)} methods. + *

+ * + * + * @param onNext + * the {@link Action1} to invoke for every item emitted by the {@link Observable} + * @throws RuntimeException + * if an error occurs + */ def foreach(f: T => Unit): Unit = { - asJava.forEach(f) + asJava.forEach(f); } - def last: T = { - asJava.last() : T // useless ascription because of compiler bug + // last -> use toIterable.last + // lastOrDefault -> use toIterable.lastOption + // first -> use toIterable.head + // firstOrDefault -> use toIterable.headOption + // single(predicate) -> use filter and single + // singleOrDefault -> use singleOption + + /** + * Returns an {@link Iterable} that always returns the item most recently emitted by an {@link Observable}. + *

+ * + * + * @param initialValue + * the initial value that will be yielded by the {@link Iterable} sequence if the {@link Observable} has not yet emitted an item + * @return an {@link Iterable} that on each iteration returns the item that the {@link Observable} has most recently emitted + */ + def mostRecent[U >: T](initialValue: U): Iterable[U] = { + val asJavaU = asJava.asInstanceOf[rx.observables.BlockingObservable[U]] + asJavaU.mostRecent(initialValue).asScala: Iterable[U] // useless ascription because of compiler bug } - // last(Func1) - // lastOrDefault(T) - // lastOrDefault(T, Func1) - // mostRecent(T) - // next() + /** + * Returns an {@link Iterable} that blocks until the {@link Observable} emits another item, + * then returns that item. + *

+ * + * + * @return an {@link Iterable} that blocks upon each iteration until the {@link Observable} emits a new item, whereupon the Iterable returns that item + */ + def next: Iterable[T] = { + asJava.next().asScala: Iterable[T] // useless ascription because of compiler bug + } + /** + * If this {@link Observable} completes after emitting a single item, return that item, + * otherwise throw an exception. + *

+ * + * + * @return the single item emitted by the {@link Observable} + */ def single: T = { - asJava.single() : T // useless ascription because of compiler bug + asJava.single(): T // useless ascription because of compiler bug + } + + /** + * If this {@link Observable} completes after emitting a single item, return an Option containing + * this item, otherwise return {@code None}. + */ + def singleOption: Option[T] = { + var size: Int = 0 + var last: Option[T] = None + for (t <- toIterable) { + size += 1 + last = Some(t) + } + if (size == 1) last else None } - - // single(Func1) - - // def singleOption: Option[T] = { TODO } - // corresponds to Java's - // singleOrDefault(T) - - // singleOrDefault(BlockingObservable, boolean, T) - // singleOrDefault(T, Func1) - // toFuture() - + + // TODO toFuture() + + /** + * Returns an {@link Iterator} that iterates over all items emitted by this {@link Observable}. + */ def toIterable: Iterable[T] = { - asJava.toIterable().asScala : Iterable[T] // useless ascription because of compiler bug + asJava.toIterable().asScala: Iterable[T] // useless ascription because of compiler bug } - + + /** + * Returns a {@link List} that contains all items emitted by this {@link Observable}. + */ def toList: List[T] = { - asJava.toIterable().asScala.toList : List[T] // useless ascription because of compiler bug + asJava.toIterable().asScala.toList: List[T] // useless ascription because of compiler bug } -} \ No newline at end of file +} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala deleted file mode 100644 index 7740ad043f..0000000000 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.lang.scala.observables - - -class ConnectableObservable[+T](val asJava: rx.observables.ConnectableObservable[_ <: T]) extends AnyVal { - import rx.lang.scala._ - import rx.lang.scala.util._ - import rx.{Observable => JObservable} - import rx.lang.scala.internal.ImplicitFunctionConversions._ - -} \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala index 6910599783..0f6ea79d34 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala @@ -53,12 +53,6 @@ rx.plugins.RxJavaErrorHandler rx.plugins.RxJavaObservableExecutionHook rx.plugins.RxJavaPlugins -rx.subjects.AsyncSubject -rx.subjects.BehaviorSubject -rx.subjects.PublishSubject -rx.subjects.ReplaySubject -rx.subjects.Subject - rx.subscriptions.BooleanSubscription rx.subscriptions.CompositeSubscription rx.subscriptions.Subscriptions diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/package.scala new file mode 100644 index 0000000000..8f99d02bf6 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/package.scala @@ -0,0 +1,14 @@ +package rx.lang.scala + +package object subjects { + + // in Java: public abstract class Subject extends Observable implements Observer + type Subject[-T, +R] = rx.subjects.Subject[_ >: T, _ <: R] + + // TODO (including static methods of these classes) + // rx.subjects.AsyncSubject + // rx.subjects.BehaviorSubject + // rx.subjects.PublishSubject + // rx.subjects.ReplaySubject + +} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d5d8fb8297..a14e78329c 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -397,7 +397,7 @@ public Subscription subscribe(final Action1 onNext, final Action1 ConnectableObservable multicast(Subject subject) { + public ConnectableObservable multicast(Subject subject) { return OperationMulticast.multicast(this, subject); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java index e83cfdaa03..e24c24a91a 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java @@ -27,7 +27,7 @@ import rx.subjects.Subject; public class OperationMulticast { - public static ConnectableObservable multicast(Observable source, final Subject subject) { + public static ConnectableObservable multicast(Observable source, final Subject subject) { return new MulticastConnectableObservable(source, subject); } @@ -35,11 +35,11 @@ private static class MulticastConnectableObservable extends ConnectableObs private final Object lock = new Object(); private final Observable source; - private final Subject subject; + private final Subject subject; private Subscription subscription; - public MulticastConnectableObservable(Observable source, final Subject subject) { + public MulticastConnectableObservable(Observable source, final Subject subject) { super(new OnSubscribeFunc() { @Override public Subscription onSubscribe(Observer observer) {