diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 2ac2fb7b5d..eee85509cf 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2626,6 +2626,40 @@ public static Observable zip(Observable o1, Observa return create(OperationZip.zip(o1, o2, zipFunction)); } + /** + * Return an Observable that pairs up values from this Observable and the other + * Observable and applies a function. + * @param the other value type + * @param the result type + * @param other the other Observable sequence + * @param zipFunction the function that combines the pairs of items from both + * observables and returns a new value + * @return an Observable that pairs up values from this Observable and the other + * Observable and applies a function. + */ + public Observable zip(Observable other, Func2 zipFunction) { + return zip(this, other, zipFunction); + } + + /** + * Return an Observable that pairs up values from this Observable and an + * Iterable sequence and applies a function. + *

+ * Note that the other Iterable is evaluated as items appear from this + * Observable and is not pre-consumed, allowing zipping infinite streams + * on either side. + * @param the other value type + * @param the result type + * @param other the other Iterable sequence + * @param zipFunction the function that combines the pairs of items of + * this Observable and the Iterable + * @return an Observable that pairs up values from this Observable and an + * Iterable sequence and applies a function. + */ + public Observable zip(Iterable other, Func2 zipFunction) { + return create(OperationZip.zipIterable(this, other, zipFunction)); + } + /** * Returns an Observable that emits the results of a function of your * choosing applied to combinations of three items emitted, in sequence, by diff --git a/rxjava-core/src/main/java/rx/operators/OperationZip.java b/rxjava-core/src/main/java/rx/operators/OperationZip.java index 6ecbb87e49..093eeb74d0 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationZip.java +++ b/rxjava-core/src/main/java/rx/operators/OperationZip.java @@ -17,12 +17,10 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -32,6 +30,7 @@ import rx.Subscription; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.SerialSubscription; +import rx.subscriptions.Subscriptions; import rx.util.functions.Func2; import rx.util.functions.Func3; import rx.util.functions.Func4; @@ -108,211 +107,6 @@ public static OnSubscribeFunc zip(Iterable> ws, f return a; } - /* - * ThreadSafe - */ - /* package accessible for unit tests */static class ZipObserver implements Observer { - final Observable w; - final Aggregator a; - private final SafeObservableSubscription subscription = new SafeObservableSubscription(); - private final AtomicBoolean subscribed = new AtomicBoolean(false); - - public ZipObserver(Aggregator a, Observable w) { - this.a = a; - this.w = w; - } - - public void startWatching() { - if (subscribed.compareAndSet(false, true)) { - // only subscribe once even if called more than once - subscription.wrap(w.subscribe(this)); - } - } - - @Override - public void onCompleted() { - a.complete(this); - } - - @Override - public void onError(Throwable e) { - a.error(this, e); - } - - @Override - public void onNext(T args) { - try { - a.next(this, args); - } catch (Throwable e) { - onError(e); - } - } - } - - /** - * Receive notifications from each of the Observables we are reducing and execute the zipFunction whenever we have received events from all Observables. - * - * This class is thread-safe. - * - * @param - */ - /* package accessible for unit tests */static class Aggregator implements OnSubscribeFunc { - - private volatile SynchronizedObserver observer; - private final FuncN zipFunction; - private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean running = new AtomicBoolean(true); - private final ConcurrentHashMap, Boolean> completed = new ConcurrentHashMap, Boolean>(); - - /* we use ConcurrentHashMap despite synchronization of methods because stop() does NOT use synchronization and this map is used by it and can be called by other threads */ - private ConcurrentHashMap, ConcurrentLinkedQueue> receivedValuesPerObserver = new ConcurrentHashMap, ConcurrentLinkedQueue>(); - /* we use a ConcurrentLinkedQueue to retain ordering (I'd like to just use a ConcurrentLinkedHashMap for 'receivedValuesPerObserver' but that doesn't exist in standard java */ - private ConcurrentLinkedQueue> observers = new ConcurrentLinkedQueue>(); - - public Aggregator(FuncN zipFunction) { - this.zipFunction = zipFunction; - } - - /** - * Receive notification of a Observer starting (meaning we should require it for aggregation) - * - * Thread Safety => Invoke ONLY from the static factory methods at top of this class which are always an atomic execution by a single thread. - * - * @param w - */ - void addObserver(ZipObserver w) { - // initialize this ZipObserver - observers.add(w); - receivedValuesPerObserver.put(w, new ConcurrentLinkedQueue()); - } - - /** - * Receive notification of a Observer completing its iterations. - * - * @param w - */ - void complete(ZipObserver w) { - // store that this ZipObserver is completed - completed.put(w, Boolean.TRUE); - // if all ZipObservers are completed, we mark the whole thing as completed - if (completed.size() == observers.size()) { - if (running.compareAndSet(true, false)) { - // this thread succeeded in setting running=false so let's propagate the completion - // mark ourselves as done - observer.onCompleted(); - } - } - } - - /** - * Receive error for a Observer. Throw the error up the chain and stop processing. - * - * @param w - */ - void error(ZipObserver w, Throwable e) { - if (running.compareAndSet(true, false)) { - // this thread succeeded in setting running=false so let's propagate the error - observer.onError(e); - /* since we receive an error we want to tell everyone to stop */ - stop(); - } - } - - /** - * Receive the next value from a Observer. - *

- * If we have received values from all Observers, trigger the zip function, otherwise store the value and keep waiting. - * - * @param w - * @param arg - */ - void next(ZipObserver w, Object arg) { - if (observer == null) { - throw new RuntimeException("This shouldn't be running if a Observer isn't registered"); - } - - /* if we've been 'unsubscribed' don't process anything further even if the things we're watching keep sending (likely because they are not responding to the unsubscribe call) */ - if (!running.get()) { - return; - } - - // store the value we received and below we'll decide if we are to send it to the Observer - receivedValuesPerObserver.get(w).add(arg); - - // define here so the variable is out of the synchronized scope - Object[] argsToZip = new Object[observers.size()]; - - /* we have to synchronize here despite using concurrent data structures because the compound logic here must all be done atomically */ - synchronized (this) { - // if all ZipObservers in 'receivedValues' map have a value, invoke the zipFunction - for (ZipObserver rw : receivedValuesPerObserver.keySet()) { - if (receivedValuesPerObserver.get(rw).peek() == null) { - // we have a null meaning the queues aren't all populated so won't do anything - return; - } - } - // if we get to here this means all the queues have data - int i = 0; - for (ZipObserver rw : observers) { - argsToZip[i++] = receivedValuesPerObserver.get(rw).remove(); - } - } - // if we did not return above from the synchronized block we can now invoke the zipFunction with all of the args - // we do this outside the synchronized block as it is now safe to call this concurrently and don't need to block other threads from calling - // this 'next' method while another thread finishes calling this zipFunction - observer.onNext(zipFunction.call(argsToZip)); - } - - @Override - public Subscription onSubscribe(Observer observer) { - if (started.compareAndSet(false, true)) { - SafeObservableSubscription subscription = new SafeObservableSubscription(); - this.observer = new SynchronizedObserver(observer, subscription); - /* start the Observers */ - for (ZipObserver rw : observers) { - rw.startWatching(); - } - - return subscription.wrap(new Subscription() { - - @Override - public void unsubscribe() { - stop(); - } - - }); - } else { - /* a Observer already has subscribed so blow up */ - throw new IllegalStateException("Only one Observer can subscribe to this Observable."); - } - } - - /* - * Do NOT synchronize this because it gets called via unsubscribe which can occur on other threads - * and result in deadlocks. (http://jira/browse/API-4060) - * - * AtomicObservableSubscription uses compareAndSet instead of locking to avoid deadlocks but ensure single-execution. - * - * We do the same in the implementation of this method. - * - * ThreadSafety of this method is provided by: - * - AtomicBoolean[running].compareAndSet - * - ConcurrentLinkedQueue[Observers] - * - ZipObserver.subscription being an AtomicObservableSubscription - */ - private void stop() { - /* tell ourselves to stop processing onNext events by setting running=false */ - if (running.compareAndSet(true, false)) { - /* propogate to all Observers to unsubscribe if this thread succeeded in setting running=false */ - for (ZipObserver o : observers) { - if (o.subscription != null) { - o.subscription.unsubscribe(); - } - } - } - } - - } /** * Merges the values across multiple sources and applies the selector * function. @@ -324,7 +118,7 @@ private void stop() { * @param the common element type * @param the result element type */ - public static class ManyObservables implements OnSubscribeFunc { + private static final class ManyObservables implements OnSubscribeFunc { /** */ protected final Iterable> sources; /** */ @@ -384,7 +178,7 @@ public void onNext(List value) { * @author akarnokd, 2013.01.14. * @param the element type */ - public static class ItemObserver implements Observer, Subscription { + private static final class ItemObserver implements Observer, Subscription { /** Reader-writer lock. */ protected final ReadWriteLock rwLock; /** The queue. */ @@ -528,4 +322,109 @@ public void unsubscribe() { } } + + /** + * Zips an Observable and an iterable sequence and applies + * a function to the pair of values. + */ + public static OnSubscribeFunc zipIterable(Observable source, Iterable other, Func2 zipFunction) { + return new ZipIterable(source, other, zipFunction); + } + + /** + * Zips an Observable and an iterable sequence and applies + * a function to the pair of values. + */ + private static final class ZipIterable implements OnSubscribeFunc { + final Observable source; + final Iterable other; + final Func2 zipFunction; + + public ZipIterable(Observable source, Iterable other, Func2 zipFunction) { + this.source = source; + this.other = other; + this.zipFunction = zipFunction; + } + + @Override + public Subscription onSubscribe(Observer t1) { + + Iterator it; + boolean first; + try { + it = other.iterator(); + first = it.hasNext(); + } catch (Throwable t) { + t1.onError(t); + return Subscriptions.empty(); + } + + + if (!first) { + t1.onCompleted(); + return Subscriptions.empty(); + } + + SerialSubscription ssub = new SerialSubscription(); + + ssub.set(source.subscribe(new SourceObserver(t1, it, zipFunction, ssub))); + + return ssub; + } + /** Observe the source. */ + private static final class SourceObserver implements Observer { + final Observer observer; + final Iterator other; + final Func2 zipFunction; + final Subscription cancel; + + public SourceObserver(Observer observer, Iterator other, + Func2 zipFunction, Subscription cancel) { + this.observer = observer; + this.other = other; + this.zipFunction = zipFunction; + this.cancel = cancel; + } + + @Override + public void onNext(T args) { + U u = other.next(); + + R r; + try { + r = zipFunction.call(args, u); + } catch (Throwable t) { + onError(t); + return; + } + + observer.onNext(r); + + boolean has; + try { + has = other.hasNext(); + } catch (Throwable t) { + onError(t); + return; + } + + if (!has) { + onCompleted(); + } + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + cancel.unsubscribe(); + } + + @Override + public void onCompleted() { + observer.onCompleted(); + cancel.unsubscribe(); + } + + } + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationZipTest.java b/rxjava-core/src/test/java/rx/operators/OperationZipTest.java index 3f4ac10a55..6bd8b36b62 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationZipTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationZipTest.java @@ -26,6 +26,8 @@ import java.util.Arrays; import java.util.Collection; +import java.util.Iterator; +import java.util.List; import org.junit.Before; import org.junit.Test; @@ -34,8 +36,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.operators.OperationZip.Aggregator; -import rx.operators.OperationZip.ZipObserver; +import rx.operators.OperationReduceTest.CustomException; import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; import rx.util.functions.Func2; @@ -118,10 +119,10 @@ public void testZippingDifferentLengthObservableSequences1() { w3.observer.onCompleted(); /* we should have been called 1 time on the Observer */ - InOrder inOrder = inOrder(w); - inOrder.verify(w).onNext("1a2a3a"); + InOrder io = inOrder(w); + io.verify(w).onNext("1a2a3a"); - inOrder.verify(w, times(1)).onCompleted(); + io.verify(w, times(1)).onCompleted(); } @Test @@ -152,13 +153,30 @@ public void testZippingDifferentLengthObservableSequences2() { w3.observer.onCompleted(); /* we should have been called 1 time on the Observer */ - InOrder inOrder = inOrder(w); - inOrder.verify(w).onNext("1a2a3a"); + InOrder io = inOrder(w); + io.verify(w).onNext("1a2a3a"); - inOrder.verify(w, times(1)).onCompleted(); + io.verify(w, times(1)).onCompleted(); } + + Func2 zipr2 = new Func2() { + + @Override + public String call(Object t1, Object t2) { + return "" + t1 + t2; + } + + }; + Func3 zipr3 = new Func3() { + + @Override + public String call(Object t1, Object t2, Object t3) { + return "" + t1 + t2 + t3; + } + + }; /** * Testing internal private logic due to the complexity so I want to use TDD to test as a I build it rather than relying purely on the overall functionality expected by the public methods. */ @@ -166,25 +184,16 @@ public void testZippingDifferentLengthObservableSequences2() { /* mock calls don't do generics */ @Test public void testAggregatorSimple() { - FuncN zipr = getConcatZipr(); - /* create the aggregator which will execute the zip function when all Observables provide values */ - Aggregator a = new Aggregator(zipr); - + PublishSubject r1 = PublishSubject.create(); + PublishSubject r2 = PublishSubject.create(); /* define a Observer to receive aggregated events */ Observer aObserver = mock(Observer.class); - a.onSubscribe(aObserver); - - /* mock the Observable Observers that are 'pushing' data for us */ - ZipObserver r1 = mock(ZipObserver.class); - ZipObserver r2 = mock(ZipObserver.class); - - /* pretend we're starting up */ - a.addObserver(r1); - a.addObserver(r2); + + Observable.zip(r1, r2, zipr2).subscribe(aObserver); /* simulate the Observables pushing data into the aggregator */ - a.next(r1, "hello"); - a.next(r2, "world"); + r1.onNext("hello"); + r2.onNext("world"); InOrder inOrder = inOrder(aObserver); @@ -192,15 +201,15 @@ public void testAggregatorSimple() { verify(aObserver, never()).onCompleted(); inOrder.verify(aObserver, times(1)).onNext("helloworld"); - a.next(r1, "hello "); - a.next(r2, "again"); + r1.onNext("hello "); + r2.onNext("again"); verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); inOrder.verify(aObserver, times(1)).onNext("hello again"); - a.complete(r1); - a.complete(r2); + r1.onCompleted(); + r2.onCompleted(); inOrder.verify(aObserver, never()).onNext(anyString()); verify(aObserver, times(1)).onCompleted(); @@ -210,38 +219,31 @@ public void testAggregatorSimple() { /* mock calls don't do generics */ @Test public void testAggregatorDifferentSizedResultsWithOnComplete() { - FuncN zipr = getConcatZipr(); /* create the aggregator which will execute the zip function when all Observables provide values */ - Aggregator a = new Aggregator(zipr); - + /* define a Observer to receive aggregated events */ + PublishSubject r1 = PublishSubject.create(); + PublishSubject r2 = PublishSubject.create(); /* define a Observer to receive aggregated events */ Observer aObserver = mock(Observer.class); - a.onSubscribe(aObserver); - - /* mock the Observable Observers that are 'pushing' data for us */ - ZipObserver r1 = mock(ZipObserver.class); - ZipObserver r2 = mock(ZipObserver.class); - - /* pretend we're starting up */ - a.addObserver(r1); - a.addObserver(r2); + + Observable.zip(r1, r2, zipr2).subscribe(aObserver); /* simulate the Observables pushing data into the aggregator */ - a.next(r1, "hello"); - a.next(r2, "world"); - a.complete(r2); + r1.onNext("hello"); + r2.onNext("world"); + r2.onCompleted(); InOrder inOrder = inOrder(aObserver); inOrder.verify(aObserver, never()).onError(any(Throwable.class)); - inOrder.verify(aObserver, never()).onCompleted(); inOrder.verify(aObserver, times(1)).onNext("helloworld"); + inOrder.verify(aObserver, times(1)).onCompleted(); - a.next(r1, "hi"); - a.complete(r1); + r1.onNext("hi"); + r1.onCompleted(); inOrder.verify(aObserver, never()).onError(any(Throwable.class)); - inOrder.verify(aObserver, times(1)).onCompleted(); + inOrder.verify(aObserver, never()).onCompleted(); inOrder.verify(aObserver, never()).onNext(anyString()); } @@ -249,38 +251,29 @@ public void testAggregatorDifferentSizedResultsWithOnComplete() { /* mock calls don't do generics */ @Test public void testAggregateMultipleTypes() { - FuncN zipr = getConcatZipr(); - /* create the aggregator which will execute the zip function when all Observables provide values */ - Aggregator a = new Aggregator(zipr); - + PublishSubject r1 = PublishSubject.create(); + PublishSubject r2 = PublishSubject.create(); /* define a Observer to receive aggregated events */ Observer aObserver = mock(Observer.class); - a.onSubscribe(aObserver); - - /* mock the Observable Observers that are 'pushing' data for us */ - ZipObserver r1 = mock(ZipObserver.class); - ZipObserver r2 = mock(ZipObserver.class); - - /* pretend we're starting up */ - a.addObserver(r1); - a.addObserver(r2); + + Observable.zip(r1, r2, zipr2).subscribe(aObserver); /* simulate the Observables pushing data into the aggregator */ - a.next(r1, "hello"); - a.next(r2, "world"); - a.complete(r2); + r1.onNext("hello"); + r2.onNext(1); + r2.onCompleted(); InOrder inOrder = inOrder(aObserver); inOrder.verify(aObserver, never()).onError(any(Throwable.class)); - inOrder.verify(aObserver, never()).onCompleted(); - inOrder.verify(aObserver, times(1)).onNext("helloworld"); + inOrder.verify(aObserver, times(1)).onNext("hello1"); + inOrder.verify(aObserver, times(1)).onCompleted(); - a.next(r1, "hi"); - a.complete(r1); + r1.onNext("hi"); + r1.onCompleted(); inOrder.verify(aObserver, never()).onError(any(Throwable.class)); - inOrder.verify(aObserver, times(1)).onCompleted(); + inOrder.verify(aObserver, never()).onCompleted(); inOrder.verify(aObserver, never()).onNext(anyString()); } @@ -288,28 +281,18 @@ public void testAggregateMultipleTypes() { /* mock calls don't do generics */ @Test public void testAggregate3Types() { - FuncN zipr = getConcatZipr(); - /* create the aggregator which will execute the zip function when all Observables provide values */ - Aggregator a = new Aggregator(zipr); - + PublishSubject r1 = PublishSubject.create(); + PublishSubject r2 = PublishSubject.create(); + PublishSubject> r3 = PublishSubject.create(); /* define a Observer to receive aggregated events */ Observer aObserver = mock(Observer.class); - a.onSubscribe(aObserver); - - /* mock the Observable Observers that are 'pushing' data for us */ - ZipObserver r1 = mock(ZipObserver.class); - ZipObserver r2 = mock(ZipObserver.class); - ZipObserver r3 = mock(ZipObserver.class); - - /* pretend we're starting up */ - a.addObserver(r1); - a.addObserver(r2); - a.addObserver(r3); + + Observable.zip(r1, r2, r3, zipr3).subscribe(aObserver); /* simulate the Observables pushing data into the aggregator */ - a.next(r1, "hello"); - a.next(r2, 2); - a.next(r3, new int[] { 5, 6, 7 }); + r1.onNext("hello"); + r2.onNext(2); + r3.onNext(Arrays.asList( 5, 6, 7 )); verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); @@ -320,43 +303,34 @@ public void testAggregate3Types() { /* mock calls don't do generics */ @Test public void testAggregatorsWithDifferentSizesAndTiming() { - FuncN zipr = getConcatZipr(); - /* create the aggregator which will execute the zip function when all Observables provide values */ - Aggregator a = new Aggregator(zipr); - + PublishSubject r1 = PublishSubject.create(); + PublishSubject r2 = PublishSubject.create(); /* define a Observer to receive aggregated events */ Observer aObserver = mock(Observer.class); - a.onSubscribe(aObserver); - - /* mock the Observable Observers that are 'pushing' data for us */ - ZipObserver r1 = mock(ZipObserver.class); - ZipObserver r2 = mock(ZipObserver.class); - - /* pretend we're starting up */ - a.addObserver(r1); - a.addObserver(r2); + + Observable.zip(r1, r2, zipr2).subscribe(aObserver); /* simulate the Observables pushing data into the aggregator */ - a.next(r1, "one"); - a.next(r1, "two"); - a.next(r1, "three"); - a.next(r2, "A"); + r1.onNext("one"); + r1.onNext("two"); + r1.onNext("three"); + r2.onNext("A"); verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); verify(aObserver, times(1)).onNext("oneA"); - a.next(r1, "four"); - a.complete(r1); - a.next(r2, "B"); + r1.onNext("four"); + r1.onCompleted(); + r2.onNext("B"); verify(aObserver, times(1)).onNext("twoB"); - a.next(r2, "C"); + r2.onNext("C"); verify(aObserver, times(1)).onNext("threeC"); - a.next(r2, "D"); + r2.onNext("D"); verify(aObserver, times(1)).onNext("fourD"); - a.next(r2, "E"); + r2.onNext("E"); verify(aObserver, never()).onNext("E"); - a.complete(r2); + r2.onCompleted(); verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); @@ -366,33 +340,24 @@ public void testAggregatorsWithDifferentSizesAndTiming() { /* mock calls don't do generics */ @Test public void testAggregatorError() { - FuncN zipr = getConcatZipr(); - /* create the aggregator which will execute the zip function when all Observables provide values */ - Aggregator a = new Aggregator(zipr); - + PublishSubject r1 = PublishSubject.create(); + PublishSubject r2 = PublishSubject.create(); /* define a Observer to receive aggregated events */ Observer aObserver = mock(Observer.class); - a.onSubscribe(aObserver); - - /* mock the Observable Observers that are 'pushing' data for us */ - ZipObserver r1 = mock(ZipObserver.class); - ZipObserver r2 = mock(ZipObserver.class); - - /* pretend we're starting up */ - a.addObserver(r1); - a.addObserver(r2); + + Observable.zip(r1, r2, zipr2).subscribe(aObserver); /* simulate the Observables pushing data into the aggregator */ - a.next(r1, "hello"); - a.next(r2, "world"); + r1.onNext("hello"); + r2.onNext("world"); verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); verify(aObserver, times(1)).onNext("helloworld"); - a.error(r1, new RuntimeException("")); - a.next(r1, "hello"); - a.next(r2, "again"); + r1.onError(new RuntimeException("")); + r1.onNext("hello"); + r2.onNext("again"); verify(aObserver, times(1)).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); @@ -404,33 +369,24 @@ public void testAggregatorError() { /* mock calls don't do generics */ @Test public void testAggregatorUnsubscribe() { - FuncN zipr = getConcatZipr(); - /* create the aggregator which will execute the zip function when all Observables provide values */ - Aggregator a = new Aggregator(zipr); - + PublishSubject r1 = PublishSubject.create(); + PublishSubject r2 = PublishSubject.create(); /* define a Observer to receive aggregated events */ Observer aObserver = mock(Observer.class); - Subscription subscription = a.onSubscribe(aObserver); - - /* mock the Observable Observers that are 'pushing' data for us */ - ZipObserver r1 = mock(ZipObserver.class); - ZipObserver r2 = mock(ZipObserver.class); - - /* pretend we're starting up */ - a.addObserver(r1); - a.addObserver(r2); + + Subscription subscription = Observable.zip(r1, r2, zipr2).subscribe(aObserver); /* simulate the Observables pushing data into the aggregator */ - a.next(r1, "hello"); - a.next(r2, "world"); + r1.onNext("hello"); + r2.onNext("world"); verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); verify(aObserver, times(1)).onNext("helloworld"); subscription.unsubscribe(); - a.next(r1, "hello"); - a.next(r2, "again"); + r1.onNext("hello"); + r2.onNext("again"); verify(aObserver, times(0)).onError(any(Throwable.class)); verify(aObserver, never()).onCompleted(); @@ -442,27 +398,18 @@ public void testAggregatorUnsubscribe() { /* mock calls don't do generics */ @Test public void testAggregatorEarlyCompletion() { - FuncN zipr = getConcatZipr(); - /* create the aggregator which will execute the zip function when all Observables provide values */ - Aggregator a = new Aggregator(zipr); - + PublishSubject r1 = PublishSubject.create(); + PublishSubject r2 = PublishSubject.create(); /* define a Observer to receive aggregated events */ Observer aObserver = mock(Observer.class); - a.onSubscribe(aObserver); - - /* mock the Observable Observers that are 'pushing' data for us */ - ZipObserver r1 = mock(ZipObserver.class); - ZipObserver r2 = mock(ZipObserver.class); - - /* pretend we're starting up */ - a.addObserver(r1); - a.addObserver(r2); + + Observable.zip(r1, r2, zipr2).subscribe(aObserver); /* simulate the Observables pushing data into the aggregator */ - a.next(r1, "one"); - a.next(r1, "two"); - a.complete(r1); - a.next(r2, "A"); + r1.onNext("one"); + r1.onNext("two"); + r1.onCompleted(); + r2.onNext("A"); InOrder inOrder = inOrder(aObserver); @@ -470,7 +417,7 @@ public void testAggregatorEarlyCompletion() { inOrder.verify(aObserver, never()).onCompleted(); inOrder.verify(aObserver, times(1)).onNext("oneA"); - a.complete(r2); + r2.onCompleted(); inOrder.verify(aObserver, never()).onError(any(Throwable.class)); inOrder.verify(aObserver, times(1)).onCompleted(); @@ -533,21 +480,21 @@ public void testOnFirstCompletion() { PublishSubject oB = PublishSubject.create(); @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); + Observer obs = mock(Observer.class); Observable o = Observable.create(zip(oA, oB, getConcat2Strings())); - o.subscribe(observer); + o.subscribe(obs); - InOrder inOrder = inOrder(observer); + InOrder io = inOrder(obs); oA.onNext("a1"); - inOrder.verify(observer, never()).onNext(anyString()); + io.verify(obs, never()).onNext(anyString()); oB.onNext("b1"); - inOrder.verify(observer, times(1)).onNext("a1-b1"); + io.verify(obs, times(1)).onNext("a1-b1"); oB.onNext("b2"); - inOrder.verify(observer, never()).onNext(anyString()); + io.verify(obs, never()).onNext(anyString()); oA.onNext("a2"); - inOrder.verify(observer, times(1)).onNext("a2-b2"); + io.verify(obs, times(1)).onNext("a2-b2"); oA.onNext("a3"); oA.onNext("a4"); @@ -561,12 +508,12 @@ public void testOnFirstCompletion() { oB.onNext("b4"); oB.onNext("b5"); - inOrder.verify(observer, times(1)).onNext("a3-b3"); - inOrder.verify(observer, times(1)).onNext("a4-b4"); - inOrder.verify(observer, times(1)).onNext("a5-b5"); + io.verify(obs, times(1)).onNext("a3-b3"); + io.verify(obs, times(1)).onNext("a4-b4"); + io.verify(obs, times(1)).onNext("a5-b5"); // WE RECEIVE THE ONCOMPLETE HERE - inOrder.verify(observer, times(1)).onCompleted(); + io.verify(obs, times(1)).onCompleted(); oB.onNext("b6"); oB.onNext("b7"); @@ -575,7 +522,7 @@ public void testOnFirstCompletion() { // never completes (infinite stream for example) // we should receive nothing else despite oB continuing after oA completed - inOrder.verifyNoMoreInteractions(); + io.verifyNoMoreInteractions(); } @Test @@ -584,21 +531,21 @@ public void testOnErrorTermination() { PublishSubject oB = PublishSubject.create(); @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); + Observer obs = mock(Observer.class); Observable o = Observable.create(zip(oA, oB, getConcat2Strings())); - o.subscribe(observer); + o.subscribe(obs); - InOrder inOrder = inOrder(observer); + InOrder io = inOrder(obs); oA.onNext("a1"); - inOrder.verify(observer, never()).onNext(anyString()); + io.verify(obs, never()).onNext(anyString()); oB.onNext("b1"); - inOrder.verify(observer, times(1)).onNext("a1-b1"); + io.verify(obs, times(1)).onNext("a1-b1"); oB.onNext("b2"); - inOrder.verify(observer, never()).onNext(anyString()); + io.verify(obs, never()).onNext(anyString()); oA.onNext("a2"); - inOrder.verify(observer, times(1)).onNext("a2-b2"); + io.verify(obs, times(1)).onNext("a2-b2"); oA.onNext("a3"); oA.onNext("a4"); @@ -606,7 +553,7 @@ public void testOnErrorTermination() { oA.onError(new RuntimeException("forced failure")); // it should emit failure immediately - inOrder.verify(observer, times(1)).onError(any(RuntimeException.class)); + io.verify(obs, times(1)).onError(any(RuntimeException.class)); oB.onNext("b3"); oB.onNext("b4"); @@ -618,7 +565,7 @@ public void testOnErrorTermination() { // never completes (infinite stream for example) // we should receive nothing else despite oB continuing after oA completed - inOrder.verifyNoMoreInteractions(); + io.verifyNoMoreInteractions(); } private Func2 getConcat2Strings() { @@ -815,4 +762,255 @@ public void testSecondFails() { inOrder.verify(observer, never()).onNext(any(String.class)); inOrder.verifyNoMoreInteractions(); } + + @Test + public void testZipIterableSameSize() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = Arrays.asList("1", "2", "3"); + + r1.zip(r2, zipr2).subscribe(o); + + r1.onNext("one-"); + r1.onNext("two-"); + r1.onNext("three-"); + r1.onCompleted(); + + io.verify(o).onNext("one-1"); + io.verify(o).onNext("two-2"); + io.verify(o).onNext("three-3"); + io.verify(o).onCompleted(); + + verify(o, never()).onError(any(Throwable.class)); + + } + @Test + public void testZipIterableEmptyFirstSize() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = Arrays.asList("1", "2", "3"); + + r1.zip(r2, zipr2).subscribe(o); + + r1.onCompleted(); + + io.verify(o).onCompleted(); + + verify(o, never()).onNext(any(String.class)); + verify(o, never()).onError(any(Throwable.class)); + + } + @Test + public void testZipIterableEmptySecond() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = Arrays.asList(); + + r1.zip(r2, zipr2).subscribe(o); + + r1.onNext("one-"); + r1.onNext("two-"); + r1.onNext("three-"); + r1.onCompleted(); + + io.verify(o).onCompleted(); + + verify(o, never()).onNext(any(String.class)); + verify(o, never()).onError(any(Throwable.class)); + } + @Test + public void testZipIterableFirstShorter() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = Arrays.asList("1", "2", "3"); + + r1.zip(r2, zipr2).subscribe(o); + + r1.onNext("one-"); + r1.onNext("two-"); + r1.onCompleted(); + + io.verify(o).onNext("one-1"); + io.verify(o).onNext("two-2"); + io.verify(o).onCompleted(); + + verify(o, never()).onError(any(Throwable.class)); + + } + + @Test + public void testZipIterableSecondShorter() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = Arrays.asList("1", "2"); + + r1.zip(r2, zipr2).subscribe(o); + + r1.onNext("one-"); + r1.onNext("two-"); + r1.onNext("three-"); + r1.onCompleted(); + + io.verify(o).onNext("one-1"); + io.verify(o).onNext("two-2"); + io.verify(o).onCompleted(); + + verify(o, never()).onError(any(Throwable.class)); + + } + @Test + public void testZipIterableFirstThrows() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = Arrays.asList("1", "2", "3"); + + r1.zip(r2, zipr2).subscribe(o); + + r1.onNext("one-"); + r1.onNext("two-"); + r1.onError(new OperationReduceTest.CustomException()); + + io.verify(o).onNext("one-1"); + io.verify(o).onNext("two-2"); + io.verify(o).onError(any(OperationReduceTest.CustomException.class)); + + verify(o, never()).onCompleted(); + + } + + @Test + public void testZipIterableIteratorThrows() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = new Iterable() { + @Override + public Iterator iterator() { + throw new OperationReduceTest.CustomException(); + } + }; + + r1.zip(r2, zipr2).subscribe(o); + + r1.onNext("one-"); + r1.onNext("two-"); + r1.onError(new OperationReduceTest.CustomException()); + + io.verify(o).onError(any(OperationReduceTest.CustomException.class)); + + verify(o, never()).onCompleted(); + verify(o, never()).onNext(any(String.class)); + + } + @Test + public void testZipIterableHasNextThrows() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + int count; + @Override + public boolean hasNext() { + if (count == 0) { + return true; + } + throw new CustomException(); + } + + @Override + public String next() { + count++; + return "1"; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Not supported yet."); + } + + }; + } + + }; + + r1.zip(r2, zipr2).subscribe(o); + + r1.onNext("one-"); + r1.onError(new OperationReduceTest.CustomException()); + + io.verify(o).onNext("one-1"); + io.verify(o).onError(any(OperationReduceTest.CustomException.class)); + + verify(o, never()).onCompleted(); + + } + @Test + public void testZipIterableNextThrows() { + PublishSubject r1 = PublishSubject.create(); + /* define a Observer to receive aggregated events */ + Observer o = mock(Observer.class); + InOrder io = inOrder(o); + + Iterable r2 = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + int count; + @Override + public boolean hasNext() { + return true; + } + + @Override + public String next() { + throw new CustomException(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Not supported yet."); + } + + }; + } + + }; + + r1.zip(r2, zipr2).subscribe(o); + + r1.onError(new OperationReduceTest.CustomException()); + + io.verify(o).onError(any(OperationReduceTest.CustomException.class)); + + verify(o, never()).onNext(any(String.class)); + verify(o, never()).onCompleted(); + + } }