Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronize Observer on OperationMerge #201

Conversation

benjchristensen
Copy link
Member

fix for Merge serialization bug reported in #200

This is necessary because by definition Merge is subscribing to multiple sequences in parallel and is supposed to serialize them into a single Observable.

fixes ReactiveX#200

This is necessary because by definition Merge is subscribing to multiple sequences in parallel and is supposed to serialize them into a single Observable.
@cloudbees-pull-request-builder

RxJava-pull-requests #41 SUCCESS
This pull request looks good

@thegeez
Copy link
Contributor

thegeez commented Mar 20, 2013

The fix looks good.

However, the test case does not catch the original problem.

First, the test case tests an "Observable.create(merge(o1,o2)" (https://github.com/Netflix/RxJava/pull/201/files#L0R409). The create wraps an AtomicObservableSubscription, which defeats the purpose of checking whether OperationMerge is chronologically well behaved itself. This should be "new Observable(merge(o1,o2), false)" (where false = isTrusted) or "Observable.merge(o1,o2)". This occurs in most of the test cases for OperationMerge.

By changing the test case to properly test OperationMerge, the test still didn't fail on the original wrong implementation.

I don't have a test case that shows the original problem, without resorting to threads with fixed sleep delays to orchestrate an interleaving.

@benjchristensen
Copy link
Member Author

The test case in this commit forces concurrent onNext invocations and demonstrates how the changed code no longer allows concurrent onNext calls.

When you say "interleaving" perhaps you're not referring to concurrent execution but instead how merge does not sequentially concatenate sequences? If that's what you mean then merge is the wrong operator and you need the concat operator. Here's the Rx.Net doc for concat: http://msdn.microsoft.com/en-us/library/hh212146(v=vs.103).aspx. This operator is available in RxJava.

Note that concat is described as:

An observable sequence that contains the elements of the first sequence, followed by those of the second the sequence.

whereas merge is:

The observable sequence that merges the elements of the observable sequences.

The merge operator subscribes to all sequences concurrently and emits them into a single serialized sequence. The concat operator subscribes to each sequence once at a time and thus "merges" them sequentially.

As for the comments about AtomicObservableSubscription - that is not the thing that is synchronizing, it is the SynchronizedObserver that wraps the Observer to synchronize onNext calls to ensure they are serialized - i.e. no concurrent execution of onNext.

The isTrusted piece has nothing to do with concurrent execution - the AtomicObservableSubscription is just ensuring that onNext/onCompleted/onError obeys the contract - it does not synchronize anything and thus has nothing to do with concurrent executing, interleaving, etc. Am I missing something that causes you to say "which defeats the purpose of checking whether OperationMerge is chronologically well behaved itself"?

Thus with merge you can have these sequences:

 A1 A2 A3
 B1 B2 B3

If the two are asynchronously and concurrently executing then they will both begin emitting immediately but onNext will not concurrently execute (which was the bug this fixes).

You can then end up with this:

 A1 B1 B2 A2 B3 A3

But no longer end up with this with concurrent onNext calls like this which were possible before this commit:

 A1 B1   B3 A3
        B2      A2

If you're trying to get output like this:

A1 A2 A3 B1 B2 B3

then you want the concat operator instead of merge.

@thegeez
Copy link
Contributor

thegeez commented Mar 20, 2013

You are right, I am confusing AtomicObservableSubscription with SynchronizedObserver, which indeed refer to two completely different things. Sorry for that.

I do not mean the concat operation. With interleaving I meant the interleaving of the calls to onNext and countDown in the test case.

When I revert the change on line https://github.com/Netflix/RxJava/pull/201/files#L0L121 and run the test case, the test case doesn't fail:

mfex@mfex:~/Programming/rx-ben$ git log | head -n4
commit effc08d548518df5a54c916e1b50daadb8bf4228
Author: Ben Christensen <[email protected]>
Date:   Tue Mar 19 16:23:50 2013 -0700

mfex@mfex:~/Programming/rx-ben$ ./gradlew rxjava-core:clean
:rxjava-core:clean

BUILD SUCCESSFUL

Total time: 8.355 secs
mfex@mfex:~/Programming/rx-ben$ ./gradlew rxjava-core:test
:rxjava-core:compileJava
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:rxjava-core:processResources UP-TO-DATE
:rxjava-core:classes
:rxjava-core:compileTestJava
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:rxjava-core:processTestResources UP-TO-DATE
:rxjava-core:testClasses
:rxjava-core:test

BUILD SUCCESSFUL

Total time: 19.126 secs

...edit out the bug fix...

mfex@mfex:~/Programming/rx-ben$ git diff
diff --git a/rxjava-core/src/main/java/rx/operators/OperationMerge.java b/rxjava
index 1e6e6e7..6ad7a51 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationMerge.java
+++ b/rxjava-core/src/main/java/rx/operators/OperationMerge.java
@@ -132,7 +132,8 @@ public final class OperationMerge {
             /**
              * Subscribe to the parent Observable to get to the children Observ
              */
-            sequences.subscribe(new ParentObserver(synchronizedObserver));
+            //            sequences.subscribe(new ParentObserver(synchronizedOb
+            sequences.subscribe(new ParentObserver(actualObserver));

             /* return our subscription to allow unsubscribing */
             return ourSubscription;
mfex@mfex:~/Programming/rx-ben$ ./gradlew rxjava-core:clean
:rxjava-core:clean

BUILD SUCCESSFUL

Total time: 8.131 secs
mfex@mfex:~/Programming/rx-ben$ ./gradlew rxjava-core:test
:rxjava-core:compileJava
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:rxjava-core:processResources UP-TO-DATE
:rxjava-core:classes
:rxjava-core:compileTestJava
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:rxjava-core:processTestResources UP-TO-DATE
:rxjava-core:testClasses
:rxjava-core:test

BUILD SUCCESSFUL

Total time: 19.567 secs
mfex@mfex:~/Programming/rx-ben$

- return AtomicSubscription not MergeSubscription which I was accidentally still returning
- try/finally in unit test so threads are released even if assertion is thrown
@benjchristensen
Copy link
Member Author

That's rather odd as I see the unit test doing what I expect:

With this code:

sequences.subscribe(new ParentObserver(actualObserver));

I get the failure:

rx.operators.OperationMerge$UnitTest > testSynchronizationOfMultipleSequences FAILED
    java.lang.AssertionError at OperationMerge.java:445

146 tests completed, 1 failed
:rxjava-core:test FAILED

With this code:

sequences.subscribe(new ParentObserver(synchronizedObserver));

the unit test passes.

BUILD SUCCESSFUL

Total time: 49.234 secs

The testSynchronizationOfMultipleSequences unit test is running 2 async Observables that each spawn a thread.

I use a CountDownLatch in the TestASynchronousObservable instances to allow the unit test to wait for each to hit the point where they are calling onNext.

I then use the endLatch inside the onNext to cause the async observables to block inside the onNext call and increment the concurrentCounter AtomicInteger.

If onNext is not synchronized then both threads will block on endLatch and concurrentCounter will be 2. If onNext is synchronized then only one of the threads can enter so concurrentCounter will be 1.

Once I perform the assertion the endLatch is released to allow the callback threads to complete execution of onNext.

Does this unit test correctly test the bug you originally were reporting about concurrent execution of onNext, or is there something else that I am missing?


I just committed a few tweaks to improve the code.

On line 136 it is now:

            sequences.subscribe(new ParentObserver(synchronizedObserver));

If I change this to the following the unit test breaks for me:

            sequences.subscribe(new ParentObserver(actualObserver));

@cloudbees-pull-request-builder

RxJava-pull-requests #43 SUCCESS
This pull request looks good

@thegeez
Copy link
Contributor

thegeez commented Mar 21, 2013

Just to re-iterate: I think the fix is good. This is now about the test case included in the fix.

With this code:

sequences.subscribe(new ParentObserver(actualObserver));

I get the failure:

 rx.operators.OperationMerge$UnitTest > testSynchronizationOfMultipleSequences FAILED
    java.lang.AssertionError at OperationMerge.java:445
146 tests completed, 1 failed
:rxjava-core:test FAILED

I don't always see FAILED, sometimes the test does succeed.

git clone https://github.com/benjchristensen/RxJava rx-200-false-positive
cd rx-200-false-positive
git checkout issue-200-merge-synchronization
git log | head -n7
echo "Remove fix from code"
sed -i 's/SynchronizedObserver<T>\ synchronizedObserver/\/\/test-remove/g' rxjava-core/src/main/java/rx/operators/OperationMerge.java
sed -i 's/sequences.subscribe(new\ ParentObserver(synchronizedObserver))/sequences.subscribe(new ParentObserver(actualObserver))/g' rxjava-core/src/main/java/rx/operators/OperationMerge.java
git diff --exit-code
echo "Running tests"
for i in {1..100}; do ./gradlew cleanTest rxjava-core:test -Dtest.single=**OperationMerge**; done > test_run_results.txt
echo "Saw success out of 100:"
echo `grep -c "BUILD SUCCESSFUL" test_run_results.txt`
echo "Saw fail out of 100: "
echo `grep -c "BUILD FAILED" test_run_results.txt`

Output:

Initialized empty Git repository in /home/mfex/Programming/rx-200-false-positive/rx-200-false-positive/.git/
...
Switched to a new branch 'issue-200-merge-synchronization'
commit fb555df3376301595f6596861662c654d77209d2
Author: Ben Christensen <[email protected]>
Date:   Wed Mar 20 15:12:44 2013 -0700

    Synchronization of Merge operator (fixes)

    - return AtomicSubscription not MergeSubscription which I was accidentally still returning
diff --git a/rxjava-core/src/main/java/rx/operators/OperationMerge.java b/rxjava-core/src/main/java/rx/operators/OperationMerge.java
index eeb1e96..ded2383 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationMerge.java
+++ b/rxjava-core/src/main/java/rx/operators/OperationMerge.java
@@ -128,12 +128,12 @@ public final class OperationMerge {
              * Bug report: https://github.com/Netflix/RxJava/issues/200
              */
             AtomicObservableSubscription subscription = new AtomicObservableSubscription(ourSubscription);
-            SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver, subscription);
+            //test-remove = new SynchronizedObserver<T>(actualObserver, subscription);

             /**
              * Subscribe to the parent Observable to get to the children Observables
              */
-            sequences.subscribe(new ParentObserver(synchronizedObserver));
+            sequences.subscribe(new ParentObserver(actualObserver));

             /* return our subscription to allow unsubscribing */
             return subscription;
Running tests
...
Saw success out of 100:
98
Saw fail out of 100: 
2

The test succeeding while it shouldn't doesn't happen always. Therefore I think that the test doesn't account for this possible interleaving of the different threads executing:

TestRunner o1 TestASynchronousObservable o2 TestASynchronousObservable
m.subscribe .. - -
- onNextBeingSent.countDown(); -
- observer.onNext("hello"); -
o1.onNextBeingSent.await(); - -
- - onNextBeingSent.countDown();
o2.onNextBeingSent.await(); - -
assertEquals(1, concurrentCounter.get()); - -
- - observer.onNext("hello");
endLatch.countDown(); - -
- observer.onCompleted(); -
- - observer.onCompleted();

@thegeez
Copy link
Contributor

thegeez commented Mar 22, 2013

Saw success out of 100:
98
Saw fail out of 100: 
2

This was on an Ubuntu machine.

On a mac book I needed more samples for this result:

Saw success out of 1000:
3
Saw fail out of 1000: 
997

Something to consider when trying to reproduce the results. I don't have detailed specs of the test machines available at the moment.

@benjchristensen
Copy link
Member Author

Hmm, not sure yet what's wrong the test case but obviously it is not deterministic. Thank you for the detailed testing - I'll explore what I can do.

If you can offer a better unit test that is deterministic (not reliant on Thread.sleep) for this I'll gladly accept it.

@benjchristensen
Copy link
Member Author

I'm holding off committing until I understand the non-determinism and can fix it ... unless you would prefer I just commit the fix and worry about the unit test later.

@thegeez
Copy link
Contributor

thegeez commented Mar 28, 2013

The merge operation is also used to implement mapMany (https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/OperationMap.java#L71), which is even more important to get right. I haven't looked into the impact of the proposed fix of merge on mapMany.

@benjchristensen
Copy link
Member Author

That should work as expected since mapMany is just a composition of merge and map. It is definitely via mapMany that merge gets used most often in my experience.

- not sure of a way other than putting Thread.sleep in here to give time after each CountDownLatch triggers for the process scheduler to execute the next line of each thread

See ReactiveX#201 for more information.
@cloudbees-pull-request-builder

RxJava-pull-requests #70 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member Author

I see in the code how the non-deterministic behavior could occur. I'm not quite sure of a way to solve it other than using Thread.sleep to hopefully give each thread time to complete the onNext call.

I can't use another CountDownLatch for after the call - as we want it in the middle of onNext. I can't use it inside onNext as the whole point of this test is that one of them should be blocked on the synchronized onNext call, thus I have no idea how to know if a thread has actually invoked but is blocked on a method ... other than a thread dump.

I can't replicate the non-deterministic behavior on my machine so @thegeez could you see if the fix I just committed helps on your machine?

I'm going to proceed with the merge instead of waiting as I want to get this fix in since it's passing unit tests (and manual review by both you and me) and if it's still non-deterministic on your machine or elsewhere we can continue to figure out a better way to do this test.

benjchristensen added a commit that referenced this pull request Apr 2, 2013
…ization

Synchronize Observer on OperationMerge
@benjchristensen benjchristensen merged commit 0010178 into ReactiveX:master Apr 2, 2013
@thegeez
Copy link
Contributor

thegeez commented Apr 2, 2013

The fix in benjchristensen@169e7e0 indeed removes the false positives I saw previously for the test case.

@benjchristensen
Copy link
Member Author

Great, thank you for confirming.

rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
- not sure of a way other than putting Thread.sleep in here to give time after each CountDownLatch triggers for the process scheduler to execute the next line of each thread

See ReactiveX#201 for more information.
rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
…-synchronization

Synchronize Observer on OperationMerge
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants