Skip to content

Commit

Permalink
Merge pull request ReactiveX#290 from abersnaze/issue103
Browse files Browse the repository at this point in the history
Issue 103
  • Loading branch information
benjchristensen committed Jul 5, 2013
2 parents 29c52e7 + 33ad107 commit 7c0348c
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 0 deletions.
114 changes: 114 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -2688,6 +2689,119 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) {
});
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <code>zip</code> applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by
* all of the Observalbes; the second item emitted by the new Observable will be the result of
* the function applied to the second item emitted by each of those Observables; and so forth.
* <p>
* The resulting <code>Observable<R></code> returned from <code>zip</code> will invoke
* <code>onNext</code> as many times as the number of <code>onNext</code> invokations of the
* source Observable that emits the fewest items.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
*
* @param ws
* An Observable of source Observables
* @param reduceFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <R> Observable<R> zip(Observable<Observable<?>> ws, final FuncN<R> reduceFunction) {
return ws.toList().mapMany(new Func1<List<Observable<?>>, Observable<R>>() {
@Override
public Observable<R> call(List<Observable<?>> wsList) {
return create(OperationZip.zip(wsList, reduceFunction));
}
});
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <code>zip</code> applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by
* all of the Observalbes; the second item emitted by the new Observable will be the result of
* the function applied to the second item emitted by each of those Observables; and so forth.
* <p>
* The resulting <code>Observable<R></code> returned from <code>zip</code> will invoke
* <code>onNext</code> as many times as the number of <code>onNext</code> invocations of the
* source Observable that emits the fewest items.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
*
* @param ws
* An Observable of source Observables
* @param function
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <R> Observable<R> zip(Observable<Observable<?>> ws, final Object function) {
@SuppressWarnings({ "unchecked" })
final FuncN<R> _f = Functions.from(function);
return zip(ws, _f);
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <code>zip</code> applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by
* all of the Observalbes; the second item emitted by the new Observable will be the result of
* the function applied to the second item emitted by each of those Observables; and so forth.
* <p>
* The resulting <code>Observable<R></code> returned from <code>zip</code> will invoke
* <code>onNext</code> as many times as the number of <code>onNext</code> invokations of the
* source Observable that emits the fewest items.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
*
* @param ws
* A collection of source Observables
* @param reduceFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <R> Observable<R> zip(Collection<Observable<?>> ws, FuncN<R> reduceFunction) {
return create(OperationZip.zip(ws, reduceFunction));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <code>zip</code> applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by
* all of the Observalbes; the second item emitted by the new Observable will be the result of
* the function applied to the second item emitted by each of those Observables; and so forth.
* <p>
* The resulting <code>Observable<R></code> returned from <code>zip</code> will invoke
* <code>onNext</code> as many times as the number of <code>onNext</code> invocations of the
* source Observable that emits the fewest items.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
*
* @param ws
* A collection of source Observables
* @param function
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <R> Observable<R> zip(Collection<Observable<?>> ws, final Object function) {
@SuppressWarnings({ "unchecked" })
final FuncN<R> _f = Functions.from(function);
return zip(ws, _f);
}

/**
* Filters an Observable by discarding any items it emits that do not meet some test.
* <p>
Expand Down
28 changes: 28 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationZip.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.mockito.Mockito.*;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -77,6 +78,16 @@ public static <T0, T1, T2, T3, R> Func1<Observer<R>, Subscription> zip(Observabl
return a;
}

@SuppressWarnings("unchecked")
public static <R> Func1<Observer<R>, Subscription> zip(Collection<Observable<?>> ws, FuncN<R> zipFunction) {
Aggregator a = new Aggregator(zipFunction);
for (Observable w : ws) {
ZipObserver zipObserver = new ZipObserver(a, w);
a.addObserver(zipObserver);
}
return a;
}

/*
* ThreadSafe
*/
Expand Down Expand Up @@ -284,6 +295,23 @@ private void stop() {
}

public static class UnitTest {

@SuppressWarnings("unchecked")
@Test
public void testCollectionSizeDifferentThanFunction() {
FuncN<String> zipr = Functions.from(getConcatStringIntegerIntArrayZipr());

/* define a Observer to receive aggregated events */
Observer<String> aObserver = mock(Observer.class);

Collection ws = java.util.Collections.singleton(Observable.from("one", "two"));
Observable<String> w = Observable.create(zip(ws, zipr));
w.subscribe(aObserver);

verify(aObserver, times(1)).onError(any(Exception.class));
verify(aObserver, never()).onCompleted();
verify(aObserver, never()).onNext(any(String.class));
}

@SuppressWarnings("unchecked")
/* mock calls don't do generics */
Expand Down

0 comments on commit 7c0348c

Please sign in to comment.