Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce Subscription Object Allocation #1281

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions rxjava-core/src/main/java/rx/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx;

import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.CompositeSubscription;

/**
Expand All @@ -30,17 +31,23 @@
*/
public abstract class Subscriber<T> implements Observer<T>, Subscription {

private final CompositeSubscription cs;
private final ChainedSubscription cs;

protected Subscriber(CompositeSubscription cs) {
protected Subscriber(ChainedSubscription cs) {
if (cs == null) {
throw new IllegalArgumentException("The CompositeSubscription can not be null");
}
this.cs = cs;
}

@Deprecated
protected Subscriber(CompositeSubscription cs) {
this(new ChainedSubscription());
add(cs);
}

protected Subscriber() {
this(new CompositeSubscription());
this(new ChainedSubscription());
}

protected Subscriber(Subscriber<?> op) {
Expand Down
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import rx.functions.Action0;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.Subscriptions;

/**
Expand Down Expand Up @@ -55,7 +55,7 @@ static final class GroupBySubscriber<K, T> extends Subscriber<T> {
public GroupBySubscriber(Func1<? super T, ? extends K> keySelector, Subscriber<? super GroupedObservable<K, T>> childObserver) {
// a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
// and will unsubscribe on this parent if they are all unsubscribed
super(new CompositeSubscription());
super(new ChainedSubscription());
this.keySelector = keySelector;
this.childObserver = childObserver;
}
Expand Down
12 changes: 6 additions & 6 deletions rxjava-core/src/main/java/rx/operators/OperatorPivot.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
import rx.Subscriber;
import rx.functions.Action0;
import rx.observables.GroupedObservable;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.Subscriptions;

public final class OperatorPivot<K1, K2, T> implements Operator<GroupedObservable<K2, GroupedObservable<K1, T>>, GroupedObservable<K1, GroupedObservable<K2, T>>> {

@Override
public Subscriber<? super GroupedObservable<K1, GroupedObservable<K2, T>>> call(final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
final AtomicReference<State> state = new AtomicReference<State>(State.create());
final PivotSubscriber<K1, K2, T> pivotSubscriber = new PivotSubscriber<K1, K2, T>(new CompositeSubscription(), child, state);
final PivotSubscriber<K1, K2, T> pivotSubscriber = new PivotSubscriber<K1, K2, T>(new ChainedSubscription(), child, state);
child.add(Subscriptions.create(new Action0() {

@Override
Expand Down Expand Up @@ -65,12 +65,12 @@ private static final class PivotSubscriber<K1, K2, T> extends Subscriber<Grouped
* needs to decouple the subscription as the inner subscriptions need a separate lifecycle
* and will unsubscribe on this parent if they are all unsubscribed
*/
private final CompositeSubscription parentSubscription;
private final ChainedSubscription parentSubscription;
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
private final AtomicReference<State> state;
private final GroupState<K1, K2, T> groups;

private PivotSubscriber(CompositeSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
private PivotSubscriber(ChainedSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
super(parentSubscription);
this.parentSubscription = parentSubscription;
this.child = child;
Expand Down Expand Up @@ -158,7 +158,7 @@ public void onNext(T t) {
private static final class GroupState<K1, K2, T> {
private final ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>> innerSubjects = new ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>>();
private final ConcurrentHashMap<K2, Outer<K1, K2, T>> outerSubjects = new ConcurrentHashMap<K2, Outer<K1, K2, T>>();
private final CompositeSubscription parentSubscription;
private final ChainedSubscription parentSubscription;
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
/** Indicates a terminal state. */
volatile int completed;
Expand All @@ -167,7 +167,7 @@ private static final class GroupState<K1, K2, T> {
static final AtomicIntegerFieldUpdater<GroupState> COMPLETED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(GroupState.class, "completed");

public GroupState(CompositeSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
public GroupState(ChainedSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
this.parentSubscription = parentSubscription;
this.child = child;
}
Expand Down
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperatorTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import rx.Observable.Operator;
import rx.Subscriber;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.ChainedSubscription;

/**
* Returns an Observable that emits the first <code>num</code> items emitted by the source
Expand All @@ -40,7 +40,7 @@ public OperatorTake(int limit) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final CompositeSubscription parent = new CompositeSubscription();
final ChainedSubscription parent = new ChainedSubscription();
if (limit == 0) {
child.onCompleted();
parent.unsubscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.Subscriptions;

/**
Expand All @@ -36,7 +36,7 @@ public OperatorUnsubscribeOn(Scheduler scheduler) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
final CompositeSubscription parentSubscription = new CompositeSubscription();
final ChainedSubscription parentSubscription = new ChainedSubscription();
subscriber.add(Subscriptions.create(new Action0() {

@Override
Expand Down
109 changes: 109 additions & 0 deletions rxjava-core/src/main/java/rx/subscriptions/ChainedSubscription.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subscriptions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;

import rx.Subscription;
import rx.exceptions.CompositeException;

/**
* Subscription that represents a group of Subscriptions that are unsubscribed together.
*
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
*/
public final class ChainedSubscription implements Subscription {

private List<Subscription> subscriptions;
private boolean unsubscribed = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth using volatile here and double-checked later because isUnsubscribed is called in many places and the synchronized call is expensive.


public ChainedSubscription() {
}

public ChainedSubscription(final Subscription... subscriptions) {
this.subscriptions = new LinkedList<Subscription>(Arrays.asList(subscriptions));
}

@Override
public synchronized boolean isUnsubscribed() {
return unsubscribed;
}

public void add(final Subscription s) {
Subscription unsubscribe = null;
synchronized (this) {
if (unsubscribed) {
unsubscribe = s;
} else {
if (subscriptions == null) {
subscriptions = new LinkedList<Subscription>();
}
subscriptions.add(s);
}
}
if (unsubscribe != null) {
// call after leaving the synchronized block so we're not holding a lock while executing this
unsubscribe.unsubscribe();
}
}

@Override
public void unsubscribe() {
synchronized (this) {
if (unsubscribed) {
return;
}
unsubscribed = true;
}
// we will only get here once
unsubscribeFromAll(subscriptions);
}

private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
if (subscriptions == null) {
return;
}
List<Throwable> es = null;
for (Subscription s : subscriptions) {
try {
s.unsubscribe();
} catch (Throwable e) {
if (es == null) {
es = new ArrayList<Throwable>();
}
es.add(e);
}
}
if (es != null) {
if (es.size() == 1) {
Throwable t = es.get(0);
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new CompositeException(
"Failed to unsubscribe to 1 or more subscriptions.", es);
}
} else {
throw new CompositeException(
"Failed to unsubscribe to 2 or more subscriptions.", es);
}
}
}
}
Loading