Skip to content

Commit

Permalink
Merge pull request #3384 from akarnokd/TakeReentrancyFix1x
Browse files Browse the repository at this point in the history
Fix for take() reentrancy bug.
  • Loading branch information
abersnaze committed Sep 30, 2015
2 parents f2410f8 + 53ff799 commit ebe28de
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/main/java/rx/internal/operators/OperatorTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public void onError(Throwable e) {

@Override
public void onNext(T i) {
if (!isUnsubscribed()) {
boolean stop = ++count >= limit;
if (!isUnsubscribed() && count++ < limit) {
boolean stop = count == limit;
child.onNext(i);
if (stop && !completed) {
completed = true;
Expand Down
21 changes: 21 additions & 0 deletions src/test/java/rx/internal/operators/OperatorTakeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import rx.functions.*;
import rx.observers.*;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

public class OperatorTakeTest {

Expand Down Expand Up @@ -417,4 +418,24 @@ public void onNext(Integer t) {
ts.assertError(TestException.class);
ts.assertNotCompleted();
}

@Test
public void testReentrantTake() {
final PublishSubject<Integer> source = PublishSubject.create();

TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

source.take(1).doOnNext(new Action1<Integer>() {
@Override
public void call(Integer v) {
source.onNext(2);
}
}).subscribe(ts);

source.onNext(1);

ts.assertValue(1);
ts.assertNoErrors();
ts.assertCompleted();
}
}

0 comments on commit ebe28de

Please sign in to comment.