Skip to content

Commit

Permalink
Move rx.joins to rxjava-joins module
Browse files Browse the repository at this point in the history
Migrating rx.joins into a separate contrib module as part of roadmap to 1.0: ReactiveX#1001 (comment)
This is being done until the rx.joins API has further time to mature as it is likely to change and we can't make breaking changes any further once we hit 1.0.
  • Loading branch information
benjchristensen committed Apr 19, 2014
1 parent 320495f commit 6896110
Show file tree
Hide file tree
Showing 20 changed files with 368 additions and 325 deletions.
21 changes: 21 additions & 0 deletions rxjava-contrib/rxjava-joins/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apply plugin: 'osgi'

sourceCompatibility = JavaVersion.VERSION_1_6
targetCompatibility = JavaVersion.VERSION_1_6

dependencies {
compile project(':rxjava-core')
testCompile project(":rxjava-core").sourceSets.test.output
provided 'junit:junit-dep:4.10'
provided 'org.mockito:mockito-core:1.8.5'
}

jar {
manifest {
name = 'rxjava-joins'
instruction 'Bundle-Vendor', 'Netflix'
instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
instruction 'Fragment-Host', 'com.netflix.rxjava.core'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.operators;
package rx.joins.operators;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.operators;
package rx.joins.operators;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
Expand All @@ -36,6 +36,7 @@
import rx.functions.Func3;
import rx.functions.Functions;
import rx.joins.Plan0;
import rx.observables.JoinObservable;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;

Expand Down Expand Up @@ -95,20 +96,20 @@ public void before() {
@Test(expected = NullPointerException.class)
public void and2ArgumentNull() {
Observable<Integer> some = Observable.just(1);
some.and(null);
JoinObservable.from(some).and(null);
}

@Test(expected = NullPointerException.class)
public void and3argumentNull() {
Observable<Integer> some = Observable.just(1);
some.and(some).and(null);
JoinObservable.from(some).and(some).and(null);
}

@Test
public void and2() {
Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.and(some).then(add2));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).and(some).then(add2)).toObservable();

m.subscribe(observer);

Expand All @@ -123,7 +124,7 @@ public void and2Error1() {

Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(error.and(some).then(add2));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(error).and(some).then(add2)).toObservable();

m.subscribe(observer);

Expand All @@ -138,7 +139,7 @@ public void and2Error2() {

Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.and(error).then(add2));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).and(error).then(add2)).toObservable();

m.subscribe(observer);

Expand All @@ -151,7 +152,7 @@ public void and2Error2() {
public void and3() {
Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.and(some).and(some).then(add3));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).and(some).and(some).then(add3)).toObservable();

m.subscribe(observer);

Expand All @@ -166,7 +167,7 @@ public void and3Error1() {

Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(error.and(some).and(some).then(add3));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(error).and(some).and(some).then(add3)).toObservable();

m.subscribe(observer);

Expand All @@ -181,7 +182,7 @@ public void and3Error2() {

Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.and(error).and(some).then(add3));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).and(error).and(some).then(add3)).toObservable();

m.subscribe(observer);

Expand All @@ -196,7 +197,7 @@ public void and3Error3() {

Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.and(some).and(error).then(add3));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).and(some).and(error).then(add3)).toObservable();

m.subscribe(observer);

Expand All @@ -209,28 +210,28 @@ public void and3Error3() {
public void thenArgumentNull() {
Observable<Integer> some = Observable.just(1);

some.then(null);
JoinObservable.from(some).then(null);
}

@Test(expected = NullPointerException.class)
public void then2ArgumentNull() {
Observable<Integer> some = Observable.just(1);

some.and(some).then(null);
JoinObservable.from(some).and(some).then(null);
}

@Test(expected = NullPointerException.class)
public void then3ArgumentNull() {
Observable<Integer> some = Observable.just(1);

some.and(some).and(some).then(null);
JoinObservable.from(some).and(some).and(some).then(null);
}

@Test
public void then1() {
Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.then(Functions.<Integer> identity()));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).then(Functions.<Integer> identity())).toObservable();
m.subscribe(observer);

verify(observer, never()).onError(any(Throwable.class));
Expand All @@ -242,7 +243,7 @@ public void then1() {
public void then1Error() {
Observable<Integer> some = Observable.error(new RuntimeException("Forced failure"));

Observable<Integer> m = Observable.when(some.then(Functions.<Integer> identity()));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).then(Functions.<Integer> identity())).toObservable();
m.subscribe(observer);

verify(observer, times(1)).onError(any(Throwable.class));
Expand All @@ -254,7 +255,7 @@ public void then1Error() {
public void then1Throws() {
Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.then(func1Throw));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).then(func1Throw)).toObservable();
m.subscribe(observer);

verify(observer, times(1)).onError(any(Throwable.class));
Expand All @@ -266,7 +267,7 @@ public void then1Throws() {
public void then2Throws() {
Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.and(some).then(func2Throw));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).and(some).then(func2Throw)).toObservable();
m.subscribe(observer);

verify(observer, times(1)).onError(any(Throwable.class));
Expand All @@ -278,7 +279,7 @@ public void then2Throws() {
public void then3Throws() {
Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.and(some).and(some).then(func3Throw));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).and(some).and(some).then(func3Throw)).toObservable();
m.subscribe(observer);

verify(observer, times(1)).onError(any(Throwable.class));
Expand All @@ -288,20 +289,20 @@ public void then3Throws() {

@Test(expected = NullPointerException.class)
public void whenArgumentNull1() {
Observable.when((Plan0<Object>[]) null);
JoinObservable.when((Plan0<Object>[]) null);
}

@Test(expected = NullPointerException.class)
public void whenArgumentNull2() {
Observable.when((Iterable<Plan0<Object>>) null);
JoinObservable.when((Iterable<Plan0<Object>>) null);
}

@Test
public void whenMultipleSymmetric() {
Observable<Integer> source1 = Observable.from(1, 2, 3);
Observable<Integer> source2 = Observable.from(4, 5, 6);

Observable<Integer> m = Observable.when(source1.and(source2).then(add2));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add2)).toObservable();
m.subscribe(observer);

verify(observer, never()).onError(any(Throwable.class));
Expand All @@ -316,7 +317,7 @@ public void whenMultipleAsymSymmetric() {
Observable<Integer> source1 = Observable.from(1, 2, 3);
Observable<Integer> source2 = Observable.from(4, 5);

Observable<Integer> m = Observable.when(source1.and(source2).then(add2));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add2)).toObservable();
m.subscribe(observer);

verify(observer, never()).onError(any(Throwable.class));
Expand All @@ -330,7 +331,7 @@ public void whenEmptyEmpty() {
Observable<Integer> source1 = Observable.empty();
Observable<Integer> source2 = Observable.empty();

Observable<Integer> m = Observable.when(source1.and(source2).then(add2));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add2)).toObservable();
m.subscribe(observer);

verify(observer, never()).onError(any(Throwable.class));
Expand All @@ -343,7 +344,7 @@ public void whenNeverNever() {
Observable<Integer> source1 = Observable.never();
Observable<Integer> source2 = Observable.never();

Observable<Integer> m = Observable.when(source1.and(source2).then(add2));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add2)).toObservable();
m.subscribe(observer);

verify(observer, never()).onError(any(Throwable.class));
Expand All @@ -356,7 +357,7 @@ public void whenThrowNonEmpty() {
Observable<Integer> source1 = Observable.empty();
Observable<Integer> source2 = Observable.error(new RuntimeException("Forced failure"));

Observable<Integer> m = Observable.when(source1.and(source2).then(add2));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add2)).toObservable();
m.subscribe(observer);

verify(observer, times(1)).onError(any(Throwable.class));
Expand All @@ -370,11 +371,11 @@ public void whenComplicated() {
PublishSubject<Integer> ys = PublishSubject.create();
PublishSubject<Integer> zs = PublishSubject.create();

Observable<Integer> m = Observable.when(
xs.and(ys).then(add2), // 1+4=5, 2+5=7, 3+6=9
xs.and(zs).then(mul2), // 1*7=7, 2*8=16, 3*9=27
ys.and(zs).then(sub2) // 4-7=-3, 5-8=-3, 6-9=-3
);
Observable<Integer> m = JoinObservable.when(
JoinObservable.from(xs).and(ys).then(add2), // 1+4=5, 2+5=7, 3+6=9
JoinObservable.from(xs).and(zs).then(mul2), // 1*7=7, 2*8=16, 3*9=27
JoinObservable.from(ys).and(zs).then(sub2) // 4-7=-3, 5-8=-3, 6-9=-3
).toObservable();

TestSubscriber<Integer> to = new TestSubscriber<Integer>(observer);
m.subscribe(to);
Expand Down
Loading

0 comments on commit 6896110

Please sign in to comment.