diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 56a4664654a..07c89c0b195 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit; import rx.concurrency.Schedulers; +import rx.joins.Pattern2; +import rx.joins.Plan0; import rx.observables.BlockingObservable; import rx.observables.ConnectableObservable; import rx.observables.GroupedObservable; @@ -51,6 +53,7 @@ import rx.operators.OperationFirstOrDefault; import rx.operators.OperationGroupBy; import rx.operators.OperationInterval; +import rx.operators.OperationJoinPatterns; import rx.operators.OperationLast; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; @@ -5689,5 +5692,173 @@ private boolean isInternalImplementation(Object o) { return isInternal; } } - + /** + * Creates a pattern that matches when both observable sequences have an available element. + * @param right Observable sequence to match with the left sequence. + * @return Pattern object that matches when both observable sequences have an available element. + * @see MSDN: Observable.And + * @throws NullPointerException if right is null + */ + public Pattern2 and(Observable right) { + return OperationJoinPatterns.and(this, right); + } + /** + * Matches when the observable sequence has an available element and projects the element by invoking the selector function. + * @param selector Selector that will be invoked for elements in the source sequence. + * @return Plan that produces the projected results, to be fed (with other plans) to the When operator. + * @see MSDN: Observable.Then + * @throws NullPointerException if selector is null + */ + public Plan0 then(Func1 selector) { + return OperationJoinPatterns.then(this, selector); + } + /** + * Joins together the results from several patterns. + * @param plans A series of plans created by use of the Then operator on patterns. + * @return An observable sequence with the results from matching several patterns. + * @see MSDN: Observable.When + * @throws NullPointerException if plans is null + */ + public static Observable when(Plan0... plans) { + return create(OperationJoinPatterns.when(plans)); + } + /** + * Joins together the results from several patterns. + * @param plans A series of plans created by use of the Then operator on patterns. + * @return An observable sequence with the results from matching several patterns. + * @see MSDN: Observable.When + * @throws NullPointerException if plans is null + */ + public static Observable when(Iterable> plans) { + if (plans == null) { + throw new NullPointerException("plans"); + } + return create(OperationJoinPatterns.when(plans)); + } + /** + * Joins the results from a pattern. + * @param p1 the plan to join + * @return An observable sequence with the results from matching a pattern + * @see MSDN: Observable.When + */ + @SuppressWarnings("unchecked") + public static Observable when(Plan0 p1) { + return create(OperationJoinPatterns.when(p1)); + } + /** + * Joins together the results from several patterns. + * @param p1 a plan + * @param p2 a plan + * @return An observable sequence with the results from matching several patterns + * @see MSDN: Observable.When + */ + @SuppressWarnings("unchecked") + public static Observable when(Plan0 p1, Plan0 p2) { + return create(OperationJoinPatterns.when(p1, p2)); + } + /** + * Joins together the results from several patterns. + * @param p1 a plan + * @param p2 a plan + * @param p3 a plan + * @return An observable sequence with the results from matching several patterns + * @see MSDN: Observable.When + */ + @SuppressWarnings("unchecked") + public static Observable when(Plan0 p1, Plan0 p2, Plan0 p3) { + return create(OperationJoinPatterns.when(p1, p2, p3)); + } + /** + * Joins together the results from several patterns. + * @param p1 a plan + * @param p2 a plan + * @param p3 a plan + * @param p4 a plan + * @return An observable sequence with the results from matching several patterns + * @see MSDN: Observable.When + */ + @SuppressWarnings("unchecked") + public static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4) { + return create(OperationJoinPatterns.when(p1, p2, p3, p4)); + } + /** + * Joins together the results from several patterns. + * @param p1 a plan + * @param p2 a plan + * @param p3 a plan + * @param p4 a plan + * @param p5 a plan + * @return An observable sequence with the results from matching several patterns + * @see MSDN: Observable.When + */ + @SuppressWarnings("unchecked") + public static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5) { + return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5)); + } + /** + * Joins together the results from several patterns. + * @param p1 a plan + * @param p2 a plan + * @param p3 a plan + * @param p4 a plan + * @param p5 a plan + * @param p6 a plan + * @return An observable sequence with the results from matching several patterns + * @see MSDN: Observable.When + */ + @SuppressWarnings("unchecked") + public static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5, Plan0 p6) { + return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6)); + } + /** + * Joins together the results from several patterns. + * @param p1 a plan + * @param p2 a plan + * @param p3 a plan + * @param p4 a plan + * @param p5 a plan + * @param p6 a plan + * @param p7 a plan + * @return An observable sequence with the results from matching several patterns + * @see MSDN: Observable.When + */ + @SuppressWarnings("unchecked") + public static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5, Plan0 p6, Plan0 p7) { + return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7)); + } + /** + * Joins together the results from several patterns. + * @param p1 a plan + * @param p2 a plan + * @param p3 a plan + * @param p4 a plan + * @param p5 a plan + * @param p6 a plan + * @param p7 a plan + * @param p8 a plan + * @return An observable sequence with the results from matching several patterns + * @see MSDN: Observable.When + */ + @SuppressWarnings("unchecked") + public static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5, Plan0 p6, Plan0 p7, Plan0 p8) { + return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8)); + } + /** + * Joins together the results from several patterns. + * @param p1 a plan + * @param p2 a plan + * @param p3 a plan + * @param p4 a plan + * @param p5 a plan + * @param p6 a plan + * @param p7 a plan + * @param p8 a plan + * @param p9 a plan + * @return An observable sequence with the results from matching several patterns + * @see MSDN: Observable.When + */ + @SuppressWarnings("unchecked") + public static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5, Plan0 p6, Plan0 p7, Plan0 p8, Plan0 p9) { + return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8, p9)); + } } diff --git a/rxjava-core/src/main/java/rx/joins/ActivePlan0.java b/rxjava-core/src/main/java/rx/joins/ActivePlan0.java new file mode 100644 index 00000000000..d8690136b75 --- /dev/null +++ b/rxjava-core/src/main/java/rx/joins/ActivePlan0.java @@ -0,0 +1,37 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.joins; + +import java.util.HashMap; +import java.util.Map; + +/** + * Represents an activated plan. + */ +public abstract class ActivePlan0 { + protected final Map joinObservers = new HashMap(); + + public abstract void match(); + + protected void addJoinObserver(JoinObserver joinObserver) { + joinObservers.put(joinObserver, joinObserver); + } + protected void dequeue() { + for (JoinObserver jo : joinObservers.values()) { + jo.dequeue(); + } + } +} diff --git a/rxjava-core/src/main/java/rx/joins/ActivePlan1.java b/rxjava-core/src/main/java/rx/joins/ActivePlan1.java new file mode 100644 index 00000000000..65e50e9908e --- /dev/null +++ b/rxjava-core/src/main/java/rx/joins/ActivePlan1.java @@ -0,0 +1,49 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Notification; +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +/** + * Represents an active plan. + */ +public class ActivePlan1 extends ActivePlan0 { + private final Action1 onNext; + private final Action0 onCompleted; + private final JoinObserver1 first; + public ActivePlan1(JoinObserver1 first, Action1 onNext, Action0 onCompleted) { + this.onNext = onNext; + this.onCompleted = onCompleted; + this.first = first; + addJoinObserver(first); + } + + @Override + public void match() { + if (!first.queue().isEmpty()) { + Notification n1 = first.queue().peek(); + if (n1.isOnCompleted()) { + onCompleted.call(); + } else { + dequeue(); + onNext.call(n1.getValue()); + } + } + } + +} diff --git a/rxjava-core/src/main/java/rx/joins/ActivePlan2.java b/rxjava-core/src/main/java/rx/joins/ActivePlan2.java new file mode 100644 index 00000000000..3ae19714ce6 --- /dev/null +++ b/rxjava-core/src/main/java/rx/joins/ActivePlan2.java @@ -0,0 +1,54 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Notification; +import rx.util.functions.Action0; +import rx.util.functions.Action2; + +/** + * Represents an active plan. + */ +public class ActivePlan2 extends ActivePlan0 { + private final Action2 onNext; + private final Action0 onCompleted; + private final JoinObserver1 first; + private final JoinObserver1 second; + public ActivePlan2(JoinObserver1 first, JoinObserver1 second, Action2 onNext, Action0 onCompleted) { + this.onNext = onNext; + this.onCompleted = onCompleted; + this.first = first; + this.second = second; + addJoinObserver(first); + addJoinObserver(second); + } + + @Override + public void match() { + if (!first.queue().isEmpty() && !second.queue().isEmpty()) { + Notification n1 = first.queue().peek(); + Notification n2 = second.queue().peek(); + + if (n1.isOnCompleted() || n2.isOnCompleted()) { + onCompleted.call(); + } else { + dequeue(); + onNext.call(n1.getValue(), n2.getValue()); + } + } + } + +} diff --git a/rxjava-core/src/main/java/rx/joins/ActivePlan3.java b/rxjava-core/src/main/java/rx/joins/ActivePlan3.java new file mode 100644 index 00000000000..cff23dd7522 --- /dev/null +++ b/rxjava-core/src/main/java/rx/joins/ActivePlan3.java @@ -0,0 +1,64 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Notification; +import rx.util.functions.Action0; +import rx.util.functions.Action3; + +/** + * Represents an active plan. + */ +public class ActivePlan3 extends ActivePlan0 { + private final Action3 onNext; + private final Action0 onCompleted; + private final JoinObserver1 first; + private final JoinObserver1 second; + private final JoinObserver1 third; + public ActivePlan3(JoinObserver1 first, + JoinObserver1 second, + JoinObserver1 third, + Action3 onNext, + Action0 onCompleted) { + this.onNext = onNext; + this.onCompleted = onCompleted; + this.first = first; + this.second = second; + this.third = third; + addJoinObserver(first); + addJoinObserver(second); + addJoinObserver(third); + } + + @Override + public void match() { + if (!first.queue().isEmpty() + && !second.queue().isEmpty() + && !third.queue().isEmpty()) { + Notification n1 = first.queue().peek(); + Notification n2 = second.queue().peek(); + Notification n3 = third.queue().peek(); + + if (n1.isOnCompleted() || n2.isOnCompleted() || n3.isOnCompleted()) { + onCompleted.call(); + } else { + dequeue(); + onNext.call(n1.getValue(), n2.getValue(), n3.getValue()); + } + } + } + +} diff --git a/rxjava-core/src/main/java/rx/joins/JoinObserver.java b/rxjava-core/src/main/java/rx/joins/JoinObserver.java new file mode 100644 index 00000000000..d191ed89588 --- /dev/null +++ b/rxjava-core/src/main/java/rx/joins/JoinObserver.java @@ -0,0 +1,26 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Subscription; + +/** + * Base interface to manage joined observations. + */ +public interface JoinObserver extends Subscription { + void subscribe(Object gate); + void dequeue(); +} diff --git a/rxjava-core/src/main/java/rx/joins/JoinObserver1.java b/rxjava-core/src/main/java/rx/joins/JoinObserver1.java new file mode 100644 index 00000000000..873d3d1a7fa --- /dev/null +++ b/rxjava-core/src/main/java/rx/joins/JoinObserver1.java @@ -0,0 +1,107 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.joins; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import rx.Notification; +import rx.Observable; +import rx.subscriptions.SingleAssignmentSubscription; +import rx.util.functions.Action1; + +/** + * Default implementation of a join observer. + */ +public final class JoinObserver1 extends ObserverBase> implements JoinObserver { + private Object gate; + private final Observable source; + private final Action1 onError; + private final List activePlans; + private final Queue> queue; + private final SingleAssignmentSubscription subscription; + private volatile boolean done; + + public JoinObserver1(Observable source, Action1 onError) { + this.source = source; + this.onError = onError; + queue = new LinkedList>(); + subscription = new SingleAssignmentSubscription(); + activePlans = new ArrayList(); + } + public Queue> queue() { + return queue; + } + public void addActivePlan(ActivePlan0 activePlan) { + activePlans.add(activePlan); + } + @Override + public void subscribe(Object gate) { + this.gate = gate; + subscription.set(source.materialize().subscribe(this)); + } + + @Override + public void dequeue() { + queue.remove(); + } + + @Override + protected void onNextCore(Notification args) { + synchronized (gate) { + if (!done) { + if (args.isOnError()) { + onError.call(args.getThrowable()); + return; + } + queue.add(args); + + // remark: activePlans might change while iterating + for (ActivePlan0 a : new ArrayList(activePlans)) { + a.match(); + } + } + } + } + + @Override + protected void onErrorCore(Throwable e) { + // not expected + } + + @Override + protected void onCompletedCore() { + // not expected or ignored + } + + + void removeActivePlan(ActivePlan0 activePlan) { + activePlans.remove(activePlan); + if (activePlans.isEmpty()) { + unsubscribe(); + } + } + + @Override + public void unsubscribe() { + if (!done) { + done = true; + subscription.unsubscribe(); + } + } + +} diff --git a/rxjava-core/src/main/java/rx/joins/ObserverBase.java b/rxjava-core/src/main/java/rx/joins/ObserverBase.java new file mode 100644 index 00000000000..f1144a8ad26 --- /dev/null +++ b/rxjava-core/src/main/java/rx/joins/ObserverBase.java @@ -0,0 +1,72 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.joins; + +import java.util.concurrent.atomic.AtomicBoolean; +import rx.Observer; + +/** + * Implements an observer that ensures proper event delivery + * semantics to its abstract onXxxxCore methods. + */ +public abstract class ObserverBase implements Observer { + private final AtomicBoolean completed = new AtomicBoolean(); + + @Override + public void onNext(T args) { + if (!completed.get()) { + onNextCore(args); + } + } + + @Override + public void onError(Throwable e) { + if (completed.compareAndSet(false, true)) { + onErrorCore(e); + } + } + + @Override + public void onCompleted() { + if (completed.compareAndSet(false, true)) { + onCompletedCore(); + } + } + /** + * Implement this method to react to the receival of a new element in the sequence. + */ + protected abstract void onNextCore(T args); + /** + * Implement this method to react to the occurrence of an exception. + */ + protected abstract void onErrorCore(Throwable e); + /** + * Implement this method to react to the end of the sequence. + */ + protected abstract void onCompletedCore(); + /** + * Try to trigger the error state. + * @param t + * @return false if already completed + */ + protected boolean fail(Throwable t) { + if (completed.compareAndSet(false, true)) { + onErrorCore(t); + return true; + } + return false; + } +} diff --git a/rxjava-core/src/main/java/rx/joins/Pattern.java b/rxjava-core/src/main/java/rx/joins/Pattern.java new file mode 100644 index 00000000000..1dc65ca3b88 --- /dev/null +++ b/rxjava-core/src/main/java/rx/joins/Pattern.java @@ -0,0 +1,25 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.joins; + +/** + * Base interface for join patterns. + * @see MSDN: Pattern + */ +public interface Pattern { + +} diff --git a/rxjava-core/src/main/java/rx/joins/Pattern1.java b/rxjava-core/src/main/java/rx/joins/Pattern1.java new file mode 100644 index 00000000000..324daf79fa5 --- /dev/null +++ b/rxjava-core/src/main/java/rx/joins/Pattern1.java @@ -0,0 +1,45 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Observable; +import rx.util.functions.Func1; + +/** + * Represents a join pattern over one observable sequence. + */ +public class Pattern1 implements Pattern { + private final Observable first; + public Pattern1(Observable first) { + this.first = first; + } + public Observable first() { + return first; + } + /** + * Matches when all observable sequences have an available + * element and projects the elements by invoking the selector function. + * @param selector the function that will be invoked for elements in the source sequences. + * @return + * @throws NullPointerException if selector is null + */ + public Plan0 then(Func1 selector) { + if (selector == null) { + throw new NullPointerException(); + } + return new Plan1(this, selector); + } +} diff --git a/rxjava-core/src/main/java/rx/joins/Pattern2.java b/rxjava-core/src/main/java/rx/joins/Pattern2.java new file mode 100644 index 00000000000..6f82a66d49f --- /dev/null +++ b/rxjava-core/src/main/java/rx/joins/Pattern2.java @@ -0,0 +1,54 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Observable; +import rx.util.functions.Func2; + +/** + * Represents a join pattern over observable sequences. + */ +public class Pattern2 implements Pattern { + private final Observable first; + private final Observable second; + public Pattern2(Observable first, Observable second) { + this.first = first; + this.second = second; + } + public Observable first() { + return first; + } + public Observable second() { + return second; + } + /** + * Creates a pattern that matches when all three observable sequences have an available element. + * @param other Observable sequence to match with the two previous sequences. + * @return Pattern object that matches when all observable sequences have an available element. + */ + public Pattern3 and(Observable other) { + if (other == null) { + throw new NullPointerException(); + } + return new Pattern3(first, second, other); + } + public Plan0 then(Func2 selector) { + if (selector == null) { + throw new NullPointerException(); + } + return new Plan2(this, selector); + } +} diff --git a/rxjava-core/src/main/java/rx/joins/Pattern3.java b/rxjava-core/src/main/java/rx/joins/Pattern3.java new file mode 100644 index 00000000000..c43b9d0b9e7 --- /dev/null +++ b/rxjava-core/src/main/java/rx/joins/Pattern3.java @@ -0,0 +1,55 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import rx.Observable; +import rx.util.functions.Func3; + +/** + * Represents a join pattern over observable sequences. + */ +public class Pattern3 implements Pattern { + private final Observable first; + private final Observable second; + private final Observable third; + public Pattern3(Observable first, Observable second, + Observable third) { + this.first = first; + this.second = second; + this.third = third; + } + public Observable first() { + return first; + } + public Observable second() { + return second; + } + public Observable third() { + return third; + } +// public Pattern4 and(Observable other) { +// if (other == null) { +// throw new NullPointerException(); +// } +// return new Pattern4(first, second, third, other); +// } + public Plan0 then(Func3 selector) { + if (selector == null) { + throw new NullPointerException(); + } + return new Plan3(this, selector); + } +} diff --git a/rxjava-core/src/main/java/rx/joins/Plan0.java b/rxjava-core/src/main/java/rx/joins/Plan0.java new file mode 100644 index 00000000000..2a647aff3de --- /dev/null +++ b/rxjava-core/src/main/java/rx/joins/Plan0.java @@ -0,0 +1,46 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.joins; + +import java.util.Map; +import rx.Observable; +import rx.Observer; +import rx.util.functions.Action1; + +/** + * Represents an execution plan for join patterns. + */ +public abstract class Plan0 { + public abstract ActivePlan0 activate(Map externalSubscriptions, + Observer observer, Action1 deactivate); + + @SuppressWarnings("unchecked") + public static JoinObserver1 createObserver( + Map externalSubscriptions, + Observable observable, + Action1 onError + ) { + JoinObserver1 observer; + JoinObserver nonGeneric = externalSubscriptions.get(observable); + if (nonGeneric == null) { + observer = new JoinObserver1(observable, onError); + externalSubscriptions.put(observable, observer); + } else { + observer = (JoinObserver1)nonGeneric; + } + return observer; + } +} diff --git a/rxjava-core/src/main/java/rx/joins/Plan1.java b/rxjava-core/src/main/java/rx/joins/Plan1.java new file mode 100644 index 00000000000..b64742e27bd --- /dev/null +++ b/rxjava-core/src/main/java/rx/joins/Plan1.java @@ -0,0 +1,80 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.joins; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import rx.Observer; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Func1; +import rx.util.functions.Actions; + +/** + * Represents an execution plan for join patterns. + */ +public class Plan1 extends Plan0 { + protected Pattern1 expression; + protected Func1 selector; + + public Plan1(Pattern1 expression, Func1 selector) { + this.expression = expression; + this.selector = selector; + } + + public Pattern1 expression() { + return expression; + } + public Func1 selector() { + return selector; + } + + @Override + public ActivePlan0 activate(Map externalSubscriptions, final Observer observer, final Action1 deactivate) { + Action1 onError = Actions.onErrorFrom(observer); + + final JoinObserver1 firstJoinObserver = createObserver(externalSubscriptions, expression.first(), onError); + + final AtomicReference> self = new AtomicReference>(); + + ActivePlan1 activePlan = new ActivePlan1(firstJoinObserver, new Action1() { + @Override + public void call(T1 t1) { + R result; + try { + result = selector.call(t1); + } catch (Throwable t) { + observer.onError(t); + return; + } + observer.onNext(result); + } + }, + new Action0() { + @Override + public void call() { + firstJoinObserver.removeActivePlan(self.get()); + deactivate.call(self.get()); + } + }); + + self.set(activePlan); + + firstJoinObserver.addActivePlan(activePlan); + return activePlan; + } + +} diff --git a/rxjava-core/src/main/java/rx/joins/Plan2.java b/rxjava-core/src/main/java/rx/joins/Plan2.java new file mode 100644 index 00000000000..71b931ea0de --- /dev/null +++ b/rxjava-core/src/main/java/rx/joins/Plan2.java @@ -0,0 +1,79 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import rx.Observer; +import static rx.joins.Plan0.createObserver; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Action2; +import rx.util.functions.Actions; +import rx.util.functions.Func2; + +/** + * Represents an execution plan for join patterns. + */ +public class Plan2 extends Plan0 { + protected Pattern2 expression; + protected Func2 selector; + public Plan2(Pattern2 expression, Func2 selector) { + this.expression = expression; + this.selector = selector; + } + + @Override + public ActivePlan0 activate(Map externalSubscriptions, + final Observer observer, final Action1 deactivate) { + Action1 onError = Actions.onErrorFrom(observer); + + final JoinObserver1 firstJoinObserver = createObserver(externalSubscriptions, expression.first(), onError); + final JoinObserver1 secondJoinObserver = createObserver(externalSubscriptions, expression.second(), onError); + + final AtomicReference> self = new AtomicReference>(); + + ActivePlan2 activePlan = new ActivePlan2(firstJoinObserver, secondJoinObserver, new Action2() { + @Override + public void call(T1 t1, T2 t2) { + R result; + try { + result = selector.call(t1, t2); + } catch (Throwable t) { + observer.onError(t); + return; + } + observer.onNext(result); + } + }, + new Action0() { + @Override + public void call() { + firstJoinObserver.removeActivePlan(self.get()); + secondJoinObserver.removeActivePlan(self.get()); + deactivate.call(self.get()); + } + }); + + self.set(activePlan); + + firstJoinObserver.addActivePlan(activePlan); + secondJoinObserver.addActivePlan(activePlan); + + return activePlan; + } + +} diff --git a/rxjava-core/src/main/java/rx/joins/Plan3.java b/rxjava-core/src/main/java/rx/joins/Plan3.java new file mode 100644 index 00000000000..386ef82dfec --- /dev/null +++ b/rxjava-core/src/main/java/rx/joins/Plan3.java @@ -0,0 +1,83 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.joins; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import rx.Observer; +import static rx.joins.Plan0.createObserver; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Action3; +import rx.util.functions.Actions; +import rx.util.functions.Func3; + +/** + * Represents an execution plan for join patterns. + */ +public class Plan3 extends Plan0 { + protected Pattern3 expression; + protected Func3 selector; + public Plan3(Pattern3 expression, Func3 selector) { + this.expression = expression; + this.selector = selector; + } + + @Override + public ActivePlan0 activate(Map externalSubscriptions, + final Observer observer, final Action1 deactivate) { + Action1 onError = Actions.onErrorFrom(observer); + + final JoinObserver1 firstJoinObserver = createObserver(externalSubscriptions, expression.first(), onError); + final JoinObserver1 secondJoinObserver = createObserver(externalSubscriptions, expression.second(), onError); + final JoinObserver1 thirdJoinObserver = createObserver(externalSubscriptions, expression.third(), onError); + + final AtomicReference> self = new AtomicReference>(); + + ActivePlan3 activePlan = new ActivePlan3(firstJoinObserver, secondJoinObserver, + thirdJoinObserver, new Action3() { + @Override + public void call(T1 t1, T2 t2, T3 t3) { + R result; + try { + result = selector.call(t1, t2, t3); + } catch (Throwable t) { + observer.onError(t); + return; + } + observer.onNext(result); + } + }, + new Action0() { + @Override + public void call() { + firstJoinObserver.removeActivePlan(self.get()); + secondJoinObserver.removeActivePlan(self.get()); + thirdJoinObserver.removeActivePlan(self.get()); + deactivate.call(self.get()); + } + }); + + self.set(activePlan); + + firstJoinObserver.addActivePlan(activePlan); + secondJoinObserver.addActivePlan(activePlan); + thirdJoinObserver.addActivePlan(activePlan); + + return activePlan; + } + +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationJoinPatterns.java b/rxjava-core/src/main/java/rx/operators/OperationJoinPatterns.java new file mode 100644 index 00000000000..6ff1c4346ea --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationJoinPatterns.java @@ -0,0 +1,131 @@ + /** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.joins.ActivePlan0; +import rx.joins.JoinObserver; +import rx.joins.Pattern1; +import rx.joins.Pattern2; +import rx.joins.Plan0; +import rx.subjects.PublishSubject; +import rx.subscriptions.CompositeSubscription; +import rx.util.functions.Action1; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +/** + * Join patterns: And, Then, When. + */ +public class OperationJoinPatterns { + /** + * Creates a pattern that matches when both observable sequences have an available element. + */ + public static Pattern2 and(/* this */Observable left, Observable right) { + if (left == null) { + throw new NullPointerException("left"); + } + if (right == null) { + throw new NullPointerException("right"); + } + return new Pattern2(left, right); + } + /** + * Matches when the observable sequence has an available element and projects the element by invoking the selector function. + */ + public static Plan0 then(/* this */Observable source, Func1 selector) { + if (source == null) { + throw new NullPointerException("source"); + } + if (selector == null) { + throw new NullPointerException("selector"); + } + return new Pattern1(source).then(selector); + } + /** + * Joins together the results from several patterns. + */ + public static OnSubscribeFunc when(Plan0... plans) { + if (plans == null) { + throw new NullPointerException("plans"); + } + return when(Arrays.asList(plans)); + } + /** + * Joins together the results from several patterns. + */ + public static OnSubscribeFunc when(final Iterable> plans) { + if (plans == null) { + throw new NullPointerException("plans"); + } + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(final Observer t1) { + final Map externalSubscriptions = new HashMap(); + final Object gate = new Object(); + final List activePlans = new ArrayList(); + + final Observer out = new Observer() { + @Override + public void onNext(R args) { + t1.onNext(args); + } + @Override + public void onError(Throwable e) { + for (JoinObserver po : externalSubscriptions.values()) { + po.unsubscribe(); + } + t1.onError(e); + } + @Override + public void onCompleted() { + t1.onCompleted(); + } + }; + + try { + for (Plan0 plan : plans) { + activePlans.add(plan.activate(externalSubscriptions, out, new Action1() { + @Override + public void call(ActivePlan0 activePlan) { + activePlans.remove(activePlan); + if (activePlans.isEmpty()) { + out.onCompleted(); + } + } + })); + } + } catch (Throwable t) { + return Observable.error(t).subscribe(t1); + } + CompositeSubscription group = new CompositeSubscription(); + for (JoinObserver jo : externalSubscriptions.values()) { + jo.subscribe(gate); + group.add(jo); + } + return group; + } + }; + } +} diff --git a/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java new file mode 100644 index 00000000000..492c4e6dbf4 --- /dev/null +++ b/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java @@ -0,0 +1,81 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.subscriptions; + +import java.util.concurrent.atomic.AtomicReference; +import rx.Subscription; + +/** + * A subscription that allows only a single resource to be assigned. + *

+ * If this subscription is live, no other subscription may be set() and + * yields an {@link IllegalStateException}. + *

+ * If the unsubscribe has been called, setting a new subscription will + * unsubscribe it immediately. + */ +public final class SingleAssignmentSubscription implements Subscription { + /** Holds the current resource. */ + private final AtomicReference current = new AtomicReference(); + /** Sentinel for the unsubscribed state. */ + private static final Subscription SENTINEL = new Subscription() { + @Override + public void unsubscribe() { + } + }; + /** + * Returns the current subscription or null if not yet set. + */ + public Subscription get() { + Subscription s = current.get(); + if (s == SENTINEL) { + return Subscriptions.empty(); + } + return s; + } + /** + * Sets a new subscription if not already set. + * @param s the new subscription + * @throws IllegalStateException if this subscription is live and contains + * another subscription. + */ + public void set(Subscription s) { + if (current.compareAndSet(null, s)) { + return; + } + if (current.get() != SENTINEL) { + throw new IllegalStateException("Subscription already set"); + } + if (s != null) { + s.unsubscribe(); + } + } + @Override + public void unsubscribe() { + Subscription old = current.getAndSet(SENTINEL); + if (old != null) { + old.unsubscribe(); + } + } + /** + * Test if this subscription is already unsubscribed. + */ + public boolean isUnsubscribed() { + return current.get() == SENTINEL; + } + +} diff --git a/rxjava-core/src/main/java/rx/util/functions/Actions.java b/rxjava-core/src/main/java/rx/util/functions/Actions.java new file mode 100644 index 00000000000..2bffbb9ec8a --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/functions/Actions.java @@ -0,0 +1,72 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.util.functions; + +import rx.Observer; +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +/** + * Utility class for the Action interfaces. + */ +public final class Actions { + private Actions() { throw new IllegalStateException("No instances!"); } + /** + * Extracts a method reference to the observer's onNext method + * in the form of an Action1. + *

Java 8: observer::onNext

+ * @param observer the observer to use + * @return an action which calls the observer's onNext method. + */ + public static Action1 onNextFrom(final Observer observer) { + return new Action1() { + @Override + public void call(T t1) { + observer.onNext(t1); + } + }; + } + /** + * Extracts a method reference to the observer's onError method + * in the form of an Action1. + *

Java 8: observer::onError

+ * @param observer the observer to use + * @return an action which calls the observer's onError method. + */ + public static Action1 onErrorFrom(final Observer observer) { + return new Action1() { + @Override + public void call(Throwable t1) { + observer.onError(t1); + } + }; + } + /** + * Extracts a method reference to the observer's onCompleted method + * in the form of an Action0. + *

Java 8: observer::onCompleted

+ * @param observer the observer to use + * @return an action which calls the observer's onCompleted method. + */ + public static Action0 onCompletedFrom(final Observer observer) { + return new Action0() { + @Override + public void call() { + observer.onCompleted(); + } + }; + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationJoinsTest.java b/rxjava-core/src/test/java/rx/operators/OperationJoinsTest.java new file mode 100644 index 00000000000..8a3b0c79c98 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationJoinsTest.java @@ -0,0 +1,382 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import static org.mockito.Matchers.any; +import org.mockito.Mock; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import org.mockito.MockitoAnnotations; +import rx.Observable; +import rx.Observer; +import rx.joins.Plan0; +import rx.subjects.PublishSubject; +import rx.util.functions.Func1; +import rx.util.functions.Func2; +import rx.util.functions.Func3; +import rx.util.functions.Functions; + +public class OperationJoinsTest { + @Mock + Observer observer; + + + Func2 add2 = new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + }; + Func2 mul2 = new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + return t1 * t2; + } + }; + Func2 sub2 = new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + return t1 - t2; + } + }; + + Func3 add3 = new Func3() { + @Override + public Integer call(Integer t1, Integer t2, Integer t3) { + return t1 + t2 + t3; + } + }; + Func1 func1Throw = new Func1() { + @Override + public Integer call(Integer t1) { + throw new RuntimeException("Forced failure"); + } + }; + Func2 func2Throw = new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + throw new RuntimeException("Forced failure"); + } + }; + Func3 func3Throw = new Func3() { + @Override + public Integer call(Integer t1, Integer t2, Integer t3) { + throw new RuntimeException("Forced failure"); + } + }; + + @Before + public void before() { + MockitoAnnotations.initMocks(this); + } + + @Test(expected = NullPointerException.class) + public void and2ArgumentNull() { + Observable some = Observable.just(1); + some.and(null); + } + @Test(expected = NullPointerException.class) + public void and3argumentNull() { + Observable some = Observable.just(1); + some.and(some).and(null); + } + @Test + public void and2() { + Observable some = Observable.just(1); + + Observable m = Observable.when(some.and(some).then(add2)); + + m.subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onCompleted(); + } + @Test + public void and2Error1() { + Observable error = Observable.error(new RuntimeException("Forced failure")); + + Observable some = Observable.just(1); + + Observable m = Observable.when(error.and(some).then(add2)); + + m.subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + } + @Test + public void and2Error2() { + Observable error = Observable.error(new RuntimeException("Forced failure")); + + Observable some = Observable.just(1); + + Observable m = Observable.when(some.and(error).then(add2)); + + m.subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + } + @Test + public void and3() { + Observable some = Observable.just(1); + + Observable m = Observable.when(some.and(some).and(some).then(add3)); + + m.subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(3); + verify(observer, times(1)).onCompleted(); + } + @Test + public void and3Error1() { + Observable error = Observable.error(new RuntimeException("Forced failure")); + + Observable some = Observable.just(1); + + Observable m = Observable.when(error.and(some).and(some).then(add3)); + + m.subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + } + @Test + public void and3Error2() { + Observable error = Observable.error(new RuntimeException("Forced failure")); + + Observable some = Observable.just(1); + + Observable m = Observable.when(some.and(error).and(some).then(add3)); + + m.subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + } + @Test + public void and3Error3() { + Observable error = Observable.error(new RuntimeException("Forced failure")); + + Observable some = Observable.just(1); + + Observable m = Observable.when(some.and(some).and(error).then(add3)); + + m.subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + } + @Test(expected = NullPointerException.class) + public void thenArgumentNull() { + Observable some = Observable.just(1); + + some.then(null); + } + @Test(expected = NullPointerException.class) + public void then2ArgumentNull() { + Observable some = Observable.just(1); + + some.and(some).then(null); + } + @Test(expected = NullPointerException.class) + public void then3ArgumentNull() { + Observable some = Observable.just(1); + + some.and(some).and(some).then(null); + } + @Test + public void then1() { + Observable some = Observable.just(1); + + Observable m = Observable.when(some.then(Functions.identity())); + m.subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(1); + verify(observer, times(1)).onCompleted(); + } + @Test + public void then1Error() { + Observable some = Observable.error(new RuntimeException("Forced failure")); + + Observable m = Observable.when(some.then(Functions.identity())); + m.subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + } + @Test + public void then1Throws() { + Observable some = Observable.just(1); + + Observable m = Observable.when(some.then(func1Throw)); + m.subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + } + @Test + public void then2Throws() { + Observable some = Observable.just(1); + + Observable m = Observable.when(some.and(some).then(func2Throw)); + m.subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + } + @Test + public void then3Throws() { + Observable some = Observable.just(1); + + Observable m = Observable.when(some.and(some).and(some).then(func3Throw)); + m.subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + } + @Test(expected = NullPointerException.class) + public void whenArgumentNull1() { + Observable.when((Plan0[])null); + } + @Test(expected = NullPointerException.class) + public void whenArgumentNull2() { + Observable.when((Iterable>)null); + } + @Test + public void whenMultipleSymmetric() { + Observable source1 = Observable.from(1, 2, 3); + Observable source2 = Observable.from(4, 5, 6); + + Observable m = Observable.when(source1.and(source2).then(add2)); + m.subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(1 + 4); + verify(observer, times(1)).onNext(2 + 5); + verify(observer, times(1)).onNext(3 + 6); + verify(observer, times(1)).onCompleted(); + } + + @Test + public void whenMultipleAsymSymmetric() { + Observable source1 = Observable.from(1, 2, 3); + Observable source2 = Observable.from(4, 5); + + Observable m = Observable.when(source1.and(source2).then(add2)); + m.subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(1 + 4); + verify(observer, times(1)).onNext(2 + 5); + verify(observer, times(1)).onCompleted(); + } + @Test + public void whenEmptyEmpty() { + Observable source1 = Observable.empty(); + Observable source2 = Observable.empty(); + + Observable m = Observable.when(source1.and(source2).then(add2)); + m.subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, times(1)).onCompleted(); + } + + @Test + public void whenNeverNever() { + Observable source1 = Observable.never(); + Observable source2 = Observable.never(); + + Observable m = Observable.when(source1.and(source2).then(add2)); + m.subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + } + @Test + public void whenThrowNonEmpty() { + Observable source1 = Observable.empty(); + Observable source2 = Observable.error(new RuntimeException("Forced failure")); + + Observable m = Observable.when(source1.and(source2).then(add2)); + m.subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + } + @Test + public void whenComplicated() { + PublishSubject xs = PublishSubject.create(); + PublishSubject ys = PublishSubject.create(); + PublishSubject zs = PublishSubject.create(); + + Observable m = Observable.when( + xs.and(ys).then(add2), + xs.and(zs).then(mul2), + ys.and(zs).then(sub2) + ); + m.subscribe(observer); + + xs.onNext(1); // t == 210 + + xs.onNext(2); // t == 220 + zs.onNext(7); // t == 220 + + xs.onNext(3); // t == 230 + zs.onNext(8); // t == 230 + + ys.onNext(4); // t == 240 + zs.onNext(9); // t == 240 + xs.onCompleted(); // t == 240 + + ys.onNext(5); // t == 250 + + ys.onNext(6); // t == 260 + + ys.onCompleted(); // t == 270 + + zs.onCompleted(); // t == 300 + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1 * 7); + inOrder.verify(observer, times(1)).onNext(2 * 8); + inOrder.verify(observer, times(1)).onNext(3 + 4); + inOrder.verify(observer, times(1)).onNext(5 - 9); + inOrder.verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } +}