diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 0d9f74d60e..235e1e62e4 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -52,7 +52,6 @@ import rx.operators.OnSubscribeFromIterable; import rx.operators.OnSubscribeRange; import rx.operators.OperationAll; -import rx.operators.OperatorAmb; import rx.operators.OperationAny; import rx.operators.OperationAsObservable; import rx.operators.OperationAverage; @@ -91,10 +90,8 @@ import rx.operators.OperationSkip; import rx.operators.OperationSkipLast; import rx.operators.OperationSkipUntil; -import rx.operators.OperatorSkipWhile; import rx.operators.OperationSum; import rx.operators.OperationSwitch; -import rx.operators.OperationSynchronize; import rx.operators.OperationTakeLast; import rx.operators.OperationTakeTimed; import rx.operators.OperationTakeUntil; @@ -107,6 +104,7 @@ import rx.operators.OperationToObservableFuture; import rx.operators.OperationUsing; import rx.operators.OperationWindow; +import rx.operators.OperatorAmb; import rx.operators.OperatorCast; import rx.operators.OperatorDoOnEach; import rx.operators.OperatorFilter; @@ -120,8 +118,11 @@ import rx.operators.OperatorRepeat; import rx.operators.OperatorRetry; import rx.operators.OperatorScan; +import rx.operators.OperatorSerialize; import rx.operators.OperatorSkip; +import rx.operators.OperatorSkipWhile; import rx.operators.OperatorSubscribeOn; +import rx.operators.OperatorSynchronize; import rx.operators.OperatorTake; import rx.operators.OperatorTimeout; import rx.operators.OperatorTimeoutWithSelector; @@ -2712,7 +2713,7 @@ public final static Observable switchOnNext(Observable Observable synchronize(Observable source) { - return create(OperationSynchronize.synchronize(source)); + return source.synchronize(); } /** @@ -6197,6 +6198,10 @@ public final Observable scan(R initialValue, Func2 accum return lift(new OperatorScan(initialValue, accumulator)); } + public final Observable serialize() { + return lift(new OperatorSerialize()); + } + /** * If the source Observable completes after emitting a single item, return an Observable that emits that * item. If the source Observable emits more than one item or no items, throw an @@ -7259,9 +7264,10 @@ public final Observable switchMap(Func1RxJava Wiki: synchronize() + * @deprecated Use {@link #serialize()} instead as it doesn't block threads while emitting notification. */ public final Observable synchronize() { - return create(OperationSynchronize.synchronize(this)); + return lift(new OperatorSynchronize()); } /** @@ -7283,9 +7289,10 @@ public final Observable synchronize() { * @return an Observable that is a chronologically well-behaved version of the source Observable, and that * synchronously notifies its {@link Observer}s * @see RxJava Wiki: synchronize() + * @deprecated Use {@link #serialize()} instead as it doesn't block threads while emitting notification. */ public final Observable synchronize(Object lock) { - return create(OperationSynchronize.synchronize(this, lock)); + return lift(new OperatorSynchronize(lock)); } /** diff --git a/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java b/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java index 922b61de95..ff121eeb77 100644 --- a/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java +++ b/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java @@ -52,7 +52,7 @@ *
  • When onError or onComplete occur it will unsubscribe from the Observable (if executing asynchronously).
  • * *

    - * It will not synchronize onNext execution. Use the {@link SynchronizedObserver} to do that. + * It will not synchronize onNext execution. Use the {@link SerializedSubscriber} to do that. * * @param */ diff --git a/rxjava-core/src/main/java/rx/observers/SerializedObserver.java b/rxjava-core/src/main/java/rx/observers/SerializedObserver.java new file mode 100644 index 0000000000..93f1632973 --- /dev/null +++ b/rxjava-core/src/main/java/rx/observers/SerializedObserver.java @@ -0,0 +1,186 @@ +package rx.observers; + +import java.util.ArrayList; + +import rx.Observer; + +/** + * Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError. + *

    + * When multiple threads are notifying they will be serialized by: + *

    + *

  • Allowing only one thread at a time to emit
  • + *
  • Adding notifications to a queue if another thread is already emitting
  • + *
  • Not holding any locks or blocking any threads while emitting
  • + *

    + * + * @param + */ +public class SerializedObserver implements Observer { + private final Observer actual; + + private boolean emitting = false; + private boolean terminated = false; + private ArrayList queue = new ArrayList(); + + private static Sentinel NULL_SENTINEL = new Sentinel(); + private static Sentinel COMPLETE_SENTINEL = new Sentinel(); + + private static class Sentinel { + + } + + private static class ErrorSentinel extends Sentinel { + final Throwable e; + + ErrorSentinel(Throwable e) { + this.e = e; + } + } + + public SerializedObserver(Observer s) { + this.actual = s; + } + + @Override + public void onCompleted() { + boolean canEmit = false; + ArrayList list = null; + synchronized (this) { + if (terminated) { + return; + } + terminated = true; + if (!emitting) { + // emit immediately + emitting = true; + canEmit = true; + if (queue.size() > 0) { + list = queue; // copy reference + queue = new ArrayList(); // new version; + } + } else { + // someone else is already emitting so just queue it + queue.add(COMPLETE_SENTINEL); + } + } + if (canEmit) { + // we won the right to emit + try { + drainQueue(list); + actual.onCompleted(); + } finally { + synchronized (this) { + emitting = false; + } + } + } + } + + @Override + public void onError(final Throwable e) { + boolean canEmit = false; + ArrayList list = null; + synchronized (this) { + if (terminated) { + return; + } + terminated = true; + if (!emitting) { + // emit immediately + emitting = true; + canEmit = true; + if (queue.size() > 0) { + list = queue; // copy reference + queue = new ArrayList(); // new version; + } + } else { + // someone else is already emitting so just queue it ... after eliminating the queue to shortcut + queue.clear(); + queue.add(new ErrorSentinel(e)); + } + } + if (canEmit) { + // we won the right to emit + try { + drainQueue(list); + actual.onError(e); + } finally { + synchronized (this) { + emitting = false; + } + } + } + } + + @Override + public void onNext(T t) { + boolean canEmit = false; + ArrayList list = null; + synchronized (this) { + if (terminated) { + return; + } + if (!emitting) { + // emit immediately + emitting = true; + canEmit = true; + if (queue.size() > 0) { + list = queue; // copy reference + queue = new ArrayList(); // new version; + } + } else { + // someone else is already emitting so just queue it + if (t == null) { + queue.add(NULL_SENTINEL); + } else { + queue.add(t); + } + } + } + if (canEmit) { + // we won the right to emit + try { + drainQueue(list); + actual.onNext(t); + } finally { + synchronized (this) { + if (terminated) { + list = queue; // copy reference + queue = new ArrayList(); // new version; + } else { + // release this thread + emitting = false; + canEmit = false; + } + } + } + } + + // if terminated this will still be true so let's drain the rest of the queue + if (canEmit) { + drainQueue(list); + } + } + + public void drainQueue(ArrayList list) { + if (list == null || list.size() == 0) { + return; + } + for (Object v : list) { + if (v != null) { + if (v instanceof Sentinel) { + if (v == NULL_SENTINEL) { + actual.onNext(null); + } else if (v == COMPLETE_SENTINEL) { + actual.onCompleted(); + } else if (v instanceof ErrorSentinel) { + actual.onError(((ErrorSentinel) v).e); + } + } else { + actual.onNext((T) v); + } + } + } + } +} diff --git a/rxjava-core/src/main/java/rx/observers/SerializedSubscriber.java b/rxjava-core/src/main/java/rx/observers/SerializedSubscriber.java new file mode 100644 index 0000000000..db545ff430 --- /dev/null +++ b/rxjava-core/src/main/java/rx/observers/SerializedSubscriber.java @@ -0,0 +1,40 @@ +package rx.observers; + +import rx.Observer; +import rx.Subscriber; + +/** + * Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError. + *

    + * When multiple threads are notifying they will be serialized by: + *

    + *

  • Allowing only one thread at a time to emit
  • + *
  • Adding notifications to a queue if another thread is already emitting
  • + *
  • Not holding any locks or blocking any threads while emitting
  • + *

    + * + * @param + */ +public class SerializedSubscriber extends Subscriber { + + private final Observer s; + + public SerializedSubscriber(Subscriber s) { + this.s = new SerializedObserver(s); + } + + @Override + public void onCompleted() { + s.onCompleted(); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + } + + @Override + public void onNext(T t) { + s.onNext(t); + } +} diff --git a/rxjava-core/src/main/java/rx/observers/SynchronizedObserver.java b/rxjava-core/src/main/java/rx/observers/SynchronizedObserver.java index 25ea8c3403..dbcbb9bc35 100644 --- a/rxjava-core/src/main/java/rx/observers/SynchronizedObserver.java +++ b/rxjava-core/src/main/java/rx/observers/SynchronizedObserver.java @@ -23,7 +23,9 @@ * This ONLY does synchronization. It does not involve itself in safety or subscriptions. See SafeSubscriber for that. * * @param + * @deprecated Use SerializedObserver instead as it doesn't block threads during event notification. */ +@Deprecated public final class SynchronizedObserver implements Observer { /** diff --git a/rxjava-core/src/main/java/rx/observers/SynchronizedSubscriber.java b/rxjava-core/src/main/java/rx/observers/SynchronizedSubscriber.java index 8dbf1b20f5..9f1b3dec90 100644 --- a/rxjava-core/src/main/java/rx/observers/SynchronizedSubscriber.java +++ b/rxjava-core/src/main/java/rx/observers/SynchronizedSubscriber.java @@ -29,7 +29,9 @@ * * * @param + * @deprecated Use SerializedSubscriber instead as it doesn't block threads during event notification. */ +@Deprecated public final class SynchronizedSubscriber extends Subscriber { private final Observer observer; diff --git a/rxjava-core/src/main/java/rx/operators/OperationDebounce.java b/rxjava-core/src/main/java/rx/operators/OperationDebounce.java index 5a7da4a1bb..e970176c8b 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDebounce.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDebounce.java @@ -26,6 +26,7 @@ import rx.Subscription; import rx.functions.Action1; import rx.functions.Func1; +import rx.observers.SerializedObserver; import rx.observers.SynchronizedObserver; import rx.schedulers.Schedulers; import rx.subscriptions.CompositeSubscription; @@ -111,7 +112,7 @@ private static class DebounceObserver implements Observer { public DebounceObserver(Observer observer, long timeout, TimeUnit unit, Scheduler scheduler) { // we need to synchronize the observer since the on* events can be coming from different // threads and are thus non-deterministic and could be interleaved - this.observer = new SynchronizedObserver(observer); + this.observer = new SerializedObserver(observer); this.timeout = timeout; this.unit = unit; this.scheduler = scheduler; diff --git a/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java b/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java index b2498df6c2..4c97223a87 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java @@ -25,7 +25,7 @@ import rx.Observer; import rx.Subscription; import rx.exceptions.CompositeException; -import rx.observers.SynchronizedObserver; +import rx.observers.SerializedObserver; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.CompositeSubscription; @@ -141,15 +141,7 @@ private MergeDelayErrorObservable(Observable> public Subscription onSubscribe(Observer actualObserver) { CompositeSubscription completeSubscription = new CompositeSubscription(); - - /** - * We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting. - *

    - * The calls from each sequence must be serialized. - *

    - * Bug report: https://github.com/Netflix/RxJava/issues/614 - */ - SynchronizedObserver synchronizedObserver = new SynchronizedObserver(actualObserver); + SerializedObserver synchronizedObserver = new SerializedObserver(actualObserver); /** * Subscribe to the parent Observable to get to the children Observables diff --git a/rxjava-core/src/main/java/rx/operators/OperationMergeMaxConcurrent.java b/rxjava-core/src/main/java/rx/operators/OperationMergeMaxConcurrent.java index ec27002d4a..d0c296b06d 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMergeMaxConcurrent.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMergeMaxConcurrent.java @@ -21,7 +21,7 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; -import rx.observers.SynchronizedObserver; +import rx.observers.SerializedObserver; import rx.subscriptions.CompositeSubscription; /** @@ -85,9 +85,8 @@ public Subscription onSubscribe(Observer actualObserver) { * Bug report: https://github.com/Netflix/RxJava/issues/200 */ SafeObservableSubscription subscription = new SafeObservableSubscription(ourSubscription); - SynchronizedObserver synchronizedObserver = new SynchronizedObserver( - new SafeObserver(subscription, actualObserver), // Create a SafeObserver as SynchronizedObserver does not automatically unsubscribe - subscription); + SerializedObserver synchronizedObserver = new SerializedObserver( + new SafeObserver(subscription, actualObserver)); // Create a SafeObserver as SynchronizedObserver does not automatically unsubscribe /** * Subscribe to the parent Observable to get to the children Observables @@ -103,10 +102,10 @@ public Subscription onSubscribe(Observer actualObserver) { * @param */ private class ParentObserver implements Observer> { - private final SynchronizedObserver synchronizedObserver; + private final SerializedObserver serializedObserver; - public ParentObserver(SynchronizedObserver synchronizedObserver) { - this.synchronizedObserver = synchronizedObserver; + public ParentObserver(SerializedObserver serializedObserver) { + this.serializedObserver = serializedObserver; } @Override @@ -119,13 +118,13 @@ public void onCompleted() { // but will let the child worry about it // if however this completes and there are no children processing, then we will send onCompleted if (isStopped()) { - synchronizedObserver.onCompleted(); + serializedObserver.onCompleted(); } } @Override public void onError(Throwable e) { - synchronizedObserver.onError(e); + serializedObserver.onError(e); } @Override @@ -151,7 +150,7 @@ public void onNext(Observable childObservable) { } if (observable != null) { ourSubscription.add(observable.subscribe(new ChildObserver( - synchronizedObserver))); + serializedObserver))); } } } @@ -162,10 +161,10 @@ public void onNext(Observable childObservable) { */ private class ChildObserver implements Observer { - private final SynchronizedObserver synchronizedObserver; + private final SerializedObserver serializedObserver; - public ChildObserver(SynchronizedObserver synchronizedObserver) { - this.synchronizedObserver = synchronizedObserver; + public ChildObserver(SerializedObserver serializedObserver) { + this.serializedObserver = serializedObserver; } @Override @@ -192,19 +191,19 @@ public void onCompleted() { } else { // No pending observable. Need to check if it's necessary to emit an onCompleted if (isStopped()) { - synchronizedObserver.onCompleted(); + serializedObserver.onCompleted(); } } } @Override public void onError(Throwable e) { - synchronizedObserver.onError(e); + serializedObserver.onError(e); } @Override public void onNext(T args) { - synchronizedObserver.onNext(args); + serializedObserver.onNext(args); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java b/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java deleted file mode 100644 index e94411b313..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscription; -import rx.observers.SynchronizedObserver; - -/** - * Wraps an Observable in another Observable that ensures that the resulting Observable is - * chronologically well-behaved. - *

    - * - *

    - * A well-behaved Observable does not interleave its invocations of the onNext, - * onCompleted, and onError methods of its Observers; it invokes - * onCompleted or onError only once; and it never invokes - * onNext after invoking either onCompleted or onError. The - * synchronize operation enforces this, and the Observable it returns invokes onNext - * and onCompleted or onError synchronously. - *

    - * NOTE: {@link Observable#create} already wraps Observables so this is generally redundant. - * - * @param - * The type of the observable sequence. - */ -public final class OperationSynchronize { - - /** - * Accepts an observable and wraps it in another observable which ensures that the resulting observable is well-behaved. - * - * A well-behaved observable ensures onNext, onCompleted, or onError calls to its subscribers are - * not interleaved, onCompleted and onError are only called once respectively, and no - * onNext calls follow onCompleted and onError calls. - * - * @param observable - * @param - * @return the wrapped synchronized observable sequence - */ - public static OnSubscribeFunc synchronize(Observable observable) { - return new Synchronize(observable, null); - } - - /** - * Accepts an observable and wraps it in another observable which ensures that the resulting observable is well-behaved. - * This is accomplished by acquiring a mutual-exclusion lock for the object provided as the lock parameter. - * - * A well-behaved observable ensures onNext, onCompleted, or onError calls to its subscribers are - * not interleaved, onCompleted and onError are only called once respectively, and no - * onNext calls follow onCompleted and onError calls. - * - * @param observable - * @param lock - * The lock object to synchronize each observer call on - * @param - * @return the wrapped synchronized observable sequence - */ - public static OnSubscribeFunc synchronize(Observable observable, Object lock) { - return new Synchronize(observable, lock); - } - - private static class Synchronize implements OnSubscribeFunc { - - public Synchronize(Observable innerObservable, Object lock) { - this.innerObservable = innerObservable; - this.lock = lock; - } - - private Observable innerObservable; - private SynchronizedObserver atomicObserver; - private Object lock; - - public Subscription onSubscribe(Observer observer) { - if (lock == null) { - atomicObserver = new SynchronizedObserver(observer); - } - else { - atomicObserver = new SynchronizedObserver(observer, lock); - } - return innerObservable.subscribe(atomicObserver); - } - - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMerge.java b/rxjava-core/src/main/java/rx/operators/OperatorMerge.java index 5b51485224..b3d822ca84 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorMerge.java @@ -20,7 +20,7 @@ import rx.Observable; import rx.Observable.Operator; import rx.Subscriber; -import rx.observers.SynchronizedSubscriber; +import rx.observers.SerializedSubscriber; import rx.subscriptions.CompositeSubscription; /** @@ -36,7 +36,7 @@ public final class OperatorMerge implements Operator> call(final Subscriber outerOperation) { - final Subscriber o = new SynchronizedSubscriber(outerOperation); + final Subscriber o = new SerializedSubscriber(outerOperation); final CompositeSubscription childrenSubscriptions = new CompositeSubscription(); outerOperation.add(childrenSubscriptions); diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSerialize.java b/rxjava-core/src/main/java/rx/operators/OperatorSerialize.java new file mode 100644 index 0000000000..f41620bab2 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorSerialize.java @@ -0,0 +1,46 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import rx.Observable.Operator; +import rx.Subscriber; +import rx.observers.SerializedSubscriber; + +public final class OperatorSerialize implements Operator { + + @Override + public Subscriber call(final Subscriber s) { + return new SerializedSubscriber(new Subscriber(s) { + + @Override + public void onCompleted() { + s.onCompleted(); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + } + + @Override + public void onNext(T t) { + s.onNext(t); + } + + }); + } + +} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSynchronize.java b/rxjava-core/src/main/java/rx/operators/OperatorSynchronize.java new file mode 100644 index 0000000000..0cdc9eded8 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorSynchronize.java @@ -0,0 +1,75 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import rx.Observable; +import rx.Observable.Operator; +import rx.Subscriber; +import rx.observers.SynchronizedSubscriber; + +/** + * Wraps an Observable in another Observable that ensures that the resulting Observable is + * chronologically well-behaved. + *

    + * + *

    + * A well-behaved Observable does not interleave its invocations of the onNext, + * onCompleted, and onError methods of its Observers; it invokes + * onCompleted or onError only once; and it never invokes + * onNext after invoking either onCompleted or onError. The + * synchronize operation enforces this, and the Observable it returns invokes onNext + * and onCompleted or onError synchronously. + *

    + * NOTE: {@link Observable#create} already wraps Observables so this is generally redundant. + * + * @param + * The type of the observable sequence. + */ +public final class OperatorSynchronize implements Operator { + + final Object lock; + + public OperatorSynchronize(Object lock) { + this.lock = lock; + } + + public OperatorSynchronize() { + this.lock = new Object(); + } + + @Override + public Subscriber call(final Subscriber s) { + return new SynchronizedSubscriber(new Subscriber(s) { + + @Override + public void onCompleted() { + s.onCompleted(); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + } + + @Override + public void onNext(T t) { + s.onNext(t); + } + + }, lock); + } + +} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorTimeoutBase.java b/rxjava-core/src/main/java/rx/operators/OperatorTimeoutBase.java index 3d6dc97998..7a6ea8e8d8 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorTimeoutBase.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorTimeoutBase.java @@ -25,7 +25,7 @@ import rx.Subscription; import rx.functions.Func2; import rx.functions.Func3; -import rx.observers.SynchronizedSubscriber; +import rx.observers.SerializedSubscriber; import rx.subscriptions.SerialSubscription; class OperatorTimeoutBase implements Operator { @@ -67,8 +67,7 @@ public Subscriber call(Subscriber subscriber) { // Use SynchronizedSubscriber for safe memory access // as the subscriber will be accessed in the current thread or the // scheduler or other Observables. - final SynchronizedSubscriber synchronizedSubscriber = new SynchronizedSubscriber( - subscriber); + final SerializedSubscriber synchronizedSubscriber = new SerializedSubscriber(subscriber); TimeoutSubscriber timeoutSubscriber = new TimeoutSubscriber( synchronizedSubscriber, timeoutStub, serial, other); @@ -84,17 +83,17 @@ public Subscriber call(Subscriber subscriber) { private final SerialSubscription serial; private final Object gate = new Object(); - private final SynchronizedSubscriber synchronizedSubscriber; + private final SerializedSubscriber serializedSubscriber; private final TimeoutStub timeoutStub; private final Observable other; private TimeoutSubscriber( - SynchronizedSubscriber synchronizedSubscriber, + SerializedSubscriber serializedSubscriber, TimeoutStub timeoutStub, SerialSubscription serial, Observable other) { - this.synchronizedSubscriber = synchronizedSubscriber; + this.serializedSubscriber = serializedSubscriber; this.timeoutStub = timeoutStub; this.serial = serial; this.other = other; @@ -110,7 +109,7 @@ public void onNext(T value) { } } if (onNextWins) { - synchronizedSubscriber.onNext(value); + serializedSubscriber.onNext(value); serial.set(timeoutStub.call(this, actual.get(), value)); } } @@ -125,7 +124,7 @@ public void onError(Throwable error) { } if (onErrorWins) { serial.unsubscribe(); - synchronizedSubscriber.onError(error); + serializedSubscriber.onError(error); } } @@ -139,7 +138,7 @@ public void onCompleted() { } if (onCompletedWins) { serial.unsubscribe(); - synchronizedSubscriber.onCompleted(); + serializedSubscriber.onCompleted(); } } @@ -153,9 +152,9 @@ public void onTimeout(long seqId) { } if (timeoutWins) { if (other == null) { - synchronizedSubscriber.onError(new TimeoutException()); + serializedSubscriber.onError(new TimeoutException()); } else { - serial.set(other.subscribe(synchronizedSubscriber)); + serial.set(other.subscribe(serializedSubscriber)); } } } diff --git a/rxjava-core/src/main/java/rx/operators/SafeObserver.java b/rxjava-core/src/main/java/rx/operators/SafeObserver.java index cf632b4dc5..08984deae3 100644 --- a/rxjava-core/src/main/java/rx/operators/SafeObserver.java +++ b/rxjava-core/src/main/java/rx/operators/SafeObserver.java @@ -22,7 +22,7 @@ import rx.Subscription; import rx.exceptions.CompositeException; import rx.exceptions.OnErrorNotImplementedException; -import rx.observers.SynchronizedObserver; +import rx.observers.SerializedObserver; import rx.plugins.RxJavaPlugins; import rx.subscriptions.Subscriptions; @@ -55,7 +55,7 @@ *

  • When onError or onComplete occur it will unsubscribe from the Observable (if executing asynchronously).
  • * *

    - * It will not synchronize onNext execution. Use the {@link SynchronizedObserver} to do that. + * It will not synchronize onNext execution. Use the {@link SerializedObserver} to do that. * * @param * @deprecated replaced by SafeSubscriber diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorSerializePerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorSerializePerformance.java new file mode 100644 index 0000000000..cfe70889cf --- /dev/null +++ b/rxjava-core/src/perf/java/rx/operators/OperatorSerializePerformance.java @@ -0,0 +1,324 @@ +package rx.operators; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Subscriber; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Func1; +import rx.perf.AbstractPerformanceTester; +import rx.perf.IntegerSumObserver; +import rx.schedulers.Schedulers; + +public class OperatorSerializePerformance extends AbstractPerformanceTester { + static int reps = Integer.MAX_VALUE / 16384; // timeTwoStreams + + // static int reps = Integer.MAX_VALUE / 1024; // timeSingleStream + // static int reps = 1000; // interval streams + + OperatorSerializePerformance() { + super(reps); + } + + public static void main(String args[]) { + + final OperatorSerializePerformance spt = new OperatorSerializePerformance(); + try { + spt.runTest(new Action0() { + + @Override + public void call() { + spt.timeTwoStreams(); + // spt.timeSingleStream(); + // spt.timeTwoStreamsIntervals(); + } + }); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + /** + * 1 streams emitting in a tight loop. Testing for single-threaded overhead. + * + * -> blocking synchronization (SynchronizedObserver) + * + * Run: 10 - 58,186,310 ops/sec + * Run: 11 - 60,592,037 ops/sec + * Run: 12 - 58,099,263 ops/sec + * Run: 13 - 59,034,765 ops/sec + * Run: 14 - 58,231,548 ops/sec + * + * -> state machine technique (SerializedObserverViaStateMachine) + * + * Run: 10 - 34,668,810 ops/sec + * Run: 11 - 32,874,312 ops/sec + * Run: 12 - 33,389,339 ops/sec + * Run: 13 - 35,269,946 ops/sec + * Run: 14 - 34,165,013 ops/sec + * + * -> using queue and counter technique (SerializedObserverViaQueueAndCounter) + * + * Run: 10 - 19,548,387 ops/sec + * Run: 11 - 19,471,069 ops/sec + * Run: 12 - 19,480,112 ops/sec + * Run: 13 - 18,720,550 ops/sec + * Run: 14 - 19,070,383 ops/sec + * + * -> using queue and lock technique (SerializedObserverViaQueueAndLock) + * + * Run: 10 - 51,295,152 ops/sec + * Run: 11 - 50,317,937 ops/sec + * Run: 12 - 51,126,331 ops/sec + * Run: 13 - 52,418,291 ops/sec + * Run: 14 - 51,694,710 ops/sec + */ + public long timeSingleStream() { + + final Observable s1 = Observable.range(0, reps).subscribeOn(Schedulers.newThread()); + + Observable s = Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber s) { + final CountDownLatch latch = new CountDownLatch(1); + // first + s1.doOnTerminate(new Action0() { + + @Override + public void call() { + latch.countDown(); + } + + }).subscribe(new Action1() { + + @Override + public void call(Integer t1) { + s.onNext(t1); + } + + }); + + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + s.onCompleted(); + } + + }).serialize(); + + IntegerSumObserver o = new IntegerSumObserver(); + s.subscribe(o); + // System.out.println("sum : " + o.sum); + + return o.sum; + } + + /** + * 2 streams emitting in tight loops so very high contention. + * + * -> blocking synchronization (SynchronizedObserver) + * + * Run: 10 - 8,361,252 ops/sec + * Run: 11 - 7,184,728 ops/sec + * Run: 12 - 8,249,685 ops/sec + * Run: 13 - 6,831,595 ops/sec + * Run: 14 - 8,003,358 ops/sec + * + * (faster because it allows each thread to be "single threaded" while blocking the other) + * + * -> state machine technique (SerializedObserverViaStateMachine) + * + * Run: 10 - 4,060,062 ops/sec + * Run: 11 - 3,561,131 ops/sec + * Run: 12 - 3,721,387 ops/sec + * Run: 13 - 3,693,909 ops/sec + * Run: 14 - 3,516,324 ops/sec + * + * -> using queue and counter technique (SerializedObserverViaQueueAndCounter) + * + * Run: 10 - 4,300,229 ops/sec + * Run: 11 - 4,395,995 ops/sec + * Run: 12 - 4,551,550 ops/sec + * Run: 13 - 4,443,235 ops/sec + * Run: 14 - 4,158,475 ops/sec + * + * -> using queue and lock technique (SerializedObserverViaQueueAndLock) + * + * Run: 10 - 6,369,781 ops/sec + * Run: 11 - 6,933,872 ops/sec + * Run: 12 - 5,652,535 ops/sec + * Run: 13 - 5,503,716 ops/sec + * Run: 14 - 6,219,264 ops/sec + */ + public long timeTwoStreams() { + + final Observable s1 = Observable.range(0, reps).subscribeOn(Schedulers.newThread()); + final Observable s2 = Observable.range(0, reps).subscribeOn(Schedulers.newThread()); + + Observable s = Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber s) { + final CountDownLatch latch = new CountDownLatch(2); + // first + s1.doOnTerminate(new Action0() { + + @Override + public void call() { + latch.countDown(); + } + + }).subscribe(new Action1() { + + @Override + public void call(Integer t1) { + s.onNext(t1); + } + + }); + + // second + s2.doOnTerminate(new Action0() { + + @Override + public void call() { + latch.countDown(); + } + + }).subscribe(new Action1() { + + @Override + public void call(Integer t1) { + s.onNext(t1); + } + + }); + + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + s.onCompleted(); + } + + }).serialize(); + + IntegerSumObserver o = new IntegerSumObserver(); + s.subscribe(o); + // System.out.println("sum : " + o.sum); + + return o.sum; + } + + /** + * 2 streams emitting once a millisecond. Slow emission so little to no contention. + * + * -> blocking synchronization (SynchronizedObserver) + * + * Run: 10 - 1,996 ops/sec + * Run: 11 - 1,996 ops/sec + * Run: 12 - 1,995 ops/sec + * Run: 13 - 1,997 ops/sec + * Run: 14 - 1,996 ops/sec + * + * -> state machine technique (SerializedObserverViaStateMachine) + * + * Run: 10 - 1,996 ops/sec + * Run: 11 - 1,996 ops/sec + * Run: 12 - 1,996 ops/sec + * Run: 13 - 1,996 ops/sec + * Run: 14 - 1,996 ops/sec + * + * -> using queue and counter technique (SerializedObserverViaQueueAndCounter) + * + * Run: 10 - 1,996 ops/sec + * Run: 11 - 1,996 ops/sec + * Run: 12 - 1,996 ops/sec + * Run: 13 - 1,996 ops/sec + * Run: 14 - 1,995 ops/sec + * + * -> using queue and lock technique (SerializedObserverViaQueueAndLock) + * + * Run: 10 - 1,996 ops/sec + * Run: 11 - 1,996 ops/sec + * Run: 12 - 1,997 ops/sec + * Run: 13 - 1,996 ops/sec + * Run: 14 - 1,995 ops/sec + */ + public long timeTwoStreamsIntervals() { + + final Observable s1 = Observable.interval(1, TimeUnit.MILLISECONDS).take(reps / 2).flatMap(new Func1>() { + + @Override + public Observable call(Long l) { + return Observable.range(l.intValue(), 100); + } + + }).subscribeOn(Schedulers.newThread()); + final Observable s2 = Observable.range(1, reps / 2).subscribeOn(Schedulers.newThread()); + + Observable s = Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber s) { + final CountDownLatch latch = new CountDownLatch(2); + // first + s1.doOnTerminate(new Action0() { + + @Override + public void call() { + latch.countDown(); + } + + }).subscribe(new Action1() { + + @Override + public void call(Integer t1) { + s.onNext(t1); + } + + }); + + // second + s2.doOnTerminate(new Action0() { + + @Override + public void call() { + latch.countDown(); + } + + }).subscribe(new Action1() { + + @Override + public void call(Integer t1) { + s.onNext(t1); + } + + }); + + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + s.onCompleted(); + } + + }).serialize(); + + IntegerSumObserver o = new IntegerSumObserver(); + s.subscribe(o); + // System.out.println("sum : " + o.sum); + + return o.sum; + } + +} diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorSynchronizePerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorSynchronizePerformance.java new file mode 100644 index 0000000000..a7396693fb --- /dev/null +++ b/rxjava-core/src/perf/java/rx/operators/OperatorSynchronizePerformance.java @@ -0,0 +1,236 @@ +package rx.operators; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Subscriber; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Func1; +import rx.perf.AbstractPerformanceTester; +import rx.perf.IntegerSumObserver; +import rx.schedulers.Schedulers; + +public class OperatorSynchronizePerformance extends AbstractPerformanceTester { + static int reps = Integer.MAX_VALUE / 1024; + + // static int reps = 1000; // timeTwoStreamsIntervals + + OperatorSynchronizePerformance() { + super(reps); + } + + public static void main(String args[]) { + + final OperatorSynchronizePerformance spt = new OperatorSynchronizePerformance(); + try { + spt.runTest(new Action0() { + + @Override + public void call() { + spt.timeTwoStreams(); + } + }); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + /** + * Run: 10 - 59,593,390 ops/sec + * Run: 11 - 55,784,194 ops/sec + * Run: 12 - 58,778,300 ops/sec + * Run: 13 - 60,679,696 ops/sec + * Run: 14 - 59,370,693 ops/sec + */ + public long timeSingleStream() { + + final Observable s1 = Observable.range(0, reps).subscribeOn(Schedulers.newThread()); + + Observable s = Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber s) { + final CountDownLatch latch = new CountDownLatch(1); + // first + s1.doOnTerminate(new Action0() { + + @Override + public void call() { + latch.countDown(); + } + + }).subscribe(new Action1() { + + @Override + public void call(Integer t1) { + s.onNext(t1); + } + + }); + + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + s.onCompleted(); + } + + }).synchronize(); + + IntegerSumObserver o = new IntegerSumObserver(); + s.subscribe(o); + // System.out.println("sum : " + o.sum); + + return o.sum; + } + + /** + * Run: 10 - 9,139,226 ops/sec + * Run: 11 - 8,456,526 ops/sec + * Run: 12 - 8,072,174 ops/sec + * Run: 13 - 8,667,381 ops/sec + * Run: 14 - 8,853,370 ops/sec + */ + public long timeTwoStreams() { + + final Observable s1 = Observable.range(0, reps).subscribeOn(Schedulers.newThread()); + final Observable s2 = Observable.range(0, reps).subscribeOn(Schedulers.newThread()); + + Observable s = Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber s) { + final CountDownLatch latch = new CountDownLatch(2); + // first + s1.doOnTerminate(new Action0() { + + @Override + public void call() { + latch.countDown(); + } + + }).subscribe(new Action1() { + + @Override + public void call(Integer t1) { + s.onNext(t1); + } + + }); + + // second + s2.doOnTerminate(new Action0() { + + @Override + public void call() { + latch.countDown(); + } + + }).subscribe(new Action1() { + + @Override + public void call(Integer t1) { + s.onNext(t1); + } + + }); + + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + s.onCompleted(); + } + + }).synchronize(); + + IntegerSumObserver o = new IntegerSumObserver(); + s.subscribe(o); + // System.out.println("sum : " + o.sum); + + return o.sum; + } + + /** + * Run: 10 - 1,996 ops/sec + * Run: 11 - 1,997 ops/sec + * Run: 12 - 1,996 ops/sec + * Run: 13 - 1,996 ops/sec + * Run: 14 - 1,995 ops/sec + * + * @return + */ + public long timeTwoStreamsIntervals() { + + final Observable s1 = Observable.interval(1, TimeUnit.MILLISECONDS).take(reps / 2).flatMap(new Func1>() { + + @Override + public Observable call(Long l) { + return Observable.range(l.intValue(), 100); + } + + }).subscribeOn(Schedulers.newThread()); + final Observable s2 = Observable.range(1, reps / 2).subscribeOn(Schedulers.newThread()); + + Observable s = Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber s) { + final CountDownLatch latch = new CountDownLatch(2); + // first + s1.doOnTerminate(new Action0() { + + @Override + public void call() { + latch.countDown(); + } + + }).subscribe(new Action1() { + + @Override + public void call(Integer t1) { + s.onNext(t1); + } + + }); + + // second + s2.doOnTerminate(new Action0() { + + @Override + public void call() { + latch.countDown(); + } + + }).subscribe(new Action1() { + + @Override + public void call(Integer t1) { + s.onNext(t1); + } + + }); + + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + s.onCompleted(); + } + + }).synchronize(); + + IntegerSumObserver o = new IntegerSumObserver(); + s.subscribe(o); + // System.out.println("sum : " + o.sum); + + return o.sum; + } +} diff --git a/rxjava-core/src/perf/java/rx/perf/IntegerSumObserver.java b/rxjava-core/src/perf/java/rx/perf/IntegerSumObserver.java index 31d10f0cab..28a522fbd8 100644 --- a/rxjava-core/src/perf/java/rx/perf/IntegerSumObserver.java +++ b/rxjava-core/src/perf/java/rx/perf/IntegerSumObserver.java @@ -13,6 +13,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { + e.printStackTrace(); throw new RuntimeException(e); } diff --git a/rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java b/rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java new file mode 100644 index 0000000000..e6043edd39 --- /dev/null +++ b/rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java @@ -0,0 +1,632 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.observers; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import rx.Observable; +import rx.Observer; +import rx.Subscriber; +import rx.Subscription; + +public class SerializedObserverTest { + + @Mock + Subscriber observer; + + @Before + public void before() { + MockitoAnnotations.initMocks(this); + } + + private Observer serializedObserver(Observer o) { + return new SerializedObserver(o); + } + + @Test + public void testSingleThreadedBasic() { + Subscription s = mock(Subscription.class); + TestSingleThreadedObservable onSubscribe = new TestSingleThreadedObservable(s, "one", "two", "three"); + Observable w = Observable.create(onSubscribe); + + Observer aw = serializedObserver(observer); + + w.subscribe(aw); + onSubscribe.waitToFinish(); + + verify(observer, times(1)).onNext("one"); + verify(observer, times(1)).onNext("two"); + verify(observer, times(1)).onNext("three"); + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onCompleted(); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + // verify(s, times(1)).unsubscribe(); + } + + @Test + public void testMultiThreadedBasic() { + Subscription s = mock(Subscription.class); + TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three"); + Observable w = Observable.create(onSubscribe); + + BusyObserver busyObserver = new BusyObserver(); + Observer aw = serializedObserver(busyObserver); + + w.subscribe(aw); + onSubscribe.waitToFinish(); + + assertEquals(3, busyObserver.onNextCount.get()); + assertFalse(busyObserver.onError); + assertTrue(busyObserver.onCompleted); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + // verify(s, times(1)).unsubscribe(); + + // we can have concurrency ... + assertTrue(onSubscribe.maxConcurrentThreads.get() > 1); + // ... but the onNext execution should be single threaded + assertEquals(1, busyObserver.maxConcurrentThreads.get()); + } + + @Test(timeout = 1000) + public void testMultiThreadedWithNPE() throws InterruptedException { + Subscription s = mock(Subscription.class); + TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null); + Observable w = Observable.create(onSubscribe); + + BusyObserver busyObserver = new BusyObserver(); + Observer aw = serializedObserver(busyObserver); + + w.subscribe(aw); + onSubscribe.waitToFinish(); + busyObserver.terminalEvent.await(); + + System.out.println("OnSubscribe maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get() + " Observer maxConcurrentThreads: " + busyObserver.maxConcurrentThreads.get()); + + // we can't know how many onNext calls will occur since they each run on a separate thread + // that depends on thread scheduling so 0, 1, 2 and 3 are all valid options + // assertEquals(3, busyObserver.onNextCount.get()); + assertTrue(busyObserver.onNextCount.get() < 4); + assertTrue(busyObserver.onError); + // no onCompleted because onError was invoked + assertFalse(busyObserver.onCompleted); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + //verify(s, times(1)).unsubscribe(); + + // we can have concurrency ... + assertTrue(onSubscribe.maxConcurrentThreads.get() > 1); + // ... but the onNext execution should be single threaded + assertEquals(1, busyObserver.maxConcurrentThreads.get()); + } + + @Test + public void testMultiThreadedWithNPEinMiddle() { + Subscription s = mock(Subscription.class); + TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine"); + Observable w = Observable.create(onSubscribe); + + BusyObserver busyObserver = new BusyObserver(); + Observer aw = serializedObserver(busyObserver); + + w.subscribe(aw); + onSubscribe.waitToFinish(); + + System.out.println("OnSubscribe maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get() + " Observer maxConcurrentThreads: " + busyObserver.maxConcurrentThreads.get()); + + // we can have concurrency ... + assertTrue(onSubscribe.maxConcurrentThreads.get() > 1); + // ... but the onNext execution should be single threaded + assertEquals(1, busyObserver.maxConcurrentThreads.get()); + + // this should not be the full number of items since the error should stop it before it completes all 9 + System.out.println("onNext count: " + busyObserver.onNextCount.get()); + assertTrue(busyObserver.onNextCount.get() < 9); + assertTrue(busyObserver.onError); + // no onCompleted because onError was invoked + assertFalse(busyObserver.onCompleted); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + // verify(s, times(1)).unsubscribe(); + } + + /** + * A non-realistic use case that tries to expose thread-safety issues by throwing lots of out-of-order + * events on many threads. + * + * @param w + * @param tw + */ + @Test + public void runOutOfOrderConcurrencyTest() { + ExecutorService tp = Executors.newFixedThreadPool(20); + try { + TestConcurrencyObserver tw = new TestConcurrencyObserver(); + // we need Synchronized + SafeSubscriber to handle synchronization plus life-cycle + Observer w = serializedObserver(new SafeSubscriber(tw)); + + Future f1 = tp.submit(new OnNextThread(w, 12000)); + Future f2 = tp.submit(new OnNextThread(w, 5000)); + Future f3 = tp.submit(new OnNextThread(w, 75000)); + Future f4 = tp.submit(new OnNextThread(w, 13500)); + Future f5 = tp.submit(new OnNextThread(w, 22000)); + Future f6 = tp.submit(new OnNextThread(w, 15000)); + Future f7 = tp.submit(new OnNextThread(w, 7500)); + Future f8 = tp.submit(new OnNextThread(w, 23500)); + + Future f10 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onCompleted, f1, f2, f3, f4)); + try { + Thread.sleep(1); + } catch (InterruptedException e) { + // ignore + } + Future f11 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onCompleted, f4, f6, f7)); + Future f12 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onCompleted, f4, f6, f7)); + Future f13 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onCompleted, f4, f6, f7)); + Future f14 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onCompleted, f4, f6, f7)); + // // the next 4 onError events should wait on same as f10 + Future f15 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onError, f1, f2, f3, f4)); + Future f16 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onError, f1, f2, f3, f4)); + Future f17 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onError, f1, f2, f3, f4)); + Future f18 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onError, f1, f2, f3, f4)); + + waitOnThreads(f1, f2, f3, f4, f5, f6, f7, f8, f10, f11, f12, f13, f14, f15, f16, f17, f18); + @SuppressWarnings("unused") + int numNextEvents = tw.assertEvents(null); // no check of type since we don't want to test barging results here, just interleaving behavior + // System.out.println("Number of events executed: " + numNextEvents); + } catch (Throwable e) { + fail("Concurrency test failed: " + e.getMessage()); + e.printStackTrace(); + } finally { + tp.shutdown(); + try { + tp.awaitTermination(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + /** + * + * @param w + * @param tw + */ + @Test + public void runConcurrencyTest() { + ExecutorService tp = Executors.newFixedThreadPool(20); + try { + TestConcurrencyObserver tw = new TestConcurrencyObserver(); + // we need Synchronized + SafeSubscriber to handle synchronization plus life-cycle + Observer w = serializedObserver(new SafeSubscriber(tw)); + + Future f1 = tp.submit(new OnNextThread(w, 12000)); + Future f2 = tp.submit(new OnNextThread(w, 5000)); + Future f3 = tp.submit(new OnNextThread(w, 75000)); + Future f4 = tp.submit(new OnNextThread(w, 13500)); + Future f5 = tp.submit(new OnNextThread(w, 22000)); + Future f6 = tp.submit(new OnNextThread(w, 15000)); + Future f7 = tp.submit(new OnNextThread(w, 7500)); + Future f8 = tp.submit(new OnNextThread(w, 23500)); + + // 12000 + 5000 + 75000 + 13500 + 22000 + 15000 + 7500 + 23500 = 173500 + + Future f10 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onCompleted, f1, f2, f3, f4, f5, f6, f7, f8)); + try { + Thread.sleep(1); + } catch (InterruptedException e) { + // ignore + } + + waitOnThreads(f1, f2, f3, f4, f5, f6, f7, f8, f10); + @SuppressWarnings("unused") + int numNextEvents = tw.assertEvents(null); // no check of type since we don't want to test barging results here, just interleaving behavior + assertEquals(173500, numNextEvents); + // System.out.println("Number of events executed: " + numNextEvents); + } catch (Throwable e) { + fail("Concurrency test failed: " + e.getMessage()); + e.printStackTrace(); + } finally { + tp.shutdown(); + try { + tp.awaitTermination(25000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + private static void waitOnThreads(Future... futures) { + for (Future f : futures) { + try { + f.get(20, TimeUnit.SECONDS); + } catch (Throwable e) { + System.err.println("Failed while waiting on future."); + e.printStackTrace(); + } + } + } + + /** + * A thread that will pass data to onNext + */ + public static class OnNextThread implements Runnable { + + private final Observer Observer; + private final int numStringsToSend; + + OnNextThread(Observer Observer, int numStringsToSend) { + this.Observer = Observer; + this.numStringsToSend = numStringsToSend; + } + + @Override + public void run() { + for (int i = 0; i < numStringsToSend; i++) { + Observer.onNext(Thread.currentThread().getId() + "-" + i); + } + } + } + + /** + * A thread that will call onError or onNext + */ + public static class CompletionThread implements Runnable { + + private final Observer observer; + private final TestConcurrencyObserverEvent event; + private final Future[] waitOnThese; + + CompletionThread(Observer Observer, TestConcurrencyObserverEvent event, Future... waitOnThese) { + this.observer = Observer; + this.event = event; + this.waitOnThese = waitOnThese; + } + + @Override + public void run() { + /* if we have 'waitOnThese' futures, we'll wait on them before proceeding */ + if (waitOnThese != null) { + for (Future f : waitOnThese) { + try { + f.get(); + } catch (Throwable e) { + System.err.println("Error while waiting on future in CompletionThread"); + } + } + } + + /* send the event */ + if (event == TestConcurrencyObserverEvent.onError) { + observer.onError(new RuntimeException("mocked exception")); + } else if (event == TestConcurrencyObserverEvent.onCompleted) { + observer.onCompleted(); + + } else { + throw new IllegalArgumentException("Expecting either onError or onCompleted"); + } + } + } + + private static enum TestConcurrencyObserverEvent { + onCompleted, onError, onNext + } + + private static class TestConcurrencyObserver extends Subscriber { + + /** + * used to store the order and number of events received + */ + private final LinkedBlockingQueue events = new LinkedBlockingQueue(); + private final int waitTime; + + @SuppressWarnings("unused") + public TestConcurrencyObserver(int waitTimeInNext) { + this.waitTime = waitTimeInNext; + } + + public TestConcurrencyObserver() { + this.waitTime = 0; + } + + @Override + public void onCompleted() { + events.add(TestConcurrencyObserverEvent.onCompleted); + } + + @Override + public void onError(Throwable e) { + events.add(TestConcurrencyObserverEvent.onError); + } + + @Override + public void onNext(String args) { + events.add(TestConcurrencyObserverEvent.onNext); + // do some artificial work to make the thread scheduling/timing vary + int s = 0; + for (int i = 0; i < 20; i++) { + s += s * i; + } + + if (waitTime > 0) { + try { + Thread.sleep(waitTime); + } catch (InterruptedException e) { + // ignore + } + } + } + + /** + * Assert the order of events is correct and return the number of onNext executions. + * + * @param expectedEndingEvent + * @return int count of onNext calls + * @throws IllegalStateException + * If order of events was invalid. + */ + public int assertEvents(TestConcurrencyObserverEvent expectedEndingEvent) throws IllegalStateException { + int nextCount = 0; + boolean finished = false; + for (TestConcurrencyObserverEvent e : events) { + if (e == TestConcurrencyObserverEvent.onNext) { + if (finished) { + // already finished, we shouldn't get this again + throw new IllegalStateException("Received onNext but we're already finished."); + } + nextCount++; + } else if (e == TestConcurrencyObserverEvent.onError) { + if (finished) { + // already finished, we shouldn't get this again + throw new IllegalStateException("Received onError but we're already finished."); + } + if (expectedEndingEvent != null && TestConcurrencyObserverEvent.onError != expectedEndingEvent) { + throw new IllegalStateException("Received onError ending event but expected " + expectedEndingEvent); + } + finished = true; + } else if (e == TestConcurrencyObserverEvent.onCompleted) { + if (finished) { + // already finished, we shouldn't get this again + throw new IllegalStateException("Received onCompleted but we're already finished."); + } + if (expectedEndingEvent != null && TestConcurrencyObserverEvent.onCompleted != expectedEndingEvent) { + throw new IllegalStateException("Received onCompleted ending event but expected " + expectedEndingEvent); + } + finished = true; + } + } + + return nextCount; + } + + } + + /** + * This spawns a single thread for the subscribe execution + */ + private static class TestSingleThreadedObservable implements Observable.OnSubscribeFunc { + + final Subscription s; + final String[] values; + private Thread t = null; + + public TestSingleThreadedObservable(final Subscription s, final String... values) { + this.s = s; + this.values = values; + + } + + public Subscription onSubscribe(final Observer observer) { + System.out.println("TestSingleThreadedObservable subscribed to ..."); + t = new Thread(new Runnable() { + + @Override + public void run() { + try { + System.out.println("running TestSingleThreadedObservable thread"); + for (String s : values) { + System.out.println("TestSingleThreadedObservable onNext: " + s); + observer.onNext(s); + } + observer.onCompleted(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + }); + System.out.println("starting TestSingleThreadedObservable thread"); + t.start(); + System.out.println("done starting TestSingleThreadedObservable thread"); + return s; + } + + public void waitToFinish() { + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + } + + /** + * This spawns a thread for the subscription, then a separate thread for each onNext call. + */ + private static class TestMultiThreadedObservable implements Observable.OnSubscribeFunc { + + final Subscription s; + final String[] values; + Thread t = null; + AtomicInteger threadsRunning = new AtomicInteger(); + AtomicInteger maxConcurrentThreads = new AtomicInteger(); + ExecutorService threadPool; + + public TestMultiThreadedObservable(Subscription s, String... values) { + this.s = s; + this.values = values; + this.threadPool = Executors.newCachedThreadPool(); + } + + @Override + public Subscription onSubscribe(final Observer observer) { + System.out.println("TestMultiThreadedObservable subscribed to ..."); + t = new Thread(new Runnable() { + + @Override + public void run() { + try { + System.out.println("running TestMultiThreadedObservable thread"); + for (final String s : values) { + threadPool.execute(new Runnable() { + + @Override + public void run() { + threadsRunning.incrementAndGet(); + try { + // perform onNext call + System.out.println("TestMultiThreadedObservable onNext: " + s); + if (s == null) { + // force an error + throw new NullPointerException(); + } + observer.onNext(s); + // capture 'maxThreads' + int concurrentThreads = threadsRunning.get(); + int maxThreads = maxConcurrentThreads.get(); + if (concurrentThreads > maxThreads) { + maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads); + } + } catch (Throwable e) { + observer.onError(e); + } finally { + threadsRunning.decrementAndGet(); + } + } + }); + } + // we are done spawning threads + threadPool.shutdown(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + + // wait until all threads are done, then mark it as COMPLETED + try { + // wait for all the threads to finish + threadPool.awaitTermination(2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + observer.onCompleted(); + } + }); + System.out.println("starting TestMultiThreadedObservable thread"); + t.start(); + System.out.println("done starting TestMultiThreadedObservable thread"); + return s; + } + + public void waitToFinish() { + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private static class BusyObserver extends Subscriber { + volatile boolean onCompleted = false; + volatile boolean onError = false; + AtomicInteger onNextCount = new AtomicInteger(); + AtomicInteger threadsRunning = new AtomicInteger(); + AtomicInteger maxConcurrentThreads = new AtomicInteger(); + final CountDownLatch terminalEvent = new CountDownLatch(1); + + @Override + public void onCompleted() { + threadsRunning.incrementAndGet(); + try { + onCompleted = true; + } finally { + captureMaxThreads(); + threadsRunning.decrementAndGet(); + terminalEvent.countDown(); + } + } + + @Override + public void onError(Throwable e) { + System.out.println(">>>>>>>>>>>>>>>>>>>> onError received: " + e); + threadsRunning.incrementAndGet(); + try { + onError = true; + } finally { + captureMaxThreads(); + threadsRunning.decrementAndGet(); + terminalEvent.countDown(); + } + } + + @Override + public void onNext(String args) { + threadsRunning.incrementAndGet(); + try { + onNextCount.incrementAndGet(); + try { + // simulate doing something computational + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } finally { + // capture 'maxThreads' + captureMaxThreads(); + threadsRunning.decrementAndGet(); + } + } + + protected void captureMaxThreads() { + int concurrentThreads = threadsRunning.get(); + int maxThreads = maxConcurrentThreads.get(); + if (concurrentThreads > maxThreads) { + maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads); + if (concurrentThreads > 1) { + new RuntimeException("should not be greater than 1").printStackTrace(); + } + } + } + + } +} diff --git a/rxjava-core/src/test/java/rx/observers/SynchronizedObserverTest.java b/rxjava-core/src/test/java/rx/observers/SynchronizedObserverTest.java index bdaad282c6..aad021d2e3 100644 --- a/rxjava-core/src/test/java/rx/observers/SynchronizedObserverTest.java +++ b/rxjava-core/src/test/java/rx/observers/SynchronizedObserverTest.java @@ -288,7 +288,7 @@ public void testMultiThreadedWithNPEinMiddleAndLock() { * @param tw */ @Test - public void runConcurrencyTest() { + public void runOutOfOrderConcurrencyTest() { ExecutorService tp = Executors.newFixedThreadPool(20); try { TestConcurrencyObserver tw = new TestConcurrencyObserver(); @@ -304,7 +304,7 @@ public void runConcurrencyTest() { Future f7 = tp.submit(new OnNextThread(w, 7500)); Future f8 = tp.submit(new OnNextThread(w, 23500)); - Future f10 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onCompleted, f1, f2, f3, f4)); + Future f10 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onCompleted, f1, f2, f3, f4, f5, f6, f7, f8)); try { Thread.sleep(1); } catch (InterruptedException e) { @@ -337,6 +337,55 @@ public void runConcurrencyTest() { } } + /** + * + * @param w + * @param tw + */ + @Test + public void runConcurrencyTest() { + ExecutorService tp = Executors.newFixedThreadPool(20); + try { + TestConcurrencyObserver tw = new TestConcurrencyObserver(); + // we need Synchronized + SafeSubscriber to handle synchronization plus life-cycle + SynchronizedObserver w = new SynchronizedObserver(new SafeSubscriber(tw)); + + Future f1 = tp.submit(new OnNextThread(w, 12000)); + Future f2 = tp.submit(new OnNextThread(w, 5000)); + Future f3 = tp.submit(new OnNextThread(w, 75000)); + Future f4 = tp.submit(new OnNextThread(w, 13500)); + Future f5 = tp.submit(new OnNextThread(w, 22000)); + Future f6 = tp.submit(new OnNextThread(w, 15000)); + Future f7 = tp.submit(new OnNextThread(w, 7500)); + Future f8 = tp.submit(new OnNextThread(w, 23500)); + + // 12000 + 5000 + 75000 + 13500 + 22000 + 15000 + 7500 + 23500 = 173500 + + Future f10 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onCompleted, f1, f2, f3, f4)); + try { + Thread.sleep(1); + } catch (InterruptedException e) { + // ignore + } + + waitOnThreads(f1, f2, f3, f4, f5, f6, f7, f8, f10); + @SuppressWarnings("unused") + int numNextEvents = tw.assertEvents(null); // no check of type since we don't want to test barging results here, just interleaving behavior + assertEquals(173500, numNextEvents); + // System.out.println("Number of events executed: " + numNextEvents); + } catch (Throwable e) { + fail("Concurrency test failed: " + e.getMessage()); + e.printStackTrace(); + } finally { + tp.shutdown(); + try { + tp.awaitTermination(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + private static void waitOnThreads(Future... futures) { for (Future f : futures) { try { diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSerializeTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSerializeTest.java new file mode 100644 index 0000000000..6e396b2139 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperatorSerializeTest.java @@ -0,0 +1,566 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import rx.Observable; +import rx.Observer; +import rx.Subscriber; +import rx.Subscription; + +public class OperatorSerializeTest { + + @Mock + Observer observer; + + @Before + public void before() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testSingleThreadedBasic() { + Subscription s = mock(Subscription.class); + TestSingleThreadedObservable onSubscribe = new TestSingleThreadedObservable(s, "one", "two", "three"); + Observable w = Observable.create(onSubscribe); + + w.serialize().subscribe(observer); + onSubscribe.waitToFinish(); + + verify(observer, times(1)).onNext("one"); + verify(observer, times(1)).onNext("two"); + verify(observer, times(1)).onNext("three"); + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onCompleted(); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + // verify(s, times(1)).unsubscribe(); + } + + @Test + public void testMultiThreadedBasic() { + Subscription s = mock(Subscription.class); + TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three"); + Observable w = Observable.create(onSubscribe); + + BusyObserver busyobserver = new BusyObserver(); + + w.serialize().subscribe(busyobserver); + onSubscribe.waitToFinish(); + + assertEquals(3, busyobserver.onNextCount.get()); + assertFalse(busyobserver.onError); + assertTrue(busyobserver.onCompleted); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + // verify(s, times(1)).unsubscribe(); + + // we can have concurrency ... + assertTrue(onSubscribe.maxConcurrentThreads.get() > 1); + // ... but the onNext execution should be single threaded + assertEquals(1, busyobserver.maxConcurrentThreads.get()); + } + + @Test + public void testMultiThreadedWithNPE() { + Subscription s = mock(Subscription.class); + TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null); + Observable w = Observable.create(onSubscribe); + + BusyObserver busyobserver = new BusyObserver(); + + w.serialize().subscribe(busyobserver); + onSubscribe.waitToFinish(); + + System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get()); + + // we can't know how many onNext calls will occur since they each run on a separate thread + // that depends on thread scheduling so 0, 1, 2 and 3 are all valid options + // assertEquals(3, busyobserver.onNextCount.get()); + assertTrue(busyobserver.onNextCount.get() < 4); + assertTrue(busyobserver.onError); + // no onCompleted because onError was invoked + assertFalse(busyobserver.onCompleted); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + //verify(s, times(1)).unsubscribe(); + + // we can have concurrency ... + assertTrue(onSubscribe.maxConcurrentThreads.get() > 1); + // ... but the onNext execution should be single threaded + assertEquals(1, busyobserver.maxConcurrentThreads.get()); + } + + @Test + public void testMultiThreadedWithNPEinMiddle() { + Subscription s = mock(Subscription.class); + TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine"); + Observable w = Observable.create(onSubscribe); + + BusyObserver busyobserver = new BusyObserver(); + + w.serialize().subscribe(busyobserver); + onSubscribe.waitToFinish(); + + System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get()); + // this should not be the full number of items since the error should stop it before it completes all 9 + System.out.println("onNext count: " + busyobserver.onNextCount.get()); + assertTrue(busyobserver.onNextCount.get() < 9); + assertTrue(busyobserver.onError); + // no onCompleted because onError was invoked + assertFalse(busyobserver.onCompleted); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + // verify(s, times(1)).unsubscribe(); + + // we can have concurrency ... + assertTrue(onSubscribe.maxConcurrentThreads.get() > 1); + // ... but the onNext execution should be single threaded + assertEquals(1, busyobserver.maxConcurrentThreads.get()); + } + + /** + * A thread that will pass data to onNext + */ + public static class OnNextThread implements Runnable { + + private final Observer observer; + private final int numStringsToSend; + + OnNextThread(Observer observer, int numStringsToSend) { + this.observer = observer; + this.numStringsToSend = numStringsToSend; + } + + @Override + public void run() { + for (int i = 0; i < numStringsToSend; i++) { + observer.onNext("aString"); + } + } + } + + /** + * A thread that will call onError or onNext + */ + public static class CompletionThread implements Runnable { + + private final Observer observer; + private final TestConcurrencyobserverEvent event; + private final Future[] waitOnThese; + + CompletionThread(Observer observer, TestConcurrencyobserverEvent event, Future... waitOnThese) { + this.observer = observer; + this.event = event; + this.waitOnThese = waitOnThese; + } + + @Override + public void run() { + /* if we have 'waitOnThese' futures, we'll wait on them before proceeding */ + if (waitOnThese != null) { + for (Future f : waitOnThese) { + try { + f.get(); + } catch (Throwable e) { + System.err.println("Error while waiting on future in CompletionThread"); + } + } + } + + /* send the event */ + if (event == TestConcurrencyobserverEvent.onError) { + observer.onError(new RuntimeException("mocked exception")); + } else if (event == TestConcurrencyobserverEvent.onCompleted) { + observer.onCompleted(); + + } else { + throw new IllegalArgumentException("Expecting either onError or onCompleted"); + } + } + } + + private static enum TestConcurrencyobserverEvent { + onCompleted, onError, onNext + } + + private static class TestConcurrencyobserver extends Subscriber { + + /** + * used to store the order and number of events received + */ + private final LinkedBlockingQueue events = new LinkedBlockingQueue(); + private final int waitTime; + + @SuppressWarnings("unused") + public TestConcurrencyobserver(int waitTimeInNext) { + this.waitTime = waitTimeInNext; + } + + public TestConcurrencyobserver() { + this.waitTime = 0; + } + + @Override + public void onCompleted() { + events.add(TestConcurrencyobserverEvent.onCompleted); + } + + @Override + public void onError(Throwable e) { + events.add(TestConcurrencyobserverEvent.onError); + } + + @Override + public void onNext(String args) { + events.add(TestConcurrencyobserverEvent.onNext); + // do some artificial work to make the thread scheduling/timing vary + int s = 0; + for (int i = 0; i < 20; i++) { + s += s * i; + } + + if (waitTime > 0) { + try { + Thread.sleep(waitTime); + } catch (InterruptedException e) { + // ignore + } + } + } + + /** + * Assert the order of events is correct and return the number of onNext executions. + * + * @param expectedEndingEvent + * @return int count of onNext calls + * @throws IllegalStateException + * If order of events was invalid. + */ + public int assertEvents(TestConcurrencyobserverEvent expectedEndingEvent) throws IllegalStateException { + int nextCount = 0; + boolean finished = false; + for (TestConcurrencyobserverEvent e : events) { + if (e == TestConcurrencyobserverEvent.onNext) { + if (finished) { + // already finished, we shouldn't get this again + throw new IllegalStateException("Received onNext but we're already finished."); + } + nextCount++; + } else if (e == TestConcurrencyobserverEvent.onError) { + if (finished) { + // already finished, we shouldn't get this again + throw new IllegalStateException("Received onError but we're already finished."); + } + if (expectedEndingEvent != null && TestConcurrencyobserverEvent.onError != expectedEndingEvent) { + throw new IllegalStateException("Received onError ending event but expected " + expectedEndingEvent); + } + finished = true; + } else if (e == TestConcurrencyobserverEvent.onCompleted) { + if (finished) { + // already finished, we shouldn't get this again + throw new IllegalStateException("Received onCompleted but we're already finished."); + } + if (expectedEndingEvent != null && TestConcurrencyobserverEvent.onCompleted != expectedEndingEvent) { + throw new IllegalStateException("Received onCompleted ending event but expected " + expectedEndingEvent); + } + finished = true; + } + } + + return nextCount; + } + + } + + /** + * This spawns a single thread for the subscribe execution + */ + private static class TestSingleThreadedObservable implements Observable.OnSubscribeFunc { + + final Subscription s; + final String[] values; + private Thread t = null; + + public TestSingleThreadedObservable(final Subscription s, final String... values) { + this.s = s; + this.values = values; + + } + + public Subscription onSubscribe(final Observer observer) { + System.out.println("TestSingleThreadedObservable subscribed to ..."); + t = new Thread(new Runnable() { + + @Override + public void run() { + try { + System.out.println("running TestSingleThreadedObservable thread"); + for (String s : values) { + System.out.println("TestSingleThreadedObservable onNext: " + s); + observer.onNext(s); + } + observer.onCompleted(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + }); + System.out.println("starting TestSingleThreadedObservable thread"); + t.start(); + System.out.println("done starting TestSingleThreadedObservable thread"); + return s; + } + + public void waitToFinish() { + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + } + + /** + * This spawns a thread for the subscription, then a separate thread for each onNext call. + */ + private static class TestMultiThreadedObservable implements Observable.OnSubscribeFunc { + + final Subscription s; + final String[] values; + Thread t = null; + AtomicInteger threadsRunning = new AtomicInteger(); + AtomicInteger maxConcurrentThreads = new AtomicInteger(); + ExecutorService threadPool; + + public TestMultiThreadedObservable(Subscription s, String... values) { + this.s = s; + this.values = values; + this.threadPool = Executors.newCachedThreadPool(); + } + + @Override + public Subscription onSubscribe(final Observer observer) { + System.out.println("TestMultiThreadedObservable subscribed to ..."); + t = new Thread(new Runnable() { + + @Override + public void run() { + try { + System.out.println("running TestMultiThreadedObservable thread"); + for (final String s : values) { + threadPool.execute(new Runnable() { + + @Override + public void run() { + threadsRunning.incrementAndGet(); + try { + // perform onNext call + System.out.println("TestMultiThreadedObservable onNext: " + s); + if (s == null) { + // force an error + throw new NullPointerException(); + } + observer.onNext(s); + // capture 'maxThreads' + int concurrentThreads = threadsRunning.get(); + int maxThreads = maxConcurrentThreads.get(); + if (concurrentThreads > maxThreads) { + maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads); + } + } catch (Throwable e) { + observer.onError(e); + } finally { + threadsRunning.decrementAndGet(); + } + } + }); + } + // we are done spawning threads + threadPool.shutdown(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + + // wait until all threads are done, then mark it as COMPLETED + try { + // wait for all the threads to finish + threadPool.awaitTermination(2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + observer.onCompleted(); + } + }); + System.out.println("starting TestMultiThreadedObservable thread"); + t.start(); + System.out.println("done starting TestMultiThreadedObservable thread"); + return s; + } + + public void waitToFinish() { + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private static class BusyObserver extends Subscriber { + volatile boolean onCompleted = false; + volatile boolean onError = false; + AtomicInteger onNextCount = new AtomicInteger(); + AtomicInteger threadsRunning = new AtomicInteger(); + AtomicInteger maxConcurrentThreads = new AtomicInteger(); + + @Override + public void onCompleted() { + threadsRunning.incrementAndGet(); + + System.out.println(">>> Busyobserver received onCompleted"); + onCompleted = true; + + int concurrentThreads = threadsRunning.get(); + int maxThreads = maxConcurrentThreads.get(); + if (concurrentThreads > maxThreads) { + maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads); + } + threadsRunning.decrementAndGet(); + } + + @Override + public void onError(Throwable e) { + threadsRunning.incrementAndGet(); + + System.out.println(">>> Busyobserver received onError: " + e.getMessage()); + onError = true; + + int concurrentThreads = threadsRunning.get(); + int maxThreads = maxConcurrentThreads.get(); + if (concurrentThreads > maxThreads) { + maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads); + } + threadsRunning.decrementAndGet(); + } + + @Override + public void onNext(String args) { + threadsRunning.incrementAndGet(); + try { + onNextCount.incrementAndGet(); + System.out.println(">>> Busyobserver received onNext: " + args); + try { + // simulate doing something computational + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } finally { + // capture 'maxThreads' + int concurrentThreads = threadsRunning.get(); + int maxThreads = maxConcurrentThreads.get(); + if (concurrentThreads > maxThreads) { + maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads); + } + threadsRunning.decrementAndGet(); + } + } + + } + + private static class ExternalBusyThread extends Thread { + + private BusyObserver observer; + private Object lock; + private int lockTimes; + private int waitTime; + public volatile boolean fail; + + public ExternalBusyThread(BusyObserver observer, Object lock, int lockTimes, int waitTime) { + this.observer = observer; + this.lock = lock; + this.lockTimes = lockTimes; + this.waitTime = waitTime; + this.fail = false; + } + + @Override + public void run() { + Random r = new Random(); + for (int i = 0; i < lockTimes; i++) { + synchronized (lock) { + int oldOnNextCount = observer.onNextCount.get(); + boolean oldOnCompleted = observer.onCompleted; + boolean oldOnError = observer.onError; + try { + Thread.sleep(r.nextInt(waitTime)); + } catch (InterruptedException e) { + // ignore + } + // Since we own the lock, onNextCount, onCompleted and + // onError must not be changed. + int newOnNextCount = observer.onNextCount.get(); + boolean newOnCompleted = observer.onCompleted; + boolean newOnError = observer.onError; + if (oldOnNextCount != newOnNextCount) { + System.out.println(">>> ExternalBusyThread received different onNextCount: " + + oldOnNextCount + + " -> " + + newOnNextCount); + fail = true; + break; + } + if (oldOnCompleted != newOnCompleted) { + System.out.println(">>> ExternalBusyThread received different onCompleted: " + + oldOnCompleted + + " -> " + + newOnCompleted); + fail = true; + break; + } + if (oldOnError != newOnError) { + System.out.println(">>> ExternalBusyThread received different onError: " + + oldOnError + + " -> " + + newOnError); + fail = true; + break; + } + } + } + } + + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSynchronizeTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSynchronizeTest.java new file mode 100644 index 0000000000..9ec9396f5b --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperatorSynchronizeTest.java @@ -0,0 +1,688 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import rx.Observable; +import rx.Observer; +import rx.Subscriber; +import rx.Subscription; +import rx.observers.TestSubscriber; + +public class OperatorSynchronizeTest { + + @Mock + Observer observer; + + @Before + public void before() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testSingleThreadedBasic() { + Subscription s = mock(Subscription.class); + TestSingleThreadedObservable onSubscribe = new TestSingleThreadedObservable(s, "one", "two", "three"); + Observable w = Observable.create(onSubscribe); + + w.synchronize().subscribe(observer); + onSubscribe.waitToFinish(); + + verify(observer, times(1)).onNext("one"); + verify(observer, times(1)).onNext("two"); + verify(observer, times(1)).onNext("three"); + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onCompleted(); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + // verify(s, times(1)).unsubscribe(); + } + + @Test + public void testMultiThreadedBasic() { + Subscription s = mock(Subscription.class); + TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three"); + Observable w = Observable.create(onSubscribe); + + BusyObserver busyobserver = new BusyObserver(); + + w.synchronize().subscribe(busyobserver); + onSubscribe.waitToFinish(); + + assertEquals(3, busyobserver.onNextCount.get()); + assertFalse(busyobserver.onError); + assertTrue(busyobserver.onCompleted); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + // verify(s, times(1)).unsubscribe(); + + // we can have concurrency ... + assertTrue(onSubscribe.maxConcurrentThreads.get() > 1); + // ... but the onNext execution should be single threaded + assertEquals(1, busyobserver.maxConcurrentThreads.get()); + } + + @Test + public void testMultiThreadedBasicWithLock() { + Subscription s = mock(Subscription.class); + TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three"); + Observable w = Observable.create(onSubscribe); + + BusyObserver busyobserver = new BusyObserver(); + + Object lock = new Object(); + ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyobserver, lock, 10, 100); + + externalBusyThread.start(); + + w.synchronize(lock).subscribe(busyobserver); + onSubscribe.waitToFinish(); + + try { + externalBusyThread.join(10000); + assertFalse(externalBusyThread.isAlive()); + assertFalse(externalBusyThread.fail); + } catch (InterruptedException e) { + // ignore + } + + assertEquals(3, busyobserver.onNextCount.get()); + assertFalse(busyobserver.onError); + assertTrue(busyobserver.onCompleted); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + // verify(s, times(1)).unsubscribe(); + + // we can have concurrency ... + assertTrue(onSubscribe.maxConcurrentThreads.get() > 1); + // ... but the onNext execution should be single threaded + assertEquals(1, busyobserver.maxConcurrentThreads.get()); + } + + @Test + public void testMultiThreadedWithNPE() { + Subscription s = mock(Subscription.class); + TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null); + Observable w = Observable.create(onSubscribe); + + BusyObserver busyobserver = new BusyObserver(); + + w.synchronize().subscribe(busyobserver); + onSubscribe.waitToFinish(); + + System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get()); + + // we can't know how many onNext calls will occur since they each run on a separate thread + // that depends on thread scheduling so 0, 1, 2 and 3 are all valid options + // assertEquals(3, busyobserver.onNextCount.get()); + assertTrue(busyobserver.onNextCount.get() < 4); + assertTrue(busyobserver.onError); + // no onCompleted because onError was invoked + assertFalse(busyobserver.onCompleted); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + //verify(s, times(1)).unsubscribe(); + + // we can have concurrency ... + assertTrue(onSubscribe.maxConcurrentThreads.get() > 1); + // ... but the onNext execution should be single threaded + assertEquals(1, busyobserver.maxConcurrentThreads.get()); + } + + @Test + public void testMultiThreadedWithNPEAndLock() { + Subscription s = mock(Subscription.class); + TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null); + Observable w = Observable.create(onSubscribe); + + BusyObserver busyobserver = new BusyObserver(); + + Object lock = new Object(); + ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyobserver, lock, 10, 100); + + externalBusyThread.start(); + + w.synchronize(lock).subscribe(busyobserver); + onSubscribe.waitToFinish(); + + try { + externalBusyThread.join(10000); + assertFalse(externalBusyThread.isAlive()); + assertFalse(externalBusyThread.fail); + } catch (InterruptedException e) { + // ignore + } + + System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get()); + + // we can't know how many onNext calls will occur since they each run on a separate thread + // that depends on thread scheduling so 0, 1, 2 and 3 are all valid options + // assertEquals(3, busyobserver.onNextCount.get()); + assertTrue(busyobserver.onNextCount.get() < 4); + assertTrue(busyobserver.onError); + // no onCompleted because onError was invoked + assertFalse(busyobserver.onCompleted); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + //verify(s, times(1)).unsubscribe(); + + // we can have concurrency ... + assertTrue(onSubscribe.maxConcurrentThreads.get() > 1); + // ... but the onNext execution should be single threaded + assertEquals(1, busyobserver.maxConcurrentThreads.get()); + } + + @Test + public void testMultiThreadedWithNPEinMiddle() { + Subscription s = mock(Subscription.class); + TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine"); + Observable w = Observable.create(onSubscribe); + + BusyObserver busyobserver = new BusyObserver(); + + w.synchronize().subscribe(busyobserver); + onSubscribe.waitToFinish(); + + System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get()); + // this should not be the full number of items since the error should stop it before it completes all 9 + System.out.println("onNext count: " + busyobserver.onNextCount.get()); + assertTrue(busyobserver.onNextCount.get() < 9); + assertTrue(busyobserver.onError); + // no onCompleted because onError was invoked + assertFalse(busyobserver.onCompleted); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + // verify(s, times(1)).unsubscribe(); + + // we can have concurrency ... + assertTrue(onSubscribe.maxConcurrentThreads.get() > 1); + // ... but the onNext execution should be single threaded + assertEquals(1, busyobserver.maxConcurrentThreads.get()); + } + + @Test + public void testMultiThreadedWithNPEinMiddleAndLock() { + Subscription s = mock(Subscription.class); + TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine"); + Observable w = Observable.create(onSubscribe); + + BusyObserver busyobserver = new BusyObserver(); + + Object lock = new Object(); + ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyobserver, lock, 10, 100); + + externalBusyThread.start(); + + w.synchronize(lock).subscribe(busyobserver); + onSubscribe.waitToFinish(); + + try { + externalBusyThread.join(10000); + assertFalse(externalBusyThread.isAlive()); + assertFalse(externalBusyThread.fail); + } catch (InterruptedException e) { + // ignore + } + + System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get()); + // this should not be the full number of items since the error should stop it before it completes all 9 + System.out.println("onNext count: " + busyobserver.onNextCount.get()); + assertTrue(busyobserver.onNextCount.get() < 9); + assertTrue(busyobserver.onError); + // no onCompleted because onError was invoked + assertFalse(busyobserver.onCompleted); + // non-deterministic because unsubscribe happens after 'waitToFinish' releases + // so commenting out for now as this is not a critical thing to test here + // verify(s, times(1)).unsubscribe(); + + // we can have concurrency ... + assertTrue(onSubscribe.maxConcurrentThreads.get() > 1); + // ... but the onNext execution should be single threaded + assertEquals(1, busyobserver.maxConcurrentThreads.get()); + } + + /** + * A thread that will pass data to onNext + */ + public static class OnNextThread implements Runnable { + + private final Observer observer; + private final int numStringsToSend; + + OnNextThread(Observer observer, int numStringsToSend) { + this.observer = observer; + this.numStringsToSend = numStringsToSend; + } + + @Override + public void run() { + for (int i = 0; i < numStringsToSend; i++) { + observer.onNext("aString"); + } + } + } + + /** + * A thread that will call onError or onNext + */ + public static class CompletionThread implements Runnable { + + private final Observer observer; + private final TestConcurrencyobserverEvent event; + private final Future[] waitOnThese; + + CompletionThread(Observer observer, TestConcurrencyobserverEvent event, Future... waitOnThese) { + this.observer = observer; + this.event = event; + this.waitOnThese = waitOnThese; + } + + @Override + public void run() { + /* if we have 'waitOnThese' futures, we'll wait on them before proceeding */ + if (waitOnThese != null) { + for (Future f : waitOnThese) { + try { + f.get(); + } catch (Throwable e) { + System.err.println("Error while waiting on future in CompletionThread"); + } + } + } + + /* send the event */ + if (event == TestConcurrencyobserverEvent.onError) { + observer.onError(new RuntimeException("mocked exception")); + } else if (event == TestConcurrencyobserverEvent.onCompleted) { + observer.onCompleted(); + + } else { + throw new IllegalArgumentException("Expecting either onError or onCompleted"); + } + } + } + + private static enum TestConcurrencyobserverEvent { + onCompleted, onError, onNext + } + + private static class TestConcurrencyobserver extends Subscriber { + + /** + * used to store the order and number of events received + */ + private final LinkedBlockingQueue events = new LinkedBlockingQueue(); + private final int waitTime; + + @SuppressWarnings("unused") + public TestConcurrencyobserver(int waitTimeInNext) { + this.waitTime = waitTimeInNext; + } + + public TestConcurrencyobserver() { + this.waitTime = 0; + } + + @Override + public void onCompleted() { + events.add(TestConcurrencyobserverEvent.onCompleted); + } + + @Override + public void onError(Throwable e) { + events.add(TestConcurrencyobserverEvent.onError); + } + + @Override + public void onNext(String args) { + events.add(TestConcurrencyobserverEvent.onNext); + // do some artificial work to make the thread scheduling/timing vary + int s = 0; + for (int i = 0; i < 20; i++) { + s += s * i; + } + + if (waitTime > 0) { + try { + Thread.sleep(waitTime); + } catch (InterruptedException e) { + // ignore + } + } + } + + /** + * Assert the order of events is correct and return the number of onNext executions. + * + * @param expectedEndingEvent + * @return int count of onNext calls + * @throws IllegalStateException + * If order of events was invalid. + */ + public int assertEvents(TestConcurrencyobserverEvent expectedEndingEvent) throws IllegalStateException { + int nextCount = 0; + boolean finished = false; + for (TestConcurrencyobserverEvent e : events) { + if (e == TestConcurrencyobserverEvent.onNext) { + if (finished) { + // already finished, we shouldn't get this again + throw new IllegalStateException("Received onNext but we're already finished."); + } + nextCount++; + } else if (e == TestConcurrencyobserverEvent.onError) { + if (finished) { + // already finished, we shouldn't get this again + throw new IllegalStateException("Received onError but we're already finished."); + } + if (expectedEndingEvent != null && TestConcurrencyobserverEvent.onError != expectedEndingEvent) { + throw new IllegalStateException("Received onError ending event but expected " + expectedEndingEvent); + } + finished = true; + } else if (e == TestConcurrencyobserverEvent.onCompleted) { + if (finished) { + // already finished, we shouldn't get this again + throw new IllegalStateException("Received onCompleted but we're already finished."); + } + if (expectedEndingEvent != null && TestConcurrencyobserverEvent.onCompleted != expectedEndingEvent) { + throw new IllegalStateException("Received onCompleted ending event but expected " + expectedEndingEvent); + } + finished = true; + } + } + + return nextCount; + } + + } + + /** + * This spawns a single thread for the subscribe execution + */ + private static class TestSingleThreadedObservable implements Observable.OnSubscribeFunc { + + final Subscription s; + final String[] values; + private Thread t = null; + + public TestSingleThreadedObservable(final Subscription s, final String... values) { + this.s = s; + this.values = values; + + } + + public Subscription onSubscribe(final Observer observer) { + System.out.println("TestSingleThreadedObservable subscribed to ..."); + t = new Thread(new Runnable() { + + @Override + public void run() { + try { + System.out.println("running TestSingleThreadedObservable thread"); + for (String s : values) { + System.out.println("TestSingleThreadedObservable onNext: " + s); + observer.onNext(s); + } + observer.onCompleted(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + }); + System.out.println("starting TestSingleThreadedObservable thread"); + t.start(); + System.out.println("done starting TestSingleThreadedObservable thread"); + return s; + } + + public void waitToFinish() { + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + } + + /** + * This spawns a thread for the subscription, then a separate thread for each onNext call. + */ + private static class TestMultiThreadedObservable implements Observable.OnSubscribeFunc { + + final Subscription s; + final String[] values; + Thread t = null; + AtomicInteger threadsRunning = new AtomicInteger(); + AtomicInteger maxConcurrentThreads = new AtomicInteger(); + ExecutorService threadPool; + + public TestMultiThreadedObservable(Subscription s, String... values) { + this.s = s; + this.values = values; + this.threadPool = Executors.newCachedThreadPool(); + } + + @Override + public Subscription onSubscribe(final Observer observer) { + System.out.println("TestMultiThreadedObservable subscribed to ..."); + t = new Thread(new Runnable() { + + @Override + public void run() { + try { + System.out.println("running TestMultiThreadedObservable thread"); + for (final String s : values) { + threadPool.execute(new Runnable() { + + @Override + public void run() { + threadsRunning.incrementAndGet(); + try { + // perform onNext call + System.out.println("TestMultiThreadedObservable onNext: " + s); + if (s == null) { + // force an error + throw new NullPointerException(); + } + observer.onNext(s); + // capture 'maxThreads' + int concurrentThreads = threadsRunning.get(); + int maxThreads = maxConcurrentThreads.get(); + if (concurrentThreads > maxThreads) { + maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads); + } + } catch (Throwable e) { + observer.onError(e); + } finally { + threadsRunning.decrementAndGet(); + } + } + }); + } + // we are done spawning threads + threadPool.shutdown(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + + // wait until all threads are done, then mark it as COMPLETED + try { + // wait for all the threads to finish + threadPool.awaitTermination(2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + observer.onCompleted(); + } + }); + System.out.println("starting TestMultiThreadedObservable thread"); + t.start(); + System.out.println("done starting TestMultiThreadedObservable thread"); + return s; + } + + public void waitToFinish() { + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private static class BusyObserver extends Subscriber { + volatile boolean onCompleted = false; + volatile boolean onError = false; + AtomicInteger onNextCount = new AtomicInteger(); + AtomicInteger threadsRunning = new AtomicInteger(); + AtomicInteger maxConcurrentThreads = new AtomicInteger(); + + @Override + public void onCompleted() { + threadsRunning.incrementAndGet(); + + System.out.println(">>> Busyobserver received onCompleted"); + onCompleted = true; + + int concurrentThreads = threadsRunning.get(); + int maxThreads = maxConcurrentThreads.get(); + if (concurrentThreads > maxThreads) { + maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads); + } + threadsRunning.decrementAndGet(); + } + + @Override + public void onError(Throwable e) { + threadsRunning.incrementAndGet(); + + System.out.println(">>> Busyobserver received onError: " + e.getMessage()); + onError = true; + + int concurrentThreads = threadsRunning.get(); + int maxThreads = maxConcurrentThreads.get(); + if (concurrentThreads > maxThreads) { + maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads); + } + threadsRunning.decrementAndGet(); + } + + @Override + public void onNext(String args) { + threadsRunning.incrementAndGet(); + try { + onNextCount.incrementAndGet(); + System.out.println(">>> Busyobserver received onNext: " + args); + try { + // simulate doing something computational + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } finally { + // capture 'maxThreads' + int concurrentThreads = threadsRunning.get(); + int maxThreads = maxConcurrentThreads.get(); + if (concurrentThreads > maxThreads) { + maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads); + } + threadsRunning.decrementAndGet(); + } + } + + } + + private static class ExternalBusyThread extends Thread { + + private BusyObserver observer; + private Object lock; + private int lockTimes; + private int waitTime; + public volatile boolean fail; + + public ExternalBusyThread(BusyObserver observer, Object lock, int lockTimes, int waitTime) { + this.observer = observer; + this.lock = lock; + this.lockTimes = lockTimes; + this.waitTime = waitTime; + this.fail = false; + } + + @Override + public void run() { + Random r = new Random(); + for (int i = 0; i < lockTimes; i++) { + synchronized (lock) { + int oldOnNextCount = observer.onNextCount.get(); + boolean oldOnCompleted = observer.onCompleted; + boolean oldOnError = observer.onError; + try { + Thread.sleep(r.nextInt(waitTime)); + } catch (InterruptedException e) { + // ignore + } + // Since we own the lock, onNextCount, onCompleted and + // onError must not be changed. + int newOnNextCount = observer.onNextCount.get(); + boolean newOnCompleted = observer.onCompleted; + boolean newOnError = observer.onError; + if (oldOnNextCount != newOnNextCount) { + System.out.println(">>> ExternalBusyThread received different onNextCount: " + + oldOnNextCount + + " -> " + + newOnNextCount); + fail = true; + break; + } + if (oldOnCompleted != newOnCompleted) { + System.out.println(">>> ExternalBusyThread received different onCompleted: " + + oldOnCompleted + + " -> " + + newOnCompleted); + fail = true; + break; + } + if (oldOnError != newOnError) { + System.out.println(">>> ExternalBusyThread received different onError: " + + oldOnError + + " -> " + + newOnError); + fail = true; + break; + } + } + } + } + + } +}