Skip to content

Commit

Permalink
2.x: Fix the error/race in Obs.repeatWhen due to flooding repeat sign…
Browse files Browse the repository at this point in the history
…al (#6359)
  • Loading branch information
akarnokd authored Jan 4, 2019
1 parent 2e0c8b5 commit 090f99e
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ public void onError(Throwable e) {

@Override
public void onComplete() {
active = false;
DisposableHelper.replace(upstream, null);
active = false;
signaller.onNext(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.TestSubscriber;
Expand Down Expand Up @@ -441,4 +442,56 @@ public boolean test(Object v) throws Exception {

assertEquals(0, counter.get());
}

@Test
public void repeatFloodNoSubscriptionError() {
List<Throwable> errors = TestHelper.trackPluginErrors();

try {
final PublishProcessor<Integer> source = PublishProcessor.create();
final PublishProcessor<Integer> signaller = PublishProcessor.create();

for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {

TestSubscriber<Integer> ts = source.take(1)
.repeatWhen(new Function<Flowable<Object>, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Flowable<Object> v)
throws Exception {
return signaller;
}
}).test();

Runnable r1 = new Runnable() {
@Override
public void run() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
source.onNext(1);
}
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
signaller.offer(1);
}
}
};

TestHelper.race(r1, r2);

ts.dispose();
}

if (!errors.isEmpty()) {
for (Throwable e : errors) {
e.printStackTrace();
}
fail(errors + "");
}
} finally {
RxJavaPlugins.reset();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.*;
Expand Down Expand Up @@ -1221,4 +1222,64 @@ public boolean test(Object v) throws Exception {

assertEquals(0, counter.get());
}

@Test
public void repeatFloodNoSubscriptionError() {
List<Throwable> errors = TestHelper.trackPluginErrors();

final TestException error = new TestException();

try {
final PublishProcessor<Integer> source = PublishProcessor.create();
final PublishProcessor<Integer> signaller = PublishProcessor.create();

for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {

TestSubscriber<Integer> ts = source.take(1)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) throws Exception {
throw error;
}
})
.retryWhen(new Function<Flowable<Throwable>, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Flowable<Throwable> v)
throws Exception {
return signaller;
}
}).test();

Runnable r1 = new Runnable() {
@Override
public void run() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
source.onNext(1);
}
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
signaller.offer(1);
}
}
};

TestHelper.race(r1, r2);

ts.dispose();
}

if (!errors.isEmpty()) {
for (Throwable e : errors) {
e.printStackTrace();
}
fail(errors + "");
}
} finally {
RxJavaPlugins.reset();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;

Expand Down Expand Up @@ -397,4 +398,56 @@ public boolean test(Object v) throws Exception {

assertEquals(0, counter.get());
}

@Test
public void repeatFloodNoSubscriptionError() {
List<Throwable> errors = TestHelper.trackPluginErrors();

try {
final PublishSubject<Integer> source = PublishSubject.create();
final PublishSubject<Integer> signaller = PublishSubject.create();

for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {

TestObserver<Integer> to = source.take(1)
.repeatWhen(new Function<Observable<Object>, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Observable<Object> v)
throws Exception {
return signaller;
}
}).test();

Runnable r1 = new Runnable() {
@Override
public void run() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
source.onNext(1);
}
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
signaller.onNext(1);
}
}
};

TestHelper.race(r1, r2);

to.dispose();
}

if (!errors.isEmpty()) {
for (Throwable e : errors) {
e.printStackTrace();
}
fail(errors + "");
}
} finally {
RxJavaPlugins.reset();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.observables.GroupedObservable;
import io.reactivex.observers.*;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;

Expand Down Expand Up @@ -1131,4 +1132,64 @@ public boolean test(Object v) throws Exception {

assertEquals(0, counter.get());
}

@Test
public void repeatFloodNoSubscriptionError() {
List<Throwable> errors = TestHelper.trackPluginErrors();

final TestException error = new TestException();

try {
final PublishSubject<Integer> source = PublishSubject.create();
final PublishSubject<Integer> signaller = PublishSubject.create();

for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {

TestObserver<Integer> to = source.take(1)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) throws Exception {
throw error;
}
})
.retryWhen(new Function<Observable<Throwable>, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Observable<Throwable> v)
throws Exception {
return signaller;
}
}).test();

Runnable r1 = new Runnable() {
@Override
public void run() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
source.onNext(1);
}
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
signaller.onNext(1);
}
}
};

TestHelper.race(r1, r2);

to.dispose();
}

if (!errors.isEmpty()) {
for (Throwable e : errors) {
e.printStackTrace();
}
fail(errors + "");
}
} finally {
RxJavaPlugins.reset();
}
}
}

0 comments on commit 090f99e

Please sign in to comment.