Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Occasionally hanging sequence of takeUntil()+window()+flatMap() #1677

Closed
spodila opened this issue Sep 8, 2014 · 11 comments
Closed

Occasionally hanging sequence of takeUntil()+window()+flatMap() #1677

spodila opened this issue Sep 8, 2014 · 11 comments
Labels
Milestone

Comments

@spodila
Copy link
Contributor

spodila commented Sep 8, 2014

This test case brings out two problems, seemingly due to concurrency bugs. They don't happen every time, but do happen if you repeat running it a handful of times.

  1. The Observable sequence hangs and eventually times out. Assert fail gets called for innerLatch.
  2. We get one more onNext() than expected. The last line assertEquals() fails with expected=250 and actual=251.

Problem 1 happens easily if you repeat the test, say, 10 times. Problem 2 is rare. Although, I am not completely sure if my expectation for problem 2 is correct.

@Test
    public void testParallelBatch() throws Exception {
        final AtomicLong counter = new AtomicLong();
        final Integer[] numbers = new Integer[5000];
        for(int i=0; i<numbers.length; i++)
            numbers[i] = i+1;
        final int NITERS=250;
        final CountDownLatch latch = new CountDownLatch(NITERS);
        for(int iters=0; iters<NITERS; iters++) {
            final CountDownLatch innerLatch = new CountDownLatch(1);
            final PublishSubject s = PublishSubject.create();
            Observable.from(numbers)
                    .takeUntil(s)
                    .window(50)
                    .flatMap(new Func1<Observable<Integer>, Observable<Integer>>() {
                        @Override
                        public Observable<Integer> call(Observable<Integer> integerObservable) {
                            return integerObservable
                                    .subscribeOn(Schedulers.computation())
                                    .map(new Func1<Integer, Integer>() {
                                        @Override
                                        public Integer call(Integer integer) {
                                            if (integer >= 5) {
                                                s.onCompleted();
                                            }
                                            // do some work
                                            Math.pow(Math.random(), Math.random());
                                            return integer * 2;
                                        }
                                    });
                        }
                    })
                    .toList()
                    .doOnNext(new Action1<List<Integer>>() {
                        @Override
                        public void call(List<Integer> integers) {
                            counter.incrementAndGet();
                            latch.countDown();
                            innerLatch.countDown();
                        }
                    })
                    .subscribe();
            if(!innerLatch.await(10, TimeUnit.SECONDS))
                Assert.fail("Failed inner latch wait, iteration " + iters);
        }
        if(!latch.await(15, TimeUnit.SECONDS))
            Assert.fail("Incomplete! Went through " + latch.getCount() + " iterations");
        else
            Assert.assertEquals(NITERS, counter.get());
    }
@spodila
Copy link
Contributor Author

spodila commented Sep 9, 2014

Replacing subscrinbeOn() with observeOn() shows the same hang.

@zsxwing
Copy link
Member

zsxwing commented Sep 11, 2014

Which version you are using? Is it because #1656?

@spodila
Copy link
Contributor Author

spodila commented Sep 11, 2014

On rxjava-0.20.4

Which release is #1656 in?

@zsxwing
Copy link
Member

zsxwing commented Sep 12, 2014

Found a concurrent issue in ReplaySubject.BufferUntilSubscriber. buffered.buffer may receive some messge between buffered.buffer is drained and state.setObserverRef. If no further messages after state.setObserverRef, the messages in buffered.buffer will be swallowed. Here is a commit to fix it using lock: zsxwing@158f531

Still investigating a lockless fix.

Also found takeUntil is still not fixed. Done it in #1686.

@zsxwing
Copy link
Member

zsxwing commented Sep 13, 2014

Fixed in #1686

@zsxwing
Copy link
Member

zsxwing commented Sep 14, 2014

BTW, you should not call PublishSubject.onCompleted in different threads.

@spodila
Copy link
Contributor Author

spodila commented Sep 15, 2014

Is that an RxJava requirement? I do not see any such constraints mentioned in the JavaDocs for the method.

@zsxwing
Copy link
Member

zsxwing commented Sep 15, 2014

Is that an RxJava requirement? I do not see any such constraints mentioned in the JavaDocs for the method.

Right. When you call PublishSubject.onCompleted, PublishSubject is regarded as an Observer. As per Rx Design Guild (http://go.microsoft.com/fwlink/?LinkID=205219),

4.2. Assume observer instances are called in a serialized fashion

PublishSubject will expect that the events should be serialized.

@spodila
Copy link
Contributor Author

spodila commented Sep 15, 2014

Based on this, for correctness, I changed the test case to synchronize on 's' around "s.onCompleted()". I can confirm that problem 2 persists after that as well. However, I can't be totally sure if this is a valid test case. I feel that the counter should give the same number as NITERS in the code. Maybe less, but never greater. When it fails, it shows counter to be 1 greater than the expected, NITERS. If you have an insight into this part of the test case, let me know.

However, I am not able to reproduce problem 1 (hang), with or without the synchronized block. I can't tell why it is not reproducible anymore. So, effectively this test case doesn't seem to reproduce anymore the original intention of this issue.

@zsxwing
Copy link
Member

zsxwing commented Sep 16, 2014

When it fails, it shows counter to be 1 greater than the expected, NITERS.

The bug in takeUntil I fixed in #1686 may emit more onCompleted events and make toList emit more onNext events. Sorry that I cannot confirm it because I cannot reproduce it in my machine.

@benjchristensen
Copy link
Member

I am unable to replicate this issue on 1.0.0-rc.5 while running the above code in a tight loop for several minutes.

Also Sharma, the easy and idiomatic way to serialize access to the Subject is like this:

final PublishSubject<Object> s = PublishSubject.create();
final Observer<Object> _s = new SerializedObserver<Object>(s);

I think this is common enough I'm going to add a SerializedSubject wrapper.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants