Skip to content
jstrachan edited this page Mar 6, 2013 · 45 revisions

RxJava is an implementation of Reactive Extensions – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

It extends the observer pattern to support sequences of data/events and adds operators for composing sequences together declaratively while abstracting away low-level threading, synchronization, thread-safety, concurrent data structures, non-blocking IO and other such concerns.

It supports Java 5 or higher and JVM based languages such as Groovy, Clojure, Scala and JRuby.

Why?

Futures are Expensive to Compose

Java Futures are straight-forward to use for a single level of asynchronous execution but they start to add non-trivial complexity when they're nested.

Conditional asynchronous execution flows become difficult to optimally compose (or impossible as latencies of each request vary at runtime) using Futures. It can be done of course, but it quickly becomes complicated (and thus error prone) or prematurely blocks on 'future.get()' - eliminating the benefit of asynchronous execution.

Callbacks Have Their Own Problems

Callbacks offer a solution to the tendency to block on Future.get() by not allowing anything to block. They are naturally efficient because they execute when the response is ready.

Similar to Futures though, they are easy to use with a single level of asynchronous execution but become unwieldy with nested composition.

Functional Reactive Programming (FRP)

Rx offers efficient execution and composition by providing a collection of operators capable of filtering, selecting, transforming, combining and composing Observable's.

The Observable data type can be thought of as a "push" equivalent to Iterable which is "pull". With an Iterable, the consumer pulls values from the producer and the thread blocks until those values arrive. By contrast with the observable type, the producer pushes values to the consumer whenever values are available. This approach is more flexible, because values can arrive synchronously or asynchronously.

The Observable type adds two missing semantics to the Gang of Four's Observer pattern, which are available in the Iterable type:

  1. The ability for the producer to signal to the consumer that there is no more data available.
  2. The ability for the producer to signal to the consumer that an error has occurred.

With these two simple additions, we have unified the Iterable and Observable types. The only difference between them is the direction in which the data flows. This is very important because now any operation we perform on an Iterable, can also be performed on an Observable. Let's take a look at an example…

/**
 * Asynchronously calls 'customObservableNonBlocking' and defines 
 * a chain of operators to apply to the callback sequence.
 */
def simpleComposition() {
  // fetch an asynchronous Observable<String> 
  // that emits 75 Strings of 'anotherValue_#'
  customObservableNonBlocking()
    // skip the first 10
    .skip(10)
    // take the next 5
    .take(5)
    // transform each String with the provided function
    .map({ stringValue -> return stringValue + "_transformed"})
    // subscribe to the sequence and print each transformed String
    .subscribe({ println "onNext => " + it})
}
 
// output
onNext => anotherValue_10_transformed
onNext => anotherValue_11_transformed
onNext => anotherValue_12_transformed
onNext => anotherValue_13_transformed
onNext => anotherValue_14_transformed

More Information

RxJava Libraries

The following external libraries can work with RxJava:

sidebar

Clone this wiki locally