From 382d6dfa95e9f55c48d99883f9db460e9c16da18 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Sat, 28 Feb 2015 23:03:22 +1100 Subject: [PATCH 1/4] OperatorMulticast.connect(connection) should return first subscription on multiple calls, address possible race condition provoking IAE --- .../internal/operators/OperatorMulticast.java | 54 ++++++++++--------- ...stTest.java => OperatorMulticastTest.java} | 12 +++-- 2 files changed, 38 insertions(+), 28 deletions(-) rename src/test/java/rx/internal/operators/{OnSubscribeMulticastTest.java => OperatorMulticastTest.java} (93%) diff --git a/src/main/java/rx/internal/operators/OperatorMulticast.java b/src/main/java/rx/internal/operators/OperatorMulticast.java index e294bfa8f4..8a15dd30ef 100644 --- a/src/main/java/rx/internal/operators/OperatorMulticast.java +++ b/src/main/java/rx/internal/operators/OperatorMulticast.java @@ -38,14 +38,16 @@ * the result value type */ public final class OperatorMulticast extends ConnectableObservable { - final Observable source; - final Object guard; - final Func0> subjectFactory; + private final Observable source; + private final Object guard; + private final Func0> subjectFactory; private final AtomicReference> connectedSubject; private final List> waitingForConnect; /** Guarded by guard. */ - Subscriber subscription; + private Subscriber subscription; + // wraps subscription above with for unsubscription using guard + private Subscription guardedSubscription; public OperatorMulticast(Observable source, final Func0> subjectFactory) { this(new Object(), new AtomicReference>(), new ArrayList>(), source, subjectFactory); @@ -82,7 +84,8 @@ public void connect(Action1 connection) { // subscription is the state of whether we are connected or not synchronized (guard) { if (subscription != null) { - // already connected, return as there is nothing to do + // already connected + connection.call(guardedSubscription); return; } else { shouldSubscribe = true; @@ -106,6 +109,21 @@ public void onNext(T args) { subject.onNext(args); } }; + guardedSubscription = Subscriptions.create(new Action0() { + @Override + public void call() { + Subscription s; + synchronized (guard) { + s = subscription; + subscription = null; + guardedSubscription = null; + connectedSubject.set(null); + } + if (s != null) { + s.unsubscribe(); + } + } + }); // register any subscribers that are waiting with this new subject for(Subscriber s : waitingForConnect) { @@ -116,34 +134,22 @@ public void onNext(T args) { // record the Subject so OnSubscribe can see it connectedSubject.set(subject); } + } // in the lock above we determined we should subscribe, do it now outside the lock if (shouldSubscribe) { // register a subscription that will shut this down - connection.call(Subscriptions.create(new Action0() { - @Override - public void call() { - Subscription s; - synchronized (guard) { - s = subscription; - subscription = null; - connectedSubject.set(null); - } - if (s != null) { - s.unsubscribe(); - } - } - })); + connection.call(guardedSubscription); // now that everything is hooked up let's subscribe // as long as the subscription is not null (which can happen if already unsubscribed) - boolean subscriptionIsNull; - synchronized(guard) { - subscriptionIsNull = subscription == null; + Subscriber sub; + synchronized (guard) { + sub = subscription; } - if (!subscriptionIsNull) - source.subscribe(subscription); + if (sub != null) + source.subscribe(sub); } } } \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OnSubscribeMulticastTest.java b/src/test/java/rx/internal/operators/OperatorMulticastTest.java similarity index 93% rename from src/test/java/rx/internal/operators/OnSubscribeMulticastTest.java rename to src/test/java/rx/internal/operators/OperatorMulticastTest.java index f5c506c8b2..5b3c57e4f6 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeMulticastTest.java +++ b/src/test/java/rx/internal/operators/OperatorMulticastTest.java @@ -15,11 +15,13 @@ */ package rx.internal.operators; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import org.junit.Assert; import org.junit.Test; import rx.Observer; @@ -29,7 +31,7 @@ import rx.subjects.PublishSubject; import rx.subjects.Subject; -public class OnSubscribeMulticastTest { +public class OperatorMulticastTest { @Test public void testMulticast() { @@ -70,15 +72,17 @@ public void testMulticastConnectTwice() { source.onNext("one"); - multicasted.connect(); - multicasted.connect(); - + Subscription sub = multicasted.connect(); + Subscription sub2 = multicasted.connect(); + source.onNext("two"); source.onCompleted(); verify(observer, never()).onNext("one"); verify(observer, times(1)).onNext("two"); verify(observer, times(1)).onCompleted(); + + assertEquals(sub, sub2); } From 3b6707a41c88993ef3913987501dc79319fb171a Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Sat, 28 Feb 2015 23:05:21 +1100 Subject: [PATCH 2/4] fix typo in comment --- src/main/java/rx/internal/operators/OperatorMulticast.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/operators/OperatorMulticast.java b/src/main/java/rx/internal/operators/OperatorMulticast.java index 8a15dd30ef..d6e379f4ce 100644 --- a/src/main/java/rx/internal/operators/OperatorMulticast.java +++ b/src/main/java/rx/internal/operators/OperatorMulticast.java @@ -46,7 +46,7 @@ public final class OperatorMulticast extends ConnectableObservable { /** Guarded by guard. */ private Subscriber subscription; - // wraps subscription above with for unsubscription using guard + // wraps subscription above for unsubscription using guard private Subscription guardedSubscription; public OperatorMulticast(Observable source, final Func0> subjectFactory) { From a85a11eb872d04cfe7eb4af799eb1a9ca0d10511 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Sun, 1 Mar 2015 10:46:20 +1100 Subject: [PATCH 3/4] revert visibility of fields and add == this check for guarded subscription --- .../internal/operators/OperatorMulticast.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorMulticast.java b/src/main/java/rx/internal/operators/OperatorMulticast.java index d6e379f4ce..78a93f4dd3 100644 --- a/src/main/java/rx/internal/operators/OperatorMulticast.java +++ b/src/main/java/rx/internal/operators/OperatorMulticast.java @@ -38,11 +38,11 @@ * the result value type */ public final class OperatorMulticast extends ConnectableObservable { - private final Observable source; - private final Object guard; - private final Func0> subjectFactory; - private final AtomicReference> connectedSubject; - private final List> waitingForConnect; + final Observable source; + final Object guard; + final Func0> subjectFactory; + final AtomicReference> connectedSubject; + final List> waitingForConnect; /** Guarded by guard. */ private Subscriber subscription; @@ -109,21 +109,26 @@ public void onNext(T args) { subject.onNext(args); } }; - guardedSubscription = Subscriptions.create(new Action0() { + final AtomicReference gs = new AtomicReference(); + gs.set(Subscriptions.create(new Action0() { @Override public void call() { Subscription s; synchronized (guard) { - s = subscription; - subscription = null; - guardedSubscription = null; - connectedSubject.set(null); + if ( guardedSubscription == gs.get()) { + s = subscription; + subscription = null; + guardedSubscription = null; + connectedSubject.set(null); + } else + return; } if (s != null) { s.unsubscribe(); } } - }); + })); + guardedSubscription = gs.get(); // register any subscribers that are waiting with this new subject for(Subscriber s : waitingForConnect) { From 379f07d902bffa0de2a2d5578ca6a88a2caf3d0e Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Sun, 1 Mar 2015 13:56:26 +1100 Subject: [PATCH 4/4] shouldSubscribe boolean variable not required --- .../internal/operators/OperatorMulticast.java | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorMulticast.java b/src/main/java/rx/internal/operators/OperatorMulticast.java index 78a93f4dd3..4d5d10f4f3 100644 --- a/src/main/java/rx/internal/operators/OperatorMulticast.java +++ b/src/main/java/rx/internal/operators/OperatorMulticast.java @@ -79,8 +79,6 @@ public void call(Subscriber subscriber) { public void connect(Action1 connection) { // each time we connect we create a new Subject and Subscription - boolean shouldSubscribe = false; - // subscription is the state of whether we are connected or not synchronized (guard) { if (subscription != null) { @@ -88,7 +86,6 @@ public void connect(Action1 connection) { connection.call(guardedSubscription); return; } else { - shouldSubscribe = true; // we aren't connected, so let's create a new Subject and connect final Subject subject = subjectFactory.call(); // create new Subscriber that will pass-thru to the subject we just created @@ -143,18 +140,16 @@ public void call() { } // in the lock above we determined we should subscribe, do it now outside the lock - if (shouldSubscribe) { - // register a subscription that will shut this down - connection.call(guardedSubscription); + // register a subscription that will shut this down + connection.call(guardedSubscription); - // now that everything is hooked up let's subscribe - // as long as the subscription is not null (which can happen if already unsubscribed) - Subscriber sub; - synchronized (guard) { - sub = subscription; - } - if (sub != null) - source.subscribe(sub); + // now that everything is hooked up let's subscribe + // as long as the subscription is not null (which can happen if already unsubscribed) + Subscriber sub; + synchronized (guard) { + sub = subscription; } + if (sub != null) + source.subscribe(sub); } } \ No newline at end of file