Skip to content

Commit

Permalink
add merge operation and examples
Browse files Browse the repository at this point in the history
and try to make Olympics groupBy work with timing, but did not
work due to problems with RxJava groupBy, see pull ReactiveX#289
  • Loading branch information
samuelgruetter committed Sep 19, 2013
1 parent a078522 commit 2b7f062
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,26 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
Observable[U](rx.Observable.merge(thisJava, thatJava))
}

/**
* Flattens the sequence of Observables emitted by {@code this} into one Observable, without any
* transformation.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
* <p>
* You can combine the items emitted by multiple Observables so that they act like a single
* Observable, by using the {@code merge} method.
*
* @return an Observable that emits items that are the result of flattening the items emitted
* by the Observables emitted by {@code this}
*/
def merge[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
val o2: Observable[Observable[U]] = this
val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJava)
val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJava
val o5 = rx.Observable.merge[U](o4)
Observable[U](o5)
}

/**
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ object Olympics {
def fourYearsEmpty: Observable[Medal] = {
// TODO this should return an observable which emits nothing during fourYears and then completes
// Because of https://github.com/Netflix/RxJava/issues/388, we get non-terminating tests
// And this https://github.com/Netflix/RxJava/pull/289#issuecomment-24738668 also causes problems
// So we don't use this:
// Observable.interval(fourYears).take(1).map(i => neverUsedDummyMedal).filter(m => false)
// But we just return empty, which completes immediately
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,33 @@ class RxScalaDemo extends JUnitSuite {
println((before ++ source).toBlockingObservable.toList)
}

@Test def mergeExample() {
@Test def mergeTwoExample() {
val slowNumbers = Observable.interval(400 millis).take(5).map("slow " + _)
val fastNumbers = Observable.interval(200 millis).take(10).map("fast " + _)
val o = (slowNumbers merge fastNumbers)
o.subscribe(output(_))
waitFor(o)
}

def myInterval(period: Long): Observable[String] = {
Observable.interval(period.millis).map(n => s"Obs-$period emits $n")
}

@Test def mergeManyExample() {
val o = Observable.interval(500 millis).map(n => myInterval((n+1)*100))
val stopper = Observable.interval(5 seconds)
o.merge.takeUntil(stopper).toBlockingObservable.foreach(println(_))
}

@Test def mergeSomeExample() {
// To merge some observables which are all known already:
Observable(
Observable.interval(200 millis),
Observable.interval(400 millis),
Observable.interval(800 millis)
).merge.take(12).toBlockingObservable.foreach(println(_))
}

@Test def rangeAndBufferExample() {
val o = Observable(1 to 18)
o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
Expand Down Expand Up @@ -178,6 +197,29 @@ class RxScalaDemo extends JUnitSuite {
assertEquals(List(0, 1, 2, 3), t.toBlockingObservable.toList)
}

@Test def timingTest() {
val firstOnly = false
val numbersByModulo3 = Observable.interval(1000 millis).take(9).groupBy(_ % 3)

(for ((modulo, numbers) <- numbersByModulo3) yield {
println("Observable for modulo" + modulo + " started")

if (firstOnly) numbers.take(1) else numbers
}).merge.toBlockingObservable.foreach(println(_))
}

@Test def timingTest1() {
val numbersByModulo3 = Observable.interval(1000 millis).take(9).groupBy(_ % 3)

val t0 = System.currentTimeMillis

(for ((modulo, numbers) <- numbersByModulo3) yield {
println("Observable for modulo" + modulo + " started at t = " + (System.currentTimeMillis - t0))
numbers.take(1) // <- TODO very unexpected
//numbers
}).merge.toBlockingObservable.foreach(println(_))
}

@Test def groupByExample() {
val medalsByCountry = Olympics.mountainBikeMedals.groupBy(medal => medal.country)

Expand All @@ -191,6 +233,13 @@ class RxScalaDemo extends JUnitSuite {
waitFor(firstMedalOfEachCountry)
}

@Test def olympicsExample() {
val (go, medals) = Olympics.mountainBikeMedals.publish
medals.subscribe(println(_))
go()
waitFor(medals)
}

@Test def exampleWithoutPublish() {
val unshared = Observable(1 to 4)
unshared.subscribe(n => println(s"subscriber 1 gets $n"))
Expand Down Expand Up @@ -260,7 +309,7 @@ class RxScalaDemo extends JUnitSuite {
assertEquals(8, Observable(4, 2).product.toBlockingObservable.single)
assertEquals(1, Observable[Int]().product.toBlockingObservable.single)
}

def output(s: String): Unit = println(s)

// blocks until obs has completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class CompletenessTest extends JUnitSuite {
"error(Throwable)" -> "apply(Throwable)",
"from(Array[T])" -> "apply(T*)",
"from(Iterable[_ <: T])" -> "apply(T*)",
"merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[T])",
"merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])",
"merge(Observable[_ <: Observable[_ <: T]])" -> "merge(<:<[Observable[T], Observable[Observable[U]]])",
"mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[T])",
"range(Int, Int)" -> "apply(Range)",
"sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "[use (first zip second) map (p => p._1 == p._2)]",
Expand All @@ -67,8 +68,8 @@ class CompletenessTest extends JUnitSuite {
"sumDoubles(Observable[Double])" -> "sum(Numeric[U])",
"sumFloats(Observable[Float])" -> "sum(Numeric[U])",
"sumLongs(Observable[Long])" -> "sum(Numeric[U])",
"synchronize(Observable[T])" -> "synchronize",
"switchDo(Observable[_ <: Observable[_ <: T]])" -> "switch",
"synchronize(Observable[_ <: T])" -> "synchronize",
"zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method zip and map]"
) ++ List.iterate("T", 9)(s => s + ", T").map(
// all 9 overloads of startWith:
Expand All @@ -84,8 +85,11 @@ class CompletenessTest extends JUnitSuite {
val obsArgs = (1 to i).map(j => s"Observable[_ <: T$j], ").mkString("")
val funcParams = (1 to i).map(j => s"_ >: T$j, ").mkString("")
("zip(" + obsArgs + "Func" + i + "[" + funcParams + "_ <: R])", unnecessary)
}).toMap

}).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
// merge 3-9:
"merge(" + _ + ")" -> "[unnecessary because we can use Observable(o1, o2, ...).merge instead]"
).drop(2).toMap

def removePackage(s: String) = s.replaceAll("(\\w+\\.)+(\\w+)", "$2")

def methodMembersToMethodStrings(members: Iterable[Symbol]): Iterable[String] = {
Expand Down

0 comments on commit 2b7f062

Please sign in to comment.