Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x Take operator can emit more values than its limit #3346

Closed
mgp opened this issue Sep 15, 2015 · 5 comments
Closed

1.x Take operator can emit more values than its limit #3346

mgp opened this issue Sep 15, 2015 · 5 comments
Labels

Comments

@mgp
Copy link

mgp commented Sep 15, 2015

This can happen if a doOnNext action downstream can cause the observable upstream to emit another value. Here's a test that demonstrates this behavior:

@Test
public void testEmittingMoreValuesThanLimit() {
    final AtomicBoolean emittedFirstValue = new AtomicBoolean(false);
    final PublishSubject<Integer> subject = PublishSubject.create();
    final int firstValue = 1;
    final int secondValue = 2;

    // Record emitted values with this action.
    Action1<Integer> record = mock(Action1.class);
    InOrder inOrder = inOrder(record);

    subject
            .take(1)
            .doOnNext(record)
            .doOnNext(new Action1<Integer>() {
                @Override
                public void call(final Integer integer) {
                    // As the first value passes through, emit the second value.
                    if (!emittedFirstValue.getAndSet(true)) {
                        subject.onNext(secondValue);
                    }
                }
            })
            .subscribe();

    inOrder.verifyNoMoreInteractions();

    // Will record both values even though take(1) is upstream.
    subject.onNext(firstValue);
    subject.onCompleted();
    inOrder.verify(record).call(firstValue);
    inOrder.verify(record).call(secondValue);
    inOrder.verifyNoMoreInteractions();
}

I first noticed this in a project that uses RxJava 1.1.10. The onNext implementation of OperatorTake has changed a bit in master. Before I go ahead and submit a PR that fixes this behavior, I just wanted to ensure that it wasn't deliberate? Thanks!

@akarnokd
Copy link
Member

Hi and thanks for discovering this. The behavior is not intentional. We are generally not prepared for synchronous reentry such as this. Solving it is a bit tricky because a delicate behavior needs to be ensured right at the limit. Here is how I'd fix this:

  • I'd introduce a boolean stop variable on the parent Subscriber
  • I'd change the onNext to have the reentrancy protection:
@Override
public void onNext(T i) {
    if (!isUnsubscribed()) {
        boolean stop = ++count >= limit;
        if (stop) {
            if (this.stop) {
                return;
            }
            this.stop = true;
        }
        child.onNext(i);
        if (stop && !completed) {
            completed = true;
            try {
                child.onCompleted();
            } finally {
                unsubscribe();
            }
        }
    }
}

This will prevent a recursive onNext call to be delivered after the limit.

@akarnokd akarnokd added the Bug label Sep 15, 2015
@mgp
Copy link
Author

mgp commented Sep 15, 2015

Thanks for the response! I think alternatively you could do:

if (!isUnsubscribed() && (count++ < limit)) {
    boolean stop = count >= limit;
    child.onNext(i);
    if (stop && !completed) {
        completed = true;
        try {
            child.onCompleted();
        } finally {
            unsubscribe();
        }
    }
}

Which only changes the outer conditional and the computation of stop. LMK if that sounds good and I'll prepare a PR?

@akarnokd
Copy link
Member

Looks even better, go ahead with the PR.

@akarnokd
Copy link
Member

@mgp Are you going to post a PR against 1.x?

akarnokd added a commit to akarnokd/RxJava that referenced this issue Sep 25, 2015
Discovered by @mgp in ReactiveX#3346 and using his supplied fix. I've already
applied it to NbpObservable's take this Monday so all that's left was
the unit test.
@akarnokd
Copy link
Member

akarnokd commented Oct 1, 2015

Closing via #3384.

@akarnokd akarnokd closed this as completed Oct 1, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants