Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate from SynchronizedObserver to SerializedObserver #962

Merged
19 changes: 13 additions & 6 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import rx.operators.OnSubscribeFromIterable;
import rx.operators.OnSubscribeRange;
import rx.operators.OperationAll;
import rx.operators.OperatorAmb;
import rx.operators.OperationAny;
import rx.operators.OperationAsObservable;
import rx.operators.OperationAverage;
Expand Down Expand Up @@ -91,10 +90,8 @@
import rx.operators.OperationSkip;
import rx.operators.OperationSkipLast;
import rx.operators.OperationSkipUntil;
import rx.operators.OperatorSkipWhile;
import rx.operators.OperationSum;
import rx.operators.OperationSwitch;
import rx.operators.OperationSynchronize;
import rx.operators.OperationTakeLast;
import rx.operators.OperationTakeTimed;
import rx.operators.OperationTakeUntil;
Expand All @@ -107,6 +104,7 @@
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationUsing;
import rx.operators.OperationWindow;
import rx.operators.OperatorAmb;
import rx.operators.OperatorCast;
import rx.operators.OperatorDoOnEach;
import rx.operators.OperatorFilter;
Expand All @@ -120,8 +118,11 @@
import rx.operators.OperatorRepeat;
import rx.operators.OperatorRetry;
import rx.operators.OperatorScan;
import rx.operators.OperatorSerialize;
import rx.operators.OperatorSkip;
import rx.operators.OperatorSkipWhile;
import rx.operators.OperatorSubscribeOn;
import rx.operators.OperatorSynchronize;
import rx.operators.OperatorTake;
import rx.operators.OperatorTimeout;
import rx.operators.OperatorTimeoutWithSelector;
Expand Down Expand Up @@ -2712,7 +2713,7 @@ public final static <T> Observable<T> switchOnNext(Observable<? extends Observab
*/
@Deprecated
public final static <T> Observable<T> synchronize(Observable<T> source) {
return create(OperationSynchronize.synchronize(source));
return source.synchronize();
}

/**
Expand Down Expand Up @@ -6197,6 +6198,10 @@ public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accum
return lift(new OperatorScan<R, T>(initialValue, accumulator));
}

public final Observable<T> serialize() {
return lift(new OperatorSerialize<T>());
}

/**
* If the source Observable completes after emitting a single item, return an Observable that emits that
* item. If the source Observable emits more than one item or no items, throw an
Expand Down Expand Up @@ -7259,9 +7264,10 @@ public final <R> Observable<R> switchMap(Func1<? super T, ? extends Observable<?
* @return an Observable that is a chronologically well-behaved version of the source Observable, and that
* synchronously notifies its {@link Observer}s
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-synchronize">RxJava Wiki: synchronize()</a>
* @deprecated Use {@link #serialize()} instead as it doesn't block threads while emitting notification.
*/
public final Observable<T> synchronize() {
return create(OperationSynchronize.synchronize(this));
return lift(new OperatorSynchronize<T>());
}

/**
Expand All @@ -7283,9 +7289,10 @@ public final Observable<T> synchronize() {
* @return an Observable that is a chronologically well-behaved version of the source Observable, and that
* synchronously notifies its {@link Observer}s
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-synchronize">RxJava Wiki: synchronize()</a>
* @deprecated Use {@link #serialize()} instead as it doesn't block threads while emitting notification.
*/
public final Observable<T> synchronize(Object lock) {
return create(OperationSynchronize.synchronize(this, lock));
return lift(new OperatorSynchronize<T>(lock));
}

/**
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/observers/SafeSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
* <li>When onError or onComplete occur it will unsubscribe from the Observable (if executing asynchronously).</li>
* </ul>
* <p>
* It will not synchronize onNext execution. Use the {@link SynchronizedObserver} to do that.
* It will not synchronize onNext execution. Use the {@link SerializedSubscriber} to do that.
*
* @param <T>
*/
Expand Down
186 changes: 186 additions & 0 deletions rxjava-core/src/main/java/rx/observers/SerializedObserver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package rx.observers;

import java.util.ArrayList;

import rx.Observer;

/**
* Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError.
* <p>
* When multiple threads are notifying they will be serialized by:
* <p>
* <li>Allowing only one thread at a time to emit</li>
* <li>Adding notifications to a queue if another thread is already emitting</li>
* <li>Not holding any locks or blocking any threads while emitting</li>
* <p>
*
* @param <T>
*/
public class SerializedObserver<T> implements Observer<T> {
private final Observer<? super T> actual;

private boolean emitting = false;
private boolean terminated = false;
private ArrayList<Object> queue = new ArrayList<Object>();

private static Sentinel NULL_SENTINEL = new Sentinel();
private static Sentinel COMPLETE_SENTINEL = new Sentinel();

private static class Sentinel {

}

private static class ErrorSentinel extends Sentinel {
final Throwable e;

ErrorSentinel(Throwable e) {
this.e = e;
}
}

public SerializedObserver(Observer<? super T> s) {
this.actual = s;
}

@Override
public void onCompleted() {
boolean canEmit = false;
ArrayList<Object> list = null;
synchronized (this) {
if (terminated) {
return;
}
terminated = true;
if (!emitting) {
// emit immediately
emitting = true;
canEmit = true;
if (queue.size() > 0) {
list = queue; // copy reference
queue = new ArrayList<Object>(); // new version;
}
} else {
// someone else is already emitting so just queue it
queue.add(COMPLETE_SENTINEL);
}
}
if (canEmit) {
// we won the right to emit
try {
drainQueue(list);
actual.onCompleted();
} finally {
synchronized (this) {
emitting = false;
}
}
}
}

@Override
public void onError(final Throwable e) {
boolean canEmit = false;
ArrayList<Object> list = null;
synchronized (this) {
if (terminated) {
return;
}
terminated = true;
if (!emitting) {
// emit immediately
emitting = true;
canEmit = true;
if (queue.size() > 0) {
list = queue; // copy reference
queue = new ArrayList<Object>(); // new version;
}
} else {
// someone else is already emitting so just queue it ... after eliminating the queue to shortcut
queue.clear();
queue.add(new ErrorSentinel(e));
}
}
if (canEmit) {
// we won the right to emit
try {
drainQueue(list);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should onError be draining the queue before sending the error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can, but if we're winning the right to emit immediately as it's doing here, it's highly unlikely there is anything in the queue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, by "yes we can" I meant we can clear it ... which means skip draining it in the onError case.

actual.onError(e);
} finally {
synchronized (this) {
emitting = false;
}
}
}
}

@Override
public void onNext(T t) {
boolean canEmit = false;
ArrayList<Object> list = null;
synchronized (this) {
if (terminated) {
return;
}
if (!emitting) {
// emit immediately
emitting = true;
canEmit = true;
if (queue.size() > 0) {
list = queue; // copy reference
queue = new ArrayList<Object>(); // new version;
}
} else {
// someone else is already emitting so just queue it
if (t == null) {
queue.add(NULL_SENTINEL);
} else {
queue.add(t);
}
}
}
if (canEmit) {
// we won the right to emit
try {
drainQueue(list);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets assume two concurrent onNext competes for canEmit, the first succeeds and drains an empty queue. The second comes in and enqueues its value. The two exit onNext. In this case, the second onNext's value sits in the queue until another event happens. (Btw., the queue/drain doesn't exhibit this.) Can we accept such delay?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm aware of that problem and don't like it but do not have a better solution yet. The other two implementations fails the more critical performance tests (the state machine version took down our production server in under 2 minutes).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some experimenting and came up with this rewrite.
For a single producer, it runs, on average, the same ops/sec as your version. For the two producer case, mine runs about 10% more ops per second; which is not much considering the the perf test is only for overhead. However, the memory usage reduction for the single producer case might be worth it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a look, does it solve both of the tradeoffs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has the MAX_DRAIN_ITERATION which trades the potential event delay (1) with effectively continuous draining (MAX_VALUE). I can't think of any adaptive adjustment method, only a parameterized serialize() operator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MAX_DRAIN_ITERATION still has the problem, just pushed back until after N iterations. So in this case are you just optimizing for the case when 1 item is in the queue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it doesn't allocate the queue if there is no overlapping, some unnecesary synchronization blocks were removed. It doesn't solve the tradeoff problem unfortunately; to avoid the delay and one threaded drain, one would need to have wait-notify which most likely causes poor performance and thread blocking.

actual.onNext(t);
} finally {
synchronized (this) {
if (terminated) {
list = queue; // copy reference
queue = new ArrayList<Object>(); // new version;
} else {
// release this thread
emitting = false;
canEmit = false;
}
}
}
}

// if terminated this will still be true so let's drain the rest of the queue
if (canEmit) {
drainQueue(list);
}
}

public void drainQueue(ArrayList<Object> list) {
if (list == null || list.size() == 0) {
return;
}
for (Object v : list) {
if (v != null) {
if (v instanceof Sentinel) {
if (v == NULL_SENTINEL) {
actual.onNext(null);
} else if (v == COMPLETE_SENTINEL) {
actual.onCompleted();
} else if (v instanceof ErrorSentinel) {
actual.onError(((ErrorSentinel) v).e);
}
} else {
actual.onNext((T) v);
}
}
}
}
}
40 changes: 40 additions & 0 deletions rxjava-core/src/main/java/rx/observers/SerializedSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package rx.observers;

import rx.Observer;
import rx.Subscriber;

/**
* Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError.
* <p>
* When multiple threads are notifying they will be serialized by:
* <p>
* <li>Allowing only one thread at a time to emit</li>
* <li>Adding notifications to a queue if another thread is already emitting</li>
* <li>Not holding any locks or blocking any threads while emitting</li>
* <p>
*
* @param <T>
*/
public class SerializedSubscriber<T> extends Subscriber<T> {

private final Observer<T> s;

public SerializedSubscriber(Subscriber<? super T> s) {
this.s = new SerializedObserver<T>(s);
}

@Override
public void onCompleted() {
s.onCompleted();
}

@Override
public void onError(Throwable e) {
s.onError(e);
}

@Override
public void onNext(T t) {
s.onNext(t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
* This ONLY does synchronization. It does not involve itself in safety or subscriptions. See SafeSubscriber for that.
*
* @param <T>
* @deprecated Use SerializedObserver instead as it doesn't block threads during event notification.
*/
@Deprecated
public final class SynchronizedObserver<T> implements Observer<T> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
* </ul>
*
* @param <T>
* @deprecated Use SerializedSubscriber instead as it doesn't block threads during event notification.
*/
@Deprecated
public final class SynchronizedSubscriber<T> extends Subscriber<T> {

private final Observer<? super T> observer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observers.SerializedObserver;
import rx.observers.SynchronizedObserver;
import rx.schedulers.Schedulers;
import rx.subscriptions.CompositeSubscription;
Expand Down Expand Up @@ -111,7 +112,7 @@ private static class DebounceObserver<T> implements Observer<T> {
public DebounceObserver(Observer<? super T> observer, long timeout, TimeUnit unit, Scheduler scheduler) {
// we need to synchronize the observer since the on* events can be coming from different
// threads and are thus non-deterministic and could be interleaved
this.observer = new SynchronizedObserver<T>(observer);
this.observer = new SerializedObserver<T>(observer);
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import rx.Observer;
import rx.Subscription;
import rx.exceptions.CompositeException;
import rx.observers.SynchronizedObserver;
import rx.observers.SerializedObserver;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.CompositeSubscription;

Expand Down Expand Up @@ -141,15 +141,7 @@ private MergeDelayErrorObservable(Observable<? extends Observable<? extends T>>

public Subscription onSubscribe(Observer<? super T> actualObserver) {
CompositeSubscription completeSubscription = new CompositeSubscription();

/**
* We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting.
* <p>
* The calls from each sequence must be serialized.
* <p>
* Bug report: https://github.com/Netflix/RxJava/issues/614
*/
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver);
SerializedObserver<T> synchronizedObserver = new SerializedObserver<T>(actualObserver);

/**
* Subscribe to the parent Observable to get to the children Observables
Expand Down
Loading