Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocking zip operator #871

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 129 additions & 17 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3041,13 +3041,77 @@ public final static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) {
return zip(ws, zipFunction, -1);
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations items emitted, in sequence, by an Iterable of other Observables,
* blocking individual fast Observables by a bounded queue.
* <p>
* Note that the source observables should emit their values from a thread which can be freely
* blocked to avoid deadlock situations. Generally such safe source threads are the {@link Schedulers#io()} and
* {@link Schedulers#newThread()} schedulers, producers running in a regular java Thread outside RxJava (including
* standard thread pools).<br>
* Multiple synchronous sources can't be zipped with this operator as it will result in deadlock.
* However, some synchronous sources can be made asynchronous by using {@link #subscribeOn(rx.Scheduler)}
* with {@link Schedulers#io()} or {@link Schedulers#newThread()} scheduler.
*
* @param <R> the result type
* @param ws an Iterable of source Observables
* @param zipFunction a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @param bufferSize the number of elements to buffer without blocking per source Observables, less
* than 0 indicates an unbounded/non-blocking buffering
* @return an Observable that emits the zipped results
*/
public final static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction, int bufferSize) {
List<Observable<?>> os = new ArrayList<Observable<?>>();
for (Observable<?> o : ws) {
os.add(o);
}
return Observable.just(os.toArray(new Observable<?>[os.size()])).lift(new OperatorZip<R>(zipFunction));
if (os.isEmpty()) {
return empty();
}
return Observable.just(os.toArray(new Observable<?>[os.size()])).lift(new OperatorZip<R>(zipFunction, bufferSize));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations items emitted, in sequence, by an Observable of other Observables,
* blocking individual fast Observables by a bounded queue.
* <p>
* Note that the source observables should emit their values from a thread which can be freely
* blocked to avoid deadlock situations. Generally such safe source threads are the {@link Schedulers#io()} and
* {@link Schedulers#newThread()} schedulers, producers running in a regular java Thread outside RxJava (including
* standard thread pools).<br>
* Multiple synchronous sources can't be zipped with this operator as it will result in deadlock.
* However, some synchronous sources can be made asynchronous by using {@link #subscribeOn(rx.Scheduler)}
* with {@link Schedulers#io()} or {@link Schedulers#newThread()} scheduler.
*
* @param <R> the result type
* @param ws an Observable of source Observables
* @param zipFunction a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @param bufferSize the number of elements to buffer without blocking per source Observables, less
* than 0 indicates an unbounded/non-blocking buffering
* @return an Observable that emits the zipped results
*/
public final static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction, int bufferSize) {
return ws.toList().map(new Func1<List<? extends Observable<?>>, Observable<?>[]>() {

@Override
public Observable<?>[] call(List<? extends Observable<?>> o) {
if (o.isEmpty()) {
return new Observable<?>[] { empty() };
}
return o.toArray(new Observable<?>[o.size()]);
}

}).lift(new OperatorZip<R>(zipFunction, bufferSize));
}


/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of <i>n</i> items emitted, in sequence, by the <i>n</i> Observables emitted by
Expand All @@ -3074,14 +3138,7 @@ public final static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws,
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, final FuncN<? extends R> zipFunction) {
return ws.toList().map(new Func1<List<? extends Observable<?>>, Observable<?>[]>() {

@Override
public Observable<?>[] call(List<? extends Observable<?>> o) {
return o.toArray(new Observable<?>[o.size()]);
}

}).lift(new OperatorZip<R>(zipFunction));
return zip(ws, zipFunction, -1);
}

/**
Expand All @@ -3108,9 +3165,40 @@ public Observable<?>[] call(List<? extends Observable<?>> o) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction) {
return just(new Observable<?>[] { o1, o2 }).lift(new OperatorZip<R>(zipFunction));
return zip(o1, o2, zipFunction, -1);
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of two items emitted, in sequence, by two other Observables, using a
* blocking the faster Observable by a bounded queue.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the new
* Observable will be the result of the function applied to the first item emitted by {@code o1} and the first item emitted by {@code o2}; the second item emitted by the new Observable will
* be the result of the function applied to the second item emitted by {@code o1} and the second
* item emitted by {@code o2}; and so forth.
* <p>
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of
* the source Observable that emits the fewest items.
*
* @param o1
* the first source Observable
* @param o2
* a second source Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @param bufferSize the number of elements to buffer without blocking per source Observables, less
* than 0 indicates an unbounded/non-blocking buffering
* @return an Observable that emits the zipped results
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2,
final Func2<? super T1, ? super T2, ? extends R> zipFunction, int bufferSize) {
return just(new Observable<?>[] { o1, o2 }).lift(new OperatorZip<R>(zipFunction, bufferSize));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of three items emitted, in sequence, by three other Observables.
Expand Down Expand Up @@ -3138,7 +3226,7 @@ public final static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, O
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <T1, T2, T3, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> zipFunction) {
return just(new Observable<?>[] { o1, o2, o3 }).lift(new OperatorZip<R>(zipFunction));
return just(new Observable<?>[] { o1, o2, o3 }).lift(new OperatorZip<R>(zipFunction, -1));
}

/**
Expand Down Expand Up @@ -3170,7 +3258,7 @@ public final static <T1, T2, T3, R> Observable<R> zip(Observable<? extends T1> o
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <T1, T2, T3, T4, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> zipFunction) {
return just(new Observable<?>[] { o1, o2, o3, o4 }).lift(new OperatorZip<R>(zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4 }).lift(new OperatorZip<R>(zipFunction, -1));
}

/**
Expand Down Expand Up @@ -3204,7 +3292,7 @@ public final static <T1, T2, T3, T4, R> Observable<R> zip(Observable<? extends T
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-zip">RxJava Wiki: zip()</a>
*/
public final static <T1, T2, T3, T4, T5, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Func5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> zipFunction) {
return just(new Observable<?>[] { o1, o2, o3, o4, o5 }).lift(new OperatorZip<R>(zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4, o5 }).lift(new OperatorZip<R>(zipFunction, -1));
}

/**
Expand Down Expand Up @@ -3240,7 +3328,7 @@ public final static <T1, T2, T3, T4, T5, R> Observable<R> zip(Observable<? exten
*/
public final static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6,
Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) {
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6 }).lift(new OperatorZip<R>(zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6 }).lift(new OperatorZip<R>(zipFunction, -1));
}

/**
Expand Down Expand Up @@ -3278,7 +3366,7 @@ public final static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? e
*/
public final static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7,
Func7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> zipFunction) {
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7 }).lift(new OperatorZip<R>(zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7 }).lift(new OperatorZip<R>(zipFunction, -1));
}

/**
Expand Down Expand Up @@ -3318,7 +3406,7 @@ public final static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> zip(Observable
*/
public final static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
Func8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> zipFunction) {
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7, o8 }).lift(new OperatorZip<R>(zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7, o8 }).lift(new OperatorZip<R>(zipFunction, -1));
}

/**
Expand Down Expand Up @@ -3360,7 +3448,7 @@ public final static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> zip(Observ
*/
public final static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
Observable<? extends T9> o9, Func9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> zipFunction) {
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7, o8, o9 }).lift(new OperatorZip<R>(zipFunction));
return just(new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7, o8, o9 }).lift(new OperatorZip<R>(zipFunction, -1));
}

/**
Expand Down Expand Up @@ -8479,6 +8567,30 @@ public final <T2, R> Observable<R> zip(Observable<? extends T2> other, Func2<? s
return zip(this, other, zipFunction);
}

/**
* Returns an Observable that emits items that are the result of applying a specified function
* to pairs of values, one each from the source Observable and another specified Observable,
* blocking the faster Observable by a bounded queue.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
*
* @param <T2>
* the type of items emitted by the {@code other} Observable
* @param <R>
* the type of items emitted by the resulting Observable
* @param other
* the other Observable
* @param zipFunction
* a function that combines the pairs of items from the two Observables to generate
* the items to be emitted by the resulting Observable
* @param bufferSize the number of elements to buffer without blocking per source Observables, less
* than 0 indicates an unbounded/non-blocking buffering
* @return an Observable that pairs up values from the source Observable and the {@code other} Observable and emits the results of {@code zipFunction} applied to these pairs
*/
public final <T2, R> Observable<R> zip(Observable<? extends T2> other, Func2<? super T, ? super T2, ? extends R> zipFunction, int bufferSize) {
return zip(this, other, zipFunction, bufferSize);
}

/**
* An Observable that never sends any information to an {@link Observer}.
*
Expand Down
25 changes: 18 additions & 7 deletions rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public OperatorObserveOn(Scheduler scheduler) {
this(scheduler, 1);
}

private static int roundToNextPowerOfTwoIfNecessary(int num) {
static int roundToNextPowerOfTwoIfNecessary(int num) {
if ((num & -num) == num) {
return num;
} else {
Expand Down Expand Up @@ -229,15 +229,15 @@ private void pollQueue() {
*
* @param <E>
*/
private static class InterruptibleBlockingQueue<E> {
static class InterruptibleBlockingQueue<E> {

private final Semaphore semaphore;
private volatile boolean interrupted = false;

private final E[] buffer;

private AtomicLong tail = new AtomicLong();
private AtomicLong head = new AtomicLong();
private final AtomicLong tail = new AtomicLong();
private final AtomicLong head = new AtomicLong();
private final int capacity;
private final int mask;

Expand Down Expand Up @@ -269,9 +269,7 @@ public void addBlocking(final E e) throws InterruptedException {
throw new IllegalArgumentException("Can not put null");
}

if (offer(e)) {
return;
} else {
if (!offer(e)) {
throw new IllegalStateException("Queue is full");
}
}
Expand All @@ -289,7 +287,20 @@ private boolean offer(final E e) {

return true;
}
public E peek() {
if (interrupted) {
return null;
}
final long _h = head.get();
if (tail.get() == _h) {
// nothing available
return null;
}
int index = (int) (_h & mask);

// fetch the item
return buffer[index];
}
public E poll() {
if (interrupted) {
return null;
Expand Down
Loading