Skip to content

Commit

Permalink
Merge pull request #717 from Applied-Duality/ScalaPublishFix
Browse files Browse the repository at this point in the history
Added ConnectableObservable
  • Loading branch information
benjchristensen committed Jan 3, 2014
2 parents 7333bf1 + 679a88f commit 7a75d4b
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def olympicsExample() {
val (go, medals) = Olympics.mountainBikeMedals.publish
val medals = Olympics.mountainBikeMedals.publish
medals.subscribe(println(_))
go()
medals.connect
//waitFor(medals)
}

Expand All @@ -257,10 +257,10 @@ class RxScalaDemo extends JUnitSuite {

@Test def exampleWithPublish() {
val unshared = List(1 to 4).toObservable
val (startFunc, shared) = unshared.publish
val shared = unshared.publish
shared.subscribe(n => println(s"subscriber 1 gets $n"))
shared.subscribe(n => println(s"subscriber 2 gets $n"))
startFunc()
shared.connect
}

def doLater(waitTime: Duration, action: () => Unit): Unit = {
Expand All @@ -269,9 +269,9 @@ class RxScalaDemo extends JUnitSuite {

@Test def exampleWithoutReplay() {
val numbers = Observable.interval(1000 millis).take(6)
val (startFunc, sharedNumbers) = numbers.publish
val sharedNumbers = numbers.publish
sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n"))
startFunc()
sharedNumbers.connect
// subscriber 2 misses 0, 1, 2!
doLater(3500 millis, () => { sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) })
waitFor(sharedNumbers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package rx.lang.scala

import rx.util.functions.FuncN
import rx.Observable.OnSubscribeFunc

import rx.lang.scala.observables.ConnectableObservable


/**
Expand Down Expand Up @@ -1052,12 +1052,10 @@ trait Observable[+T]
*
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/publishConnect.png">
*
* @return a pair of a start function and an [[rx.lang.scala.Observable]] such that when the start function
* is called, the Observable starts to emit items to its [[rx.lang.scala.Observer]]s
* @return an [[rx.lang.scala.observables.ConnectableObservable]].
*/
def publish: (() => Subscription, Observable[T]) = {
val javaCO = asJavaObservable.publish()
(() => javaCO.connect(), toScalaObservable[T](javaCO))
def publish: ConnectableObservable[T] = {
new ConnectableObservable[T](asJavaObservable.publish())
}

// TODO add Scala-like aggregate function
Expand Down Expand Up @@ -1136,7 +1134,8 @@ trait Observable[+T]
* the initial (seed) accumulator value
* @param accumulator
* an accumulator function to be invoked on each item emitted by the source
* Observable, whose result will be emitted to [[rx.lang.scala.Observer]]s via [[rx.lang.scala.Observer.onNext onNext]] and used in the next accumulator call.
* Observable, whose result will be emitted to [[rx.lang.scala.Observer]]s via
* [[rx.lang.scala.Observer.onNext onNext]] and used in the next accumulator call.
* @return an Observable that emits the results of each call to the accumulator function
*/
def scan[R](initialValue: R)(accumulator: (R, T) => R): Observable[R] = {
Expand All @@ -1145,6 +1144,30 @@ trait Observable[+T]
}))
}

/**
* Returns an Observable that applies a function of your choosing to the
* first item emitted by a source Observable, then feeds the result of that
* function along with the second item emitted by an Observable into the
* same function, and so on until all items have been emitted by the source
* Observable, emitting the result of each of these iterations.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/scan.png">
* <p>
*
* @param accumulator
* an accumulator function to be invoked on each item emitted by the source
* Observable, whose result will be emitted to [[rx.lang.scala.Observer]]s via
* [[rx.lang.scala.Observer.onNext onNext]] and used in the next accumulator call.
* @return
* an Observable that emits the results of each call to the
* accumulator function
*/
def scan[U >: T](accumulator: (U, U) => U): Observable[U] = {
val func: Func2[_ >: U, _ >: U, _ <: U] = accumulator
val func2 = func.asInstanceOf[Func2[T, T, T]]
toScalaObservable[U](asJavaObservable.asInstanceOf[rx.Observable[T]].scan(func2))
}

/**
* Returns an Observable that emits a Boolean that indicates whether all of the items emitted by
* the source Observable satisfy a condition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package rx.lang.scala.observables
import scala.collection.JavaConverters._
import rx.lang.scala.ImplicitFunctionConversions._


/**
* An Observable that provides blocking operators.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.lang.scala.observables

import rx.lang.scala.{Observable, Subscription}
import rx.lang.scala.JavaConversions._

class ConnectableObservable[+T] private[scala](val asJavaObservable: rx.observables.ConnectableObservable[_ <: T])
extends Observable[T] {

/**
* Call a ConnectableObservable's connect method to instruct it to begin emitting the
* items from its underlying [[rx.lang.scala.Observable]] to its [[rx.lang.scala.Observer]]s.
*/
def connect: Subscription = toScalaSubscription(asJavaObservable.connect())

/**
* Returns an observable sequence that stays connected to the source as long
* as there is at least one subscription to the observable sequence.
*
* @return a [[rx.lang.scala.Observable]]
*/
def refCount: Observable[T] = toScalaObservable[T](asJavaObservable.refCount())
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ class ObservableTests extends JUnitSuite {
assertEquals(demat.toBlockingObservable.toIterable.toList, List(1, 2, 3))
}

@Test def TestScan() {
val xs = Observable.items(0,1,2,3)
val ys = xs.scan(0)(_+_)
assertEquals(List(0,0,1,3,6), ys.toBlockingObservable.toList)
val zs = xs.scan((x: Int, y:Int) => x*y)
assertEquals(List(0, 0, 0, 0), zs.toBlockingObservable.toList)
}

// Test that Java's firstOrDefault propagates errors.
// If this changes (i.e. it suppresses errors and returns default) then Scala's firstOrElse
// should be changed accordingly.
Expand Down

0 comments on commit 7a75d4b

Please sign in to comment.