Skip to content

Commit

Permalink
Merge pull request ReactiveX#506 from akarnokd/AndPattern2
Browse files Browse the repository at this point in the history
Operators: And, Then, When
  • Loading branch information
benjchristensen committed Nov 22, 2013
2 parents fa48bf9 + 7c22333 commit a718be0
Show file tree
Hide file tree
Showing 20 changed files with 1,714 additions and 1 deletion.
173 changes: 172 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.concurrent.TimeUnit;

import rx.concurrency.Schedulers;
import rx.joins.Pattern2;
import rx.joins.Plan0;
import rx.observables.BlockingObservable;
import rx.observables.ConnectableObservable;
import rx.observables.GroupedObservable;
Expand All @@ -51,6 +53,7 @@
import rx.operators.OperationFirstOrDefault;
import rx.operators.OperationGroupBy;
import rx.operators.OperationInterval;
import rx.operators.OperationJoinPatterns;
import rx.operators.OperationLast;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
Expand Down Expand Up @@ -5689,5 +5692,173 @@ private boolean isInternalImplementation(Object o) {
return isInternal;
}
}

/**
* Creates a pattern that matches when both observable sequences have an available element.
* @param right Observable sequence to match with the left sequence.
* @return Pattern object that matches when both observable sequences have an available element.
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229153.aspx'>MSDN: Observable.And</a>
* @throws NullPointerException if right is null
*/
public <T2> Pattern2<T, T2> and(Observable<T2> right) {
return OperationJoinPatterns.and(this, right);
}
/**
* Matches when the observable sequence has an available element and projects the element by invoking the selector function.
* @param selector Selector that will be invoked for elements in the source sequence.
* @return Plan that produces the projected results, to be fed (with other plans) to the When operator.
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211662.aspx'>MSDN: Observable.Then</a>
* @throws NullPointerException if selector is null
*/
public <R> Plan0<R> then(Func1<T, R> selector) {
return OperationJoinPatterns.then(this, selector);
}
/**
* Joins together the results from several patterns.
* @param plans A series of plans created by use of the Then operator on patterns.
* @return An observable sequence with the results from matching several patterns.
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
* @throws NullPointerException if plans is null
*/
public static <R> Observable<R> when(Plan0<R>... plans) {
return create(OperationJoinPatterns.when(plans));
}
/**
* Joins together the results from several patterns.
* @param plans A series of plans created by use of the Then operator on patterns.
* @return An observable sequence with the results from matching several patterns.
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229558.aspx'>MSDN: Observable.When</a>
* @throws NullPointerException if plans is null
*/
public static <R> Observable<R> when(Iterable<? extends Plan0<R>> plans) {
if (plans == null) {
throw new NullPointerException("plans");
}
return create(OperationJoinPatterns.when(plans));
}
/**
* Joins the results from a pattern.
* @param p1 the plan to join
* @return An observable sequence with the results from matching a pattern
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
*/
@SuppressWarnings("unchecked")
public static <R> Observable<R> when(Plan0<R> p1) {
return create(OperationJoinPatterns.when(p1));
}
/**
* Joins together the results from several patterns.
* @param p1 a plan
* @param p2 a plan
* @return An observable sequence with the results from matching several patterns
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
*/
@SuppressWarnings("unchecked")
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2) {
return create(OperationJoinPatterns.when(p1, p2));
}
/**
* Joins together the results from several patterns.
* @param p1 a plan
* @param p2 a plan
* @param p3 a plan
* @return An observable sequence with the results from matching several patterns
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
*/
@SuppressWarnings("unchecked")
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3) {
return create(OperationJoinPatterns.when(p1, p2, p3));
}
/**
* Joins together the results from several patterns.
* @param p1 a plan
* @param p2 a plan
* @param p3 a plan
* @param p4 a plan
* @return An observable sequence with the results from matching several patterns
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
*/
@SuppressWarnings("unchecked")
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4) {
return create(OperationJoinPatterns.when(p1, p2, p3, p4));
}
/**
* Joins together the results from several patterns.
* @param p1 a plan
* @param p2 a plan
* @param p3 a plan
* @param p4 a plan
* @param p5 a plan
* @return An observable sequence with the results from matching several patterns
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
*/
@SuppressWarnings("unchecked")
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5) {
return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5));
}
/**
* Joins together the results from several patterns.
* @param p1 a plan
* @param p2 a plan
* @param p3 a plan
* @param p4 a plan
* @param p5 a plan
* @param p6 a plan
* @return An observable sequence with the results from matching several patterns
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
*/
@SuppressWarnings("unchecked")
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6) {
return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6));
}
/**
* Joins together the results from several patterns.
* @param p1 a plan
* @param p2 a plan
* @param p3 a plan
* @param p4 a plan
* @param p5 a plan
* @param p6 a plan
* @param p7 a plan
* @return An observable sequence with the results from matching several patterns
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
*/
@SuppressWarnings("unchecked")
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6, Plan0<R> p7) {
return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7));
}
/**
* Joins together the results from several patterns.
* @param p1 a plan
* @param p2 a plan
* @param p3 a plan
* @param p4 a plan
* @param p5 a plan
* @param p6 a plan
* @param p7 a plan
* @param p8 a plan
* @return An observable sequence with the results from matching several patterns
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
*/
@SuppressWarnings("unchecked")
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6, Plan0<R> p7, Plan0<R> p8) {
return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8));
}
/**
* Joins together the results from several patterns.
* @param p1 a plan
* @param p2 a plan
* @param p3 a plan
* @param p4 a plan
* @param p5 a plan
* @param p6 a plan
* @param p7 a plan
* @param p8 a plan
* @param p9 a plan
* @return An observable sequence with the results from matching several patterns
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229889.aspx'>MSDN: Observable.When</a>
*/
@SuppressWarnings("unchecked")
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6, Plan0<R> p7, Plan0<R> p8, Plan0<R> p9) {
return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8, p9));
}
}
37 changes: 37 additions & 0 deletions rxjava-core/src/main/java/rx/joins/ActivePlan0.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.joins;

import java.util.HashMap;
import java.util.Map;

/**
* Represents an activated plan.
*/
public abstract class ActivePlan0 {
protected final Map<JoinObserver, JoinObserver> joinObservers = new HashMap<JoinObserver, JoinObserver>();

public abstract void match();

protected void addJoinObserver(JoinObserver joinObserver) {
joinObservers.put(joinObserver, joinObserver);
}
protected void dequeue() {
for (JoinObserver jo : joinObservers.values()) {
jo.dequeue();
}
}
}
49 changes: 49 additions & 0 deletions rxjava-core/src/main/java/rx/joins/ActivePlan1.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.joins;

import rx.Notification;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

/**
* Represents an active plan.
*/
public class ActivePlan1<T1> extends ActivePlan0 {
private final Action1<T1> onNext;
private final Action0 onCompleted;
private final JoinObserver1<T1> first;
public ActivePlan1(JoinObserver1<T1> first, Action1<T1> onNext, Action0 onCompleted) {
this.onNext = onNext;
this.onCompleted = onCompleted;
this.first = first;
addJoinObserver(first);
}

@Override
public void match() {
if (!first.queue().isEmpty()) {
Notification<T1> n1 = first.queue().peek();
if (n1.isOnCompleted()) {
onCompleted.call();
} else {
dequeue();
onNext.call(n1.getValue());
}
}
}

}
54 changes: 54 additions & 0 deletions rxjava-core/src/main/java/rx/joins/ActivePlan2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.joins;

import rx.Notification;
import rx.util.functions.Action0;
import rx.util.functions.Action2;

/**
* Represents an active plan.
*/
public class ActivePlan2<T1, T2> extends ActivePlan0 {
private final Action2<T1, T2> onNext;
private final Action0 onCompleted;
private final JoinObserver1<T1> first;
private final JoinObserver1<T2> second;
public ActivePlan2(JoinObserver1<T1> first, JoinObserver1<T2> second, Action2<T1, T2> onNext, Action0 onCompleted) {
this.onNext = onNext;
this.onCompleted = onCompleted;
this.first = first;
this.second = second;
addJoinObserver(first);
addJoinObserver(second);
}

@Override
public void match() {
if (!first.queue().isEmpty() && !second.queue().isEmpty()) {
Notification<T1> n1 = first.queue().peek();
Notification<T2> n2 = second.queue().peek();

if (n1.isOnCompleted() || n2.isOnCompleted()) {
onCompleted.call();
} else {
dequeue();
onNext.call(n1.getValue(), n2.getValue());
}
}
}

}
64 changes: 64 additions & 0 deletions rxjava-core/src/main/java/rx/joins/ActivePlan3.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.joins;

import rx.Notification;
import rx.util.functions.Action0;
import rx.util.functions.Action3;

/**
* Represents an active plan.
*/
public class ActivePlan3<T1, T2, T3> extends ActivePlan0 {
private final Action3<T1, T2, T3> onNext;
private final Action0 onCompleted;
private final JoinObserver1<T1> first;
private final JoinObserver1<T2> second;
private final JoinObserver1<T3> third;
public ActivePlan3(JoinObserver1<T1> first,
JoinObserver1<T2> second,
JoinObserver1<T3> third,
Action3<T1, T2, T3> onNext,
Action0 onCompleted) {
this.onNext = onNext;
this.onCompleted = onCompleted;
this.first = first;
this.second = second;
this.third = third;
addJoinObserver(first);
addJoinObserver(second);
addJoinObserver(third);
}

@Override
public void match() {
if (!first.queue().isEmpty()
&& !second.queue().isEmpty()
&& !third.queue().isEmpty()) {
Notification<T1> n1 = first.queue().peek();
Notification<T2> n2 = second.queue().peek();
Notification<T3> n3 = third.queue().peek();

if (n1.isOnCompleted() || n2.isOnCompleted() || n3.isOnCompleted()) {
onCompleted.call();
} else {
dequeue();
onNext.call(n1.getValue(), n2.getValue(), n3.getValue());
}
}
}

}
Loading

0 comments on commit a718be0

Please sign in to comment.