Skip to content

Commit

Permalink
Merge pull request #1623 from zsxwing/rxscala-operators
Browse files Browse the repository at this point in the history
RxScala: Add more operators to match RxJava
  • Loading branch information
benjchristensen committed Aug 26, 2014
2 parents 85d8a95 + 45563a3 commit 1693324
Show file tree
Hide file tree
Showing 5 changed files with 335 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,18 @@ class RxScalaDemo extends JUnitSuite {
waitFor(Olympics.yearTicks)
}

@Test def groupByExample2() {
val medalByYear = Olympics.mountainBikeMedals.groupBy(medal => medal.year, medal => medal.country)

for ((year, countries) <- medalByYear; country <- countries) {
println(s"${year}: ${country}")
}

Olympics.yearTicks.subscribe(year => println(s"\nYear $year starts."))

waitFor(Olympics.yearTicks)
}

@Test def groupByUntilExample() {
val numbers = Observable.interval(250 millis).take(14)
val grouped = numbers.groupByUntil(x => x % 2){ case (key, obs) => obs.filter(x => x == 7) }
Expand Down Expand Up @@ -1510,4 +1522,104 @@ class RxScalaDemo extends JUnitSuite {
o.take(3).toBlocking.foreach(println)
}

@Test def collectExample() {
val o = Observable.just(1, 1.0, "a", 2, 2.0, "b")
o.collect { case s: String => "Item: " + s }.foreach(println(_))
}

@Test def usingExample() {
import scala.io.{Codec, Source}

Observable.using { new java.net.URL("http://rxscala.github.io/").openStream() }(
input => Source.fromInputStream(input)(Codec.UTF8).getLines().toList.toObservable,
input => input.close
).foreach(println(_))
}

def createFastObservable: Observable[Int] = {
Observable {
subscriber: Subscriber[Int] => {
(0 to 2000).takeWhile(_ => !subscriber.isUnsubscribed).foreach(subscriber.onNext(_))
subscriber.onCompleted()
}
}
}

@Test def withoutBackpressureExample() {
val o = createFastObservable
val l = new CountDownLatch(1)
o.observeOn(NewThreadScheduler()).subscribe(new Subscriber[Int] {
override def onStart() {
request(1)
}

override def onNext(n: Int) {
println(n)
Thread.sleep(10) // emulate a slow subscriber
request(1)
}

override def onError(e: Throwable) {
e.printStackTrace()
l.countDown()
}

override def onCompleted() {
l.countDown()
}
})
l.await()
}

@Test def onBackpressureDropExample() {
val o = createFastObservable.onBackpressureDrop
val l = new CountDownLatch(1)
o.observeOn(NewThreadScheduler()).subscribe(new Subscriber[Int] {
override def onStart() {
request(1)
}

override def onNext(n: Int) {
println(n)
Thread.sleep(10) // emulate a slow subscriber
request(1)
}

override def onError(e: Throwable) {
e.printStackTrace()
l.countDown()
}

override def onCompleted() {
l.countDown()
}
})
l.await()
}

@Test def onBackpressureBufferExample() {
val o = createFastObservable.onBackpressureBuffer
val l = new CountDownLatch(1)
o.observeOn(NewThreadScheduler()).subscribe(new Subscriber[Int] {
override def onStart() {
request(1)
}

override def onNext(n: Int) {
println(n)
Thread.sleep(10) // emulate a slow subscriber
request(1)
}

override def onError(e: Throwable) {
e.printStackTrace()
l.countDown()
}

override def onCompleted() {
l.countDown()
}
})
l.await()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,12 @@ object JavaConversions {
}
}
}

implicit def toJavaTransformer[T, R](transformer: Observable[T] => Observable[R]): rx.Observable.Transformer[T, R] = {
new rx.Observable.Transformer[T, R] {
override def call(o: rx.Observable[_ <: T]): rx.Observable[R] = {
transformer(toScalaObservable(o)).asJavaObservable.asInstanceOf[rx.Observable[R]]
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,38 @@ trait Observable[+T]
: Observable[Observable[T]] // SI-7818
}

/**
* Returns an Observable that emits windows of items it collects from the source Observable. The resulting
* Observable starts a new window periodically, as determined by the `timeshift` argument or a maximum
* size as specified by the `count` argument (whichever is reached first). It emits
* each window after a fixed timespan, specified by the `timespan` argument. When the source
* Observable completes or Observable completes or encounters an error, the resulting Observable emits the
* current window and propagates the notification from the source Observable.
*
* <img width="640" height="335" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/window7.s.png" alt="">
*
* ===Backpressure Support:===
* This operator does not support backpressure as it uses time to control data flow.
*
* ===Scheduler:===
* you specify which `Scheduler` this operator will use
*
* @param timespan the period of time each window collects items before it should be emitted
* @param timeshift the period of time after which a new window will be created
* @param count the maximum size of each window before it should be emitted
* @param scheduler the `Scheduler` to use when determining the end and start of a window
* @return an Observable that emits new windows periodically as a fixed timespan elapses
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#window">RxJava wiki: window</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.window.aspx">MSDN: Observable.Window</a>
*/
def sliding(timespan: Duration, timeshift: Duration, count: Int, scheduler: Scheduler): Observable[Observable[T]] = {
val span: Long = timespan.length
val shift: Long = timespan.unit.convert(timeshift.length, timeshift.unit)
val unit: TimeUnit = timespan.unit
Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(span, shift, unit, count, scheduler))
: Observable[Observable[T]] // SI-7818
}

/**
* Returns an Observable which only emits those items for which a given predicate holds.
*
Expand Down Expand Up @@ -1577,6 +1609,41 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.cache())
}

/**
* Caches emissions from the source Observable and replays them in order to any subsequent Subscribers.
* This method has similar behavior to [[Observable.replay]] except that this auto-subscribes to the source
* Observable rather than returning a [[ConnectableObservable]] for which you must call
* `connect` to activate the subscription.
* <p>
* <img width="640" height="410" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/cache.png" alt="">
* <p>
* This is useful when you want an Observable to cache responses and you can't control the
* `subscribe/unsubscribe` behavior of all the [[Subscriber]]s.
* <p>
* When you call `cache`, it does not yet subscribe to the source Observable and so does not yet
* begin cacheing items. This only happens when the first Subscriber calls the resulting Observable's
* `subscribe` method.
* <p>
* <em>Note:</em> You sacrifice the ability to unsubscribe from the origin when you use the `cache`
* Observer so be careful not to use this Observer on Observables that emit an infinite or very large number
* of items that will use up memory.
*
* ===Backpressure Support:===
* This operator does not support upstream backpressure as it is purposefully requesting and caching everything emitted.
*
* ===Scheduler:===
* `cache` does not operate by default on a particular `Scheduler`.
*
* @param capacity hint for number of items to cache (for optimizing underlying data structure)
* @return an Observable that, when first subscribed to, caches all of its items and notifications for the
* benefit of subsequent subscribers
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#cache">RxJava wiki: cache</a>
* @since 0.20
*/
def cache(capacity: Int): Observable[T] = {
toScalaObservable[T](asJavaObservable.cache(capacity))
}

/**
* Returns a new [[Observable]] that multicasts (shares) the original [[Observable]]. As long a
* there is more than 1 [[Subscriber]], this [[Observable]] will be subscribed and emitting data.
Expand Down Expand Up @@ -2176,6 +2243,41 @@ trait Observable[+T]
toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
}

/**
* Groups the items emitted by an [[Observable]] according to a specified criterion, and emits these
* grouped items as `(key, observable)` pairs.
*
* <img width="640" height="360" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/groupBy.png" alt="">
*
* Note: A `(key, observable)` will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* `(key, observable)` pairs that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like `take(0)` to them.
*
* ===Backpressure Support:===
* This operator does not support backpressure as splitting a stream effectively turns it into a "hot observable"
* and blocking any one group would block the entire parent stream. If you need backpressure on individual groups
* then you should use operators such as `nBackpressureDrop` or `@link #onBackpressureBuffer`.</dd>
* ===Scheduler:===
* groupBy` does not operate by default on a particular `Scheduler`.
*
* @param keySelector a function that extracts the key for each item
* @param valueSelector a function that extracts the return element for each item
* @tparam K the key type
* @tparam V the value type
* @return an [[Observable]] that emits `(key, observable)` pairs, each of which corresponds to a
* unique key value and each of which emits those items from the source Observable that share that
* key value
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava wiki: groupBy</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.groupby.aspx">MSDN: Observable.GroupBy</a>
*/
def groupBy[K, V](keySelector: T => K, valueSelector: T => V): Observable[(K, Observable[V])] = {
val jo: rx.Observable[rx.observables.GroupedObservable[K, V]] = asJavaObservable.groupBy[K, V](keySelector, valueSelector)
toScalaObservable[rx.observables.GroupedObservable[K, V]](jo).map {
go: rx.observables.GroupedObservable[K, V] => (go.getKey, toScalaObservable[V](go))
}
}

/**
* Groups the items emitted by this Observable according to a specified discriminator function and terminates these groups
* according to a function.
Expand Down Expand Up @@ -4298,6 +4400,75 @@ trait Observable[+T]
def nonEmpty: Observable[Boolean] = {
isEmpty.map(!_)
}

/**
* Transform an Observable by applying a particular Transformer function to it.
*
* This method operates on the Observable itself whereas [[Observable.lift]] operates on the Observable's
* Subscribers or Observers.
*
* If the operator you are creating is designed to act on the individual items emitted by a source
* Observable, use [[Observable.lift]]. If your operator is designed to transform the source Observable as a whole
* (for instance, by applying a particular set of existing RxJava operators to it) use `compose`.
*
* ===Scheduler:===
* `compose` does not operate by default on a particular [[Scheduler]].
*
* @param transformer implements the function that transforms the source Observable
* @return the source Observable, transformed by the transformer function
* @see <a href="https://github.com/Netflix/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
*/
def compose[R](transformer: Observable[T] => Observable[R]): Observable[R] = {
toScalaObservable[R](asJavaObservable.compose(toJavaTransformer(transformer)))
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to buffer these
* items indefinitely until they can be emitted.
*
* <img width="640" height="300" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/bp.obp.buffer.png" alt="">
*
* ===Scheduler:===
* `onBackpressureBuffer` does not operate by default on a particular `Scheduler`.
*
* @return the source Observable modified to buffer items to the extent system resources allow
* @see <a href="https://github.com/Netflix/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
*/
def onBackpressureBuffer: Observable[T] = {
toScalaObservable[T](asJavaObservable.onBackpressureBuffer)
}

/**
* Use this operator when the upstream does not natively support backpressure and you wish to drop
* `onNext` when unable to handle further events.
*
* <img width="640" height="245" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/bp.obp.drop.png" alt="">
*
* If the downstream request count hits 0 then `onNext` will be dropped until `request(long n)`
* is invoked again to increase the request count.
*
* ===Scheduler:===
* onBackpressureDrop` does not operate by default on a particular `Scheduler`.
*
* @return the source Observable modified to drop `onNext` notifications on overflow
* @see <a href="https://github.com/Netflix/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
*/
def onBackpressureDrop: Observable[T] = {
toScalaObservable[T](asJavaObservable.onBackpressureDrop)
}

/**
* Return a new [[Observable]] by applying a partial function to all elements of this [[Observable]]
* on which the function is defined.
*
* @tparam R the element type of the returned [[Observable]].
* @param pf the partial function which filters and maps the [[Observable]].
* @return a new [[Observable]] by applying a partial function to all elements of this [[Observable]]
* on which the function is defined.
*/
def collect[R](pf: PartialFunction[T, R]): Observable[R] = {
filter(pf.isDefinedAt(_)).map(pf)
}
}

/**
Expand Down Expand Up @@ -4750,6 +4921,7 @@ object Observable {
* @param observableFactory the factory function to obtain an Observable
* @return the Observable whose lifetime controls the lifetime of the dependent resource object
*/
@deprecated("Use `using(=> Resource)(Resource => Observable[T], Resource => Unit)` instead", "0.20.1")
def using[T, Resource <: Subscription](resourceFactory: () => Resource, observableFactory: Resource => Observable[T]): Observable[T] = {
class ResourceSubscription(val resource: Resource) extends rx.Subscription {
def unsubscribe = resource.unsubscribe
Expand All @@ -4763,6 +4935,32 @@ object Observable {
))
}

/**
* Constructs an Observable that creates a dependent resource object.
*
* <img width="640" height="400" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/using.png" alt="" />
*
* ===Scheduler:===
* `using` does not operate by default on a particular `Scheduler`.
*
* @param resourceFactory the factory function to create a resource object that depends on the Observable.
* Note: this is a by-name parameter.
* @param observableFactory the factory function to create an Observable
* @param dispose the function that will dispose of the resource
* @return the Observable whose lifetime controls the lifetime of the dependent resource object
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#using">RxJava wiki: using</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229585.aspx">MSDN: Observable.Using</a>
*/
def using[T, Resource](resourceFactory: => Resource)(observableFactory: Resource => Observable[T], dispose: Resource => Unit): Observable[T] = {
val jResourceFactory = new rx.functions.Func0[Resource] {
override def call: Resource = resourceFactory
}
val jObservableFactory = new rx.functions.Func1[Resource, rx.Observable[_ <: T]] {
override def call(r: Resource) = observableFactory(r).asJavaObservable
}
toScalaObservable[T](rx.Observable.using[T, Resource](jResourceFactory, jObservableFactory, dispose))
}

/**
* Mirror the one Observable in an Iterable of several Observables that first emits an item.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,7 @@ private[scala] class WithFilter[+T] (p: T => Boolean, asJava: rx.Observable[_ <:
toScalaObservable[T](asJava.filter((x: T) => p(x) && q(x)))
}

// there is no foreach here, that's only available on BlockingObservable
def foreach(onNext: T => Unit): Unit = {
toScalaObservable[T](asJava.filter(p)).foreach(onNext)
}
}
Loading

0 comments on commit 1693324

Please sign in to comment.