Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert 'last' from non-blocking to blocking to match Rx.Net #184

Merged
merged 2 commits into from
Mar 12, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

def class ObservableTests {
Expand Down Expand Up @@ -61,8 +62,12 @@ def class ObservableTests {

@Test
public void testLast() {
new TestFactory().getObservable().last().subscribe({ result -> a.received(result)});
verify(a, times(1)).received("hello_1");
assertEquals("three", Observable.toObservable("one", "two", "three").last())
}

@Test
public void testLastWithPredicate() {
assertEquals("two", Observable.toObservable("one", "two", "three").last({ x -> x.length() == 3}))
}

@Test
Expand Down Expand Up @@ -175,6 +180,12 @@ def class ObservableTests {
verify(a, times(0)).received(3);
}

@Test
public void testTakeLast() {
new TestFactory().getObservable().takeLast(1).subscribe({ result -> a.received(result)});
verify(a, times(1)).received("hello_1");
}

@Test
public void testTakeWhileViaGroovy() {
Observable.takeWhile(Observable.toObservable(1, 2, 3), { x -> x < 3}).subscribe({ result -> a.received(result)});
Expand Down Expand Up @@ -280,7 +291,7 @@ def class ObservableTests {
observer.onCompleted();
}
}).start();
return Observable.noOpSubscription();
return Subscriptions.empty();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testFilterViaGroovy() {

@Test
public void testLast() {
String script = "mockApiCall.getObservable().last().subscribe(lambda{|result| a.received(result)})";
String script = "mockApiCall.getObservable().takeLast(1).subscribe(lambda{|result| a.received(result)})";
runGroovyScript(script);
verify(assertion, times(1)).received("hello_1");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@ class UnitTestSuite extends JUnitSuite {
verify(assertion, times(1)).received(List(2,4,6,8))
}

@Test def testLast() {
@Test def testTakeLast() {
val numbers = Observable.toObservable[Int](1, 2, 3, 4, 5, 6, 7, 8, 9)
numbers.last().subscribe((callback: Int) => {
println("testLast: onNext -> got " + callback)
numbers.takeLast(1).subscribe((callback: Int) => {
println("testTakeLast: onNext -> got " + callback)
assertion.received(callback)
})
verify(assertion, times(1)).received(9)
Expand Down
113 changes: 95 additions & 18 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
import rx.operators.OperationFilter;
import rx.operators.OperationLast;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
import rx.operators.OperationMerge;
Expand Down Expand Up @@ -537,7 +536,6 @@ public Subscription call(Observer<T> t1) {
}
}


/**
* an Observable that calls {@link Observer#onError(Exception)} when the Observer subscribes.
*
Expand Down Expand Up @@ -807,18 +805,44 @@ public static <T> Observable<T> just(T value) {
}

/**
* Takes the last item emitted by a source Observable and returns an Observable that emits only
* that item as its sole emission.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/last.png">
* Returns the last element of an observable sequence with a specified source.
*
* @param that
* the source Observable
* @return the last element in the observable sequence.
*/
public static <T> T last(final Observable<T> that) {
T result = null;
for (T value : that.toIterable()) {
result = value;
}
return result;
}

/**
* Returns the last element of an observable sequence that matches the predicate.
*
* @param that
* the source Observable
* @param predicate
* a predicate function to evaluate for elements in the sequence.
* @return the last element in the observable sequence.
*/
public static <T> T last(final Observable<T> that, final Func1<T, Boolean> predicate) {
return last(that.filter(predicate));
}

/**
* Returns the last element of an observable sequence that matches the predicate.
*
* @param that
* the source Observable
* @return an Observable that emits a single item, which is identical to the last item emitted
* by the source Observable
* @param predicate
* a predicate function to evaluate for elements in the sequence.
* @return the last element in the observable sequence.
*/
public static <T> Observable<T> last(final Observable<T> that) {
return _create(OperationLast.last(that));
public static <T> T last(final Observable<T> that, final Object predicate) {
return last(that.filter(predicate));
}

/**
Expand Down Expand Up @@ -1363,7 +1387,7 @@ public static <T> Observable<T> onErrorReturn(final Observable<T> that, Func1<Ex
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public static <T> Observable<T> reduce(Observable<T> sequence, Func2<T, T, T> accumulator) {
return last(_create(OperationScan.scan(sequence, accumulator)));
return takeLast(_create(OperationScan.scan(sequence, accumulator)), 1);
}

/**
Expand Down Expand Up @@ -1435,7 +1459,7 @@ public T call(T t1, T t2) {
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public static <T> Observable<T> reduce(Observable<T> sequence, T initialValue, Func2<T, T, T> accumulator) {
return last(_create(OperationScan.scan(sequence, initialValue, accumulator)));
return takeLast(_create(OperationScan.scan(sequence, initialValue, accumulator)), 1);
}

/**
Expand Down Expand Up @@ -2364,17 +2388,44 @@ public Boolean call(T t1) {
}

/**
* Converts an Observable that emits a sequence of objects into one that only emits the last
* object in this sequence before completing.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/last.png">
* Returns the last element of an observable sequence with a specified source.
*
* @return an Observable that emits only the last item emitted by the original Observable
* @return the last element in the observable sequence.
*/
public Observable<T> last() {
public T last() {
return last(this);
}

/**
* Returns the last element of an observable sequence that matches the predicate.
*
* @param predicate
* a predicate function to evaluate for elements in the sequence.
* @return the last element in the observable sequence.
*/
public T last(final Func1<T, Boolean> predicate) {
return last(this, predicate);
}

/**
* Returns the last element of an observable sequence that matches the predicate.
*
* @param predicate
* a predicate function to evaluate for elements in the sequence.
* @return the last element in the observable sequence.
*/
public T last(final Object predicate) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(predicate);

return last(this, new Func1<T, Boolean>() {
@Override
public Boolean call(T args) {
return (Boolean) _f.call(args);
}
});
}

/**
* Returns the last element, or a default value if no value is found.
*
Expand Down Expand Up @@ -3333,6 +3384,32 @@ public Boolean call(String args) {
});
}

@Test
public void testLast() {
Observable<String> obs = Observable.toObservable("one", "two", "three");

assertEquals("three", obs.last());
}

@Test
public void testLastWithPredicate() {
Observable<String> obs = Observable.toObservable("one", "two", "three");

assertEquals("two", obs.last(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.length() == 3;
}
}));
}

@Test
public void testLastEmptyObservable() {
Observable<String> obs = Observable.toObservable();

assertNull(obs.last());
}

private static class TestException extends RuntimeException {
private static final long serialVersionUID = 1L;
}
Expand Down
91 changes: 0 additions & 91 deletions rxjava-core/src/main/java/rx/operators/OperationLast.java

This file was deleted.