Skip to content

Commit

Permalink
Lock-free subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Dec 9, 2013
1 parent ceeab36 commit ba1d448
Show file tree
Hide file tree
Showing 8 changed files with 369 additions and 120 deletions.
137 changes: 80 additions & 57 deletions rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subscriptions;

import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableSet;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -30,50 +31,60 @@
/**
* Subscription that represents a group of Subscriptions that are unsubscribed
* together.
*
*
* @see <a
* href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net
* equivalent CompositeDisposable</a>
*/
public class CompositeSubscription implements Subscription {
private static final Set<Subscription> MUTATE_STATE = unmodifiableSet(new HashSet<Subscription>());
private static final Set<Subscription> UNSUBSCRIBED_STATE = unmodifiableSet(new HashSet<Subscription>());

/** Sentinel to indicate a thread is modifying the subscription set. */
private static final Set<Subscription> MUTATE_SENTINEL = unmodifiableSet(Collections.<Subscription>emptySet());
/** Sentinel to indicate the entire CompositeSubscription has been unsubscribed.*/
private static final Set<Subscription> UNSUBSCRIBED_SENTINEL = unmodifiableSet(Collections.<Subscription>emptySet());
/** The reference to the set of subscriptions. */
private final AtomicReference<Set<Subscription>> reference = new AtomicReference<Set<Subscription>>();

public CompositeSubscription(final Subscription... subscriptions) {
reference.set(new HashSet<Subscription>(asList(subscriptions)));
}

public boolean isUnsubscribed() {
return reference.get() == UNSUBSCRIBED_STATE;
return reference.get() == UNSUBSCRIBED_SENTINEL;
}

public void add(final Subscription s) {
do {
final Set<Subscription> existing = reference.get();
if (existing == UNSUBSCRIBED_STATE) {
if (existing == UNSUBSCRIBED_SENTINEL) {
s.unsubscribe();
break;
}

if (reference.compareAndSet(existing, MUTATE_STATE)) {

if (existing == MUTATE_SENTINEL) {
continue;
}

if (reference.compareAndSet(existing, MUTATE_SENTINEL)) {
existing.add(s);
reference.set(existing);
break;
}
} while (true);
}

public void remove(final Subscription s) {
do {
final Set<Subscription> subscriptions = reference.get();
if (subscriptions == UNSUBSCRIBED_STATE) {
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
s.unsubscribe();
break;
}

if (reference.compareAndSet(subscriptions, MUTATE_STATE)) {

if (subscriptions == MUTATE_SENTINEL) {
continue;
}

if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) {
// also unsubscribe from it:
// http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
subscriptions.remove(s);
Expand All @@ -83,54 +94,66 @@ public void remove(final Subscription s) {
}
} while (true);
}

public void clear() {
do {
final Set<Subscription> subscriptions = reference.get();
if (subscriptions == UNSUBSCRIBED_STATE) {
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
break;
}

if (reference.compareAndSet(subscriptions, MUTATE_STATE)) {

if (subscriptions == MUTATE_SENTINEL) {
continue;
}

if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) {
final Set<Subscription> copy = new HashSet<Subscription>(
subscriptions);
subscriptions.clear();
reference.set(subscriptions);

for (final Subscription subscription : copy) {
subscription.unsubscribe();
}

unsubscribeAll(copy);
break;
}
} while (true);
}

/**
* Unsubscribe from the collection of subscriptions.
* <p>
* Exceptions thrown by any of the {@code unsubscribe()} methods are
* collected into a {@link CompositeException} and thrown once
* all unsubscriptions have been attempted.
* @param subs the collection of subscriptions
*/
private void unsubscribeAll(Collection<Subscription> subs) {
final Collection<Throwable> es = new ArrayList<Throwable>();
for (final Subscription s : subs) {
try {
s.unsubscribe();
} catch (final Throwable e) {
es.add(e);
}
}
if (!es.isEmpty()) {
throw new CompositeException(
"Failed to unsubscribe to 1 or more subscriptions.", es);
}
}
@Override
public void unsubscribe() {
do {
final Set<Subscription> subscriptions = reference.get();
if (subscriptions == UNSUBSCRIBED_STATE) {
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
break;
}

if (subscriptions == MUTATE_STATE) {
if (subscriptions == MUTATE_SENTINEL) {
continue;
}

if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_STATE)) {
final Collection<Throwable> es = new ArrayList<Throwable>();
for (final Subscription s : subscriptions) {
try {
s.unsubscribe();
} catch (final Throwable e) {
es.add(e);
}
}
if (es.isEmpty()) {
break;
}
throw new CompositeException(
"Failed to unsubscribe to 1 or more subscriptions.", es);

if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_SENTINEL)) {
unsubscribeAll(subscriptions);
break;
}
} while (true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,41 @@
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.multipleassignmentdisposable">Rx.Net equivalent MultipleAssignmentDisposable</a>
*/
public class MultipleAssignmentSubscription implements Subscription {

private final AtomicBoolean unsubscribed = new AtomicBoolean(false);
private AtomicReference<Subscription> subscription = new AtomicReference<Subscription>();

private AtomicReference<Subscription> reference = new AtomicReference<Subscription>();
/** Sentinel for the unsubscribed state. */
private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() {
@Override
public void unsubscribe() {
}
};
public boolean isUnsubscribed() {
return unsubscribed.get();
return reference.get() == UNSUBSCRIBED_SENTINEL;
}

@Override
public synchronized void unsubscribe() {
unsubscribed.set(true);
Subscription s = getSubscription();
public void unsubscribe() {
Subscription s = reference.getAndSet(UNSUBSCRIBED_SENTINEL);
if (s != null) {
s.unsubscribe();
}

}

public synchronized void setSubscription(Subscription s) {
if (unsubscribed.get()) {
s.unsubscribe();
} else {
subscription.set(s);
}
public void setSubscription(Subscription s) {
do {
Subscription r = reference.get();
if (r == UNSUBSCRIBED_SENTINEL) {
s.unsubscribe();
return;
}
if (reference.compareAndSet(r, s)) {
break;
}
} while (true);
}

public Subscription getSubscription() {
return subscription.get();
Subscription s = reference.get();
return s != UNSUBSCRIBED_SENTINEL ? s : Subscriptions.empty();
}

}
Loading

0 comments on commit ba1d448

Please sign in to comment.