Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Make observeOn not let worker.dispose() called prematurely #6167

Merged
merged 2 commits into from
Aug 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.plugins.RxJavaPlugins;

public final class CompletableFromCallable extends Completable {

Expand All @@ -37,6 +38,8 @@ protected void subscribeActual(CompletableObserver observer) {
Exceptions.throwIfFatal(e);
if (!d.isDisposed()) {
observer.onError(e);
} else {
RxJavaPlugins.onError(e);
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ final boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a) {
if (d) {
if (delayError) {
if (empty) {
cancelled = true;
Throwable e = error;
if (e != null) {
a.onError(e);
Expand All @@ -203,12 +204,14 @@ final boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a) {
} else {
Throwable e = error;
if (e != null) {
cancelled = true;
clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
cancelled = true;
a.onComplete();
worker.dispose();
return true;
Expand Down Expand Up @@ -314,6 +317,7 @@ void runSync() {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancelled = true;
upstream.cancel();
a.onError(ex);
worker.dispose();
Expand All @@ -324,6 +328,7 @@ void runSync() {
return;
}
if (v == null) {
cancelled = true;
a.onComplete();
worker.dispose();
return;
Expand All @@ -339,6 +344,7 @@ void runSync() {
}

if (q.isEmpty()) {
cancelled = true;
a.onComplete();
worker.dispose();
return;
Expand Down Expand Up @@ -379,6 +385,7 @@ void runAsync() {
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);

cancelled = true;
upstream.cancel();
q.clear();

Expand Down Expand Up @@ -441,6 +448,7 @@ void runBackfused() {
downstream.onNext(null);

if (d) {
cancelled = true;
Throwable e = error;
if (e != null) {
downstream.onError(e);
Expand Down Expand Up @@ -552,6 +560,7 @@ void runSync() {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancelled = true;
upstream.cancel();
a.onError(ex);
worker.dispose();
Expand All @@ -562,6 +571,7 @@ void runSync() {
return;
}
if (v == null) {
cancelled = true;
a.onComplete();
worker.dispose();
return;
Expand All @@ -577,6 +587,7 @@ void runSync() {
}

if (q.isEmpty()) {
cancelled = true;
a.onComplete();
worker.dispose();
return;
Expand Down Expand Up @@ -617,6 +628,7 @@ void runAsync() {
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);

cancelled = true;
upstream.cancel();
q.clear();

Expand Down Expand Up @@ -680,6 +692,7 @@ void runBackfused() {
downstream.onNext(null);

if (d) {
cancelled = true;
Throwable e = error;
if (e != null) {
downstream.onError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
Throwable error;
volatile boolean done;

volatile boolean cancelled;
volatile boolean disposed;

int sourceMode;

Expand Down Expand Up @@ -141,8 +141,8 @@ public void onComplete() {

@Override
public void dispose() {
if (!cancelled) {
cancelled = true;
if (!disposed) {
disposed = true;
upstream.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
Expand All @@ -153,7 +153,7 @@ public void dispose() {

@Override
public boolean isDisposed() {
return cancelled;
return disposed;
}

void schedule() {
Expand Down Expand Up @@ -181,6 +181,7 @@ void drainNormal() {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
Expand Down Expand Up @@ -211,14 +212,15 @@ void drainFused() {
int missed = 1;

for (;;) {
if (cancelled) {
if (disposed) {
return;
}

boolean d = done;
Throwable ex = error;

if (!delayError && d && ex != null) {
disposed = true;
downstream.onError(error);
worker.dispose();
return;
Expand All @@ -227,6 +229,7 @@ void drainFused() {
downstream.onNext(null);

if (d) {
disposed = true;
ex = error;
if (ex != null) {
downstream.onError(ex);
Expand Down Expand Up @@ -254,14 +257,15 @@ public void run() {
}

boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (cancelled) {
if (disposed) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
if (delayError) {
if (empty) {
disposed = true;
if (e != null) {
a.onError(e);
} else {
Expand All @@ -272,12 +276,14 @@ boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
}
} else {
if (e != null) {
disposed = true;
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
disposed = true;
a.onComplete();
worker.dispose();
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import io.reactivex.annotations.Nullable;
import org.junit.Test;
import org.mockito.InOrder;
import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.annotations.Nullable;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
Expand Down Expand Up @@ -1781,4 +1782,163 @@ public void syncFusedRequestOneByOneConditional() {
.test()
.assertResult(1, 2, 3, 4, 5);
}

public static final class DisposeTrackingScheduler extends Scheduler {

public final AtomicInteger disposedCount = new AtomicInteger();

@Override
public Worker createWorker() {
return new TrackingWorker();
}

final class TrackingWorker extends Scheduler.Worker {

@Override
public void dispose() {
disposedCount.getAndIncrement();
}

@Override
public boolean isDisposed() {
return false;
}

@Override
public Disposable schedule(Runnable run, long delay,
TimeUnit unit) {
run.run();
return Disposables.empty();
}
}
}

@Test
public void workerNotDisposedPrematurelyNormalInNormalOut() {
DisposeTrackingScheduler s = new DisposeTrackingScheduler();

Flowable.concat(
Flowable.just(1).hide().observeOn(s),
Flowable.just(2)
)
.test()
.assertResult(1, 2);

assertEquals(1, s.disposedCount.get());
}

@Test
public void workerNotDisposedPrematurelySyncInNormalOut() {
DisposeTrackingScheduler s = new DisposeTrackingScheduler();

Flowable.concat(
Flowable.just(1).observeOn(s),
Flowable.just(2)
)
.test()
.assertResult(1, 2);

assertEquals(1, s.disposedCount.get());
}

@Test
public void workerNotDisposedPrematurelyAsyncInNormalOut() {
DisposeTrackingScheduler s = new DisposeTrackingScheduler();

UnicastProcessor<Integer> up = UnicastProcessor.create();
up.onNext(1);
up.onComplete();

Flowable.concat(
up.observeOn(s),
Flowable.just(2)
)
.test()
.assertResult(1, 2);

assertEquals(1, s.disposedCount.get());
}

static final class TestSubscriberFusedCanceling
extends TestSubscriber<Integer> {

public TestSubscriberFusedCanceling() {
super();
initialFusionMode = QueueFuseable.ANY;
}

@Override
public void onComplete() {
cancel();
super.onComplete();
}
}

@Test
public void workerNotDisposedPrematurelyNormalInAsyncOut() {
DisposeTrackingScheduler s = new DisposeTrackingScheduler();

TestSubscriber<Integer> ts = new TestSubscriberFusedCanceling();

Flowable.just(1).hide().observeOn(s).subscribe(ts);

assertEquals(1, s.disposedCount.get());
}

@Test
public void workerNotDisposedPrematurelyNormalInNormalOutConditional() {
DisposeTrackingScheduler s = new DisposeTrackingScheduler();

Flowable.concat(
Flowable.just(1).hide().observeOn(s).filter(Functions.alwaysTrue()),
Flowable.just(2)
)
.test()
.assertResult(1, 2);

assertEquals(1, s.disposedCount.get());
}

@Test
public void workerNotDisposedPrematurelySyncInNormalOutConditional() {
DisposeTrackingScheduler s = new DisposeTrackingScheduler();

Flowable.concat(
Flowable.just(1).observeOn(s).filter(Functions.alwaysTrue()),
Flowable.just(2)
)
.test()
.assertResult(1, 2);

assertEquals(1, s.disposedCount.get());
}

@Test
public void workerNotDisposedPrematurelyAsyncInNormalOutConditional() {
DisposeTrackingScheduler s = new DisposeTrackingScheduler();

UnicastProcessor<Integer> up = UnicastProcessor.create();
up.onNext(1);
up.onComplete();

Flowable.concat(
up.observeOn(s).filter(Functions.alwaysTrue()),
Flowable.just(2)
)
.test()
.assertResult(1, 2);

assertEquals(1, s.disposedCount.get());
}

@Test
public void workerNotDisposedPrematurelyNormalInAsyncOutConditional() {
DisposeTrackingScheduler s = new DisposeTrackingScheduler();

TestSubscriber<Integer> ts = new TestSubscriberFusedCanceling();

Flowable.just(1).hide().observeOn(s).filter(Functions.alwaysTrue()).subscribe(ts);

assertEquals(1, s.disposedCount.get());
}
}
Loading