From 679a88fdae553bbab0c841bb0df66e294e90ffaa Mon Sep 17 00:00:00 2001 From: headinthebox Date: Thu, 2 Jan 2014 21:29:11 -0800 Subject: [PATCH] Added ConnectableObservable Fixed test Added overload for scan Added trivial test for scan --- .../rx/lang/scala/examples/RxScalaDemo.scala | 12 +++--- .../main/scala/rx/lang/scala/Observable.scala | 37 ++++++++++++++---- .../observables/BlockingObservable.scala | 1 + .../observables/ConnectableObservable.scala | 38 +++++++++++++++++++ .../scala/rx/lang/scala/ObservableTest.scala | 8 ++++ 5 files changed, 83 insertions(+), 13 deletions(-) create mode 100644 language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 7bf832f58e..048df5dc74 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -243,9 +243,9 @@ class RxScalaDemo extends JUnitSuite { } @Test def olympicsExample() { - val (go, medals) = Olympics.mountainBikeMedals.publish + val medals = Olympics.mountainBikeMedals.publish medals.subscribe(println(_)) - go() + medals.connect //waitFor(medals) } @@ -257,10 +257,10 @@ class RxScalaDemo extends JUnitSuite { @Test def exampleWithPublish() { val unshared = List(1 to 4).toObservable - val (startFunc, shared) = unshared.publish + val shared = unshared.publish shared.subscribe(n => println(s"subscriber 1 gets $n")) shared.subscribe(n => println(s"subscriber 2 gets $n")) - startFunc() + shared.connect } def doLater(waitTime: Duration, action: () => Unit): Unit = { @@ -269,9 +269,9 @@ class RxScalaDemo extends JUnitSuite { @Test def exampleWithoutReplay() { val numbers = Observable.interval(1000 millis).take(6) - val (startFunc, sharedNumbers) = numbers.publish + val sharedNumbers = numbers.publish sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n")) - startFunc() + sharedNumbers.connect // subscriber 2 misses 0, 1, 2! doLater(3500 millis, () => { sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) }) waitFor(sharedNumbers) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 9be58009dd..a39754e593 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -18,7 +18,7 @@ package rx.lang.scala import rx.util.functions.FuncN import rx.Observable.OnSubscribeFunc - +import rx.lang.scala.observables.ConnectableObservable /** @@ -1052,12 +1052,10 @@ trait Observable[+T] * * * - * @return a pair of a start function and an [[rx.lang.scala.Observable]] such that when the start function - * is called, the Observable starts to emit items to its [[rx.lang.scala.Observer]]s + * @return an [[rx.lang.scala.observables.ConnectableObservable]]. */ - def publish: (() => Subscription, Observable[T]) = { - val javaCO = asJavaObservable.publish() - (() => javaCO.connect(), toScalaObservable[T](javaCO)) + def publish: ConnectableObservable[T] = { + new ConnectableObservable[T](asJavaObservable.publish()) } // TODO add Scala-like aggregate function @@ -1136,7 +1134,8 @@ trait Observable[+T] * the initial (seed) accumulator value * @param accumulator * an accumulator function to be invoked on each item emitted by the source - * Observable, whose result will be emitted to [[rx.lang.scala.Observer]]s via [[rx.lang.scala.Observer.onNext onNext]] and used in the next accumulator call. + * Observable, whose result will be emitted to [[rx.lang.scala.Observer]]s via + * [[rx.lang.scala.Observer.onNext onNext]] and used in the next accumulator call. * @return an Observable that emits the results of each call to the accumulator function */ def scan[R](initialValue: R)(accumulator: (R, T) => R): Observable[R] = { @@ -1145,6 +1144,30 @@ trait Observable[+T] })) } + /** + * Returns an Observable that applies a function of your choosing to the + * first item emitted by a source Observable, then feeds the result of that + * function along with the second item emitted by an Observable into the + * same function, and so on until all items have been emitted by the source + * Observable, emitting the result of each of these iterations. + *

+ * + *

+ * + * @param accumulator + * an accumulator function to be invoked on each item emitted by the source + * Observable, whose result will be emitted to [[rx.lang.scala.Observer]]s via + * [[rx.lang.scala.Observer.onNext onNext]] and used in the next accumulator call. + * @return + * an Observable that emits the results of each call to the + * accumulator function + */ + def scan[U >: T](accumulator: (U, U) => U): Observable[U] = { + val func: Func2[_ >: U, _ >: U, _ <: U] = accumulator + val func2 = func.asInstanceOf[Func2[T, T, T]] + toScalaObservable[U](asJavaObservable.asInstanceOf[rx.Observable[T]].scan(func2)) + } + /** * Returns an Observable that emits a Boolean that indicates whether all of the items emitted by * the source Observable satisfy a condition. diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala index f9e98efa63..c1ce913095 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala @@ -18,6 +18,7 @@ package rx.lang.scala.observables import scala.collection.JavaConverters._ import rx.lang.scala.ImplicitFunctionConversions._ + /** * An Observable that provides blocking operators. * diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala new file mode 100644 index 0000000000..bba27ecb22 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala @@ -0,0 +1,38 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.lang.scala.observables + +import rx.lang.scala.{Observable, Subscription} +import rx.lang.scala.JavaConversions._ + +class ConnectableObservable[+T] private[scala](val asJavaObservable: rx.observables.ConnectableObservable[_ <: T]) + extends Observable[T] { + + /** + * Call a ConnectableObservable's connect method to instruct it to begin emitting the + * items from its underlying [[rx.lang.scala.Observable]] to its [[rx.lang.scala.Observer]]s. + */ + def connect: Subscription = toScalaSubscription(asJavaObservable.connect()) + + /** + * Returns an observable sequence that stays connected to the source as long + * as there is at least one subscription to the observable sequence. + * + * @return a [[rx.lang.scala.Observable]] + */ + def refCount: Observable[T] = toScalaObservable[T](asJavaObservable.refCount()) +} diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala index 39f863b4dd..c0e575f4d0 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala @@ -57,6 +57,14 @@ class ObservableTests extends JUnitSuite { assertEquals(demat.toBlockingObservable.toIterable.toList, List(1, 2, 3)) } + @Test def TestScan() { + val xs = Observable.items(0,1,2,3) + val ys = xs.scan(0)(_+_) + assertEquals(List(0,0,1,3,6), ys.toBlockingObservable.toList) + val zs = xs.scan((x: Int, y:Int) => x*y) + assertEquals(List(0, 0, 0, 0), zs.toBlockingObservable.toList) + } + // Test that Java's firstOrDefault propagates errors. // If this changes (i.e. it suppresses errors and returns default) then Scala's firstOrElse // should be changed accordingly.