Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue #1451 #1454

Merged
merged 2 commits into from
Jul 17, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public void request(long n) {
}
o.onNext(it.next());
}
o.onCompleted();
if (!o.isUnsubscribed()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this considered such a problem? This is always possible to have happen from an Observable and nothing should be sensitive to it happening since unsubscribe can be a race condition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Should I send a PR to remove it, or just keep it?

o.onCompleted();
}
} else if(n > 0) {
// backpressure is requested
long _c = REQUESTED_UPDATER.getAndAdd(this, n);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ public void request(long n) {
}
o.onNext((int) i);
}
o.onCompleted();
if (!o.isUnsubscribed()) {
o.onCompleted();
}
} else if (n > 0) {
// backpressure is requested
long _c = REQUESTED_UPDATER.getAndAdd(this, n);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,31 @@ public OperatorDoOnEach(Observer<? super T> doOnEachObserver) {
@Override
public Subscriber<? super T> call(final Subscriber<? super T> observer) {
return new Subscriber<T>(observer) {

private boolean done = false;

@Override
public void onCompleted() {
if (done) {
return;
}
try {
doOnEachObserver.onCompleted();
} catch (Throwable e) {
onError(e);
return;
}
// Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer
done = true;
observer.onCompleted();
}

@Override
public void onError(Throwable e) {
if (done) {
return;
}
done = true;
try {
doOnEachObserver.onError(e);
} catch (Throwable e2) {
Expand All @@ -57,6 +69,9 @@ public void onError(Throwable e) {

@Override
public void onNext(T value) {
if (done) {
return;
}
try {
doOnEachObserver.onNext(value);
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,40 @@ public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {

private int counter = 0;

private boolean done = false;

@Override
public void onNext(T args) {
boolean isSelected;
try {
isSelected = predicate.call(args, counter++);
} catch (Throwable e) {
done = true;
subscriber.onError(e);
unsubscribe();
return;
}
if (isSelected) {
subscriber.onNext(args);
} else {
done = true;
subscriber.onCompleted();
unsubscribe();
}
}

@Override
public void onCompleted() {
subscriber.onCompleted();
if (!done) {
subscriber.onCompleted();
}
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
if (!done) {
subscriber.onError(e);
}
}

};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand All @@ -30,6 +31,9 @@
import rx.functions.Action1;
import rx.functions.Func1;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class OperatorDoOnEachTest {

@Mock
Expand Down Expand Up @@ -114,4 +118,55 @@ public void call(String s) {

}

@Test
public void testIssue1451Case1() {
// https://github.com/Netflix/RxJava/issues/1451
int[] nums = {1, 2, 3};
final AtomicInteger count = new AtomicInteger();
for (final int n : nums) {
Observable
.from(Boolean.TRUE, Boolean.FALSE)
.takeWhile(new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean value) {
return value;
}
})
.toList()
.doOnNext(new Action1<List<Boolean>>() {
@Override
public void call(List<Boolean> booleans) {
count.incrementAndGet();
}
})
.subscribe();
}
assertEquals(nums.length, count.get());
}

@Test
public void testIssue1451Case2() {
// https://github.com/Netflix/RxJava/issues/1451
int[] nums = {1, 2, 3};
final AtomicInteger count = new AtomicInteger();
for (final int n : nums) {
Observable
.from(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE)
.takeWhile(new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean value) {
return value;
}
})
.toList()
.doOnNext(new Action1<List<Boolean>>() {
@Override
public void call(List<Boolean> booleans) {
count.incrementAndGet();
}
})
.subscribe();
}
assertEquals(nums.length, count.get());
}
}