From 2b7f0625019d8ea450d002ce230a4803d79a3809 Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Thu, 19 Sep 2013 16:14:12 +0200 Subject: [PATCH] add merge operation and examples and try to make Olympics groupBy work with timing, but did not work due to problems with RxJava groupBy, see pull #289 --- .../main/scala/rx/lang/scala/Observable.scala | 20 +++++++ .../rx/lang/scala/examples/Olympics.scala | 1 + .../rx/lang/scala/examples/RxScalaDemo.scala | 53 ++++++++++++++++++- .../rx/lang/scala/CompletenessTest.scala | 12 +++-- 4 files changed, 80 insertions(+), 6 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 7bfbbcdd3c..57b43a5e1e 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -1239,6 +1239,26 @@ class Observable[+T](val asJava: rx.Observable[_ <: T]) Observable[U](rx.Observable.merge(thisJava, thatJava)) } + /** + * Flattens the sequence of Observables emitted by {@code this} into one Observable, without any + * transformation. + *

+ * + *

+ * You can combine the items emitted by multiple Observables so that they act like a single + * Observable, by using the {@code merge} method. + * + * @return an Observable that emits items that are the result of flattening the items emitted + * by the Observables emitted by {@code this} + */ + def merge[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = { + val o2: Observable[Observable[U]] = this + val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJava) + val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJava + val o5 = rx.Observable.merge[U](o4) + Observable[U](o5) + } + /** * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. *

diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala index d826aa58e8..a3fa2345bc 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala @@ -51,6 +51,7 @@ object Olympics { def fourYearsEmpty: Observable[Medal] = { // TODO this should return an observable which emits nothing during fourYears and then completes // Because of https://github.com/Netflix/RxJava/issues/388, we get non-terminating tests + // And this https://github.com/Netflix/RxJava/pull/289#issuecomment-24738668 also causes problems // So we don't use this: // Observable.interval(fourYears).take(1).map(i => neverUsedDummyMedal).filter(m => false) // But we just return empty, which completes immediately diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala index 9ebfa7b999..0fe9c3424e 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -95,7 +95,7 @@ class RxScalaDemo extends JUnitSuite { println((before ++ source).toBlockingObservable.toList) } - @Test def mergeExample() { + @Test def mergeTwoExample() { val slowNumbers = Observable.interval(400 millis).take(5).map("slow " + _) val fastNumbers = Observable.interval(200 millis).take(10).map("fast " + _) val o = (slowNumbers merge fastNumbers) @@ -103,6 +103,25 @@ class RxScalaDemo extends JUnitSuite { waitFor(o) } + def myInterval(period: Long): Observable[String] = { + Observable.interval(period.millis).map(n => s"Obs-$period emits $n") + } + + @Test def mergeManyExample() { + val o = Observable.interval(500 millis).map(n => myInterval((n+1)*100)) + val stopper = Observable.interval(5 seconds) + o.merge.takeUntil(stopper).toBlockingObservable.foreach(println(_)) + } + + @Test def mergeSomeExample() { + // To merge some observables which are all known already: + Observable( + Observable.interval(200 millis), + Observable.interval(400 millis), + Observable.interval(800 millis) + ).merge.take(12).toBlockingObservable.foreach(println(_)) + } + @Test def rangeAndBufferExample() { val o = Observable(1 to 18) o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]"))) @@ -178,6 +197,29 @@ class RxScalaDemo extends JUnitSuite { assertEquals(List(0, 1, 2, 3), t.toBlockingObservable.toList) } + @Test def timingTest() { + val firstOnly = false + val numbersByModulo3 = Observable.interval(1000 millis).take(9).groupBy(_ % 3) + + (for ((modulo, numbers) <- numbersByModulo3) yield { + println("Observable for modulo" + modulo + " started") + + if (firstOnly) numbers.take(1) else numbers + }).merge.toBlockingObservable.foreach(println(_)) + } + + @Test def timingTest1() { + val numbersByModulo3 = Observable.interval(1000 millis).take(9).groupBy(_ % 3) + + val t0 = System.currentTimeMillis + + (for ((modulo, numbers) <- numbersByModulo3) yield { + println("Observable for modulo" + modulo + " started at t = " + (System.currentTimeMillis - t0)) + numbers.take(1) // <- TODO very unexpected + //numbers + }).merge.toBlockingObservable.foreach(println(_)) + } + @Test def groupByExample() { val medalsByCountry = Olympics.mountainBikeMedals.groupBy(medal => medal.country) @@ -191,6 +233,13 @@ class RxScalaDemo extends JUnitSuite { waitFor(firstMedalOfEachCountry) } + @Test def olympicsExample() { + val (go, medals) = Olympics.mountainBikeMedals.publish + medals.subscribe(println(_)) + go() + waitFor(medals) + } + @Test def exampleWithoutPublish() { val unshared = Observable(1 to 4) unshared.subscribe(n => println(s"subscriber 1 gets $n")) @@ -260,7 +309,7 @@ class RxScalaDemo extends JUnitSuite { assertEquals(8, Observable(4, 2).product.toBlockingObservable.single) assertEquals(1, Observable[Int]().product.toBlockingObservable.single) } - + def output(s: String): Unit = println(s) // blocks until obs has completed diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index dfd3e196ee..bc6c5a6310 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -58,7 +58,8 @@ class CompletenessTest extends JUnitSuite { "error(Throwable)" -> "apply(Throwable)", "from(Array[T])" -> "apply(T*)", "from(Iterable[_ <: T])" -> "apply(T*)", - "merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[T])", + "merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])", + "merge(Observable[_ <: Observable[_ <: T]])" -> "merge(<:<[Observable[T], Observable[Observable[U]]])", "mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[T])", "range(Int, Int)" -> "apply(Range)", "sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "[use (first zip second) map (p => p._1 == p._2)]", @@ -67,8 +68,8 @@ class CompletenessTest extends JUnitSuite { "sumDoubles(Observable[Double])" -> "sum(Numeric[U])", "sumFloats(Observable[Float])" -> "sum(Numeric[U])", "sumLongs(Observable[Long])" -> "sum(Numeric[U])", + "synchronize(Observable[T])" -> "synchronize", "switchDo(Observable[_ <: Observable[_ <: T]])" -> "switch", - "synchronize(Observable[_ <: T])" -> "synchronize", "zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method zip and map]" ) ++ List.iterate("T", 9)(s => s + ", T").map( // all 9 overloads of startWith: @@ -84,8 +85,11 @@ class CompletenessTest extends JUnitSuite { val obsArgs = (1 to i).map(j => s"Observable[_ <: T$j], ").mkString("") val funcParams = (1 to i).map(j => s"_ >: T$j, ").mkString("") ("zip(" + obsArgs + "Func" + i + "[" + funcParams + "_ <: R])", unnecessary) - }).toMap - + }).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map( + // merge 3-9: + "merge(" + _ + ")" -> "[unnecessary because we can use Observable(o1, o2, ...).merge instead]" + ).drop(2).toMap + def removePackage(s: String) = s.replaceAll("(\\w+\\.)+(\\w+)", "$2") def methodMembersToMethodStrings(members: Iterable[Symbol]): Iterable[String] = {