Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Don't dispose the winner of {Single|Maybe|Completable}.amb() #6375

Merged
merged 2 commits into from
Jan 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public void subscribeActual(final CompletableObserver observer) {

final AtomicBoolean once = new AtomicBoolean();

CompletableObserver inner = new Amb(once, set, observer);

for (int i = 0; i < count; i++) {
CompletableSource c = sources[i];
if (set.isDisposed()) {
Expand All @@ -82,7 +80,7 @@ public void subscribeActual(final CompletableObserver observer) {
}

// no need to have separate subscribers because inner is stateless
c.subscribe(inner);
c.subscribe(new Amb(once, set, observer));
}

if (count == 0) {
Expand All @@ -91,9 +89,14 @@ public void subscribeActual(final CompletableObserver observer) {
}

static final class Amb implements CompletableObserver {
private final AtomicBoolean once;
private final CompositeDisposable set;
private final CompletableObserver downstream;

final AtomicBoolean once;

final CompositeDisposable set;

final CompletableObserver downstream;

Disposable upstream;

Amb(AtomicBoolean once, CompositeDisposable set, CompletableObserver observer) {
this.once = once;
Expand All @@ -104,6 +107,7 @@ static final class Amb implements CompletableObserver {
@Override
public void onComplete() {
if (once.compareAndSet(false, true)) {
set.delete(upstream);
set.dispose();
downstream.onComplete();
}
Expand All @@ -112,6 +116,7 @@ public void onComplete() {
@Override
public void onError(Throwable e) {
if (once.compareAndSet(false, true)) {
set.delete(upstream);
set.dispose();
downstream.onError(e);
} else {
Expand All @@ -121,8 +126,8 @@ public void onError(Throwable e) {

@Override
public void onSubscribe(Disposable d) {
upstream = d;
set.add(d);
}

}
}
56 changes: 28 additions & 28 deletions src/main/java/io/reactivex/internal/operators/maybe/MaybeAmb.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,64 +64,63 @@ protected void subscribeActual(MaybeObserver<? super T> observer) {
count = sources.length;
}

AmbMaybeObserver<T> parent = new AmbMaybeObserver<T>(observer);
observer.onSubscribe(parent);
CompositeDisposable set = new CompositeDisposable();
observer.onSubscribe(set);

AtomicBoolean winner = new AtomicBoolean();

for (int i = 0; i < count; i++) {
MaybeSource<? extends T> s = sources[i];
if (parent.isDisposed()) {
if (set.isDisposed()) {
return;
}

if (s == null) {
parent.onError(new NullPointerException("One of the MaybeSources is null"));
set.dispose();
NullPointerException ex = new NullPointerException("One of the MaybeSources is null");
if (winner.compareAndSet(false, true)) {
observer.onError(ex);
} else {
RxJavaPlugins.onError(ex);
}
return;
}

s.subscribe(parent);
s.subscribe(new AmbMaybeObserver<T>(observer, set, winner));
}

if (count == 0) {
observer.onComplete();
}

}

static final class AmbMaybeObserver<T>
extends AtomicBoolean
implements MaybeObserver<T>, Disposable {

private static final long serialVersionUID = -7044685185359438206L;
implements MaybeObserver<T> {

final MaybeObserver<? super T> downstream;

final CompositeDisposable set;
final AtomicBoolean winner;

AmbMaybeObserver(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
this.set = new CompositeDisposable();
}
final CompositeDisposable set;

@Override
public void dispose() {
if (compareAndSet(false, true)) {
set.dispose();
}
}
Disposable upstream;

@Override
public boolean isDisposed() {
return get();
AmbMaybeObserver(MaybeObserver<? super T> downstream, CompositeDisposable set, AtomicBoolean winner) {
this.downstream = downstream;
this.set = set;
this.winner = winner;
}

@Override
public void onSubscribe(Disposable d) {
upstream = d;
set.add(d);
}

@Override
public void onSuccess(T value) {
if (compareAndSet(false, true)) {
if (winner.compareAndSet(false, true)) {
set.delete(upstream);
set.dispose();

downstream.onSuccess(value);
Expand All @@ -130,7 +129,8 @@ public void onSuccess(T value) {

@Override
public void onError(Throwable e) {
if (compareAndSet(false, true)) {
if (winner.compareAndSet(false, true)) {
set.delete(upstream);
set.dispose();

downstream.onError(e);
Expand All @@ -141,12 +141,12 @@ public void onError(Throwable e) {

@Override
public void onComplete() {
if (compareAndSet(false, true)) {
if (winner.compareAndSet(false, true)) {
set.delete(upstream);
set.dispose();

downstream.onComplete();
}
}

}
}
26 changes: 16 additions & 10 deletions src/main/java/io/reactivex/internal/operators/single/SingleAmb.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,61 +59,67 @@ protected void subscribeActual(final SingleObserver<? super T> observer) {
count = sources.length;
}

final AtomicBoolean winner = new AtomicBoolean();
final CompositeDisposable set = new CompositeDisposable();

AmbSingleObserver<T> shared = new AmbSingleObserver<T>(observer, set);
observer.onSubscribe(set);

for (int i = 0; i < count; i++) {
SingleSource<? extends T> s1 = sources[i];
if (shared.get()) {
if (set.isDisposed()) {
return;
}

if (s1 == null) {
set.dispose();
Throwable e = new NullPointerException("One of the sources is null");
if (shared.compareAndSet(false, true)) {
if (winner.compareAndSet(false, true)) {
observer.onError(e);
} else {
RxJavaPlugins.onError(e);
}
return;
}

s1.subscribe(shared);
s1.subscribe(new AmbSingleObserver<T>(observer, set, winner));
}
}

static final class AmbSingleObserver<T> extends AtomicBoolean implements SingleObserver<T> {

private static final long serialVersionUID = -1944085461036028108L;
static final class AmbSingleObserver<T> implements SingleObserver<T> {

final CompositeDisposable set;

final SingleObserver<? super T> downstream;

AmbSingleObserver(SingleObserver<? super T> observer, CompositeDisposable set) {
final AtomicBoolean winner;

Disposable upstream;

AmbSingleObserver(SingleObserver<? super T> observer, CompositeDisposable set, AtomicBoolean winner) {
this.downstream = observer;
this.set = set;
this.winner = winner;
}

@Override
public void onSubscribe(Disposable d) {
this.upstream = d;
set.add(d);
}

@Override
public void onSuccess(T value) {
if (compareAndSet(false, true)) {
if (winner.compareAndSet(false, true)) {
set.delete(upstream);
set.dispose();
downstream.onSuccess(value);
}
}

@Override
public void onError(Throwable e) {
if (compareAndSet(false, true)) {
if (winner.compareAndSet(false, true)) {
set.delete(upstream);
set.dispose();
downstream.onError(e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@
import static org.junit.Assert.*;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;

import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.operators.completable.CompletableAmb.Amb;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.*;

public class CompletableAmbTest {
Expand Down Expand Up @@ -173,6 +177,7 @@ public void ambRace() {
CompositeDisposable cd = new CompositeDisposable();
AtomicBoolean once = new AtomicBoolean();
Amb a = new Amb(once, cd, to);
a.onSubscribe(Disposables.empty());

a.onComplete();
a.onComplete();
Expand Down Expand Up @@ -259,4 +264,54 @@ public void untilCompletableOtherError() {
to.assertFailure(TestException.class);
}

@Test
public void noWinnerErrorDispose() throws Exception {
final TestException ex = new TestException();
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
final AtomicBoolean interrupted = new AtomicBoolean();
final CountDownLatch cdl = new CountDownLatch(1);

Completable.ambArray(
Completable.error(ex)
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.computation()),
Completable.never()
)
.subscribe(Functions.EMPTY_ACTION, new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
interrupted.set(Thread.currentThread().isInterrupted());
cdl.countDown();
}
});

assertTrue(cdl.await(500, TimeUnit.SECONDS));
assertFalse("Interrupted!", interrupted.get());
}
}

@Test
public void noWinnerCompleteDispose() throws Exception {
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
final AtomicBoolean interrupted = new AtomicBoolean();
final CountDownLatch cdl = new CountDownLatch(1);

Completable.ambArray(
Completable.complete()
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.computation()),
Completable.never()
)
.subscribe(new Action() {
@Override
public void run() throws Exception {
interrupted.set(Thread.currentThread().isInterrupted());
cdl.countDown();
}
});

assertTrue(cdl.await(500, TimeUnit.SECONDS));
assertFalse("Interrupted!", interrupted.get());
}
}
}
Loading