Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxJava does not follow Rx design guidelines for serialized onNext calls #200

Closed
thegeez opened this issue Mar 19, 2013 · 5 comments
Closed

Comments

@thegeez
Copy link
Contributor

thegeez commented Mar 19, 2013

Observable.merge is not well behaved as defined in "Reactive Extensions Design Guidelines" section 6.7 (http://go.microsoft.com/fwlink/?LinkID=205219)

This may hold for other operators that combine multiple Observables as well

Call to onNext by observables should be synchronized, the following
example shows the desired behavior in Rx.Net:

(Example can be run with linqpad as C# statements)

Console.WriteLine("Starting example");
var o1 = Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => "stream ONE" + i);
var o2 = Observable.Interval(TimeSpan.FromSeconds(2)).Select(i => "stream TWO" + i);

Observable.Merge(o1,o2).Subscribe(x => 
  { 
    if (x == "stream ONE0") { 
      Console.WriteLine("slow onNext processing start");
      Thread.Sleep(TimeSpan.FromSeconds(3));
      Console.WriteLine("slow onNext processing done");
    }
    Console.WriteLine("OnNext" + x);
  },
  e => Console.WriteLine("Error" + e),
  () => Console.WriteLine("Completed"));

Output with a long wait between processing start and done
No OnNextStream is interleaved

Starting example
slow onNext processing start
slow onNext processing done
OnNextstream ONE0
OnNextstream TWO0
OnNextstream ONE1
OnNextstream ONE2
OnNextstream ONE3
OnNextstream TWO1
OnNextstream ONE4
OnNextstream ONE5
OnNextstream TWO2
// Etc...

Same example using RxJava

(defn interval [secs]
  ;; same as Observable.Interval(TimeSpan.FromSeconds(1))
  (let [s (Subject/create)
        index (AtomicLong. 0)
        t (Thread. (fn []
                     (while true
                       (Thread/sleep (* secs 1000))
                       (.onNext s (.getAndIncrement index)))))]
    (.start t)
    s))

(defn -main [& args]
  (println "Starting example")
  (let [o1 (-> (interval 1)
               (.map (fn [i] (str "stream ONE" i))))
        o2 (-> (interval 2)
               (.map (fn [i] (str "stream TWO" i))))]
    (-> (Observable/merge [o1 o2])
        (.subscribe 
         (fn [x]
           (when (= x "stream ONE0")
             (println "slow onNext processing start")
             (Thread/sleep (* 3 1000))
             (println "slow onNext processing done"))
           (println "onNext" x))
         (fn [e] (println "onError" e))
         (fn [] (println "onCompleted"))))))

"onNext stream TWO0" shows that the onNext calls by a merge Observable are not synchronized

Starting example
slow onNext processing start
onNext stream TWO0
slow onNext processing done
onNext stream ONE0
onNext stream TWO1
onNext stream ONE1
onNext stream ONE2
onNext stream TWO2
onNext stream ONE3
onNext stream ONE4

The code for this example can be found in https://github.com/thegeez/rxjava-merge-behave

@thegeez
Copy link
Contributor Author

thegeez commented Mar 19, 2013

(defn -main [& args]
  (println "Starting example")
  (let [o1 (-> (interval 1)
               (.map (fn [i] (str "stream ONE" i))))
        o2 (-> (interval 2)
               (.map (fn [i] (str "stream TWO" i))))]
    (-> (Observable/merge [o1 o2])
        (Observable/synchronize)
        (.subscribe 
         (fn [x]
           (when (= x "stream ONE0")
             (println "slow onNext processing start")
             (Thread/sleep (* 3 1000))
             (println "slow onNext processing done"))
           (println "onNext" x))
         (fn [e] (println "onError" e))
         (fn [] (println "onCompleted"))))))

By explicitly adding the (Observable/synchronize) the example does produce the same output as the Rx.net code.

@benjchristensen
Copy link
Member

Based on your code it seems that the Merge operation has a bug that it does not ensure synchronization when multiple sequences are subscribed to. Good catch.

(btw ... nice Clojure!)


At a higher level (since the title of the issue suggests the entire library doesn't comply) here is some further information:

The same guidelines you refer to (http://go.microsoft.com/fwlink/?LinkID=205219) for creating an Observable are linked to from the Javadocs: http://netflix.github.com/RxJava/javadoc/rx/Observable.html#create(rx.util.functions.Func1)

Section 6.7 is specifying how an Observable implementation must serialize the calls to onNext - not that they will be automatically serialized as that is very bad for performance (see section 6.8).

(Though all operators implemented in RxJava can actually support interleaved onNext calls - we've built it from the beginning to be thread-safe in that regard even though the contract officially does not need that since Observables are assumed to not interleave calls)

An Observable implementation is responsible for serializing it's onNext calls and complying with the contract. If it expects to have interleaving onNext calls and doesn't want to deal with this appropriately then the Synchronize (http://netflix.github.com/RxJava/javadoc/rx/Observable.html#synchronize(rx.Observable)) operation can be used to synchronize it as you did in your Clojure example.

This contract allows guideline 6.8 to be applied where synchronization does not need to be done by all of the operators as it assumes the Observable complies with the guidelines.

For these reasons automatic synchronization is not added when an Observable is created. Only certain operators (like merge) that subscribe concurrently to multiple sequences need to concern themselves with this issue.


In the case of the Merge operator it's a simple oversight (not caught in a year of production usage primarily by the fact that Netflix uses this in purely functional - no side-effects and order doesn't matter - environments where we don't care if onNext calls are interleaved) and easily remedied. I'll submit a fix shortly.

Other than the merge operator missing synchronization are there other places doing the wrong thing?

Thank you for reporting this issue and helping make RxJava better!

@benjchristensen
Copy link
Member

@thegeez Can you please review the pull request I just submitted with a unit test and fix for the Merge operator?

@thegeez
Copy link
Contributor Author

thegeez commented Mar 20, 2013

I understand why the synchronization should be, but was missed, in OperationMerge and not be standard for every created Observable.

My review of the fix pull request is at #201

@benjchristensen
Copy link
Member

Fix has been merged.

rickbw pushed a commit to rickbw/RxJava that referenced this issue Jan 9, 2014
fixes ReactiveX#200

This is necessary because by definition Merge is subscribing to multiple sequences in parallel and is supposed to serialize them into a single Observable.
jihoonson pushed a commit to jihoonson/RxJava that referenced this issue Mar 6, 2020
…t_breaker_disabled_forced_open

Feature/159 fixing build
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants