Skip to content

Commit

Permalink
Merge pull request #2 from jmhofer/reactivate-core-tests
Browse files Browse the repository at this point in the history
Reactivate core tests and combineLatest
  • Loading branch information
benjchristensen committed Aug 28, 2013
2 parents 077b148 + e3e148a commit 64f433b
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 14 deletions.
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ subprojects {
}
}

project(':rxjava-core') {
sourceSets.test.java.srcDir 'src/test/java'
}

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func0;
import rx.util.functions.Func2;

/**
Expand Down Expand Up @@ -187,14 +186,12 @@ public void testPeriodicScheduling() throws Exception {
final CountDownLatch latch = new CountDownLatch(4);

final Action0 innerAction = mock(Action0.class);
final Action0 unsubscribe = mock(Action0.class);
final Func0<Subscription> action = new Func0<Subscription>() {
final Action0 action = new Action0() {
@Override
public Subscription call() {
public void call() {
try {
innerAction.call();
assertTrue(SwingUtilities.isEventDispatchThread());
return Subscriptions.create(unsubscribe);
} finally {
latch.countDown();
}
Expand All @@ -210,7 +207,6 @@ public Subscription call() {
sub.unsubscribe();
waitForEmptyEventQueue();
verify(innerAction, times(4)).call();
verify(unsubscribe, times(4)).call();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import javax.swing.AbstractButton;

import rx.Observable;
import static rx.Observable.filter;
import rx.swing.sources.AbstractButtonSource;
import rx.swing.sources.ComponentEventSource;
import rx.swing.sources.KeyEventSource;
Expand Down Expand Up @@ -68,7 +67,7 @@ public static Observable<KeyEvent> fromKeyEvents(Component component) {
* @return Observable of key events.
*/
public static Observable<KeyEvent> fromKeyEvents(Component component, final Set<Integer> keyCodes) {
return filter(fromKeyEvents(component), new Func1<KeyEvent, Boolean>() {
return fromKeyEvents(component).filter(new Func1<KeyEvent, Boolean>() {
@Override
public Boolean call(KeyEvent event) {
return keyCodes.contains(event.getKeyCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void call() {
* @see SwingObservable.fromKeyEvents(Component, Set)
*/
public static Observable<Set<Integer>> currentlyPressedKeysOf(Component component) {
return Observable.<KeyEvent, Set<Integer>>scan(fromKeyEventsOf(component), new HashSet<Integer>(), new Func2<Set<Integer>, KeyEvent, Set<Integer>>() {
return fromKeyEventsOf(component).<Set<Integer>>scan(new HashSet<Integer>(), new Func2<Set<Integer>, KeyEvent, Set<Integer>>() {
@Override
public Set<Integer> call(Set<Integer> pressedKeys, KeyEvent event) {
Set<Integer> afterEvent = new HashSet<Integer>(pressedKeys);
Expand Down
38 changes: 33 additions & 5 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
*/
package rx;

import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -27,14 +23,14 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;


import rx.concurrency.Schedulers;
import rx.observables.BlockingObservable;
import rx.observables.ConnectableObservable;
import rx.observables.GroupedObservable;
import rx.operators.OperationAll;
import rx.operators.OperationBuffer;
import rx.operators.OperationCache;
import rx.operators.OperationCombineLatest;
import rx.operators.OperationConcat;
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
Expand Down Expand Up @@ -1085,6 +1081,38 @@ public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observabl
return create(OperationZip.zip(w0, w1, w2, w3, reduceFunction));
}

/**
* Combines the given observables, emitting an event containing an aggregation of the latest values of each of the source observables
* each time an event is received from one of the source observables, where the aggregation is defined by the given function.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/combineLatest.png">
*
* @param w0
* The first source observable.
* @param w1
* The second source observable.
* @param combineFunction
* The aggregation function used to combine the source observable values.
* @return An Observable that combines the source Observables with the given combine function
*/
public static <R, T0, T1> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<T0, T1, R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, combineFunction));
}

/**
* @see #combineLatest(Observable, Observable, Func2)
*/
public static <R, T0, T1, T2> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Func3<T0, T1, T2, R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, w2, combineFunction));
}

/**
* @see #combineLatest(Observable, Observable, Func2)
*/
public static <R, T0, T1, T2, T3> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<T0, T1, T2, T3, R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, w2, w3, combineFunction));
}

/**
* Creates an Observable which produces buffers of collected values.
*
Expand Down

0 comments on commit 64f433b

Please sign in to comment.